amqp-ko

Object oriented AMQP layer for microservices communication.


Keywords
amqp-ko, amqp, microservice, rabbitmq, queue, tools, microservices-communication
License
MIT
Install
pip install amqp-ko==1.0.4

Documentation

AMQP Kø

Object oriented AMQP layer for microservices communication.

Usage

The recommended way to use AMQP Kø is to create your own queue object. The simplest way to do this is using createQueue function.

Create queue

from amqp_ko import create_queue, AsyncConnection, Message, MessageGate
from dataclasses import dataclass


@dataclass(frozen=True)
class TopicFollow(Message):
    user_id: int
    topic_name: str


def unmarshal_topic_follow(data: dict) -> TopicFollow:
    return TopicFollow(
        user_id=data["user_id"],
        topic_name=data["topic_name"],
    )

message_gates = [
    MessageGate("topic_follow", TopicFollow, unmarshal_topic_follow),
]

async with AsyncConnection("localhost", 5672, "rabbitmq", "rabbitmq") as connection:
    queue = await create_queue(connection, "exchange-name", message_gates)

Consume messages

from amqp_ko import Consumer, Job


class ConnectUserWithTopic(Consumer):
    async def consume(self, job: Job[TopicFollow]):
        # Put here some code to connect user with a topic
        # using "job.message.user_id" and "job.message.topic_name"
        await job.ack()
        
await queue.consume(
    "queue-name",
    {TopicFollow: ConnectUserWithTopic()},
)

Produce message

message = TopicFollow(120, "entertainment")
await queue.produce(message)

Installation

pip install amqp-ko

Author: Michał Budziak