confluo

Minimalist scalable microservice framework for distributed systems using AMQP/RabbitMQ


Keywords
confluo, service, amqp, rabbitmq, scalable, destributed, microservice, framework, library, package, module
License
MIT
Install
pip install confluo==0.0.3

Documentation

confluo

From con- ‎(“with; together”) + fluō ‎(“flow”).
cōnfluō ‎(present infinitive cōnfluere, perfect active cōnfluxī); third conjugation, no passive
    1. (intransitive) I flow or run together, meet.
    2. (intransitive, figuratively) I flock or crowd together, throng, assemble.

⇢ confluo is a minimalist scalable microservice framework for distributed systems using AMQP/RabbitMQ as a broker.

Note: this project is under heavy development! Contributions are more than appreciated!

import asyncio
from confluo.core import Service

a = Service("Service-A")
b = Service("Service-B")


@b.subscribe("/foo/bar")
async def foo_bar_event(path, headers, body):
    print("Got /foo/bar event with body: '{0}'".format(body))
    response = await b.call("Service-A", "/bar", {"data": "Some body data"})
    print("Got response: '{0}'".format(response))


@a.route("/bar")
async def bar_command(path, headers, body):
    print("Got /bar command with body: '{0}'".format(body))
    return {"value": 1}


if __name__ == "__main__":
    loop = asyncio.get_event_loop()

    # connect services
    loop.run_until_complete(asyncio.wait([a.connect(), b.connect()]))

    # Publish event
    loop.run_until_complete(a.publish("/foo/bar", "wtf"))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        loop.run_until_complete(asyncio.wait([a.shutdown(), b.shutdown()]))
        loop.stop()
        loop.close()

Features

These are the key features of confluo:

  • Pythonic and minimalist API
  • Based on python's asyncio event loop
  • Based on AMQP protocol
  • HTTP like protocol on top of AMQP
  • Send and Receive Events
  • Call Commands and Receive Responses

Usage

This section walks through the essentials in order to run a confluo service.

Run RabbitMQ as a broker.

The simplest way to use RabbitMQ is probably to just pull the docker image from docker hub and run it:

sudo docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3-management

Import and create a service

Let's create our first service named My-First-Service and bind it to the main threads asyncio event loop:

import asyncio
from confluo import Service

loop = asyncio.get_event_loop()
service = Service("My-First-Service", loop=loop)

Publish event

Every confluo service is able to publish events. All other services which subscribed to this event will receive it.

Outside of a coroutine:

loop.run_until_complete(service.publish("MyFancyEvent", {"name": "Peter", "nickname": "Spider Man"}))

Inside a coroutine using the same event loop:

await service.publish("MyFancyEvent", {"name": "Peter", "nickname": "Spider Man"})

The first argument of the Service.publish method is the name of the event. You can also use a path like /my/cool/path//event or whatever you like. The second argument is the data sent as event body.

Subscribe event

Use the Service.subscribe decorator to subscribe for an event and register a handler:

@service.subscribe("MyFancyEvent")
async def handle_my_fancy_event(path, headers, body):
    print("Got MyFancyEvent with body: {0}".format(body))

Call command

You can also call a command from another specific service and retrieve it's response:

Outside of a coroutine:

response = loop.run_until_complete(
    service.call("My-Other-Service", "MyFancyCommand", {"name": "Bruce", "nickname": "Batman"}))

Iniside a coroutine using the same event loop:

response = await service.call("My-Other-Service", "MyFancyCommand", {"name": "Bruce", "nickname": "Batman"})

Handle command

Use the Service.route decorator to register a handler for a specific command:

@service.route("MyFancyCommand")
async def handle_my_fancy_command(path, headers, body):
    print("Got MyFancyCommand with body: {0}".format(body))
    return {"message": "That's my awesome response"}