
BLOG.JETBRAINS.COM
Faster Python: Concurrency in async/await and threading
If you have been coding with Python for a while, especially if you have been using frameworks and libraries such as Fast API and discord.py, then you have probably been using async/await or asyncio. You may have heard statements like multithreading in Python isnt real, and you may also know about the famous (or infamous) GIL in Python. In light of the denial about multithreading in Python, you might be wondering what the difference between async/await and multithreading actually is especially in Python programming. If so, this is the blog post for you!What is multithreading?In programming, multithreading refers to the ability of a program to execute multiple sequential tasks (called threads) concurrently. These threads can run on a single processor core or across multiple cores. However, due to the limitation of the Global Interpreter Lock (GIL), multithreading in Python is only processed on a single core. The exception is nogil (also called thread-free) Python, which removes the GIL and will be covered in part 2 of this series. For this blog post, we will assume that the GIL is always present.What is concurrency?Concurrency in programming means that the computer is doing more than one thing at a time, or seems to be doing more than one thing at a time, even if the different tasks are executed on a single processor. By managing resources and interactions between different parts of a program, different tasks are allowed to make progress independently and in overlapping time intervals.Both asyncio and threading appear concurrent in PythonLoosely speaking, both the asyncio and threading Python libraries enable the appearance of concurrency. However, your CPUs are not doing multiple things at the exact same time. It just seems like they are.Imagine you are hosting a multi-course dinner for some guests. Some of the dishes take time to cook, for example, the pie that needs to be baked in the oven or the soup simmering on the stove. While we are waiting for those to cook, we do not just stand around and wait. We will do something else in the meantime. This is similar to concurrency in Python. Sometimes your Python process is waiting for something to get done. For example, some input/output (I/O) processes are being handled by the operating system, and in this time the Python process is just waiting. We can then use async to let another Python process run while it waits.The difference is who is in chargeIf both asyncio and threading appear concurrent, what is the difference between them? Well, the main difference is a matter of who is in charge of which process is running and when. For async/await, the approach is sometimes called cooperative concurrency. A coroutine or future gives up its control to another coroutine or future to let others have a go. On the other hand, in threading, the operating systems manager will be in control of which process is running.Cooperative concurrency is like a meeting with a microphone being passed around for people to speak. Whoever has the microphone can talk, and when they are done or have nothing else to say, they will pass the microphone to the next person. In contrast, multithreading is a meeting where there is a chairperson who will determine who has the floor at any given time.Writing concurrent code in PythonLets have a look at how concurrency works in Python by writing some example code. We will create a fast food restaurant simulation using both asyncio and threading.How async/await works in PythonThe asyncio package was introduced in Python 3.4, while the async and await keywords were introduced in Python 3.5. One of the main things that make async/await possible is the use of coroutines. Coroutines in Python are actually generators repurposed to be able to pause and pass back to the main function.Now, imagine a burger restaurant where only one staff member is working. The orders are prepared according to a first-in-first-out queue, and no async operations can be performed:import timedef make_burger(order_num): print(f"Preparing burger #{order_num}...") time.sleep(5) # time for making the burger print(f"Burger made #{order_num}")def main(): for i in range(3): make_burger(i)if __name__ == "__main__": s = time.perf_counter() main() elapsed = time.perf_counter() - s print(f"Orders completed in {elapsed:0.2f} seconds.")This will take a while to finish:Preparing burger #0...Burger made #0Preparing burger #1...Burger made #1Preparing burger #2...Burger made #2Orders completed in 15.01 seconds.Now, imagine the restaurant brings in more staff, so that it can perform work concurrently:import asyncioimport timeasync def make_burger(order_num):print(f"Preparing burger #{order_num}...")await asyncio.sleep(5) # time for making the burgerprint(f"Burger made #{order_num}")async def main():order_queue = []for i in range(3):order_queue.append(make_burger(i))await asyncio.gather(*(order_queue))if __name__ == "__main__":s = time.perf_counter()asyncio.run(main())elapsed = time.perf_counter() - sprint(f"Orders completed in {elapsed:0.2f} seconds.")We see the difference between the two:Preparing burger #0...Preparing burger #1...Preparing burger #2...Burger made #0Burger made #1Burger made #2Orders completed in 5.00 seconds.Using the functions provided by asyncio, like run and gather, and the keywords async and await, we have created coroutines that can make burgers concurrently.Now, lets take a step further and create a more complicated simulation. Imagine we only have two workers, and we can only make two burgers at a time.import asyncioimport timeorder_queue = asyncio.Queue()def take_order():for i in range(3):order_queue.put_nowait(make_burger(i))async def make_burger(order_num):print(f"Preparing burger #{order_num}...")await asyncio.sleep(5) # time for making the burgerprint(f"Burger made #{order_num}")class Staff:def __init__(self, name):self.name = nameasync def working(self):while order_queue.qsize() > 0:print(f"{self.name} is working...")task = await order_queue.get()await taskprint(f"{self.name} finished a task...")async def main():staff1 = Staff(name="John")staff2 = Staff(name="Jane")take_order()await asyncio.gather(staff1.working(), staff2.working())if __name__ == "__main__":s = time.perf_counter()asyncio.run(main())elapsed = time.perf_counter() - sprint(f"Orders completed in {elapsed:0.2f} seconds.")Here we will use a queue to hold the tasks, and the staff will pick them up.John is working...Preparing burger #0...Jane is working...Preparing burger #1...Burger made #0John finished a task...John is working...Preparing burger #2...Burger made #1Jane finished a task...Burger made #2John finished a task...Orders completed in 10.00 seconds.In this example, we use asyncio.Queue to store the tasks, but it will be more useful if we have multiple types of tasks, as shown in the following example.import asyncioimport timetask_queue = asyncio.Queue()order_num = 0async def take_order():global order_numorder_num += 1print(f"Order burger and fries for order #{order_num:04d}:")burger_num = input("Number of burgers:")for i in range(int(burger_num)):await task_queue.put(make_burger(f"{order_num:04d}-burger{i:02d}"))fries_num = input("Number of fries:")for i in range(int(fries_num)):await task_queue.put(make_fries(f"{order_num:04d}-fries{i:02d}"))print(f"Order #{order_num:04d} queued.")await task_queue.put(take_order())async def make_burger(order_num):print(f"Preparing burger #{order_num}...")await asyncio.sleep(5) # time for making the burgerprint(f"Burger made #{order_num}")async def make_fries(order_num):print(f"Preparing fries #{order_num}...")await asyncio.sleep(2) # time for making friesprint(f"Fries made #{order_num}")class Staff:def __init__(self, name):self.name = nameasync def working(self):while True:if task_queue.qsize() > 0:print(f"{self.name} is working...")task = await task_queue.get()await taskprint(f"{self.name} finish task...")else:await asyncio.sleep(1) #restasync def main():task_queue.put_nowait(take_order())staff1 = Staff(name="John")staff2 = Staff(name="Jane")await asyncio.gather(staff1.working(), staff2.working())if __name__ == "__main__":s = time.perf_counter()asyncio.run(main())elapsed = time.perf_counter() - sprint(f"Orders completed in {elapsed:0.2f} seconds.")In this example, there are multiple tasks, including making fries, which takes less time, and taking orders, which involves getting input from the user.Notice that the program stops waiting for the users input, and even the other staff who are not taking the order stop working in the background. This is because the input function is not async and therefore is not awaited. Remember, control in async code is only released when it is awaited. To fix that, we can replace:input("Number of burgers:")Withawait asyncio.to_thread(input, "Number of burgers:")And we do the same for fries see the code below. Note that now the program runs in an infinite loop. If we need to stop it, we can deliberately crash the program with an invalid input.import asyncioimport timetask_queue = asyncio.Queue()order_num = 0async def take_order():global order_numorder_num += 1print(f"Order burger and fries for order #{order_num:04d}:")burger_num = await asyncio.to_thread(input, "Number of burgers:")for i in range(int(burger_num)):await task_queue.put(make_burger(f"{order_num:04d}-burger{i:02d}"))fries_num = await asyncio.to_thread(input, "Number of fries:")for i in range(int(fries_num)):await task_queue.put(make_fries(f"{order_num:04d}-fries{i:02d}"))print(f"Order #{order_num:04d} queued.")await task_queue.put(take_order())async def make_burger(order_num):print(f"Preparing burger #{order_num}...")await asyncio.sleep(5) # time for making the burgerprint(f"Burger made #{order_num}")async def make_fries(order_num):print(f"Preparing fries #{order_num}...")await asyncio.sleep(2) # time for making friesprint(f"Fries made #{order_num}")class Staff:def __init__(self, name):self.name = nameasync def working(self):while True:if task_queue.qsize() > 0:print(f"{self.name} is working...")task = await task_queue.get()await taskprint(f"{self.name} finish task...")else:await asyncio.sleep(1) #restasync def main():task_queue.put_nowait(take_order())staff1 = Staff(name="John")staff2 = Staff(name="Jane")await asyncio.gather(staff1.working(), staff2.working())if __name__ == "__main__":s = time.perf_counter()asyncio.run(main())elapsed = time.perf_counter() - sprint(f"Orders completed in {elapsed:0.2f} seconds.")By using asyncio.to_thread, we have put the input function into a separate thread (see this reference). Do note, however, that this trick only unblocks I/O-bounded tasks if the Python GIL is present.If you run the code above, you may also see that the standard I/O in the terminal is scrambled. The user I/O and the record of what is happening should be separate. We can put the record into a log to inspect later.import asyncioimport loggingimport timelogger = logging.getLogger(__name__)logging.basicConfig(filename='pyburger.log', level=logging.INFO)task_queue = asyncio.Queue()order_num = 0closing = Falseasync def take_order():global order_num, closingtry:order_num += 1logger.info(f"Taking Order #{order_num:04d}...")print(f"Order burger and fries for order #{order_num:04d}:")burger_num = await asyncio.to_thread(input, "Number of burgers:")for i in range(int(burger_num)):await task_queue.put(make_burger(f"{order_num:04d}-burger{i:02d}"))fries_num = await asyncio.to_thread(input, "Number of fries:")for i in range(int(fries_num)):await task_queue.put(make_fries(f"{order_num:04d}-fries{i:02d}"))logger.info(f"Order #{order_num:04d} queued.")print(f"Order #{order_num:04d} queued, please wait.")await task_queue.put(take_order())except ValueError:print("Goodbye!")logger.info("Closing down... stop taking orders and finish all tasks.")closing = Trueasync def make_burger(order_num):logger.info(f"Preparing burger #{order_num}...")await asyncio.sleep(5) # time for making the burgerlogger.info(f"Burger made #{order_num}")async def make_fries(order_num):logger.info(f"Preparing fries #{order_num}...")await asyncio.sleep(2) # time for making frieslogger.info(f"Fries made #{order_num}")class Staff:def __init__(self, name):self.name = nameasync def working(self):while True:if task_queue.qsize() > 0:logger.info(f"{self.name} is working...")task = await task_queue.get()await tasktask_queue.task_done()logger.info(f"{self.name} finish task.")elif closing:returnelse:await asyncio.sleep(1) #restasync def main():global task_queuetask_queue.put_nowait(take_order())staff1 = Staff(name="John")staff2 = Staff(name="Jane")print("Welcome to Pyburger!")logger.info("Ready for business!")await asyncio.gather(staff1.working(), staff2.working())logger.info("All tasks finished. Closing now.")if __name__ == "__main__":s = time.perf_counter()asyncio.run(main())elapsed = time.perf_counter() - slogger.info(f"Orders completed in {elapsed:0.2f} seconds.")In this final code block, we have logged the simulation information in pyburger.log and reserved the terminal for messages for customers. We also catch invalid input during the ordering process and switch a closing flag to True if the input is invalid, assuming the user wants to quit. Once the closing flag is set to True, the worker will return, ending the coroutines infinite while loop.How does threading work in Python?In the example above, we put an I/O-bound task into another thread. You may wonder if we can put all tasks into separate threads and let them run concurrently. Lets try using threading instead of asyncio.Consider the code we have as shown below, where we create burgers concurrently with no limitation put in place:import asyncioimport timeasync def make_burger(order_num):print(f"Preparing burger #{order_num}...")await asyncio.sleep(5) # time for making the burgerprint(f"Burger made #{order_num}")async def main():order_queue = []for i in range(3):order_queue.append(make_burger(i))await asyncio.gather(*(order_queue))if __name__ == "__main__":s = time.perf_counter()asyncio.run(main())elapsed = time.perf_counter() - sprint(f"Orders completed in {elapsed:0.2f} seconds.")```Instead of creating async coroutines to make the burgers, we can just send functions down different threads like this:```import threadingimport timedef make_burger(order_num):print(f"Preparing burger #{order_num}...")time.sleep(5) # time for making the burgerprint(f"Burger made #{order_num}")def main():order_queue = []for i in range(3):task = threading.Thread(target=make_burger, args=(i,))order_queue.append(task)task.start()for task in order_queue:task.join()if __name__ == "__main__":s = time.perf_counter()main()elapsed = time.perf_counter() - sprint(f"Orders completed in {elapsed:0.2f} seconds.")In the first for loop in main, tasks are created in different threads and get a kickstart. The second for loop makes sure all the burgers are made before the program moves on (that is, before it returns to main).It is more complicated when we have only two staff members. Each member of the staff is represented with a thread, and they will take tasks from a normal list where they are all stored.import threadingimport timeorder_queue = []def take_order():for i in range(3):order_queue.append(make_burger(i))def make_burger(order_num):def making_burger():print(f"Preparing burger #{order_num}...")time.sleep(5) # time for making the burgerprint(f"Burger made #{order_num}")return making_burgerdef working():while len(order_queue) > 0:print(f"{threading.current_thread().name} is working...")task = order_queue.pop(0)task()print(f"{threading.current_thread().name} finish task...")def main():take_order()staff1 = threading.Thread(target=working, name="John")staff1.start()staff2 = threading.Thread(target=working, name="Jane")staff2.start()staff1.join()staff2.join()if __name__ == "__main__":s = time.perf_counter()main()elapsed = time.perf_counter() - sprint(f"Orders completed in {elapsed:0.2f} seconds.")When you run the code above, an error may occur in one of the threads, saying that it is trying to get a task from an empty list. You may wonder why this is the case, since we have a condition in the while loop that causes it to continue only if the task_queue is not empty. Nevertheless, we still get an error because we have encountered race conditions.Race conditionsRace conditions can occur when multiple threads attempt to access the same resource or data at the same time and cause problems in the system. The timing and order of when the resource is accessed are important to the program logic, and unpredictable timing or the interleaving of multiple threads accessing and modifying shared data can cause errors.To solve the race condition in our program, we will deploy a lock to the task_queue:queue_lock = threading.Lock()For working, we need to make sure we have access rights to the queue when checking its length and getting tasks from it. While we have the rights, other threads cannot access the queue:def working():while True:with queue_lock:if len(order_queue) == 0:returnelse:task = order_queue.pop(0)print(f"{threading.current_thread().name} is working...")task()print(f"{threading.current_thread().name} finish task...")```Based on what we have learned so far, we can complete our final code with threading like this:```import loggingimport threadingimport timelogger = logging.getLogger(__name__)logging.basicConfig(filename="pyburger_threads.log", level=logging.INFO)queue_lock = threading.Lock()task_queue = []order_num = 0closing = Falsedef take_order():global order_num, closingtry:order_num += 1logger.info(f"Taking Order #{order_num:04d}...")print(f"Order burger and fries for order #{order_num:04d}:")burger_num = input("Number of burgers:")for i in range(int(burger_num)):with queue_lock:task_queue.append(make_burger(f"{order_num:04d}-burger{i:02d}"))fries_num = input("Number of fries:")for i in range(int(fries_num)):with queue_lock:task_queue.append(make_fries(f"{order_num:04d}-fries{i:02d}"))logger.info(f"Order #{order_num:04d} queued.")print(f"Order #{order_num:04d} queued, please wait.")with queue_lock:task_queue.append(take_order)except ValueError:print("Goodbye!")logger.info("Closing down... stop taking orders and finish all tasks.")closing = Truedef make_burger(order_num):def making_burger():logger.info(f"Preparing burger #{order_num}...")time.sleep(5) # time for making the burgerlogger.info(f"Burger made #{order_num}")return making_burgerdef make_fries(order_num):def making_fries():logger.info(f"Preparing fried #{order_num}...")time.sleep(2) # time for making frieslogger.info(f"Fries made #{order_num}")return making_friesdef working():while True:with queue_lock:if len(task_queue) == 0:if closing:returnelse:task = Noneelse:task = task_queue.pop(0)if task:logger.info(f"{threading.current_thread().name} is working...")task()logger.info(f"{threading.current_thread().name} finish task...")else:time.sleep(1) # restdef main():print("Welcome to Pyburger!")logger.info("Ready for business!")task_queue.append(take_order)staff1 = threading.Thread(target=working, name="John")staff1.start()staff2 = threading.Thread(target=working, name="Jane")staff2.start()staff1.join()staff2.join()logger.info("All tasks finished. Closing now.")if __name__ == "__main__":s = time.perf_counter()main()elapsed = time.perf_counter() - slogger.info(f"Orders completed in {elapsed:0.2f} seconds.")If you compare the two code snippets using asyncio and threading, they should have similar results. You may wonder which one is better and why you should choose one over the other.Practically, writing asyncio code is easier than multithreading because we dont have to take care of potential race conditions and deadlocks by ourselves. Controls are passed around coroutines by default, so no locks are needed. However, Python threads do have the potential to run in parallel, just not most of the time with the GIL in place. We can revisit this when we talk about nogil (thread-free) Python in the next blog post.Benefiting from concurrencyWhy do we want to use concurrency in programming? Theres one main reason: speed. Like we have illustrated above, tasks can be completed faster if we can cut down the waiting time. There are different types of waiting in computing, and for each one, we tend to use different methods to save time.I/O-bound tasksA task or program is considered input/output (I/O) bound when its execution speed is primarily limited by the speed of I/O operations, such as reading from a file or network, or waiting for user input. I/O operations are generally slower than other CPU operations, and therefore, tasks that involve lots of them can take significantly more time. Typical examples of these tasks include reading data from a database, handling web requests, or working with large files.Using async/await concurrency can help optimize the waiting time during I/O-bound tasks by unblocking the processing sequence and letting other tasks be taken care of while waiting.Async/await concurrency is beneficial in many Python applications, such as web applications that involve a lot of communication with databases and handling web requests. GUIs (graphical user interfaces) can also benefit from async/await concurrency by allowing background tasks to be performed while the user is interacting with the application.CPU-bound tasksA task or program is considered CPU-bound when its execution speed is primarily limited by the speed of the CPU. Typical examples include image or video processing, like resizing or editing, and complex mathematical calculations, such as matrix multiplication or training machine learning models.Contrary to I/O-bound tasks, CPU-bound tasks can rarely be optimised by using async/await concurrency, as the CPU is already busy working on the tasks. If you have more than one CPU in your machine, or if you can offload some of these tasks to one or more GPUs, then CPU-bound tasks can be finished faster by creating more threads and performing multiprocessing. Multiprocessing can optimise how these CPUs and GPUs are used, which is also why many machine learning and AI models these days are trained on multiple GPUs.This, however, is tough to perform with pure Python code, as Python itself is designed to provide abstract layers so users do not have to control the lower-level computation processes. Moreover, Pythons GIL limits the sharing of Python resources across multiple threads on your computer. Recently, Python 3.13 made it possible to remove the GIL, allowing for true multithreading. We will discuss the GIL, and the ability to go without it, in the next blog post.Sometimes, none of the methods we mentioned above are able to speed up CPU-bound tasks sufficiently. When that is the case, the CPU-bound tasks may need to be broken into smaller ones so that they can be performed simultaneously over multiple threads, multiple processors, or even multiple machines. This is parallel processing, and you may have to rewrite your code completely to implement it. In Python, the multiprocessing package offers both local and remote concurrency, which can be used to work around the limitation of the GIL. We will also look at some examples of that in the next blog post.Debugging concurrent code in PyCharmDebugging async or concurrent code can be hard, as the program is not executed in sequence, meaning it is hard to see where and when the code is being executed. Many developers use print to help trace the flow of the code, but this approach is not recommended, as it is very clumsy and using it to investigate a complex program, like a concurrent one, isnt easy. Plus, it is messy to tidy up after.Many IDEs provide debuggers, which are great for inspecting variables and the flow of the program. Debuggers also provide a clear stack trace across multiple threads. Lets see how we can track the task_queue of our example restaurant simulation in PyCharm.First, we will put down some breakpoints in our code. You can do that by clicking the line number of the line where you want the debugger to pause. The line number will turn into a red dot, indicating that a breakpoint is set there. We will put breakpoints at lines 23, 27, and 65, where the task_queue is changed in different threads.Then we can run the program in debug mode by clicking the little bug icon in the top right.After clicking on the icon, the Debug window will open up. The program will run until it hits the first breakpoint highlighted in the code.Here we see the John thread is trying to pick up the task, and line 65 is highlighted. At this point, the highlighted line has not been executed yet. This is useful when we want to inspect the variables before entering the breakpoint.Lets inspect whats in the task_queue. You can do so simply by starting to type in the Debug window, as shown below.Select or type in task_queue, and then press Enter. You will see that the take_order task is in the queue.Now, lets execute the breakpoint by clicking the Step in button, as shown below.After pressing that and inspecting the Special Variables window that pops up, we see that the task variable is now take_order in the John thread.When querying the task_queue again, we see that now the list is empty.Now lets click the Resume Program button and let the program run.When the program hits the user input part, PyCharm will bring us to the Console window so we can provide the input. Lets say we want two burgers. Type 2 and press Enter.Now we hit the second breakpoint. If we click on Threads & Variables to go back to that window, well see that burger_num is two, as we entered.Now lets step into the breakpoint and inspect the task_queue, just like we did before. We see that one make_burger task has been added.We let the program run again, and if we step into the breakpoint when it stops, we see that Jane is picking up the task.You can inspect the rest of the code yourself. When you are done, simply press the red Stop button at the top of the window.With the debugger in PyCharm, you can follow the execution of your program across different threads and inspect different variables very easily.ConclusionNow we have learned the basics of concurrency in Python, and I hope you will be able to master it with practice. In the next blog post, we will have a look at the Python GIL, the role it plays, and what changes when it is absent.PyCharm provides powerful tools for working with concurrent Python code. As demonstrated in this blog post, the debugger allows the step-by-step inspection of both async and threaded code, helping you track the execution flow, monitor shared resources, and detect issues. With intuitive breakpoints, real-time variable views, seamless console integration for user input, and robust logging support, PyCharm makes it easier to write, test, and debug applications with confidence and clarity. Download PyCharm Now
0 Comentários
0 Compartilhamentos
29 Visualizações