mongoprocessing

A simple library for developing workflows using MongoDB Change Streams.


Keywords
mongodb, changestream, changestreams, pipeline, workflow
License
Apache-2.0
Install
pip install mongoprocessing==0.5.5

Documentation

Mongo Processing

Readme out of date

A simple library for developing workflows using MongoDB Change Streams.

Usage

Set up a MongoDB repository
uri = 'mongodb://user:pw@ip:port,ip:port,ip:port/admin?replicaSet=rs1'
repo = MongoRepository(uri, 'db', 'collection')
Listen to inserted documents

The following code sets up a watch that triggers whenever a new document is inserted in the collection. The process writes a new property 'a' to the document.

def acknowledge_one(doc):
    return True


def process_one(doc):
    results = {'a': randint(0, 9)}
    return True, results


created_watch = WatchBuilder(repo).listen_to('insert')
created_watch.start_worker('one', acknowledge_one, process_one)
Listen to previous processes

This code demonstrates how to execute a process after a previous process.

def acknowledge_two(doc):
    return True


def process_two(doc):
    a = doc['one']['a']
    results = {'b': 2 * a}

    if a < 4:
        results['c'] = randint(0, 9)

    return True, results


one_requirement = ProcessRequirement('one')
one_watch = WatchBuilder(repo).listen_to('update').add_process_requirement(one_requirement)
one_watch.start_worker('two', acknowledge_two, process_two)
Additional conditions

In this example, the event will only be received if process 'two' has a result with key 'c'. Additionally, process 'three' should only execute if 'c' is greater than 2.

def acknowledge_three(doc):
    return doc['two']['c'] > 2


def process_three(doc):
    a = doc['one']['a']
    b = doc['two']['b']
    c = doc['two']['c']
    
    results = {'sum': a + b + c}

    return True, results


two_requirement = ProcessRequirement('two', True, 'c')
two_watch = WatchBuilder(repo).listen_to('update').add_process_requirement(two_requirement)
two_watch.start_worker('three', acknowledge_three, process_three)

MongoProcessing automatically records the start and end time of processes. In the example above, the trigger_if_rerun argument of the ProcessRequirement is set to true. That means that if process 'two' runs again, process 'three' will also re-run.