
Celery like task manager for new AsyncIO Python module

asyncio, celery, celery-task, coroutines, python, python-3
pip install aiotasks==1.0.0a2



aiotasks: A Celery like task manager for the new AsyncIO Python module

aiohttp-cache logo
Project site
Author Daniel Garcia (cr0hn) - @ggdaniel
Latest Version 1.0.0-alpha
Python versions 3.5 or above

What's aiotasks

aiotasks is an asynchronous task queue/job queue based on distributed message passing based on Python asyncio framework. Based on the Celery Task Queue ideas, but focusing in performance, non-blocking, event-driven.

aiotasks doesn't does pulling or active waiting for tasks jobs, instead use asyncio framework to suspend the execution until any new data are received by the broker actively.

aiotaks is still under development. Not as active as I would like (for time limitations), but the project is in active development.

If you wan't contribute, take a look to the file.


You can find examples at examples folder.

You can run aiotasks as two ways:

  • Launching a aiotasks manager in an independent console / process (like Celery does), and then send any tasks to aiotasks thought the broker.
  • Running the standalone way: Launching the client and the server in an unique point an running both at the same time.

Running using the manager

Currently there's a limitation for launching the tasks. Python files with the tasks should be in a package to be able for aiotasks to import them.

This limitation is in TODO to fix in the future, to allow to import .py directly without be inside in a package.

Run the manager

> aiotasks -vvvv worker -A examples.launch_manager_tasks_and_launch_in_console

Send the tasks

> python examples/

Running standalone

> python examples/

Defining tasks

This concept was ported from Celery. Define any tasks is very simple, only need to decorate a function with task function.

from aiotasks import build_manager

manager = build_manager("redis://")

@manager.task()  # <-- DEFINITION OF TASK
async def task_01(num):  # <-- TASK SHOULD BE A **COROUTINE**
    print("Task 01 starting: {}".format(num))
    await asyncio.sleep(2, loop=manager.loop)
    print("Task 01 stopping")

Sending info to tasks

Currently aiotasks only support send information using the delay(...) method and need to be access to the task definition:

from aiotasks import build_manager

manager = build_manager("redis://")

async def task_01(num):
    await asyncio.sleep(0, loop=manager.loop)

async def generate_tasks():
    # Generates 5 tasks
    for x in range(5):
        await task_01.delay(x)  # <-- METHOD DELAY SEND A TASK

if __name__ == '__main__':

Sending info to tasks & wait for response

We can also send for a task job and wait for the response in a non-blocking mode:

from aiotasks import build_manager

manager = build_manager("redis://")

async def task_01(num):
    await asyncio.sleep(0, loop=manager.loop)

async def generate_tasks():
    # Generates 5 tasks
    async with task_01.wait(x) as f:  # <-- NON-BLOCKING WAITING FOR RESPONSE

if __name__ == '__main__':


Currently only two backend are supported:

  • Redis: redis://HOST:PORT/DB
  • In memory: memory://


Connect to localhost and default Redis options:

from aiotasks import build_manager

manager = build_manager("redis://")


Custom Redis server:

from aiotasks import build_manager

manager = build_manager("redis://:mypassword@")


In memory

This execution mode is useful to do small and local tasks. For example: If you're using aiohttp and want to send and email in a background way, you can use the standalone way and the memory backend.

from aiotasks import build_manager

manager = build_manager("memory://")
