Towards asyncio from threading in Python

26 Mar 2020

Undertaking a conversion of a multithreaded Python project to asyncio can be daunting, but your program can reap huge performance benefits with it.

basic asyncio code

Background

There’s ample information about what asyncio is about, and how it differs conceptually from multi-threading and multi-processing. This post doesn’t cover those differences, but rather aims to provide practical tips for solving situations that will likely come up while converting an existing project to asyncio, or while developing a new project using asyncio.

Motivation

I recently converted a large project which used heavy multithreading to constantly fetch and store I/O bound information. It was the perfect candidate for conversion, since the program mostly waited on resources – REST APIs, databases, and sockets. Whenever things weren’t going fast enough, I simply added more threads and called it a day. After a certain point, however, at roughly 50 threads, it was still taking too long between fetches, so I decided to try moving components of the project to asyncio over time and see how that would improve performance.

Since the conversion was motivated by application performance, I measured average CPU usage, and time to complete data reporting. Essentially, the program simply fetches data on a loop from many different sources, so I measured how long it took to do a full round of data reporting. Here are the results:

Environment CPU Usage Fetch Time
multithreaded 6% 3 minutes
asyncio 10% 30 seconds

This means a 6x speed-up against 2x CPU usage, meaning the effort was well worth the time. Additionally, there was less fluctuation in the rate of data collection – the 30 second fetch average in asyncio had a small variance, while the 3 minutes in the multithreaded ranged wildly, from 1 minute to 6 minutes.

Converting to asyncio can be extremely beneficial to your system! Again, note that this example was particularly well-suited for big performance gains because it was mostly I/O bound, and not CPU bound.

History

The asyncio package has been around since Python 3.4, and each new release has brought greater ease of use, along with usual improvements and refinements. Here are some highlights from each release:

Development in asyncio has gotten easier and more performant because of these enhancements along with greater 3rd party library support.

Getting started / top-level

Other languages, most notably ECMAScript / JavaScript and C#, integrate asynchronous programming in a non-intrusive way – you simply need to add async in the function declaration and some form of await within the function. For example, here is a simple C# program to read a file synchronously:

using System;
using System.IO;

class SyncTest
{
  public static void PrintFile()
  {
    string text = File.ReadAllText("./text.txt");
    Console.WriteLine(text);
  }

  public static void Main()
  {
    PrintFile();
  }
}

Using asynchronous development, this becomes:

using System;
using System.IO;
using System.Threading.Tasks;
using System.Text;

class AsyncTest
{
  public static async Task PrintFile()
  {
    byte[] result;
    using (FileStream SourceStream = File.Open("./text.txt", FileMode.Open))
    {
      result = new byte[SourceStream.Length];
      await SourceStream.ReadAsync(result, 0, (int)SourceStream.Length);
    }

    string text = Encoding.ASCII.GetString(result);
    Console.WriteLine(text);
  }

  public static async Task Main()
  {
    await PrintFile();
  }
}

Although we have to tag async and await everywhere, and all async functions need to return a Task or Task<T>, the general code is pretty similar, since Main() can be async.

For Python, asyncio works very differently from threading, so normal code changes a lot with the introduction of the event loop. Here is a sample program that uses threading to read and print a file synchronously:

import threading

def print_file():
    with open('./test.txt') as f:
        text = f.read()
        print(text)

t = threading.Thread(target=read_file)
t.start()

The asynchronous version requires an external package aiofiles and a few other bits which we will break down:

import asyncio
import aiofiles

async def print_file():
    async with aiofiles.open('./test.txt') as f:
        text = await f.read()
        print(text)

async def main():
    loop = asyncio.get_event_loop()
    task = loop.create_task(print_file())
    await task

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Note that we could use the asyncio.run() and asyncio.create_task() convenience functions to make the code look simpler, but that would mask the event loop, which is will be vital during the conversion. The official documentation does the best job of explaining everything in detail, but the most important functions for moving from threads are:

Those are the basics! With this background, and your effort, it’s only a matter of time before the whole project is converted to asyncio.

Guidelines for converting your code

General guidelines

These general guidelines were useful for my project. Some of them are more stylistic, so your mileage may vary on them. As with any large refactor, go component by component and be sure to maintain a working synchronous version that you can always use as a fallback.

import time

class Component(object):
    # __init__ and friends omitted for brevity
    def perform_long_job(self):
        # some long-running code here
        time.sleep(10)

You will copy it to ./component_async.py or (even better) ./async_support/component.py, which looks like:

import asyncio
from .component import Component as SyncComponent

class Component(SyncComponent):
    # __init__ and friends omitted for brevity
    async def perform_long_job(self):
        # some long-running code here
        await asyncio.sleep(10)

This way, the users of Component have a simple and clear conversion, from:

from .component import Component

def use_component():
    component = Component()
    component.perform_long_job()

Into the asynchronous version:

from .async_support.component import Component

async def use_component():
    component = Component()
    await component.perform_long_job()

Once a component is converted, you will also want to properly test it to make sure the functionality is maintained. As with any refactor, this is where having good tests already in place is extremely helpful. Just as with the component, you will copy the test file and then properly use async and await in the test code. There are many asyncio testing components out there, but if you’re using unittest, the easiest way to convert tests to asyncio is to use this decorator on tests:

import asyncio

def make_async(f):
    def wrapper(*args, **kwargs):
        coroutine = asyncio.coroutine(f)
        future = coroutine(*args, **kwargs)
        asyncio.get_event_loop().run_until_complete(future)
    return wrapper

For example, a sample synchronous TestCase:

from unittest import TestCase
from .component import Component

class SyncTestCase(TestCase):
    def test_feature(self):
        component = Component()
        component.perform_long_job()

becomes:

from unittest import TestCase
from .async_support.component import Component

class AsyncTestCase(TestCase):
    @make_async
    async def test_feature(self):
        component = Component()
        await component.perform_long_job()

For the first part of the conversion, you will feel like you’re just writing async and await all over the place, propagating through too many classes and function calls. And you will be right. There is always an end in sight though! Keep creating the async versions of components and test at every level possible.

Note that this approach does violate some DRY principles, but there will unfortunately be some inevitable repetition with some functions since one function cannot be both async and sync at the same time. This doesn’t mean you should neglect all code reuse. Do reuse functionality that can stay sync in the async version. To that end, you can have the async version inherit from the synchronous version.

Managing concurrent tasks

At the top-level, instead of handling Threads, you will have Tasks everywhere created using create_task(). Remember that the code starts executing right after the call to create_task(), and you do not need to await it to start.

Be sure to refer to the Task documentation to see what is available. You will probably be happy with done(), exception(), and result(), but be sure to explore around.

Converting dependencies

Before you’ve even started tackling the conversion of your own code, you’ll need to see how all your external dependencies support asyncio, which will strongly influence any design changes required in your application. Your dependencies will likely fall into a few different cases, from easiest to hardest:

Third-party libraries are understandably all over the map for async support. Thankfully, it’s only getting better as asyncio continues to gain traction all over the community. It can also be the perfect opportunity to do some useful and appreciated open-source work. For example, here is a pull request that I did for a popular financial data package.

Drop-in replacement

Some library providers simply give an async variant that operates exactly like its synchronous counterpart, albeit with async all over the place. This is the best possible case since you won’t have to make any big changes in your async implementation. For example, in the MongoDB world, motor is an extremely simple alternative to pymongo. Even better, there is extensive reuse of pymongo objects within motor, so most imports won’t even need to be updated.

Search around for the async variants and cross your fingers that it’s there! And if not, you can make a name for yourself in the open-source community.

Incompatible drop-in replacement

Some async libraries look similar to their sync versions but aren’t exactly the same. For example, first-party Python libraries unfortunately look a bit different. The socket library is mostly replaced by functions that exist directly on the event loop with some inconsistencies. The asyncio version of creating a connection requires a protocol_factory implementing some network protocol for handling data when it’s sent or received.

Note that some APIS will require the event loop to work. Passing the event loop down to client code used to be the stylistic standard, but this has mostly been replaced with getting the current event loop in most situations, unless your application requires multiple event loops. Beware that older libraries may still require the event loop directly. You can find the interesting discussion here.

No async library available, no dependency on async components

What do you do if you have mostly asynchronous replacements for all of your dependencies, but there’s still one or two holdouts? And most importantly, you don’t have the time or resources to port the library yourself?

Thankfully, you can use run_in_executor() to use an Executor and run your blocking synchronous code within an asynchronous context. This will look a lot like your old threaded setup.

This step is extremely important, because one blocking call can stall your entire asynchronous setup. Remember that those await calls tell the event loop to move onto other tasks. If a non-awaited call takes a long time, nothing else can be done in that time.

No async library available, need to call async components

What do you do if you are using a sync-only external library that maintains its own thread and needs to call async code? An example would be a listening socket not directly maintained by your application. It needs to keep listening on its socket in its own thread, and eventually call into async code when it receives data. This is a problem because the listening thread is calling into the event loop owned by another thread. If multiple threads are asking one event loop to do things, you can run into normal multi-threading race conditions.

Again, thankfully, asyncio has you covered! First, it will raise an exception if you try to use an event loop from another thread. Second, you have run_coroutine_threadsafe() to handle this situation without too much trouble. Note that this function gives you concurrent.futures.Future, which must be sychronously “awaited” using result(timeout) if you need the return value.

Testing

Unless your testing library has an async version also available, you can use the make_async() helper decorator described earlier to convert tests to async.

Testing on all components is critical to ensure that everything behaves as expected, and to double-check that there aren’t any missing awaits anywhere. Trust me, that will happen.

Wrapping up

This post was meant to clarify how to move over to the new world of asyncio in Python. Be sure to let me know if this worked for you or if you’ve encountered other cases not covered here during your asyncio transition. The official documentation even lists common pitfalls and solutions when developing with asyncio. It certainly helped me during the transition.

Once everything has been converted over, I hope you will find a much more performant and reliable Python application!