Python async processing – make your data pipelines scream

Easily control asynchronous execution of individual functions inside a program instead of parallelizing the processing

Preamble

“Well, can’t you throw a bunch of additional servers at the problem, make the process run in parallel and make it faster?” asks an exasperated Tom, the Chief Technology Officer at Acme Widgets.

Acme’s data management team has developed multiple data pipelines to transport, transform and deliver data to various systems for consumption. The pipelines read data from various sources--such as kafka streams, queues, disks and even a network socket--and execute important computations on the data before sending them to the destination systems. While the timing for computation is rather predictable, the timing of the arrival of the data is often much slower compared to the computation; sporadic and unpredictable. Executed in sequential steps, the entire pipeline slows down, causing lost opportunities and massive customer dissatisfaction.

 But Acme has access to a practically unlimited number of virtual servers on their cloud service. Can’t they just spin up multiple servers to run this pipeline as multi-threaded and make it faster? That was the question from Tom. But Debra, the head of development, thinks rewriting the process from a single threaded one to a multi-threaded one is an overly complex solution which is expensive, error-prone, and sometimes not even possible. Surely there must be a better solution. They both turn to Amanda for some enlightenment.

This is not a new situation for Amanda. As a data leader in many organizations before coming to Acme, she has had to educate many other technology leaders on the delicate nuances of data patterns, which includes careful designs to avoid the risk of untrusted data. She knows from past experience that explaining what can or cannot be done with clear examples is often the best way to prove the point of what is best for Acme.

Both Tom and Debra have valid points, agrees Amanda. “There is good news,” she declares. “And bad. First, the good news: it is possible to achieve a pseudo multi-threaded capability in Python in a much simpler manner to make the pipeline faster. And the bad: there are nuances to the approach and not everything can be multi-threaded easily. But overall, it should be faster.”

Everyone urges her to explain in detail.

A pipeline

First, Amanda explains why some processes are slow and multi-threading will not likely make them faster. She gives an example of the following parts of the process:

  1. Read data. This function waits for the data to be read from somewhere, e.g. a kafka stream, a network socket or even from the disk. One example is getting the number of clicks on the website by customers.
  2. Compute on data. This function applies a computation on the data that is received, such as adding all the clicks from a specific type of browser.
  3. Write data. This function writes the results to some store or sends to a queue.

It resembles the following loop:

red, purple, and green rectangles with black text in the middle, next to a black circular process loop

If executed sequentially, it resembles this flow, the length of the chevrons signify the elapsed time for that function. Amanda points out that the read() functions often vary in execution time.

process timeline made of red and green interlocking arrow segments, above a bold black line signifying elapsed time

As the number of read operations increases, the elapsed time also increases. To make the pipeline multi-threaded, Amanda explains, we have to execute multiple instances of compute() function; but then we will need to know how to divide the output of the read() function and keep track of who has done how much. This makes the design very complex and difficult to maintain, as Debra was saying. To add to the problem, in some cases creating multiple instances of compute() is not even possible since the data may need to be processed in real time.

A more practical approach, Amanda explains, is to run only a few functions in parallel and the others in sequence. Typically the read operations are slow and have unpredictable timings; hence those can be executed in parallel. The compute operation, where the state of the data is more important, can be executed sequentially as shown in the following figure.

staggered, stacked red and green arrows above a bold black line representing elapsed time

Here, Amanda explains, the read functions() are in parallel. The compute() function, running sequentially, starts right after the first output from any of the read() functions, not waiting for all to finish. It can then pick up the results of the other read() functions as they finish. This dramatically shrinks the elapsed time. Interleaving the functions this way results in a faster overall execution time.

But this is not parallel processing, Debra interjects, because not all components run in parallel.

“No, it’s not,” responds Amanda, “It’s selectively running components asynchronously. The trick is to run only some components this way. And it is a relatively trivial task to implement it in Python.”

The “traditional” say

First, Amanda examines the traditional way a program is written. She shows an example program named sync.py in which a function called myproc() does something that takes 5 seconds. For the sake of simplicity she just put sleep(5) to simulate the 5 second execution time. When the function call is made, the program prints a line “myproc started …”. Likewise it prints “myproc finished …” when the function finishes its execution. And finally, using the time module, she records the elapsed time.

    # sync.py
import time
def myproc():
   print("myProc started ...")
   t1 = time.perf_counter()
   time.sleep(5)
   t = time.perf_counter() - t1
   print(f"   myProc finished in {t:0.5f} seconds.")
  
def main():
   for _ in range(5):
       myproc()
    
if __name__ == "__main__":
   start_sec = time.perf_counter()
   main()
   elapsed_secs = time.perf_counter() - start_sec
   print(f"Job finished in {elapsed_secs:0.5f} seconds.")
  

Amanda executes this program in Python and shows the output:

    myProc started ...
  myProc finished in 5.00262 seconds.
myProc started ...
  myProc finished in 5.00281 seconds.
myProc started ...
  myProc finished in 5.00011 seconds.
myProc started ...
  myProc finished in 5.00042 seconds.
myProc started ...
  myProc finished in 5.00504 seconds.
Job finished in 25.01145 seconds.
  

The job took a total of 25 seconds, as expected, since each run of the function myproc() takes 5 seconds. Running it 5 times, sequentially, it finished the job in 25 seconds.

The async way

That’s how the data pipelines are or have been traditionally developed, Amanda points out. Now, she rewrites the program in an asynchronous manner. Here is the modified code, named async1.py. She put comments (preceded by #) to identify the modified parts.

    # async1.py
import asyncio # new module 
import time
async def myproc(): # async is new
   print("myProc started ...")
   t1 = time.perf_counter()
   await asyncio.sleep(5) # await asyncio is new
   t = time.perf_counter() - t1
   print(f"   myProc finished in {t:0.5f} seconds.")
async def main(): # async is new
   await asyncio.gather( # await asyncio is new
     myproc(),
     myproc(),
     myproc(),
     myproc(),
     myproc()
   )
if __name__ == "__main__":
   start_sec = time.perf_counter()
   asyncio.run(main()) # asyncio.run is new
   elapsed_secs = time.perf_counter() - start_sec
   print(f"Job finished in {elapsed_secs:0.5f} seconds.")
  

Here is the output when she ran the code:

    myProc started ...
myProc started ...
myProc started ...
myProc started ...
myProc started ...
  myProc finished in 5.00337 seconds.
  myProc finished in 5.00347 seconds.
  myProc finished in 5.00349 seconds.
  myProc finished in 5.00351 seconds.
  myProc finished in 5.00353 seconds.
Job finished in 5.00495 seconds.
  

Everyone exclaimed. The entire process now finished in just 5 seconds (which is the time it takes for each run of the function myproc()). How it was possible--they all asked Amanda.

The async/await combo

The trick, Amanda explains, is the inclusion of async and await keywords in the code. These keywords make sure that function behaves in an asynchronous manner. She points to the following line:

    await sleep(5)
  

The await keyword tells Python not to wait for its completion; but to send the control back to the caller immediately and continue processing.

There was a general sense of confusion in the room. So, Amanda repeated that as it was a pretty powerful concept for the group to understand. The keyword tells the Python interpreter to immediately give the control back to the caller (in this case the program main()) but continue processing the rest of the statements inside the function. The function gets the control back once the execution is over.

Then she points to the the following line:

    asyncio.run(main())
  

This, Amanda continues, executes the main() function in an asynchronous manner, i.e. not wait for the completion of myproc(). But, looking at code, she notes that there is nothing else in the main() function code; so the main() function calls the next iteration of the myproc(), again asynchronously as before, and it also passes the control as soon as the await keyword is encountered.

Debra seems alarmed. The myproc() function is supposed to perform something useful, such as reading, processing or writing data. If the control has gone back to the main() function immediately, and the program forgets about what happened to the results of myproc(), how it is useful in the processing of the pipeline--she wonders.

Aha! There lies the magic--explains Amanda. The very moment the function myproc() completes (which is about 5 seconds, as designed), the control passes back to the function main(), which initially called it. The main() function executes the next line, which is printing the finishing line as well as the elapsed time.

The function myproc() took 5 seconds (as expected); but the overall program executed all the function calls in sort of a parallel manner. Hence the overall time was also just 5 seconds.

“Note I called it sort of parallel,” Amanda continues. The execution was actually not in parallel. She put the async clause in the code of the function to tell when to get the control back to the caller. It mimicked parallelism. So, opines Tom, the called function was merely a non-blocking call. Precisely, answers Amanda. “For those of you familiar with the unix shell, it’s somewhat similar to calling a command with the nohup and background processing option (with an “&” at the end) multiple times simultaneously.”

    nohup somecommand.sh &
nohup somecommand.sh &
nohup somecommand.sh &
nohup somecommand.sh &
nohup somecommand.sh &
  

“All of them will run asynchronously. But,” she clarifies, “unlike the unix shell, my function call grabs the control back when the execution of the lines inside is complete. That is what makes it so powerful.”

We control the async part

All eyes turn to Debra to confirm if this will reduce the long elapsed times; but she is skeptical. “I understand that you can call functions in asynchronous manner to execute in a pseudo-parallel manner and reduce the execution time,” she explains, “but not all the function calls can be made asynchronous. For instance, functions to read data can be asynchronous; but not functions that transform. The latter functions, if called asynchronously, will introduce data errors.”

Fair concern--Amanda concedes--and explains that it is possible to selectively control the asynchrosity of the functions. To illustrate she modifies the code async1.py to async2.py, with the changes shown with comments. In this modified code she assumes two tasks taking 2.5 seconds each. One part is asynchronous, which can be run in parallel. In real life this will be akin to reading data from disk, socket, queue, etc. The other part, which also takes 2.5 seconds cannot be asynchronous. In real life this will be akin to transformations.

    # async2.py
import asyncio
import time
async def myproc():
   print("myProc started ...")
   t1 = time.perf_counter()
   # the following is an async call function that takes 2.5 secs
   await asyncio.sleep(2.5)
   # the following is an sync call function that takes 2.5 secs
   time.sleep(2.5)
   t = time.perf_counter() - t1
   print(f"   myProc finished in {t:0.5f} seconds.")
async def main():
   await asyncio.gather(
     myproc(),
     myproc(),
     myproc(),
     myproc(),
     myproc()
   )
if __name__ == "__main__":
   start_sec = time.perf_counter()
   asyncio.run(main())
   elapsed_secs = time.perf_counter() - start_sec
   print(f"Job finished in {elapsed_secs:0.5f} seconds.")
  

Here is the result when she ran the script:

    myProc started ...
myProc started ...
myProc started ...
myProc started ...
myProc started ...
  myProc finished in 5.00751 seconds.
  myProc finished in 7.50905 seconds.
  myProc finished in 10.01197 seconds.
  myProc finished in 12.51726 seconds.
  myProc finished in 15.02254 seconds.
Job finished in 15.02414 seconds.
  

All found the output interesting. Of course, all the function calls started immediately, as expected. However, Debra observes, each call now takes progressively more time, in fact exactly 2.5 seconds more from the last one. That’s peculiar and she wants to know why.

It’s because, explains Debbie, only the 2.5 seconds of the call, not the entire call, was asynchronous. Once that asynchronous part was reached, the control returned back to the function myproc() to execute the rest of the lines. The immediate next line was the second sleep() which was not async. This is why the function myproc() did not return control back to the caller--the module main() in this case until that sleep was complete. Hence it was a blocking call. This is why the second iteration of function call had to wait for the execution of that sleep of 2.5 seconds.

This example by Amanda makes everyone realize why this technique is not really parallel execution. It was merely a selective async (non-blocking) execution that she controlled to achieve pseudo-parallelism; but they all conceded that the technique might be enough for many use cases.

Order of calls matter

“Does the order in which you call the async and sync parts inside a function call?” ponders Tom.

Yes, and a great concern to address, warns Amanda. To illustrate how the order can be useful and important she makes another small change to the program and named it async3.py. In this she simply reverses the sequence of calling async and sync sleep commands, as shown with comments:

    # async3.py
import asyncio
import time
async def myproc():
   print("myProc started ...")
   t1 = time.perf_counter()
   # Now it is sync first and then async
   time.sleep(2.5) 
   await asyncio.sleep(2.5)
   t = time.perf_counter() - t1
   print(f"   myProc finished in {t:0.5f} seconds.")
async def main():
   await asyncio.gather(
     myproc(),
     myproc(),
     myproc(),
     myproc(),
     myproc()
   )
if __name__ == "__main__":
   start_sec = time.perf_counter()
   asyncio.run(main())
   elapsed_secs = time.perf_counter() - start_sec
   print(f"Job finished in {elapsed_secs:0.5f} seconds.")
  

She runs the program and gets this output:

    myProc started ...
myProc started ...
myProc started ...
myProc started ...
myProc started ...
  myProc finished in 12.51053 seconds.
  myProc finished in 10.00526 seconds.
  myProc finished in 7.50407 seconds.
  myProc finished in 5.00093 seconds.
  myProc finished in 5.00068 seconds.
Job finished in 15.01211 seconds.
  

While it is not visible in this static screenshot, the first line of the function call “myProc started …” now comes up staggered, with 2.5 seconds between them. Earlier all the lines “myProc started …” came up at the same time. The audience was curious about this interesting change first.

Amanda points their attention to the first sleep call of 2.5 seconds, which was synchronous; hence myproc() has to wait that long before sending the control back to the main program. The main() program couldn’t call myproc() again until then. That’s why the message comes after exactly 2.5 seconds, she explains.

The second part the audience was curious about was the lines “myProc finished …” which were in reverse order of execution time, unlike the last time which was smallest to the largest.

Amanda explains: after 2.5 seconds, the function myproc() encounters the async sleep call. Since  it was async, it immediately returned the control to main(), which then called the second instance of myproc(). The same thing happened in the second instance of myproc(), i.e. it waited for 2.5 seconds. Finally, after the 5th iteration of the myproc() function, main() went back to the first call of myproc(); but it was after all the other calls were done. Hence we have an interesting pattern here, Debra pointed out: the first call to myproc() was the longest as opposed to the previous case, where the last call was the longest. The overall time remained the same, though.

This is why, Debbie warns, the order in which you call the sync and async parts of the function matter. It determines how the program will return values from the sub-components. The overall time will not vary.

Law of the order

But Tom is not convinced. Is it really relevant in the real world scenario-- he wants to know.

Fair point, agrees Amanda and considers another small task in this program. Suppose, she continues, we get the clicks on the website and we need to compute the sum inside the function call. To keep it simple, she just increments the variable named sum by 1 at each invocation of the function. She also adds some code to display how the functions are called after one another. She changes the function code to accept a parameter called callid which is merely a number to represent each call to the function distinctly. Then she prints a chain of the callids to show which iteration of the functions was called.

    import asyncio
import time
chain = ""
sum = 0
async def myproc(callid):
   global chain
   global sum
   print(f"myProc {callid} started ...")
   t1 = time.perf_counter()
   await asyncio.sleep(2.5)
   chain = chain + "->" + str(callid)
   sum = sum + 1
   time.sleep(2.5)
   t = time.perf_counter() - t1
   print(f"   myProc {callid} finished in {t:0.5f} seconds. sum = {sum} chain {chain}")
async def main():
   await asyncio.gather(
     myproc(1),
     myproc(2),
     myproc(3),
     myproc(4),
     myproc(5)
   )
if __name__ == "__main__":
   start_sec = time.perf_counter()
   asyncio.run(main())
   elapsed_secs = time.perf_counter() - start_sec
   print(f"Job finished in {elapsed_secs:0.5f} seconds.")
  

Here is the output:

    myProc 1 started ...
myProc 2 started ...
myProc 3 started ...
myProc 4 started ...
myProc 5 started ...
  myProc 1 finished in 5.00606 seconds. sum = 1 chain ->1
  myProc 2 finished in 7.51137 seconds. sum = 2 chain ->1->2
  myProc 3 finished in 10.01224 seconds. sum = 3 chain ->1->2->3
  myProc 4 finished in 12.51499 seconds. sum = 4 chain ->1->2->3->4
  myProc 5 finished in 15.01671 seconds. sum = 5 chain ->1->2->3->4->5
Job finished in 15.01861 seconds.
  

The audience notes that the functions were called in the sequence she called them, as shown in the chain; but the sum variable varies within each function call. They wondered why that was the case.

Amanda points their attention to the code of function myproc(). When it was called for the first time, it encountered the async sleep; so it returned the control immediately to main(), which then went on to the second myproc() function call. However, by the time the second function call (callid=2) started to execute the sync part, the first function call (callid=1) had already started the computation and done its work. At that time the sum was 0; so it added 1 to it to come up with 1. By the time the second function call came to the compute portion of the code, the sum was already 1; so it came up with 1 + 1 = 2. And so was the chain for the rest of the calls.

So, Tom again implores, why is it relevant?

If all you need is the final sum, Amanda explains, which is at the end of the job, then it is no problem. The number will be correct in any way. However, if you are using the sum inside the function, e.g. checking if the sum is greater than 3 to execute some other action, then clearly calls 1, 2 and 3 will fail but 4 and 5 will succeed. This could inadvertently introduce a bug, she warns. For instance if you have different types of functions: ones that execute some computation and others that observe and fine tune the execution of the computing functions, it’s important to put them in the proper order; otherwise you will get into a race condition.

To illustrate, Amanda makes a small variation where she reverses the order of sync and async calls.

    import asyncio
import time
chain = ""
sum = 0
async def myproc(callid):
   global chain
   global sum
   print(f"myProc {callid} started ...")
   t1 = time.perf_counter()
   time.sleep(2.5)
   chain = chain + "->" + str(callid)
   sum = sum + 1
   await asyncio.sleep(2.5)
   t = time.perf_counter() - t1
   print(f"   myProc {callid} finished in {t:0.5f} seconds. sum = {sum} chain {chain}")
async def main():
   await asyncio.gather(
     myproc(1),
     myproc(2),
     myproc(3),
     myproc(4),
     myproc(5)
   )
if __name__ == "__main__":
   start_sec = time.perf_counter()
   asyncio.run(main())
   elapsed_secs = time.perf_counter() - start_sec
   print(f"Job finished in {elapsed_secs:0.5f} seconds.")
  

Here is the output:

    myProc 1 started ...
myProc 2 started ...
myProc 3 started ...
myProc 4 started ...
myProc 5 started ...
  myProc 1 finished in 12.51241 seconds. sum = 5 chain ->1->2->3->4->5
  myProc 2 finished in 10.01062 seconds. sum = 5 chain ->1->2->3->4->5
  myProc 3 finished in 7.51010 seconds. sum = 5 chain ->1->2->3->4->5
  myProc 4 finished in 5.00613 seconds. sum = 5 chain ->1->2->3->4->5
  myProc 5 finished in 5.00680 seconds. sum = 5 chain ->1->2->3->4->5
Job finished in 15.01523 seconds.
  

While the end result is the same, i.e. the job completed in about 15 seconds, the behavior inside the function, Amanda notes, is very different. She draws the attention of the audience to show how the value of sum is the same and the chain is the same in all iterations of the function call. That’s because, she explains, the first call is sync (not async as in the previous example) and the function call waits till that sync call completes. By the time the sync call has completed, the values have all been assigned appropriately and the sum is updated. In this case, she observes, it will be safe to check the value of variables at any iteration.

The bottom line, Amanda cautions, is to pay close attention to which parts of the function call are sync or async and where you check the values of variables that are set inside the function. It could yield different results without you even being aware of it. Tom, Debra and the entire audience were very happy to learn the technique and the potential issues to avoid. They profusely thank Amanda.

Summary

Let’s revisit the scenario described in the beginning of the story--a data pipeline with two example functions:

  1. read(): where you wait for data to arrive
  2. compute(): where you perform computations on the arrived data

The read() function is slow, unpredictable and is a good candidate for parallelizing; the compute() function is fast but cannot be parallelized. Calling them sequentially will slow down the entire pipeline. You can interleave the functions to make the pipeline faster but you have to make sure you call them in the right order.

  • You can use async processing in Python in many cases to mimic parallel processing, using a few syntax changes instead of doing true parallel processing which is generally harder.
  • But it is not multithreading. You control which parts are async and which ones are not.
  • The combination of two keywords make it possible. async applies to function definitions to tell Python the function is an asynchronous call. And await tells the command to pass the control back to the caller immediately but continue processing the rest of the statements.
  • Pay attention to how the logic inside function call behaves for sync and async events. The variable could have wrong values if not accessed or updated at the right place.

Next steps

For more information on async processing read the official Python documentation: https://docs.python.org/3/library/asyncio.html

***

Business vector created by gstudioimagen - www.freepik.com


Arup Nanda, Sr Director, Enterprise Architecture

Data Engineering, Cloud Computing and Data Management Leader in FinTech space. Author of 6 books and 700+ articles, speaker of 500+ sessions, training seminars in 25 countries, mentor, friend, father and husband--not in any particular order. Awarded Oracle’s DBA of the Year in 2003 and Enterprise Architect of the Year in 2012. Blogs at arup.blogspot.com.

Explore #LifeAtCapitalOne

Feeling inspired? So are we.

Learn more

Related Content