Rabbit MQ Wrapper
Create a class to rapid implementation of rabbitmq on your project
If you use or like the project, click Star
and Watch
to generate metrics and i evaluate project continuity.
Obs: Notes this implementatio use pika as a basis.
Install:
pip install rabbitmq-wrapper
Usage:
-
Import RabbitWrapper Class:
from rabbitmq_wrapper import RabbitWrapper
-
Create a configuration dict:
{ 'ENABLED': True, 'USE_EXCHANGE': '', # QUEUES 'DEFAULT_QUEUE': 'my_queue', 'DEADLETTER_QUEUE': 'my_queue_deadletter', # default is setted to f"DEFAULT_QUEUE_deadletter" 'ALLOWED_QUEUES': ['my_queue', 'my_queue_deadletter'], # default is [DEFAULT_QUEUE, DEADLETTER_QUEUE] # CONNECTION 'CONNECTION': { # see all parameters on https://pika.readthedocs.io/en/stable/modules/parameters.html#connectionparameters, https://pika.readthedocs.io/en/stable/modules/credentials.html#module-pika.credentials 'HOST': '', 'PORT': '', 'USERNAME': '', 'PASSWORD': '' }, # if your prefer, you can use url connection 'CONNECTION': { 'URL': 'amqp://', } # Consumers 'CONSUMERS': [ # See all parameters on https://pika.readthedocs.io/en/stable/modules/channel.html#pika.channel.Channel.basic_consume # Obs the param "on_message_callback" is replaced by callback and receive string with callback function path {'queue': 'email_service', 'callback': 'app.callbacks.MyReceiver', 'auto_ack': False} ] }
-
Instance with settings
RabbitWrapper(config={ ... }) # or if your use django, you can put config on settings.py, and pass a name to instance RabbitWrapper(name=MyCustomConfig) # in settings.py: RABBIT_MyCustomConfig = { ... }
-
Use Message Receive Wrapper Class:
from rabbitmq_wrapper import CallbackWrapper class MyReceiver(CallbackWrapper): NAME = '' # Name used to instance RabbitWrapper or None CONFIG = None # Config used to instance RabbitWrapper or None def consume_data(self, raw_msg): # Your logic here. self.msg_ack()
Others
-
See custom methods on CallbackWrapper:
- Message ACK
self.msg_ack()
- Parse data to json
# Obs: Name or Config in class params needed to use param deadletter_on_error self.msg_json_parser(data={}, deadletter_on_error=False, ack_on_error=False)
- Resend msg to queue
# Obs: Name or Config in class params needed to use this self.msg_resend(self, data, queue=None, json=True) # params: data (data to resend), queue (queue to resend), json (parse data to json).
- Move msg to deadletter
# Obs: Name or Config in class params needed to use this self.msg_deadletter(self, data, queue=None, json=True) # params: (data to move), queue (queue to move), json (parse data to json)