
CarbonTube is a ZeroMQ-based distributed pipeline framework.

pip install carbontube==0.1.2



This is an old experiment I made with ZeroMQ + Gevent to build a DSL to easily define worker pipelines that are easily scalable across multiple boxes. It is focused on optimizing units of work that share common, intense I/O activities.

First, a tale

Tina is the CTO of a video app startup. She wants to optimize the utilization of cloud boxes and extend the longevity of her company's seed investment runway.

The choice is made: Every part of the video processing code will be strategically executed in different boxes, cluster of GPU-optimized boxes doing the video processing, small clusters of I/O optimized boxes will parse files and store in long-term persistence.

The requirements

  • Each step of the pipeline is encumbered by one and one job only
  • A central pipeline must distribute jobs equally across workers
  • Individual steps are not responsible about persisting jobs in case of failure
  • In case of failure the pipeline should re-schedule the job
  • The pipeline must have a pluggable storage mechanism to back the fail-recovery

The Tools

  • ZeroMQ pull workers execute each step of the pipeline, and publish events in regards to its state, real-time logs, availability and success/fail score.
  • ZeroMQ push server that is aware of the capabilities of its subject steps.
  • A web interface that listens for events published by steps and pipeline, showing real-time stats of pipeline health and availability.

Enter Carbontube...

Defining pipeline phases

from carbontube import Phase

class EncodeVideo(Phase):
    job_type = 'encode-video'

    def execute(self, job):
        video_path = job.get('video_path')
        encoding_options = job.get('encoding_options')

        if not process_video(video_path, encoding_options):
            raise IOError('could not process video {video_title} with options {encoding_options}'.format(**job))

        video_uri = copy_video_to_cloud_storage(video_path)
        return {'video_uri': video_uri}

class SplitIntoSegments(Phase):
    job_type = 'split-video'

    def execute(self, job):
        # ... some I/O

Defining the execution order

class VideoEncoderPipeline(Pipeline):
    name = 'video-encoder-pipeline'

    phases = [

    def initialize(self):
        self.backend = RedisStorageBackend(, redis_uri='redis://')

Deploying the encoder step

$ export HOSTNAME=""
$ carbontube phase encode-video \
    --concurrency=4 \
    --pull-bind="tcp://${HOSTNAME}:3000" \
    --push-connect="tcp://${HOSTNAME}:4000" \

Deploying the file-split step

$ export HOSTNAME=""
$ carbontube phase split-video \
    --concurrency=8 \
    --pull-bind="tcp://${HOSTNAME}:3000" \
    --push-connect="tcp://${HOSTNAME}:4000" \

Deploying the pipeline manager server

$ export HOSTNAME=""
$ carbontube pipeline video-encoder-pipeline \
      --pull-bind="tcp://${HOSTNAME}:5050" \

Feeding with jobs

In Python

from carbontube.clients import PipelineClient

properly_formatted = {
    "name": "video-encoder-pipeline",
    "instructions": {
         "video_path": /tmp/video1.mp4",
client = PipelineClient('tcp://')
ok, payload_sent = client.enqueue_job(properly_formatted)
if ok:
    print "PUBLISHED JOB", payload_sent

From the command-line

$ carbontube enqueue \
    tcp:// \
    video-encoder-pipeline \
    "{\"video_path\": '/tmp/video1.mp4'}"