classic-messaging-kombu

Provides implementation for publisher and consumer with Kombu library


License
MIT
Install
pip install classic-messaging-kombu==0.0.1

Documentation

Classic Messaging Kombu

This package provides implementation of interfaces in classic-messaging and base for consumers and messages handling.

Usage with publishing:

from classic.components import component
from classic.messaging import Message, Publisher
from classic.messaging_kombu import BrokerScheme, KombuPublisher
from kombu import Exchange, Queue, Connection


@component
class SomeService:
    publisher: Publisher

    def do_some_work(self):
        message = Message('some', 'Some very useful info')
        self.publisher.publish(message)


broker_scheme = BrokerScheme(
    Queue('queue1', Exchange('some')),
)
        
connection = Connection('amqp://localhost:5672/')
publisher = KombuPublisher(
    connection=connection,
    scheme=broker_scheme
)

service = SomeService(publisher=publisher)

service.do_some_work()

Message have 2 arguments - target and body. Target is a str with destination. In simple case it is an exchange name, in complex case - entry in mapping.

Body can be any serializable object.

Customization of target:

from classic.messaging import Message
from classic.messaging_kombu import BrokerScheme, KombuPublisher
from kombu import Exchange, Queue, Connection


targets = {
    'FIRST': {
        'exchange': 'exchange1',
    },
    'SECOND': {
        'exchange': 'exchange2',
        'timeout': 10,
    }
}

broker_scheme = BrokerScheme(
    Queue('queue1', Exchange('exchange1')),
    Queue('queue2', Exchange('exchange2')),
)
        
connection = Connection('amqp://localhost:5672/')
publisher = KombuPublisher(
    connection=connection,
    scheme=broker_scheme,
    messages_params=targets,
)

publisher = KombuPublisher(
    connection=connection,
    scheme=broker_scheme
)

publisher.publish(
    Message('FIRST', 'Hello'),  # Will be published to exchange1 and queue1
    Message('SECOND', 'By'),  # Will be published to exchange2, queue2 and timeout=10
)

Usage with consuming:

from classic.messaging_kombu import BrokerScheme, KombuConsumer, MessageHandler
from kombu import Exchange, Queue, Connection


class SomeSerice:
    def handle_message(self, message):
        print(message)

        
class CustomHandler(MessageHandler):
    
    def handle(self, message, body):
        print(body)
        
        message.ack()


broker_scheme = BrokerScheme(
    Queue('queue1', Exchange('exchange1')),
    Queue('queue2', Exchange('exchange2')),
)

connection = Connection('amqp://localhost:5672/')

consumer = KombuConsumer(
    connection=connection,
    scheme=broker_scheme,
)

service = SomeSerice()
handler = CustomHandler()

consumer.register_function(service.handle_message, 'queue1')
consumer.register_handler(handler, 'queue2')