puppy-pubsub

Pub/Sub that makes your project simpler, not more complicated


Keywords
easy, elegant, pub-sub, publish-subscribe, publisher-subscriber, pubsub, python, python3, simple, topic
License
MIT
Install
pip install puppy-pubsub==0.1

Documentation

puppy

Build Status
Codecov

A featherweight pub/sub architecture genetically engineered to make your project simpler, not more complicated. And to be adorable.

What's in a name?

Publisher–Subscriber + Python → Pub/Sub + .py → Pub + py → puppy

Hello World

import puppy as pup

puppy = pup.Puppy()

pub = puppy.Pub('topic1')
sub = puppy.SubPull('topic1')

pub.send('hello')
pub.send('world')

print(sub.recvAll())
['hello', 'world']

For those unfamiliar with the Pub/Sub pattern, Google and Wikipedia have pretty useful overviews.

Installation

pip install puppy # not yet

Basic

Publishers

Publishers are created by calling .Pub() on the Puppy object. A topics argument is passed, either a string or list of strings, specifying which topics the Publisher should publish to:

import puppy as pup

puppy = pup.Puppy()

pub1 = puppy.Pub('topic1')
pub2 = puppy.Pub(['topic1','topic2'])

Subscribers (Push)

To register a push subscriber, call .SubPush() on the Puppy object, with two arguments. The first is the topics argument, either a string or list of strings, specifying which topics the Subscriber should be subscribed to. The second is a callback function that is called for each message published to each of the subscribed topics.

import puppy as pup

puppy = pup.Puppy()
pub = puppy.Pub('topic1')


def exampleSubscriber(data):
    print('Subscriber received: {0}'.format(repr(data))

puppy.SubPush('topic1',exampleSubscriber)

pub.send('hello')
Subscriber received: 'hello'

It's important to note that the function is called once for each message on each topic, regardless of the message origin:

import puppy as pup

puppy = pup.Puppy()
pub = puppy.Pub(['topic1','topic2'])


def exampleSubscriber(data):
    print('Subscriber received: {0}'.format(repr(data))

puppy.SubPush(['topic1','topic2'],exampleSubscriber)

pub.send('hello')
Subscriber received: 'hello'   # pub -> topic1 -> exampleSubscriber
Subscriber received: 'hello'   # pub -> topic2 -> exampleSubscriber

Subscribers (Pull)

Sometimes it's useful to have messages wait in the queue until requested. Puppy offers pull subscriptions as well. To register a pull subscriber, call .SubPull() on the Puppy object, with a topics argument. This returns a subscriber object. To check for messages, this object provides two methods:

  • .recv() is a non-blocking query that:

    • Returns None if no unread messages exist
    • Returns the oldest unread message if any exist
  • .recvAll() is a non-blocking query that:

    • Returns [] if no unread messages exist
    • Returns a list of all unread messages if any exist, ordered from oldest to newest.

Each subscriber has its own message queue, which can be consumed without affecting any other subscribers.

import puppy as pup

puppy = pup.Puppy()
pub = puppy.Pub('topic1')

sub1 = puppy.SubPull('topic1')
sub2 = puppy.SubPull('topic1')

print(sub1.recv())
print(sub2.recvAll())

pub.send('hello')
pub.send('world')

print(sub1.recv())
print(sub1.recv())
print(sub1.recv())

print(sub2.recvAll())
print(sub2.recvAll())
None
[]

'hello'
'world'
None

['hello', 'world']
[]

Advanced

Complex Architectures

import puppy as pup

def s1(data):
    print("subscriber1 received: {0}".format(repr(data)))

def s2(data):
    print("subscriber2 received: {0}".format(repr(data)))

puppy = pup.Puppy()

pub1 = puppy.Pub('topic1')
pub2 = puppy.Pub(['topic2','topic3'])
puppy.SubPush(['topic1','topic2'],s1)
puppy.SubPush('topic3',s2)

pub1.send(1)
pub2.send(2)

subscriber1 received: 1   # pub1 -> topic1 -> sub1
subscriber1 received: 2   # pub2 -> topic2 -> sub1
subscriber2 received: 2   # pub2 -> topic3 -> sub2

Parent/Child topics

An example is worth a thousand words:

import puppy as pup

puppy = pup.Puppy()
pub1 = puppy.Pub('topic1/subtopic1')
pub2 = puppy.Pub('topic1/subtopic2')

sub1 = puppy.SubPull('topic1/subtopic1')
sub2 = puppy.SubPull('topic1/subtopic2')
sub = puppy.SubPull('topic1')

pub1.send('hello')
pub2.send('world')

print(sub1.recvAll())
print(sub2.recvAll())
print(sub.recvAll())
['hello']
['world']
['hello', 'world']

The default delimiter is /, but arbitrary delimiters may be passed when constructing the Puppy object, eg, .Puppy(delim='-'). Behavior for passed delimiters of length greater than one is currently undefined, and may be quite surprising.

import puppy as pup

puppy = pup.Puppy(delim='.')
pub = puppy.Pub('topic1.subtopic1')

sub1 = puppy.SubPull('topic1.subtopic1')
sub2 = puppy.SubPull('topic1')
sub3 = puppy.SubPull('')

pub1.send(42)

print(sub1.recvAll())
print(sub2.recvAll())
print(sub3.recvAll())
42
42
42

There is actually a root topic of '', which receives every single message from any topic, and can be subscribed to:

import puppy as pup

puppy = pup.Puppy()
pub1 = puppy.Pub('topic1')
pub2 = puppy.Pub('topic2')

sub1 = puppy.SubPull('topic1')
sub2 = puppy.SubPull('topic2')
sub = puppy.SubPull('')

pub1.send('hello')
pub2.send('world')

print(sub1.recvAll())
print(sub2.recvAll())
print(sub.recvAll())
['hello']
['world']
['hello', 'world']

Subscribing to '' will often be useful for logging, for example.

Filters

It might be useful to have incoming messages filtered by some arbitrary criteria above and beyond topics:

import puppy as pup

puppy = pup.Puppy()
pub1 = puppy.Pub('topic1')
pub2 = puppy.Pub('topic2')

sub = puppy.SubPull('topic1',
                    filter=lambda i : 'e' in i)

pub1.send('hello')
pub2.send('world')
pub2.send('hey')

print(sub.recvAll())
['hello', 'hey']

The function passed as filter will be called on each potential message. If the function returns True, the message will be queued for the subscriber. If the function errors or returns any other value, the message will not be queued.

import puppy as pup

puppy = pup.Puppy()
pub = puppy.Pub('topic1')

sub = puppy.SubPull('topic1',
                    filter=lambda i : i[2] == 'c')

pub.send('ab')    # will cause a silent caught error in filter function
pub.send('abc')
pub.send('abd')   # will return False from filter function
pub.send('abcde')

print(sub.recvAll())
['abc', 'abcde']

Non-lambda functions are of course also usable as filter functions:

import puppy as pup

puppy = pup.Puppy()
pub = puppy.Pub('topic1')

def f(n):
    if n%3 != 0:
        return False
    
    if n%5 != 0:
        return False
    
    return True

sub = puppy.SubPull('topic1',
                    filter=f)

for n in range(50):
	pub.send(n)

print(sub.recvAll())
[0, 15, 30, 45]

Examples

Project Euler #1:

import time
import puppy as pup

class Filter(object):
    def __init__(self,puppy):
        # publisher to the 'filtered' topic
        self.pub = puppy.Pub('filtered')

    # will receive push from the 'all' topic    
    def recv(self,n):
        if n%3 == 0:
            self.pub.send(n)
        elif n%5 == 0:
            self.pub.send(n)        


puppy = pup.Puppy()

# raw numbers go into the 'all' topic
pub = puppy.Pub('all')

f = Filter(puppy)
puppy.SubPush('all',f.recv)

# subscribe to the filtered results
result = puppy.SubPull('filtered')

# send some data into the 'all' topic
for n in range(1000):
	pub.send(n)

# make sure everything settles down
time.sleep(1) 

# receive everything from the 'filtered' topic and sum it
print(sum(result.recvAll()))