coroexecutor

A coroutine-based Executor implementation


Keywords
asyncio, executor, coroutines, taskgroup, taskgroups
License
MIT
Install
pip install coroexecutor==2021.10.1

Documentation

https://coveralls.io/repos/github/cjrh/coroexecutor/badge.svg?branch=master

Warning

This is alpha. Please don't rely on this in a production setting yet. I will remove this warning when it is ready.

coroexecutor

Provides an Executor interface for running a group of coroutines together in asyncio-native applications.

Demo

import asyncio
from coroexecutor import CoroutineExecutor

async def f(dt, msg=''):
    await asyncio.sleep(dt)
    print(f'completion message: {msg}')

async def main():
    async with CoroutineExecutor(max_workers=10) as exe:
        t1 = await exe.submit(f, 0.01, msg="task 1")
        t2 = await exe.submit(f, 0.05, msg="task 2")

    assert t1.done()
    assert t2.done()

asyncio.run(main())

max_workers controls how many submitted jobs can run concurrently. These internal workers are lightweight of course, they're just asyncio.Task instances. Millions of jobs can be pushed through the executor. As is normal for asyncio, concurrency requires that these jobs be IO-bound, and the upper bound for setting max_workers is mainly going to depend on your CPU and RAM resources.

Discussion

The CoroutineExecutor context manager works very much like the Executor implementations in the concurrent.futures package in the standard library. This is the intention of this package. The basic components of the interface are:

  • The executor applies a context over the creation of jobs
  • Jobs are submitted to the executor
  • All jobs must be complete when the context manager for the executor exits.

After creating a context manager using CoroutineExecutor, the two main features are the submit() method, and the map() method.

It is impossible to exactly match the Executor interface in the concurrent.futures package because some functions in this interface need to be async functions. But we can get close; certainly close enough that a user with experience using the ThreadPoolExecutor or ProcessPoolExecutor should be able to figure things out pretty quickly.

There is a great deal of complexity that can arise. The "happy path" is simple. You just submit jobs to the executor, and they will get executed accordingly. But there are many corner cases:

  • asyncio can concurrently execute thousands, or even tens of thousands of (IO-bound) jobs concurrently. But how to handle more, say, millions of jobs?
  • If one job raises an exception, how to terminate all the other jobs? In the CTRL-C case, this is desired, but what about other cases? Do you always want a single task failure (with an unexpected exception) to cancel the entire batch? And is there a difference between a job raising CancelledError versus raising some other kind of exception?
  • The CoroutineExectutor provides a context manager API: if some code within the body of the context manager (that is not a task) raises an exception, should all the submitted tasks also be cancelled?

Each of these will be discussed in more detail in the sections that follow.

Throttling, using max_workers

Even though it is possible to concurrently execute a much larger number of (IO bound) tasks with asyncio compared to threads or processes, there will still be an upper limit the machine can handle based on either:

  • memory limitations: many task object instances
  • CPU limitations: too many concurrent task objects and events for the event loop to process.

Thus, we also have a max_workers setting to limit concurrency. It might not be obvious how that limitation is applied, say, in the scenario of millions of jobs.

The CoroutineExecutor.submit() is an async def method. This means that you will have to await it, like so:

import asyncio
from coroexecutor import CoroutineExecutor

async def f():
    print('hi!')

async def main():
    async with CoroutineExecutor(max_workers=10) as exe:
        t1 = await exe.submit(f)

asyncio.run(main())

If the total number of jobs already submitted is less than max_workers, the call to await exe.submit() will return immediately: the job will begin executing, and submit() returns an asyncio.Task instance for that job. However, if the total number of concurrently-running jobs is greater than the max_workers setting, this call will wait until the number of currently-running jobs drops below the threshold before adding the new job. This means that submit() applies back-pressure.

Say you have a file containing ten million URLs that you want to fetch using aiohttp. That program might look something like this:

import asyncio, aiohttp
from coroexecutor import CoroutineExecutor

async def fetch(url: str):
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                print('body:', response.text())  # or whatever
    except Exception:
        print('Problem with url:', url)

async def main():
    async with CoroutineExecutor(max_workers=10000) as exe:
        for line in open('urls.txt'):
            await exe.submit(fetch, line)

asyncio.run(main())

Assuming it takes 3 seconds to fetch a single url, this program should take around 1e7 / 1e4 => 1000 seconds to fetch all of them. About 17 minutes, since even though there are 10 million urls, we're doing 10k concurrently. (In practice, some of the endpoints will be very slow to respond, if they respond at all. So for real code you're going to want to either use aiohttp facilities for timeouts on the .get(), or wrap the work inside an asyncio.wait_for() wrapper.)

Note that we're handling errors inside our job function fetch(). By default, if jobs raise exceptions these will cancel all pending jobs inside the executor, and shut it down. For long batch jobs, that may not be what we want, and this is discussed next.

Dealing with errors and cancellation

Generally, there are these kinds of error situations:

  • A job is cancelled, and you want the executor to be shut down
  • A job is cancelled, and the executor must NOT be shut down
  • A job raises an exception (not CancelledError), and you want the executor to shut down
  • A job raises an exception (not CancelledError), and the executor must NOT be shut down

Consider the previous example using aiohttp to fetch URLs: inside the fetch() function, we're handling Exception, which includes asyncio.CancelledError. In general, this is the correct thing to do because you can control what happens in each of the scenarios presented above. But what happens if your code is not supplying the jobs and you don't control how error handling inside them is being managed? By default, if any job raises an exception (cancellation or otherwise) that will initiate "shutdown" of the executor instance, and all other pending jobs on that executor will be cancelled.

If you have a situation where this is not desired, you can ask CoroutineExecutor to ignore all task errors for you:

import asyncio, aiohttp
from coroexecutor import CoroutineExecutor

async def naive_fetch(url: str):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            print('body:', response.text())  # or whatever

async def main():
    async with CoroutineExecutor(
            max_workers=10000,
            suppress_task_errors=True,
    ) as exe:
        for line in open('urls.txt'):
            await exe.submit(naive_fetch, line)

asyncio.run(main())

In this modified example, the job function naive_fetch has no error handling. No matter, the suppress_task_errors parameter will allow the executor to absorb them all. Be careful with this. I recommend against doing this wherever possible, and handle exceptions and CancelledError explicitly within your job functions instead.

Examples

Using map

The concurrent.futures.Executor interface also defines map() which returns an iterator. However, it makes for sense for us to use an asynchronous generator for this purpose. Here's an example from the tests:

times = [0.01, 0.02, 0.03]

async def f(dt):
    await asyncio.sleep(dt)
    return dt

async def main():
    async with CoroutineExecutor() as exe:
        results = exe.map(f, times)
        assert [v async for v in results] == times

asyncio.run(main())

You can see how async for is used to asynchronously loop over the result from calling map.

If one of the function calls raises an error, all unfinished calls will be cancelled, but you may still have received partial results. Here's another example from the tests:

times = [0.01, 0.02, 0.1, 0.2]
results = []

async def f(dt):
    await asyncio.sleep(dt)
    if dt == 0.1:
        raise Exception('oh noes')
    return dt

async def main():
    async with CoroutineExecutor() as exe:
        async for r in exe.map(f, times):
            results.append(r)

with pytest.raises(Exception):
    asyncio.run(main())

assert results == [0.01, 0.02]

The first two values of the batch finish quickly, and I saved these to the results list in the outer scope. Then, one of the jobs fails with an exception. This results in the other pending jobs being cancelled (i.e., the "0.2" case in this example), the CoroutineExecutor instance re-raising the exception, and in this example, the exception raises all the way out to the invocation of the run() function itself. However, note that we still have the results from jobs that succeeded.

Nesting

You don't always have to submit tasks to the executor in a single function. The executor instance can be passed around and work can be added to it from several different places.

from random import random

async def f(dt):
    await asyncio.sleep(dt)

async def producer1(executor: CoroutineExecutor):
    executor.submit(f, random())
    executor.submit(f, random())
    executor.submit(f, random())

async def producer2(executor: CoroutineExecutor):
    executor.submit(f, random())
    executor.submit(f, random())
    executor.submit(f, random())

async def main():
    async with CoroutineExecutor(timeout=0.5) as executor:
        executor.submit(f, random())
        executor.submit(f, random())
        executor.submit(f, random())

        executor.submit(producer1, executor)
        executor.submit(producer2, executor)

run(main())

You can not only submit jobs within the executor context manager, but also pass the instance around and collect jobs from other functions too. And the timeout set when creating the CoroutineExecutor instance will still be applied.