Python Concurrency (v)

Photo by Victor Garcia on Unsplash

Finally, let’s talk about Python’s Asyncio module. Asyncio is the new concurrency module introduced in Python 3.4. It is designed to use coroutines and futures to simplify asynchronous code and make it almost as readable as synchronous code. Asyncio uses different constructs: event loops, coroutines, async/await syntax, etc.

It differs from the more traditional threading or multiprocess approach to asynchronous code execution in that it utilises an event loop to handle the scheduling of asynchronous tasks.

Note that it is designed to improve I/O bound operation performance, but not CPU bound operations.

The foundation of async IO is Event Loop.

You may hear of or use event loop already, if you are using any JavaScript code as it has a concurrency model based on an event loop, which is responsible for executing the code, collecting and processing events, and executing queued sub-tasks.

Generally, tasks (async functions) are registered to the loop and wait in a queue, the loop runs one task, and while that task waits for IO, it pauses it and runs another. When the first task completes, the scheduler resumes it and and returns the result. Thus two or more functions can co-operatively run together. This the main goal of an event loop.

In order to be registered to an event loop, we need something called a awaitable. There are three main types of awaitables:

  • Coroutines
  • Tasks
  • Futures

A coroutine is a specialised version of a Python Generator function. So we may want to have a quick look at Generators before come back to Coroutines.

Generators are functions that generates values (is it so obvious!). Normally, when function returns a value and the underlying scope is destroyed. When we call again, a new scope is built. But a generator function can yield a value and pause the execution of the function. The control is then passed back to the parent scope. It can then resume the execution again when it’s got called and produce the next value on the line.

def gen():
yield “Hi”
yield “You”
gen = gen()
print(next(gen)) #Hi
print(next(gen)) #You

When we call the generator function, we get a generator object (the gen above) that is an iterable whose value can be retrieved by calling next().

We can also get the value use for loop:

for _ in gen:
print(_)
#Hi
#You

Coroutine an extenstion of Generator.

There are two types of coroutines:

  • generator coroutines: asyncio module using legacy asyncio implementation.
  • native coroutines: asyncio module using the new async/await implementation.

A coroutine can pause the execution of the function by using the yield, yield from or await keywords in an expression until the yield statement returns a value. We can also send value to a coroutine using send() method on a generator object.

Coroutines are suitable for doing conconcurrent work as they are able to pause and resume the execution naturally, so that the main thread can easily switch from one task to another.

def coro():
hi = yield “Hi”
yield hi
coro_obj = coro()
print(next(coro_obj))
print(coro_obj.send(“You”))
# Hi
# You

In above example we first define a coroutine function coro , and then we call it and gets a coroutine object coro_obj . We first get the value by calling next() and send the value You by calling send() .

Generator based coroutines

Started from Python 3.4, we have the generator based coroutines, meaning any asyncio based code would have used yield from wait on Futures or any other coroutines to finish.

import asyncio
@asyncio.coroutine
def sleep_5():
yield from asyncio.sleep(5)
@asyncio.coroutine
def sleep_7(num, loop):
yield from asyncio.sleep(7)
loop = asyncio.get_event_loop()asyncio.ensure_future(sleep_5())
asyncio.ensure_future(sleep_7())
loop.run_forever()

In above example, we use yield from keyword to await results from asyncio.sleep() function call. Then asyncio.ensure_future() is to schedule the execution of the coroutine in the current event loop.

The code will run sleep_5 and when it sees the yield from, the event loop knows that it has to wait for a while so it pauses execution of the coroutine and runs another sleep_7. Thus two functions run concurrently .

Note that the Generator based coroutine functions (e.g. those defined by decorating a function with @asyncio.coroutine) are superseded by the async/await syntax, but will continue to be supported until Python 3.10.

Async/await based coroutines

Coroutines created with async def are implemented using the more recent __await__ dunder method starting in Python 3.5. To rewrite the above code:

import asyncio
import datetime
async def sleep_5():
await asyncio.sleep(5)
async def sleep_7(num, loop):
await asyncio.sleep(7)
loop = asyncio.get_event_loop()asyncio.ensure_future(sleep_5())
asyncio.ensure_future(sleep_7())
loop.run_forever()

We simply replace the decorator with the async keyword before the def keyword and use the await keyword instead of yield from.

Again this is nothing new if you use Javascript — Promise. In Python similar concepts are called Future/Task.

A Future is an object whose result is pending now but will resolve and return the future.

A Task is a special kind of Future that wraps a coroutine. When the coroutine finishes, the result of the Task is materialised .

All asyncio applications have at least one entrypoint task that will be scheduled to run immediately on the event loop using the asyncio.run function which expect a Coroutine function.

Internally asyncio will check if it’s a coroutine using coroutines.iscoroutine .

If so the coroutine will be passed to loop.run_until_complete , which expects a Future.

Internally asyncio will check it using futures.isfuture .

If not then then the ensure_future will be called to convert the Coroutine into a Future.

Note that before 3.7, you may need to manually create your own Future and register with the event loop. But after Python 3.7, this has been replaced by asyncio.create_task.

The point of ensure_future() is only useful if you have something that could either be a coroutine or a Future, and you want to be able to call a method on it that is only defined on Future . That being said, if you know that you have a coroutine and you want it to be scheduled, the correct API to use is create_task().

asyncio.create_task: which simply call event_loop.create_task(coro) directly.

ensure_future which also call event_loop.create_task(coro) if it is coroutine or else it is simply to ensure the return type to be a asyncio.Future.

Also, the process of setting up the event loop, creating a task and passing it to the event loop has been abstracted by asyncio.run.

So instead of:

import asyncioasync def hi():
print("Hi!")
async def coro():
await hi()
print("You")
loop = asyncio.get_event_loop()
loop.run_until_complete(coro())
loop.close()

You can simply replace the last three lines with:

asyncio.run(coro())

The .run function always creates a new event loop and closes it at the end, thinking of it like a context manager.

For example, we can use create_task to convert coroutine function into a Task. This automatically register the Task to be run on the next slot in the event loop:

import asyncioasync def sub():
await asyncio.sleep(10)
print("Sub task!")
async def main():
sub_task = asyncio.create_task(sub())
#2.return sub task as pending as it's not run yet
print(sub_task)
#3. context switch from main to sub, as main is busy for 5 sec
await asyncio.sleep(5)
#4. context switch from sub to main, as sub is busy for 10 sec: get main result
print("Main task!")
#5. context switch from main to sub, as main is busy for 10 sec; sub has finished waiting (5 sec) and returns result
await asyncio.sleep(10)
print(sub_task)
asyncio.run(main()) # 1. event loop with entry task main registered=================><Task pending coro=<hi() running at create_task.py:4>>
Main task!
Sub task!
<Task finished coro=<hi() done, defined at create_task.py:4> result=None>

There are also some other functions to help manage the concurrency tasks:

  • asyncio.gather: takes a list of awaitables and returns an aggregate list of successfully resolved values. (Like Promise.all() );
import asyncioasync def hi(n):
await asyncio.sleep(n)
async def main():
tasks = [hi(1), hi(2), hi(3)]
await asyncio.gather(*tasks)
asyncio.run(main())
  • asyncio.shield: prevent an awaitable object from being cancelled.
  • asyncio.wait: wait for a list of awaitables until the certain condition is met.
  • asyncio.wait_for: wait for a single awaitable, until the given timeout is reached.
import asyncioasync def hi(n):
await asyncio.sleep(10)
async def main():
try:
await asyncio.wait_for(hi(2), timeout=10)
except asyncio.TimeoutError:
print("timeout exceeded!")
asyncio.run(main())
  • asyncio.as_completed: similar to gather but returns Futures that are populated when results are ready.
  • future.add_done_callback : add a callback to be run when the Future is done. This is a rough equivalent of the then operator of pre-async/await JavaScript promises. Add this to the create_Task example above.
import asyncioasync def sub():
await asyncio.sleep(10)
return "Sub task!"
def callback_result(future):
print(f"{future.result()} completed")
async def main():
sub_task = asyncio.create_task(sub())
sub_task.add_done_callback(callback_result)
print(sub_task)
await asyncio.sleep(5)
print("Main task!")
await asyncio.sleep(10)
print(sub_task)
asyncio.run(main())===================>
<Task pending coro=<hi() running at create_task.py:4>>
Main task!
Sub task completed!
<Task finished coro=<hi() done, defined at create_task.py:4> result=None>

Note that below are deprecated functions which you should avoid.

  • @asyncio.coroutine: removed in favour of async def in Python 3.10
  • asyncio.sleep: the loop parameter will be removed in Python 3.10

In real life, we normally create a coroutines function, and wrap it in Future and get a Task object from create_task . When the coroutine returns value, the Task also returns and scheduled callback is run if there’s any. The event loop will keep this cycle until there’s no tasks on the pipeline.

import asyncioasync def coro():
await asyncio.sleep(1)
return ‘Coroutine is done!’
def callback_add(future):
print(future.result())
# loop.stop()
# only use this if we call loop.run_forever, or just use loop.run_until_complete(task) if fine.
loop = asyncio.get_event_loop()
task = loop.create_task(coro())
task.add_done_callback(callback_add)
loop.run_until_complete(task)# loop.run_forever()

Hi :)