Produce and consume events with any queue.


Keywords
event
License
MIT
Install
pip install eventcore==0.4.0rc5

Documentation

eventcore

Build Status

Produce and consume events with any queue.

Installation

This project is hosted on PyPI and can be installed with pip:

$ pip install eventcore

The default installation includes a DummyProducer and DummyConsumer that can be used for testing or development environments. These dummy objects store everything in local memory, meaning the events are lost whenever the main process ends or restarts.

For production systems you should use either Kafka or SQS included in this library.

Usage

Configure a Producer

Before you can dispatch events you need to register a default producer, this could be changed depending on the type of queue you want to use. The producer implements a persistence mechanism that is used whenever dispatch() gets called.

from eventcore.dummy import DummyProducer

producer = DummyProducer()
producer.register()

Configure a Consumer

In order to retrieve events you'll need to create and run a consumer instance. The consumer will stay active in a separate thread to keep listening for new events by calling thread() on it.

from eventcore.dummy import DummyConsumer

consumer = DummyConsumer()
consumer.thread()

Events

Create your own events using Event as base class. These custom events must set the properties topic and name as is shown in the example.

from eventcore import Event

class UserCreated(Event):
    topic = 'User'
    name = 'UserCreated'

Dispatching

Events can be instantiated and dispatched on the fly. Doing so requires you to pass the subject and data arguments. Call dispatch() on the event to have your default producer store it.

event = UserCreated(subject='cfce9306-3b33-4cb2-a51f-9dc4879cc7a2'
                    data={'name': 'John Doe'})
event.dispatch()

Alternatively, you can dispatch events whenever a method is called by using the dispatch_event decorator. This uses the return value of the decorated method to build the event instance. When the return value is a non-dictionary object, it uses it's __dict__ property to populate event data.

from eventcore import dispatch_event

@dispatch_event(UserCreated, subject='id')
def create_user():
    user = User(name='John Doe')
    return user

Subscribers

Whenever your consumer retrieves new events it will execute all subscriber methods registered for it, passing the event instance as the only argument.

from eventcore import event_subscriber

@event_subscriber(UserCreated)
def send_activation(event):
    pass

Utility

Context managers

Add a context manager that wraps around the execution of each event_subscriber. This could be used to manage database transactions.

import transaction
from eventcore.dummy import DummyConsumer

consumer = DummyConsumer()
consumer.set_context_manager(transaction.manager)
consumer.thread()

Alternative producers

You can bypass the default producer by passing a alternative one to the dispatch() method. This could allow you to use e.g. Kafa as a default, while still sending some specific events to SQS.

from eventcore.dummy import DummyProducer

alternative_producer = DummyProducer()
event.dispatch(alternative_producer)

Fallback method

You can add a fallback method to your producer. This method is executed whenever an uncaught exception occurs, passing in the event instance that it failed on as the only argument.

from eventcore.dummy import DummyProducer

def report_error(event):
    pass

producer = DummyProducer()
producer.set_fallback(report_error)
producer.register()

Feature - Kafka

Installation is only required if you want to use the eventcore_kafka package. This install includes the library confluent-kafka==0.11.*

$ pip install eventcore-kafka

The below examples show the different configuration needed for the producer and consumer when using Kafka.

from eventcore_kafka import KafkaProducer

producer = KafkaProducer(servers='localhost:9092')
producer.register()
from eventcore_kafka import KafkaConsumer

consumer = KafkaConsumer(servers='localhost:9092',
                         group_id='UserService',
                         topics='user')
consumer.thread()

Feature - SQS

Installation is only required if you want to use the eventcore.sqs package. This install includes the library boto3==1.9.*

$ pip install eventcore[sqs]

The below examples show the different configuration needed for your producer and consumer when using SQS.

from eventcore.sqs import SQSProducer

producer = SQSProducer(region_name='eu-west-1',
                       access_key_id='ACCESS_KEY_ID',
                       secret_access_key='SECRET_ACCESS_KEY',
                       queue_url='https://.../example.fifo')
producer.register()
from eventcore.sqs import SQSConsumer

consumer = SQSConsumer(region_name='eu-west-1',
                       access_key_id='ACCESS_KEY_ID',
                       secret_access_key='SECRET_ACCESS_KEY',
                       queue_url='https://.../example.fifo')
consumer.thread()