avroconsumer
A set of Rejected classes that automatically encode and decode AMQP message bodies as Avro datums.
For applications that can share schema files, Avro datum provide small, contract based binary
serialization format. Leveraging AMQP's Type
message property to convey the Avro schema
file for decoding the datum, avroconsumer's classes extend Rejected's
rejected.consumer.SmartPublishingConsumer
.
Installation
avroconsumer is available on the Python package index.
Usage
avroconsumer has two base consumer types: LocalSchemaConsumer
and RemoteSchemaConsumer
.
The difference between the two resides in how they load the Avro schema file. The
LocalSchemaConsumer
loads schema files from a local directory, while the RemoteSchemaConsumer
loads schema files from a remote location accessible via HTTP.
LocalSchemaConsumer
To use the LocalSchemaConsumer
, you need to set the schema_path
config value in the consumer
configuration of the rejected configuration file. The following snippet demonstrates an example
configuration:
Consumers:
apns_push:
consumer: app.Consumer
connections: [rabbit1]
qty: 1
queue: datum
qos_prefetch: 1
ack: True
max_errors: 5
config:
schema_path: /etc/avro-schemas/
In this example configuration, if messages are published with a AMQP type
message property of
foo
and a content-type
property of application/vnd.apache.avro.datum
, classes
extending the combination of LocalSchemaConsumer
will use the Avro schema file located at
/etc/avro-schemas/foo.avsc
to deserialize messages.
RemoteSchemaConsumer
The RemoteSchemaConsumer
loads schema files from a remote HTTP server. It expects the
schema_uri_format
setting in the consumer configuration of the rejected configuration file.
The following snippet demonstrates an example configuration:
Consumers:
apns_push:
consumer: app.Consumer
connections: [rabbit1]
qty: 1
queue: datum
qos_prefetch: 1
ack: True
max_errors: 5
config:
schema_uri_format: http://schema-server/avro/{0}.avsc
In this example configuration, if messages are published with a AMQP type
message property
of foo
and a content-type
property of application/vnd.apache.avro.datum
, classes
extending the combination of RemoteSchemaConsumer
will use the Avro schema file located at
http://schema-server/avro/foo.avsc
to deserialize messages.
Example
The following example uses the RemoteSchemaConsumer
class to receive a message and
deserialize it. It evaluates the content of the message and if the field foo
equals
bar
it will publish a bar
message.
import avroconsumer
class FooConsumer(avroconsumer.RemoteSchemaConsumer):
def process(self):
if self.body['foo'] == 'bar':
self.publish('bar', 'amqp.direct', 'routing-key',
{'timestamp': time.time()}, {'bar': True})
Enforcing Message Type
As with any instance of rejected.consumer.Consumer
, the avroconsumer classes
can automatically rejected messages based upon the type
message property.
Simply set the MESSAGE_TYPE
attribute of your consumer and any messages
received that do not match that message type will be rejected.