Simple job queuing for very long tasks

pip install blueque==0.3.2



A simple task queue system optimized for very long running tasks.


  • Queue

    A list of tasks, of a given type, to be run. Multiple queues may be used to execute different types of tasks on different nodes.

  • Task

    A piece of work to be done, including its parameters, current status, result (if done), etc.

  • Node

    A computer running tasks from the queue.

  • Listener

    A process running on a node, listening for new tasks on a single queue. The system does not support multiple listeners listening to the same queue on the same node.

  • Process

    A process, running on a node, created by a listener, executing a task.


Client Connection

The Blueque.Client object stores the connection to the Blueque system, and constructs the other objects necessary for interacting with the system

client = Blueque.Client("redis://hostname:port/db")


The Blueque.queue.Queue object provides the interface to a named queue of tasks.

queue = client.get_queue("some.queue")


task_id = queue.enqueue(parameters)

Returns the Task ID (a string) of the newly enqueued task


The task object provides a basic, read-only view of all the attributes of a task.

task = client.get_task(task_id)

There is a read-only attribute for all of the task attributes stored in Redis (see below).


The listener object provides the interface used to listen for new tasks.

listener = client.get_listener("some.queue")


task = listener.listen(on_new_task)

Blocks listening for a new task to execute until one is ready, then returns a Task object describing that task.


A Processor object provides the interface used to update a task while it is being processed.

processor = client.get_processor(task)


parameters = processor.start(pid)

Marks a task as being started, by the specified pid, and returns the parameters of that task.


processor.complete("some result")

Marks a task as being complete, and stores the result."some error")

Marks a task as having failed, and stores the error.

Data Storage

Currently, the backend structure is Redis. Keys are prefixed with a namespace, i.e. named "bluequeue_foo", so that other systems can use the Redis DB for other things.

Task Queue

blueque_pending_tasks_[queue name]

Stored in a List, accessed as a queue: new tasks are added via LPUSH, tasks are removed for execution via RPOP. There is a List for each task queue (channel). All that is stored in the List is a task ID, which will be used to retrieve the task data.

Listener Task List

blueque_reserved_tasks_[queue name]_[listener ID]

Stored in a List, this is used to keep track of which listeners are running which tasks. Tasks should be atomically moved from the Task Queue to the Listener Task List via RPOPLPUSH, so that they don't get lost.

Task Channel

Note: this will not be implemented in the first pass.

There is a Pub/Sub Channel for each task queue (channel). This can be used by the listener client to listen for new tasks if the task queue they were interested in was empty when they last checked for a task.

Messages should not include the task ID of the newly created task, because listeners must be required to manually try to LPOP the task off the task queue, so that only one work runs each task.

Task List

blueque_tasks_[queue name]

There is a list of all the tasks in a queue, regardless of their state. This is mostly used for introspection/management purposes.

Task Data

blueque_task_[task id]

The actual data associated with a task will be stored as a hash, where the key is built from the task ID (i.e. "bluequeue_task_NNNN"). Each subsystem will be able to add its own fields to the hash.

Current fields are

  • status

    One of: scheduled, pending, reserved, started, complete or failed.

  • queue

    The queue that the task is in (mostly just for debugging purposes).

  • parameters

    String containing data to be passed to the process. Totally task specific.

  • result

    String containing the result of the process; totally task-specific. Will not be set if the task hasn't completed yet.

  • error

    String containing the error generated by the process. Set only if status is failed.

  • node

    The listener ID of the node running the task. Will not be set if the task has not been started yet.

  • pid

    The PID of the process running the task on the node. Will not be set if the task has not been started yet.

  • created

    A floating point value containing the Python timestampe (time.time()) of the time when the task was first created (enqueued).

  • updated

    A floating point value containing the Python timestamp (time.time()) of the last time any value in the task changed.

  • eta

    The timestamp when the task is scheduled to be executed. Will not be set if the task was not scheduled with an ETA.


blueque_listeners_[queue name]

In order for the system to be easily introspected, the currently active listeners will be stored in a Redis Set. Listeners are stored by their LISTENER ID, which must be [hostname]_[pid].

Note that this means that all hosts in the system must have unique names.



There is a Sorted Set containing the names of all the queues, where the score of the set is the number of nodes listening to that set.

When a node comes online, it increments the score by 1; when a node goes offline (cleanly) it increments the score by -1. Every time a task is enqueued, the score should be incremented by 0, so that a queue with tasks, but no listeners, still shows up in the set.

Task Workflow

Tasks are executed via this workflow. Note that any HMSET call which modifies a blueque_task_[TASK ID] should be assumed to also set the updated field of the task.


Tasks should be submitted by creating a UUID, [TID], JSON encoding the parameters, [PARAMS], and then executing:

HMSET blueque_task_[TASK ID] status pending queue [QUEUE] parameters [PARAMS]
ZINCRBY blueque_queues 0 [QUEUE]
LPUSH blueque_pending_tasks_[QUEUE] [TASK ID]

Node Task Pop

Nodes should pop a task off the queue and then set the status of the task to started, and set the node field of the task.

If no task is popped off the queue, the Node should wait for a new task notification. Ideally, this will be via Pub/Sub, but, at first, we can do it by polling.

Tasks are popped using the following commands.

RPOPLPUSH blueque_pending_tasks_[QUEUE] [NODE TASKS]
HMSET blueque_task_[TASK ID] status reserved node [NODE]

Note that these two commands cannot be executed atomically because the second depends on the first, and, even with Lua scripting, that cannot be done atomically and safely. Therefore, there is a chance that a task is popped off the pending queue, but its record is not updated. This can be detected if a task is in a node's queue, but has a status of pending.

Task Queue Channel Message

Note: We will not implement this in the first pass. Listeners will just poll once every few seconds.

If a listener receives a message via a Pub/Sub channel that a queue has a task in it, it should try to atomically pop a task off that channel (see above). If it does get a task, it should unsubscribe from the channel; if it does not (i.e. another listener got the task) it should remain subscribed.

Task Started

When a process starts executing a task on a node, it should update the task to indicate that, and also add itself to a set of all active tasks:

SADD blueque_started_tasks_[QUEUE] "[LISTENER ID] [PID] [TASK ID]"
HMSET blueque_task_[TASK ID] status started pid [PID]

Note that this assumes that the process is told what task to execute, rather than pulling it off the node's task list.

Task Completed Successfully

If a task completes successfully, it should set the status field of the task to succeeded and set the result field to the JSON-serialized result of the task, as a single atomic transaction.

LREM blueque_reserved_tasks_[QUEUE]_[LISTENER ID] 1 [TASK ID]
ZREM blueque_started_tasks_[QUEUE] "[LISTENER ID] [PID] [TASK ID]"
HMSET blueque_task_[TASK ID] status complete result [RESULT]
LPUSH blueque_complete_tasks_[QUEUE] [TASK ID]

Task Failed

If a task fails for any reason (process function raises an exception, or some monitoring process determines that the process fails more catastrophically), the process that detects the error should set the status field of the task to failed and the error field to a JSON-serialized description of the error (see above).

LREM blueque_reserved_tasks_[QUEUE]_[LISTENER ID] 1 [TASK ID]
ZREM blueque_started_tasks_[QUEUE] "[LISTENER ID] [PID] [TASK ID]"
HMSET blueque_task_[TASK ID] status failed error [ERROR]
LPUSH blueque_failed_tasks_[QUEUE] [TASK ID]

Delete Finished Task

Once everybody interested in a task's result (or error) has been notified, the task data needs to be deleted from Redis, so that it does not leak data.

The command to do this is slightly different, depending on whether it is complete or failed:

DEL blueque_task_[TASK ID]
LREM blueque_[status]_tasks_[QUEUE] 1 [TASK ID]

Schedule Task

A task can be scheduled for execution at a later time by adding it to a sorted set, where the score is the timestamp when the task should be executed.

HMSET blueque_task_[TASK ID] status scheduled queue [QUEUE] parameters [PARAMS] eta [TIMESTAMP]
ZINCRBY blueque_queues 0 [QUEUE]
ZADD blueque_scheduled_tasks_[QUEUE] [TIMESTAMP] [TASK ID]

Enqueue Scheduled Tasks

A process must be running which periodically checks the scheduled task list for each queue and adds them to the queue to be run at the scheduled time.

WATCH blueque_scheduled_tasks_[QUEUE]
to_run = ZRANGEBYSCORE blueque_scheduled_tasks_[QUEUE] 0 [CURRENT TIME]
ZREMRANGEBYSCORE blueque_scheduled_tasks_[QUEUE] 0 [CURRENT TIME]
LPUSH blueque_pending_tasks_[QUEUE] to_run[0] ... to_run[n]
for task in to_run:
    HMSET blueque_task_[TASK ID] status pending

Note that [CURRENT TIME] should only be read once, before executing the transaction, so that the same tasks which were fetched are the ones that are removed.