A simple task queue system optimized for very long running tasks.
A list of tasks, of a given type, to be run. Multiple queues may be used to execute different types of tasks on different nodes.
A piece of work to be done, including its parameters, current status, result (if done), etc.
A computer running tasks from the queue.
A process running on a node, listening for new tasks on a single queue. The system does not support multiple listeners listening to the same queue on the same node.
A process, running on a node, created by a listener, executing a task.
Blueque.Client object stores the connection to the Blueque
system, and constructs the other objects necessary for interacting
with the system
client = Blueque.Client("redis://hostname:port/db")
Blueque.queue.Queue object provides the interface to a named
queue of tasks.
queue = client.get_queue("some.queue")
task_id = queue.enqueue(parameters)
Returns the Task ID (a string) of the newly enqueued task
The task object provides a basic, read-only view of all the attributes of a task.
task = client.get_task(task_id)
There is a read-only attribute for all of the task attributes stored in Redis (see below).
The listener object provides the interface used to listen for new tasks.
listener = client.get_listener("some.queue")
task = listener.listen(on_new_task)
Blocks listening for a new task to execute until one is ready, then
Task object describing that task.
A Processor object provides the interface used to update a task while it is being processed.
processor = client.get_processor(task)
parameters = processor.start(pid)
Marks a task as being started, by the specified pid, and returns the parameters of that task.
Marks a task as being complete, and stores the result.
Marks a task as having failed, and stores the error.
Currently, the backend structure is Redis. Keys are prefixed with a namespace, i.e. named "bluequeue_foo", so that other systems can use the Redis DB for other things.
Stored in a
List, accessed as a queue: new tasks are added via
LPUSH, tasks are removed for execution via
RPOP. There is a
for each task queue (channel). All that is stored in the
List is a
task ID, which will be used to retrieve the task data.
Listener Task List
blueque_reserved_tasks_[queue name]_[listener ID]
Stored in a
List, this is used to keep track of which listeners are
running which tasks. Tasks should be atomically moved from the Task
Queue to the Listener Task List via
RPOPLPUSH, so that they don't get
Note: this will not be implemented in the first pass.
There is a Pub/Sub
Channel for each task queue (channel). This can
be used by the listener client to listen for new tasks if the task queue
they were interested in was empty when they last checked for a task.
Messages should not include the task ID of the newly created task,
because listeners must be required to manually try to
LPOP the task
off the task queue, so that only one work runs each task.
There is a list of all the tasks in a queue, regardless of their state. This is mostly used for introspection/management purposes.
The actual data associated with a task will be stored as a hash, where the key is built from the task ID (i.e. "bluequeue_task_NNNN"). Each subsystem will be able to add its own fields to the hash.
Current fields are
The queue that the task is in (mostly just for debugging purposes).
String containing data to be passed to the process. Totally task specific.
String containing the result of the process; totally task-specific. Will not be set if the task hasn't completed yet.
String containing the error generated by the process. Set only if
The listener ID of the node running the task. Will not be set if the task has not been started yet.
The PID of the process running the task on the node. Will not be set if the task has not been started yet.
A floating point value containing the Python timestampe (
time.time()) of the time when the task was first created (enqueued).
A floating point value containing the Python timestamp (
time.time()) of the last time any value in the task changed.
The timestamp when the task is scheduled to be executed. Will not be set if the task was not scheduled with an ETA.
In order for the system to be easily introspected, the currently
active listeners will be stored in a Redis
Set. Listeners are stored
LISTENER ID, which must be
Note that this means that all hosts in the system must have unique names.
There is a
Sorted Set containing the names of all the queues, where
the score of the set is the number of nodes listening to that set.
When a node comes online, it increments the score by 1; when a node goes offline (cleanly) it increments the score by -1. Every time a task is enqueued, the score should be incremented by 0, so that a queue with tasks, but no listeners, still shows up in the set.
Tasks are executed via this workflow. Note that any
HMSET call which
blueque_task_[TASK ID] should be assumed to also set the
updated field of the task.
Tasks should be submitted by creating a UUID, [TID], JSON encoding the parameters, [PARAMS], and then executing:
MULTI HMSET blueque_task_[TASK ID] status pending queue [QUEUE] parameters [PARAMS] ZINCRBY blueque_queues 0 [QUEUE] LPUSH blueque_pending_tasks_[QUEUE] [TASK ID] EXEC
Node Task Pop
Nodes should pop a task off the queue and then set the status of the
started, and set the
node field of the task.
If no task is popped off the queue, the Node should wait for a new task notification. Ideally, this will be via Pub/Sub, but, at first, we can do it by polling.
Tasks are popped using the following commands.
RPOPLPUSH blueque_pending_tasks_[QUEUE] [NODE TASKS]
HMSET blueque_task_[TASK ID] status reserved node [NODE]
Note that these two commands cannot be executed atomically because the
second depends on the first, and, even with Lua scripting, that cannot
be done atomically and safely. Therefore, there is a chance that a
task is popped off the pending queue, but its record is not
updated. This can be detected if a task is in a node's queue, but has
a status of
Task Queue Channel Message
Note: We will not implement this in the first pass. Listeners will just poll once every few seconds.
If a listener receives a message via a Pub/Sub channel that a queue has a task in it, it should try to atomically pop a task off that channel (see above). If it does get a task, it should unsubscribe from the channel; if it does not (i.e. another listener got the task) it should remain subscribed.
When a process starts executing a task on a node, it should update the task to indicate that, and also add itself to a set of all active tasks:
MULTI SADD blueque_started_tasks_[QUEUE] "[LISTENER ID] [PID] [TASK ID]" HMSET blueque_task_[TASK ID] status started pid [PID] EXEC
Note that this assumes that the process is told what task to execute, rather than pulling it off the node's task list.
Task Completed Successfully
If a task completes successfully, it should set the
status field of
the task to
succeeded and set the
result field to the
JSON-serialized result of the task, as a single atomic transaction.
MULTI LREM blueque_reserved_tasks_[QUEUE]_[LISTENER ID] 1 [TASK ID] ZREM blueque_started_tasks_[QUEUE] "[LISTENER ID] [PID] [TASK ID]" HMSET blueque_task_[TASK ID] status complete result [RESULT] LPUSH blueque_complete_tasks_[QUEUE] [TASK ID] EXEC
If a task fails for any reason (process function raises an exception,
or some monitoring process determines that the process fails more
catastrophically), the process that detects the error should set the
status field of the task to
failed and the
error field to a
JSON-serialized description of the error (see above).
MULTI LREM blueque_reserved_tasks_[QUEUE]_[LISTENER ID] 1 [TASK ID] ZREM blueque_started_tasks_[QUEUE] "[LISTENER ID] [PID] [TASK ID]" HMSET blueque_task_[TASK ID] status failed error [ERROR] LPUSH blueque_failed_tasks_[QUEUE] [TASK ID] EXEC
Delete Finished Task
Once everybody interested in a task's result (or error) has been notified, the task data needs to be deleted from Redis, so that it does not leak data.
The command to do this is slightly different, depending on whether it is complete or failed:
MULTI DEL blueque_task_[TASK ID] LREM blueque_[status]_tasks_[QUEUE] 1 [TASK ID] EXEC
A task can be scheduled for execution at a later time by adding it to a sorted set, where the score is the timestamp when the task should be executed.
HMSET blueque_task_[TASK ID] status scheduled queue [QUEUE] parameters [PARAMS] eta [TIMESTAMP] ZINCRBY blueque_queues 0 [QUEUE] ZADD blueque_scheduled_tasks_[QUEUE] [TIMESTAMP] [TASK ID]
Enqueue Scheduled Tasks
A process must be running which periodically checks the scheduled task list for each queue and adds them to the queue to be run at the scheduled time.
WATCH blueque_scheduled_tasks_[QUEUE] to_run = ZRANGEBYSCORE blueque_scheduled_tasks_[QUEUE] 0 [CURRENT TIME] MULTI ZREMRANGEBYSCORE blueque_scheduled_tasks_[QUEUE] 0 [CURRENT TIME] LPUSH blueque_pending_tasks_[QUEUE] to_run ... to_run[n] for task in to_run: HMSET blueque_task_[TASK ID] status pending EXEC
[CURRENT TIME] should only be read once, before executing
the transaction, so that the same tasks which were fetched are the
ones that are removed.