Tools for flow-based programming


Keywords
etl, workflow, flow-based, pipe, pipeline, queue, data, processing
License
MIT
Install
pip install pipekit==0.3.5

Documentation

Pipekit

Pipekit is a flow-based programming toolkit, with a control layer.

Quick start

Pipekit connects message processors using pipes. Pipes are just a thin layer on top of Queue objects and 0mq sockets, wrapping them under a common API. The basic idea behind this abstraction is the possibility to transparently replace a pipe implementation with another one, with no code change needed in the producers/consumers.

Pipes simply have an input and an output channel; creating and using them is pretty straightforward:

from pipekit import ThreadPipe

# Pipes need to be given a name
mypipe = ThreadPipe('my-pipe')
mypipe.send('Hello world')

print(mypipe.receive())
# Hello world

Pipes are iterables, too:

for msg in mypipe:
    dosomething(msg)

Need a 0mq-based pipe instead?

from pipekit import ZMQPipe

my0mqpipe = ZMQPipe('my-0mq-pipe', address='tcp://*:5555')

Alternatively:

from pipekit import Pipe

my0mqpipe = Pipe('my-0mq-pipe', impl='zmq', address='tcp://*:5555')
print(my0mqpipe)
# <pipekit.ZMQPipe object at 0x7fe...>