workload

Task distribution


Keywords
task, distribution, distributed, python, task-scheduler, workload
License
MIT
Install
pip install workload==0.1

Documentation

Workload

Simple, clean, light python task distribution. No magic, no configuration, no broker abstraction

The library uses Redis as broker

Usage:

Create distributed job

import redis
from workload import distributed

REDIS_POOL = redis.ConnectionPool()

@distributed('worker', redis_pool=REDIS_POOL)
def worker(job, country):
    # do stuff
    job.result('result') # add unique result
    job.fanout(['task3', 'task4']) # schedule another tasks if needed
    job.error('oops') # raise and catch exception

Add workload to do

from jobs import worker

worker.distribute(['task1', 'task2'])
worker.wait_results()

Start worker

from jobs import worker

worker.start_processing(concurrency=10)

Cycle

workload.cycle is a loop which starts deferred tasks at specified time or interval

Usage:

from workload.cycle import cycle
from jobs import do_work

cycle([
    # Run job every hour
    (cycle.interal(hours=1), do_work),
    # Run job every day at midnight
    (cycle.at(hour=0), do_work),
])

Admin

In order to monitor deferred and distributed jobs an autogenerated admin can be used.

Frontend is written using jQuery and Bootstrap both served from corresponding official CDNs.

Since we want to keep default install as clean as possible, to run the admin the following dependencies need to be installed manually: falcon jinja2

The result of the create_admin_app will be WSGI app which can be served by any WSGI application server (uWSGI, gunicorn, werkzeug to name a few)

Admin table fields description:

Deferred section

Field Description
Name Fully qualified python function name
Description Function docstring
Workload Number of queued deferred jobs

Distributed section

Field Description
Name Fully qualified python function name
Description Function docstring
Workload Amount of workload to process
Duration Current and last (smaller one) duration of job
Workers Amount of active workers processing the distributed job

Usage:

from workload.admin import create_admin_app, START_DEFER

app = create_admin_app(
    '/admin',  # url prefix
    [
    # START_DEFER is a shortcut for lambda job: job.defer()
    (deferred_worker, START_DEFER),
    # without starting function job will be displayed without Start button
    deferred_readonly_worker,
    # starting function could be used for debug as well
    (distributed_worker, lambda job: job.distribute(test_workload)),
    ],
    title='My admin',  # the name will be displayed at the top of the admin
    redis_pool=REDIS_POOL,  # redis pool to display server info, can be ommited
    credentials=('user', secret'),  # add credentials to enable basic auth
)


# to start local dev server
if __name__ == '__main__':
    from werkzeug.serving import run_simple
    run_simple('localhost', 8000, app, use_reloader=True)