arque

Asyncio Reliable Queue (based on redis)


Keywords
asyncio, redis, reliable, queue, asynchronous, python, reliable-queue, work-queue, delay, delayed, jobs, delayed-queue, repeated, tasks, dead, letter, failed
License
MIT
Install
pip install arque==1.1.0

Documentation

arque

Asyncio Reliable Queue (based on redis)

Inspired by Tom DeWire's article "Reliable Queueing in Redis (Part 1)" [1] [2] and the "torrelque" python module [3].

Features:

- Asynchronous: based on asyncio and aioredis
- Reliable: at any moment task data stored in redis database
- Throttling: controls number of tasks in execution
- Delayed queue: defers task availability
- Dead letters: put task data in failed queue after number of predefined retry attempts
- Tested on Python 3.7 and redis server '>=3.0.6', '<=5.0.5'
- Used in containerized applications (managed by kubernetes) in high load environments

Install:

pip install arque

Usage:

import signal
import random
import logging
import asyncio
import aioredis
import time
from functools import wraps
from arque import Arque

logger = logging.getLogger(__name__)


async def shutdown(signal, loop):
    """Cleanup tasks tied to the service's shutdown."""
    logging.info(f"Received exit signal {signal.name}...")
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    [task.cancel() for task in tasks]
    logging.info(f"Cancelling {len(tasks)}outstanding tasks")
    await asyncio.gather(*tasks)
    logging.info(f"Flushing metrics")
    loop.stop()


def aioredis_pool(host='redis://localhost', encoding='utf8'):
    def wrapper(func):
        @wraps(func)
        async def wrapped():
            redis = await aioredis.create_redis_pool(host, encoding=encoding)
            try:
                return await func(redis=redis)
            finally:
                redis.close()
                await redis.wait_closed()

        return wrapped

    return wrapper


@aioredis_pool(host='redis://localhost', encoding='utf8')
async def produce_task(redis=None):
    logger.info('Starting producing...')
    queue = Arque(redis=redis)
    while True:
        for _ in range(1):
            task = {'value': random.randint(0, 99)}
            task_id = f"custom_{task['value']}_{time.time()}"
            logger.debug('Produced task %s', task)
            await queue.enqueue(task, task_id=task_id, task_timeout=10, delay=1)
        await asyncio.sleep(1)


async def process(task_data):
    logger.debug('Consumed task %s', task_data)
    await asyncio.sleep(1)


@aioredis_pool(host='redis://localhost', encoding='utf8')
async def consume_task(redis=None):
    logger.info('Starting consuming...')
    queue = Arque(redis=redis, working_limit=3)
    while True:
        task_id, task_data = await queue.dequeue()
        if task_id == '__not_found__':
            continue

        if task_id == '__overloaded__':
            print(f'TASK ID: {task_id}')
            await asyncio.sleep(1)
            continue

        if task_id == '__marked_as_failed___':
            print(f'FAILED  ID: {task_id}')
            continue

        try:
            await process(task_data)
            await queue.release(task_id)
        except Exception:
            logger.exception('Job processing has failed')
            await queue.requeue(task_id, delay=5)
            stats = await queue.get_stats()
            logger.info(stats)


@aioredis_pool(host='redis://localhost', encoding='utf8')
async def sweep_task(redis=None):
    logger.info('Starting sweeping...')
    queue = Arque(redis=redis, sweep_interval=5)
    await queue.schedule_sweep()


@aioredis_pool(host='redis://localhost', encoding='utf8')
async def stats_task(redis=None):
    logger.info('Starting stats...')
    queue = Arque(redis=redis)
    while True:
        stats = await queue.get_stats()
        logger.info(stats)
        await asyncio.sleep(5)


async def example():
    tasks = []
    for _ in range(5):
        tasks.append(consume_task())
    tasks.append(produce_task())
    tasks.append(sweep_task())
    tasks.append(stats_task())
    await asyncio.gather(*tasks)


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(message)s')
    loop = asyncio.get_event_loop()
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT, signal.SIGUSR1)
    for s in signals:
        loop.add_signal_handler(s, lambda s=s: asyncio.create_task(shutdown(s, loop)))
    try:
        loop.run_until_complete(example())
    finally:
        loop.close()
        logging.info("Successfully shutdown...")

Reference

[1] Reliable Queueing in Redis (Part 1)
[2] DEWIRE Redis as a Reliable Work Queue.pdf
[3] torrelque