Consumer utility for AMQP


Keywords
python-library, rabbitmq, rabbitmq-consumer
License
MIT
Install
pip install aioamqp-consumer-best==2.2.1

Documentation

aioamqp-consumer-best

PyPI version PyPI - Python Version Build Status codecov

Usage

import asyncio
from typing import List

from aioamqp_consumer_best import (
    ConnectionParams,
    Consumer,
    Exchange,
    Message,
    ProcessBulk,
    Queue,
    QueueBinding,
    ToBulks,
    load_json,
)


async def callback(messages: List[Message]) -> None:
    print(messages)


consumer = Consumer(
    middleware=(
        load_json
        | ToBulks(max_bulk_size=10, bulk_timeout=3.0)
        | ProcessBulk(callback)
    ),
    prefetch_count=10,
    queue=Queue(
        name='test-queue',
        bindings=[
            QueueBinding(
                exchange=Exchange('test-exchange'),
                routing_key='test-routing-key',
            ),
        ],
    ),
    connection_params=[  # Round robin
        ConnectionParams(),
        ConnectionParams.from_string('amqp://user@rmq-host:5672/'),
    ],
)

asyncio.run(consumer.start())