Custom consumer for celery integration

RabbitMQ, Celery, amqp, generic
pip install celery-skinos==1.0.4


Celery Skinos

Custom consumer for celery integration.


from skinos.custom_consumer import CustomConsumer

Define a new exchange

defined a new exchange with a name and a binding key (always a topic). The exchange name must be unique.

# add_exchange(str, str) -> Exchange
CustomConsumer.add_exchange('test', "test.*.*")

Define a new task

Define a new message handler

decoration take 3 arguments:

  • exchange name (must be defined)
  • queue name (must be defined)
  • queue binding key

Function but have this prototype: (str, Message) -> Any

  • body is the payload
  • msg is the message object (kombu.transport.myamqp.Message)
# consumer(str, str, str) -> Callable[[str, Message], Any]
@CustomConsumer.consumer('test', 'test.test', 'test.test.*')
def coucou(body, msg):
    print('payload content : {}'.format(body))
    print('message object content : {}'.format(msg))

Build consumers for Celery integration

Build consumers itself. all previous methods are just a pre-configuration for this build. It take one argument, which is the Celery app.

# build(Celery) -> None

Add Sentry handler

You must init Sentry normally for a Celery project. Then Skinos is able to catch exception and send it sentry.

set sentry to True and set raise to False (i.e: if error occur, error is not re-raise, but ignored) if you don't use it, default values are False and False

# with_sentry(bool, bool) -> Tuple(bool, bool)
CustomConsumer.with_sentry(False, False)

Run celery

Run celery normally