sharded-queue

Sharded queue implementation


Keywords
jobqueue, priority, priority-queue, python, queue, redis, sharded, sharding, task-queue
License
MIT
Install
pip install sharded-queue==0.2.10

Documentation

Sharded queue

CI pypi codecov downloads license

Introduction

Imagine your job queue operates at very high rps and needs distribution over multiple workers. But you need to keep context-sensitive requests in same thread and manage thread request processing priority. In other words, sharded queue is a queue with sub-queues inside. Tasks are executed in FIFO order and you define how to route them correctly per handler basis.

Installation

Install using pip

pip install sharded-queue

Getting started

There are some roles that you need to understand:

  • request a simple message that should be delivered to a handler
  • handler request handler that performs the job
  • route defines internal queue that is used for request distribution
    • thread a group of context-sensitive requests
    • priority can be used to sort requests inside the thread

Let's start with a simple notification task that is shared by 3 threads and there are no priorities. Notice, that handler methods are written using batch approach to reduce io latency per each message.

from sharded_queue import Handler, Queue, Route

class NotifyRequest(NamedTuple):
    '''
    In this example we have simple notify request containing user identifier
    In addition, the value is used to shard requests over worker threads
    '''
    user_id: int

class NotifyHandler(Handler):
    @classmethod
    async def route(cls, *requests: NotifyRequest) -> list[Route]:
        '''
        Spread requests by 3 threads that can be concurrently processed
        '''
        return [
            Route(thread=request.user_id % 3)
            for request in requests
        ]

    async def perform(self, *requests: NotifyRequest) -> None:
        '''
        Perform is called using configurable batch size
        This allows you to reduce io per single request
        '''
        # users = await UserRepository.find_all([r.user_id for r in requests])
        # await mailer.send_all([construct_message(user) for user in users])

    def batch_size(self) -> Optional[int]:
        '''
        Optionally, you can ovveride global batch size per handler
        '''
        return 128

Usage example

When a handler is described you can use queue and worker api to manage and process tasks. Let's describe runtime components:

  • lock helps worker bind to the queue
  • queue is used to register requests
  • storage a database containing queue data
  • worker performs requests using handler
from asyncio import gather
from notifications import NotifyHandler, NotifyRequest
from sharded_queue import Queue, Worker
from sharded_queue.drivers import RuntimeLock, RuntimeStorage


async def example():
    '''
    let's register notification for first 9 users
    '''
    queue: Queue = Queue(RuntimeStorage())
    await queue.register(NotifyHandler, *[NotifyRequest(n) for n in range(1, 9)])
    '''
    now all requests are waiting for workers on 3 notify handler tubes
    they were distributed using route handler method
    first tube contains notify request for users 1, 4, 7
    second tube contains requests for 2, 5, 8 and other goes to third tube
    '''
    futures = [
        Worker(RuntimeLock(), queue).loop(3),
        Worker(RuntimeLock(), queue).loop(3),
        Worker(RuntimeLock(), queue).loop(3),
    ]
    '''
    we've just run three coroutines that will process messages
    workers will bind to each thread and process all messages
    '''
    await gather(*futures)
    '''
    now all emails were send in 3 threads
    '''

Drivers

There are several implementations of components:

  • RedisLock persist locks in redis using setnx api
  • RedisStorage persist msgs using lists and lrange/lpop/rpush api
  • RuntimeLock persist locks in memory (process in-memory distribution)
  • RuntimeStorage persist msgs in dict (process in-memory distribution)

Handler lifecycle

As you can notice, routing is made using static method, but perform is an instance method. When a worker start processing requests it can bootstrap and tear down the handler using start and stop methods

class ParseEventRequest(NamedTuple):
    '''
    Event identifier should be enough to get it contents from storage
    '''
    event: int

class ParseEventHandler(Handler):
    @classmethod
    async def create(cls) -> Self:
        '''
        define your own handler and dependency factory
        '''
        return cls()

    @classmethod
    async def route(cls, *requests: ParseEventRequest) -> list[Route]:
        '''
        override default single thread tube
        '''
        return [Route(0, 0) for request in requests]

    async def start(self):
        '''
        run any code on worker is bind to the queue
        '''

    async def perform(self, *requests: ParseEventRequest):
        '''
        the handler
        '''

    async def handle(self, *requests: ParseEventRequest) -> None:
        '''
        process requests batch
        '''

    async def stop(self):
        '''
        run any code when queue is empty and worker stops processing thread
        '''

Deduplication

There is an optional if_not_exists flag. If it is set, request will be registered only if not persent in a queue

await queue.register(SycBucket, Bucket(7756527), if_not_exists=True)
await queue.register(SycBucket, Bucket(7756527), if_not_exists=True)

Deferred handlers

You can use built-in deferred request handler to defer call

await queue.register(Housekeep, Room(402), defer=5)  # numeric means seconds
await queue.register(Housekeep, Room(324), defer=timedelta(minutes=15))

Recurrent handlers

You can use built-in recurrent request handler for regular request registration

await queue.register(BalanceCheck, Company('basis'), recurrent=timedelta(hours=1))

Performance

Performance dependends on many factors, we can only measure clean library overhead with in-memory storages. You can run performance on your hardware with pytest -s, with this option performance test will print result for different cases. Perfomance test on intel i5-4670K, Ubuntu 23.04 LTS using Python 3.11.4 gives us about 200_000 rps for batch request registration with sharding and about 600_000 requests for request handling in concurrent mode.

Advanced queue configuration

You can configure sharded queue using env

  • QUEUE_LOCK_PREFIX = 'lock_'
    Lock key prefix
  • QUEUE_LOCK_TIMEOUT = 24 * 60 * 60
    Lock key ttl
  • QUEUE_STORAGE_PREFIX = 'tube_'
    Default queue prefix
  • QUEUE_WORKER_ACQUIRE_DELAY = 1
    Worker acquire delay in seconds on empty queues
  • QUEUE_WORKER_BATCH_SIZE = 128
    Worker batch processing size
  • QUEUE_WORKER_DEFERRED_RETRY_DELAY = 1
    Deferred tasks retry delay
  • QUEUE_WORKER_EMPTY_LIMIT = 16
    Worker empty queue attempt limit berfore queue rebind
  • QUEUE_WORKER_EMPTY_PAUSE = 0.1
    Worker pause in seconds on empty queue
  • QUEUE_WORKER_RECURRENT_CHECK_INTERVAL = 30
    Recurrent interval check in seconds
  • QUEUE_WORKER_RECURRENT_TASKS_LIMIT = 1024
    Recurrent tasks limit count

You can change runtime settings

from sharded_queue import Queue, Worker
from sharded_queue.drivers import RuntimeLock, RuntimeStorage


worker = Worker(RuntimeLock(), Queue(RuntimeStorage()))
worker.lock.settings.timeout = 5 * 60
worker.settings.acquire_delay = 5
worker.settings.batch_size = 64
await worker.loop()