Postgres Broker for Dramatiq Task Queue


Keywords
postgres, task, queue, dramatiq, python, task queue
License
PostgreSQL
Install
pip install dramatiq-pg==0.4.1

Documentation

dramatiq-pg − Postgres Broker for Dramatiq

Dramatiq is a simple task queue implementation for Python3. dramatiq-pg provides a Postgres-based implementation of a dramatiq broker.

Features

  • Super simple deployment: Single table, no ORM.
  • Stores message payload and results as native JSONb.
  • Uses LISTEN/NOTIFY to keep worker sync. No polling.
  • Implements delayed task.
  • Reliable thanks to Postgres MVCC.
  • Self-healing: automatic purge of old messages. Automatic recovery after crash.
  • Utility CLI for maintainance: flush, purge, stats, etc.

Note that dramatiq assumes tasks are idempotent. This broker makes the same assumptions for recovering after a crash.

Installation

  • Install dramatiq-pg package from PyPI:
    $ pip install dramatiq-pg psycopg2-binary
    
    Ensure you have either psycopg2 or psycopg2-binary installed.
  • Apply dramatiq_pg/schema.sql file in your database:
    $ psql -f dramatiq_pg/schema.sql
    
  • Before importing actors, define global broker with a connection pool:
    import dramatiq
    import psycopg2.pool
    from dramatiq_pg import PostgresBroker
    
    dramatiq.set_broker(PostgresBroker(i))
    
    @dramatiq.actor
    def myactor():
        ...
    

Now declare/import actors and manage worker just like any dramatiq setup. An example script is available, tested on CI.

The CLI tool dramatiq-pg allows you to requeue messages, purge old messages and show stats on the queue. See --help for details.

Dramatiq-pg documentation is hosted on GitLab and give you more details on deployment and operation of Postgres as a Dramatiq broker.

Support

If you encounter a bug or miss a feature, please open an issue on GitLab with as much information as possible.

dramatiq_pg is available under the PostgreSQL licence.