queue map reduce for python
Queues for batch-jobs distribute your compute-tasks over multiple machines in parallel. This pool maps your tasks onto a queue and reduces the results.
import queue_map_reduce as qmr
pool = qmr.Pool()
results = pool.map(sum, [[1, 2], [2, 3], [4, 5], ])
A drop-in-replacement for builtins' map()
, and multiprocessing.Pool()
's map()
.
Requirements
- Programs
qsub
,qstat
, andqdel
are required to submit, monitor, and delete queue-jobs. - Your
func(task)
must be part of an importable python module. - Your
tasks
and theirresults
must be able to serialize using pickle. - Both worker-nodes and process-node can read and write from and to a common path in the file-system.
Queue flavor
Tested flavors are:
- Sun Grid Engine (SGE) 8.1.9
Features
- Respects fair-share, i.e. slots are only occupied when the compute is done.
- No spawning of additional threads. Neither on the process-node, nor on the worker-nodes.
- No need for databases or web-servers.
- Queue-jobs with error-state
'E'
can be deleted, and resubmitted until your predefined upper limit is reached. - Can bundle tasks on worker-nodes to avoid start-up-overhead with many small tasks.
Alternatives
When you do not share resources with other users, and when you have some administrative power you might want to use one of these:
-
Dask has a
job_queue
which also supports other flavors such as PBS, SLURM. -
pyABC.sge has a
sge.map()
very much like our one. - ipyparallel
Inner workings
-
map()
makes awork_dir
because the mapping and reducing takes place in the file-system. You can setwork_dir
manually to make sure both worker-nodes and process-node can reach it. -
map()
serializes yourtasks
usingpickle
into separate files inwork_dir/{ichunk:09d}.pkl
. -
map()
reads all environment-variables in its process. -
map()
creates the worker-node-script inwork_dir/worker_node_script.py
. It contains and exports the process' environment-variables into the batch-job's context. It reads the chunk of tasks inwork_dir/{ichunk:09d}.pkl
, imports and runs yourfunc(task)
, and finally writes the result back towork_dir/{ichunk:09d}.pkl.out
. -
map()
submits queue-jobs. Thestdout
andstderr
of the tasks are written towork_dir/{ichunk:09d}.pkl.o
andwork_dir/{ichunk:09d}.pkl.e
respectively. By default,shutil.which("python")
is used to process the worker-node-script. - When all queue-jobs are submitted,
map()
monitors their progress. In case a queue-job runs into an error-state ('E'
by default) the job wFill be deleted and resubmitted until a maximum number of resubmissions is reached. - When no more queue-jobs are running or pending,
map()
will reduce the results fromwork_dir/{ichunk:09d}.pkl.out
. - In case of non zero
stderr
in any task, a missing result, or on the user's request, thework_dir
will be kept for inspection. Otherwise its removed.
Wording
-
task
is a valid input tofunc
. Thetasks
are the actual payload to be processed. -
iterable
is an iterable (list) oftasks
. It is the naming adopted frommultiprocessing.Pool.map
. -
itask
is the index of atask
initerable
. -
chunk
is a chunk oftasks
which is processed on a worker-node in serial. -
ichunk
is the index of a chunk. It is used to create the chunks's filenames such aswork_dir/{ichunk:09d}.pkl
. - queue-job is what we submitt into the queue. Each queue-job processes the tasks in a single chunk in series.
-
JB_job_number
is assigned to a queue-job by the queue-system for its own book-keeping. -
JB_name
is assigned to a queue-job by ourmap()
. It is composed of ourmap()
's session-id, andichunk
. E.g."q"%Y-%m-%dT%H:%M:%S"#{ichunk:09d}"
Environment Variables
All the user's environment-variables in the process where map()
is called will be exported in the queue-job's context.
The worker-node-script sets the environment-variables. We do not use qsub
's argument -V
because on some clusters this will not set all variables. Apparently some administrators fear security issues when using qsub -V
to set LD_LIBRARY_PATH
.
Testing
py.test -s .
dummy queue
To test our map()
we provide a dummy qsub
, qstat
, and qdel
.
These are individual python
-scripts which all act on a common state-file in tests/resources/dummy_queue_state.json
in order to fake the sun-grid-engine's queue.
-
dummy_qsub.py
only appends queue-jobs to the list of pending jobs in the state-file. -
dummy_qdel.py
only removes queue-jobs from the state-file. -
dummy_qstat.py
does move the queue-jobs from the pending to the running list, and does trigger the actual processing of the jobs. Each timedummy_qstat.py
is called it performs a single action on the state-file. So it must be called multiple times to process all jobs. It can intentionally bring jobs into the error-state when this is set in the state-file.
Before running the dummy-queue, its state-file must be initialized:
from queue_map_reduce import dummy_queue
dummy_queue.init_queue_state(
path="tests/resources/dummy_queue_state.json"
)
When testing our map()
you set its arguments qsub_path
, qdel_path
, and qstat_path
to point to the dummy-queue.
See tests/test_full_chain_with_dummy_qsub.py
.
Because of the global state-file, only one instance of dummy_queue must run at a time.