zc_events
Long term plan
The long term plan is to simplify this library as much as possible. This will be done in several stages:
-
Add http verbs to
EventClient
-
Deprecate all other methods on
EventClient
, exceptemail
related ones. -
Decouple
EventClient
from any particular backend to send events. - Move over sending events through Django Rest Framework.
-
Move over sending specific
email
type of events. -
Create a
celery
backend to stop placing things directly on rabbitmq, with sane defaults. - Major version change that removes deprecated methods and classes.
Each of the above represent potential releases.
Vision
This library should be as simple as possible, with a standard interface that is decoupled from the transport mechanism underlying any event system (RabbitMQ, Celery, Redis, HTTPS, etc.).
Example based goals
# Server side
def add(request):
data = request.data
return {
'answer': data['x'] + data['y']
}
Fire and forget
# Client side
zc_events.post_no_wait('add', data={'x': 1, 'y': 1})
Wait for a response
# Client side
response = zc_events.get('add', data={'x': 1, 'y': 1})
response.data # {'answer': 2}
response.has_errors # False
Get errors
# Client side
response = zc_events.get('add', data={'x': 1})
response.data # None
response.has_errors # True
response.errors # [{ 'type': 'KeyError', 'message': 'y' }]
Working Example
A working example can be found in the examples
folder, and can be run via docker-compose run client3
and docker-compose run client2
.
Installation
Settings changes
In the settings.py
file, add the following lines
EVENTS_EXCHANGE = 'microservice-events' + ('' if STAGING_NAME == 'production' else '-{}'.format(STAGING_NAME))
events_exchange = Exchange(EVENTS_EXCHANGE, type='fanout')
CELERY_QUEUES = (
# Add this line
Queue(SERVICE_NAME + '-events', events_exchange),
)
CELERY_ROUTES = ('zc_events.routers.TaskRouter', )
BROKER_URL
Make sure that the BROKER_URL
either in ``.env.sampleor in
settings.py` if a default is defined there is in the following format. The old style of the trailing `//` won't work for development when emitting events.
BROKER_URL=amqp://guest:guest@rabbitmq:5672/%2F
New requirements
Sending and receiving events requires the pika
and ujson
libraries to be installed via pip and added to the requirements.txt
file.
Set up service for listening to events
Create a file called microservice_events.py
and place it somewhere like the mp-slots-and-orders/slots_and_orders/tasks/
directory. For now this will be an empty file, but in the future this is where any events that this service listens to will go.
Next create a file called microservice_events_listener.py
and place it in the same directory. This file will have the following contents (replacing slots_and_orders
with the current project name)
import logging
from slots_and_orders import celery_app as app
from . import microservice_events
@app.task(name='microservice.event')
def microservice_event(event_type, *args, **kwargs):
if hasattr(microservice_events, event_type):
logger.info('MICROSERVICE_EVENT::PROCESSED: Received [{}:{}] event for object ({}:{}) and user {}'.format(
event_type, kwargs.get('task_id'), kwargs.get('resource_type'), kwargs.get('resource_id'), kwargs.get('user_id')))
getattr(microservice_events, event_type)(*args, **kwargs)
else:
logger.info('MICROSERVICE_EVENT::IGNORED: Received [{}:{}] event for object ({}:{}) and user {}'.format(
event_type, kwargs.get('task_id'), kwargs.get('resource_type'), kwargs.get('resource_id'), kwargs.get('user_id')))
Note: Feel free to use explicit imports here, just make sure you're importing the correct things. They are specified as relative here to be as generic as possible.
Go back to settings.py
and add the new event listener file to CELERY_IMPORTS
which you may need to create if it doesn't already exist
CELERY_IMPORTS = (
'slots_and_orders.tasks.microservice_events_listener',
)
How to configure a service to listen to events
In the proj/proj/__init__.py
file, import and instantiate the EventClient
from zc_events
. This will serve as a singleton for all event requests and holds a Redis connection pool and RabbitMQ connection pool open for events. This is a good time to add the Celery import in here as well and make sure all parts of the project import celery from here.
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
from zc_events import EventClient
event_client = EventClient()
__all__ = ['celery_app']
When you need to make a service-to-service request, you can now do something similar to this:
from slots_and_orders import event_client
order = event_client.get_remote_resource('Order', '23131024')
print 'Order name: {}'.format(order.name)
Util functions
You may need to save or read data from S3 as part of your event processing. In such cases, refer to zc_events.aws.py
module. It contains a few helper functions to do common routines.