Taqu Task Queue system

task, queue, azure, service, bus
pip install taqu==1.0.0


https://travis-ci.org/lietu/taqu.svg?branch=master https://sonarcloud.io/api/project_badges/measure?project=lietu_taqu&metric=alert_status GitHub issues PyPI - Downloads PyPI PyPI - Python Version

Python Task Queue system built on Azure Service Bus queues and pydantic models.

What is this?

Lots of systems benefit from having a queue for background tasks, that run independently of e.g. your APIs or other processes. This can help e.g. by enhancing your API performance, limiting the effects of traffic peaks, as well as scaling out with parallel processing of various things.

The defacto standard for Python is Celery + RabbitMQ, but hosting RabbitMQ is another liability and not always the most fun experience, and Celery doesn't support asyncio yet. Fully hosted solutions such as the Azure Service Bus help you get off the ground faster, with less things to worry about, and can allow you to save on costs as well.

Primarily intended for use with asyncio (from taqu.aio module), but works with non-async code just as well (using imports from the taqu module).

Supports all the basic things you could need:

  • Fast insertion of tasks to queue
  • Async task processing
  • Easy to scale workers
  • Retry logic - if there's an uncaught exception the task will automatically be put back in the queue
  • Clean shutdown on Ctrl+C (waits until tasks finish processing)


Licensing is important. This project itself uses BSD 3-clause license, but e.g. Azure library for Storage Bus and other such libraries used by it may have their own licenses.

For more information check the LICENSE -file.

Getting started

In the Azure Portal set up a new Service Bus (any tier is fine), and then a queue in it. You probably want to enable partitioning, maybe also dead-lettering. Then you'll want to get the access credentials for your code. Ensure you've got the Azure CLI installed and then run:

az login  # Ensure you're logged in to Azure
az account list  # List subscriptions
az account set --subscription <subscriptionId>  # Set active subscription

az servicebus namespace authorization-rule keys list \
    --resource-group <rgName> \
    --namespace-name <namespace> \
    --name RootManageSharedAccessKey \
    --query primaryConnectionString \
    --output tsv

Also you'll need the Taqu library installed, e.g. for use with the Azure:

pip install taqu[azure]

Then, you can set up your worker, here's an example worker.py that you can just run with python worker.py:

import asyncio

from taqu.aio import TaquAzureWorker
from pydantic import BaseModel


worker = TaquAzureWorker(CONNECTION_STRING)

class CreateUser(BaseModel):
    username: str

async def create_user(user: CreateUser):


With the worker in place, you can create a client and send some tasks

from taqu import TaquAzureClient
from pydantic import BaseModel


taqu = TaquAzureClient(CONNECTION_STRING)

class CreateUser(BaseModel):
    username: str


You can also check out the examples.


This project is run on GitHub using the issue tracking and pull requests here. If you want to contribute, feel free to submit issues (incl. feature requests) or PRs here.

To test changes locally python setup.py develop is a good way to run this, and you can python setup.py develop --uninstall afterwards (you might want to also use the --user flag).