tx-easy-pika

Wrapper around Pika's Twisted connection to make it simpler to work with.


Keywords
pika, rabbitmq, twisted, amqpy
License
Other
Install
pip install tx-easy-pika==1.0.1

Documentation

Description

A wrapper around pika's Twisted connection to make it simpler to use.

Main assumptions:

  • All messages are JSON. Non-JSON messages log an error and are acked immediately. You can publish non-JSON messages, any non-strings that are published are converted to JSON strings.
  • All you really want to do is consume and publish.

Installation

Use pip:

pip install tx-easy-pika

Usage

Example

from datetime import datetime
from txeasypika import QueueConnection
from twisted.internet import reactor


def callback(channel, tag, message):
    ''' This function will be called whenever the server sends us a message. '''
    print "Received a message:"
    print message
    channel.basic_ack(tag)

queues = QueueConnection()

# Create an exchange
queues.exchange_declare(exchange="test-exchange")

# Set up a listener on a queue
queues.bind("my-queue", "test-exchange", "test-routing-key", callback)

# Now publish some messages to that queue
# Note that this message will not be published until we have started the
# Twisted reactor and we have successfully connected to Rabbit.
for i in range(5):
    queues.basic_publish("test-exchange", "test-routing-key", {
        "value": i,
        "message_data": "This is a message",
        "nested_field": {
            "an_array": ["this has an array"],
            "a_date": datetime.now()
        }
    })

# Start up the reactor
reactor.run()

QueueConnection

Class to represent a connection to the queues.

init()

Construct a new connection to the queues. Note that this is based on Twisted, so no connection is actually made until the reactor is running.

Arguments:

  • host (str) - The server that RabbitMQ is on.
  • port (str) - The port that RabbitMQ is listening on.
  • username (str) - Username for RabbitMQ.
  • password (str) - Password for RabbitMQ.
  • heartbeat (int) - Heartbeat frequency in seconds for RabbitMQ.
  • prefetch_count (int) - How many messages to prefetch for this connection.
  • log_level (logging log level) - Logging level for Pika

basic_publish()

Publish a message to the queues. This is a deferred method, the actual call will be made when the system has successfully connected to the AMQP server.

Arguments:

  • exchange (str or unicode sequence of these characters: letters, digits, hyphen, underscore, period, or colon.) - The exchange to publish to.
  • routing_key (str) - The routing key to bind on.
  • message (anything) - The message to send. If this is not a string it will be serialized as JSON, and the properties.content_type field will be forced to be application/json.
  • properties (dict) - Dict of AMQP message properties.
  • mandatory (bool) - AMQP Mandatory flag.
  • immediate (bool) - AMQP Immediate flag.

exchange_declare()

Declare an exchange. This is a deferred method, the actual call will be made when the system has successfully connected to the AMQP server.

If passive set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not and if the exchange does not already exist, the server MUST raise a channel exception with reply code 404 (not found).

Parameters:

  • callback (method) – Call this method on Exchange.DeclareOk
  • exchange (str or unicode sequence of these characters: letters, digits, hyphen, underscore, period, or colon.) – The exchange name consists of a non-empty
  • exchange_type (str) – The exchange type to use
  • passive (bool) – Perform a declare or just check to see if it exists
  • durable (bool) – Survive a reboot of RabbitMQ
  • auto_delete (bool) – Remove when no more queues are bound to it
  • internal (bool) – Can only be published to by other exchanges
  • nowait (bool) – Do not expect an Exchange.DeclareOk response
  • arguments (dict) – Custom key/value pair arguments for the exchange

bind()

Declare a queue, create a binding on it. Optionally attach a function to call. Note that the function will be called once for each call to bind()

  • queue_name (str) - The name of the queue we want to declare.
  • exchange (str) - The exchange of the initial binding on the new queue.
  • routing_key (str) - The routing key of the initial binding on the new queue.
  • callback (function) - A function to call when a message is received on this queue. Arguments are (channel object, delivery tag, JSON-parsed message).
  • arguments (dict) - AMQP Arguments for declaring the queue.
  • no_ack (bool) - Whether this consumer will ack or not. Ignored if callback is None.

close()

Close the connection to the AMQP server.