Bulk queue management in MongoDB


Keywords
queue, management, mongo, mongodb, bulk
License
GPL-3.0
Install
pip install pyqm==0.0.9

Documentation

pyqm: Python Queue Management

This package is designed for lightweight queue management on top of MongoDB. It is inspired by the mongoqueue package (https://github.com/kapilt/mongoqueue) by Kapil Thangavelu, although we wanted to do some more advanced 'batching' concepts (for managing a type of Bulk API) and support for bulk jobs (i.e., being able to 'checkout' more than one record at once). So, we started from scratch.

Getting started

Queue is a class in pyqm that creates a connection to new/existing queues. New queues are created by specifying a unique, new queueName; existing queues are accessed by using an existing queueName.

Example: Using a single queue

from pyqm import Queue
from pymongo import MongoClient

client = MongoClient()
db = client['queues']

# this creates a new record in the queueList collection
newqueue = Queue(db = db, queueName = 'testqueue')

# add three records to our queue
records = [
  {'field': 'value1'},
  {'field': 'value2'},
  {'field': 'value3'}
]

x = newqueue.add(records)
print(x) # 3
print(db['newqueue'].count()) # 3

# get a job of 2 records
job1 = newqueue.next(job = 'job1', limit = 2)
print(len(job1)) # 2

# get a job of all remaining records
job2 = newqueue.next(job = 'job2', limit = 10) #we set the limit to 10, but there is only 1 unlocked record left
print(len(job2)) # 1

# mark job1 as complete; removes records from queue
y = newqueue.complete(job1)
print(y) # 2

# release job2 record due to timeout
z = newqueue.timeout(t=5) # pass # of seconds for timeout
print(z) # 1

Example: Using multiple queues

Say you need Python to regularly perform three actions around a set of data:

  1. Export data from an operational data store (database, API, flatfile, etc)
  2. Apply some type of processing to the data
  3. Import data back into an operational data store - either the same or different from #1

You could put all three in one script set to run via cron. But wait! There are risks if that script crashes:

  • Computational time is wasted during the next run on exporting records the script has already retrieved once
  • If there are API limits dictating number of calls, you've just wasted an export call due to an error in a different part of the script
  • If #2 is processing, say 500 records, and it makes it through 450 before crashing, that computational time is waster

Instead, you could use multiple scripts, and pass off records between them using multiple queues.

script1_GetDataFromSource.py:

# setup a new queue
from pyqm import Queue
from pymongo import MongoClient

client = MongoClient()
db = client['queues']

exportQueue = Queue(db = db, queueName = 'exportQueue')

# get data from an api
import requests

data = requests.get('http://mydatafake.com/api')

print(len(data)) # let's say this is 1,000

# add records to queue
exportQueue.add(data)

script2_ProcessData.py

from pyqm import Queue
from pymongo import MongoClient

client = MongoClient()
db = client['queues']

# access last queue we started
exportQueue = Queue(db = db, queueName = 'exportQueue')

# open a new queue for records who have finished processing
processedQueue = Queue(db = db, queueName = 'processedQueue')

# our last script retrieved 1,000 records, but we only want to work with 500 at a time
# that way, if the script crashes, we only wasted computation time on a part of the data

job = exportQueue.next(job = 'script2_ProcessData', limit = 500)

dataNew = applySomeFunction(job)

# Now that these records are processed, add to the processedQueue
x = processedQueue.add(dataNew)
print(x) # 500

# then mark as complete in exportQueue
y = exportQueue.complete(data)
print(y) # 500

script3_UpdateSourceData.py

from pyqm import Queue, clean
from pymongo import MongoClient

client = MongoClient()
db = client['queues']

# access the processedQueue
processedQueue = Queue(db = db, queueName = 'processedQueue')

# Let's assume our last script runs minutely, and this script runs hourly; in that case, all 1,000 should be finished
job = processedQueue(job = 'script3_UpdateSourceData', limit = 1000)

# send the data back to the source
import requests

dataOut = clean(job) # the clean fcn removes pyqm system fields
res = requests.post('http://mydatafake.com/api', json = dataOut)

if res.status_code==200:

  x = processedQueue.complete(job)