confluo
From con- ‎(“with; together”) + fluō ‎(“flow”).
cōnfluō ‎(present infinitive cōnfluere, perfect active cōnfluxī); third conjugation, no passive
    1. (intransitive) I flow or run together, meet.
    2. (intransitive, figuratively) I flock or crowd together, throng, assemble.
⇢ confluo is a minimalist scalable microservice framework for distributed systems using AMQP/RabbitMQ as a broker.
Note: this project is under heavy development! Contributions are more than appreciated!
import asyncio
from confluo.core import Service
a = Service("Service-A")
b = Service("Service-B")
@b.subscribe("/foo/bar")
async def foo_bar_event(path, headers, body):
print("Got /foo/bar event with body: '{0}'".format(body))
response = await b.call("Service-A", "/bar", {"data": "Some body data"})
print("Got response: '{0}'".format(response))
@a.route("/bar")
async def bar_command(path, headers, body):
print("Got /bar command with body: '{0}'".format(body))
return {"value": 1}
if __name__ == "__main__":
loop = asyncio.get_event_loop()
# connect services
loop.run_until_complete(asyncio.wait([a.connect(), b.connect()]))
# Publish event
loop.run_until_complete(a.publish("/foo/bar", "wtf"))
try:
loop.run_forever()
except KeyboardInterrupt:
loop.run_until_complete(asyncio.wait([a.shutdown(), b.shutdown()]))
loop.stop()
loop.close()
Features
These are the key features of confluo:
- Pythonic and minimalist API
- Based on python's asyncio event loop
- Based on AMQP protocol
- HTTP like protocol on top of AMQP
- Send and Receive Events
- Call Commands and Receive Responses
Usage
This section walks through the essentials in order to run a confluo service.
Run RabbitMQ as a broker.
The simplest way to use RabbitMQ is probably to just pull the docker image from docker hub and run it:
sudo docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3-management
Import and create a service
Let's create our first service named My-First-Service and bind it to the main threads asyncio event loop:
import asyncio
from confluo import Service
loop = asyncio.get_event_loop()
service = Service("My-First-Service", loop=loop)
Publish event
Every confluo service is able to publish events. All other services which subscribed to this event will receive it.
Outside of a coroutine:
loop.run_until_complete(service.publish("MyFancyEvent", {"name": "Peter", "nickname": "Spider Man"}))
Inside a coroutine using the same event loop:
await service.publish("MyFancyEvent", {"name": "Peter", "nickname": "Spider Man"})
The first argument of the Service.publish
method is the name of the event. You can also use a path like /my/cool/path//event
or whatever you like. The second argument is the data sent as event body.
Subscribe event
Use the Service.subscribe
decorator to subscribe for an event and register a handler:
@service.subscribe("MyFancyEvent")
async def handle_my_fancy_event(path, headers, body):
print("Got MyFancyEvent with body: {0}".format(body))
Call command
You can also call a command from another specific service and retrieve it's response:
Outside of a coroutine:
response = loop.run_until_complete(
service.call("My-Other-Service", "MyFancyCommand", {"name": "Bruce", "nickname": "Batman"}))
Iniside a coroutine using the same event loop:
response = await service.call("My-Other-Service", "MyFancyCommand", {"name": "Bruce", "nickname": "Batman"})
Handle command
Use the Service.route
decorator to register a handler for a specific command:
@service.route("MyFancyCommand")
async def handle_my_fancy_command(path, headers, body):
print("Got MyFancyCommand with body: {0}".format(body))
return {"message": "That's my awesome response"}