Using beanstalkd made easy
azuki makes it easy to create tools using beanstalkd tubes for asynchronous execution. From queueing parallelizing quick shell hacks to integrating asynchronous execution with your django application, azuki lets you do it all.
CLI beanstalk inspection
The main utility,
azuki, makes it easy to inspect and modify beanstalk queues
from the command line.
$ azuki tubes [-v] $ azuki stats $ azuki stats default $ azuki stats 123
The stats command will give you basic statistics, such as the total amount of
connections, jobs and commands. If you specify a tube, such as
will see the statistics of that tube, and if you specify a job ID, you will see
its details and content.
The tubes command shows a list of tubes. Add the -v parameter to also get a summary of all processes and jobs for all queues.
Peeking at tubes
$ azuki peek-delayed default $ azuki peek-ready default $ azuki peek-buried default $ azuki peek 123
With the first three commands above you can look at the first delayed, ready or
buried job in a tube. If you use the
--ask option as well, you will be asked
whether you want to delete, bury or kick the job. The author is fond of
peek-buried --ask to do cleanup of failed jobs.
The last command will peek at a specific job, given its id number.
Manipulating jobs and tubes
azuki kick 100 default
This would kick 100 jobs in the default queue, moving them from buried back to ready.
azuki bury 123 azuki kick 123
If you want to bury or kick a specific job, use the commands above.
Queueing shell commands
Azuki was originally written to easily parallelize shell jobs. Of course gnu parallel can be used if you just want parallelization, but I also needed to be able to adjust parallelism, pause everything and add jobs while the application was running. In short, I needed to use a queue, and beanstalk is my queue of choice.
The original task was to update the iLO firmware of thousands of HP servers in an automated, yet controlled way. To do this, first one puts jobs in the tube:
for ilo in $(<ilos.txt); do echo $ilo | azuki put --ttr 600 ilo-firmware done
This reads a list of items and schedules them one by one in the ilo-firmware
tube with a ttr of 600. You can also specify a delay with the
To consume the items in the tube, you use
azuki foreach ilo-firmware -- xargs do_firmware_update
foreach command will execute a command you want to execute and feed the
job body on stdin.
do_firmware_update wants this on the command line instead,
hence the use of
xargs. To now parallelize this, we can simply run as many
azuki foreach as we want in parallel, possibly using screen.
foreach command looks at the exitcode of the commands it runs. If the
exitcode is 0, the job is considered succesfull and gets deleted. If the
exitcode is nonzero, the job gets buried.
At the end of my working day I wanted to pause the upgrading, but using
ctrl-C in the middle of a firmware upgrade is not a very good idea. Azuki to
azuki pause 8640000 ilo-firmware
This pauses the tube for 100 days. Already running firmware updates would complete, but no new ones would be scheduled. 10 minutes later all is paused. To unpause the tube, simply pause it again, but for 0 seconds.
beanstalkd tubes already are fairly easy to use in python, using the beanstalkc library. But fairly easy isn't easy enough, so azuki can make it easier for you. You can simply decorate your functions to make them asynchronous.
from azuki import beanstalk @beanstalk('example-tube') def hello(who): print "Hello, %s" % who
import example example.hello("world")
If you now run
python main.py, you will notice that it does not output
anything. Instead it has serialized the arguments to
hello and scheduled them
as a job in the
To process the queued jobs, you run
azuki daemon example-tube. This will take
items from the tube, import the
example module and call the
With beanstalk you can bury a job to tell beanstalk not to attempt the job
again until manually told to do so, but if you just want to delay execution of
a job, you can raise an
from azuki import beanstalk, Reschedule @beanstalk('example-tube') def process(task): if not am_ready_for(task): raise Reschedule(120) do_task(task)
The downside of scheduling things in beanstalk queues, is that argument to function calls must be serialized. Azuki uses json serialization, so anything that is not json-serializable, cannot be used as an argument.
Except django model instances, as azuki recognizes them and handles them specially. That means that for example queueing mails instead of sending them directly works:
from django.contrib.auth import User from django.core.mail import send_mail from azuki import beanstalk class Message(models.Model): recipient = models.ForeignKey(User) subject = models.CharField("Subject", max_length=128) text = models.TextField("Message text") @beanstalk('send-mail') def send(self): send_mail(self.subject, self.text, 'webmaster@localhost', [self.recipient])
If you now create a message and call
send(), the message is not sent, but
only added to the
send-mail tube. You can again use
azuki daemon to process
this tube and actually send the mails, possibly even on a different machine
(c) 2014, Dennis Kaarsemaker firstname.lastname@example.org
This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with this program. If not, see http://www.gnu.org/licenses/.