pyrendezvous

Python binding for rendezvous


License
MIT
Install
pip install pyrendezvous==1.0

Documentation

Sources image

Introduction

A python wrapper for rendezvoues - library providing infrastructure for writing applications around event loop(s).

The library is currently devoted to expose in consistent, simplistic way amenities provided by epoll event loop on Linux.

Installation

Simply, using pip:

$ pip install pyrendezvous

The library comes in a form of a source package, thus a C++ 14 compiler has to be available in the system. Only Linux is supported at this moment.

Usage

Dispatcher and Loop

A Dispatcher is a core component of the framework, providing binding point between framework facilities and event queue dispatch. Thus every factility object is always initialised with it's dispatcher. There are two ways of dispatching events: 1 by calling Dispatcher.dispatch in a loop 2 by using a Loop convinience object, which does the above in automated way.

from framework import Dispatcher, Loop
from signal import SIGTERM, SIGINT


if __name__ == '__main__':
    dispatcher = Dispatcher()

    loop = Loop(dispatcher)
    loop.interrupt_on(SIGTERM)
    loop.interrupt_on(SIGINT)
    loop.run_forever()

Dispatchers can form a hierarchy. Meaning they can be attached as dispatchable to other dispatchers allowing cascade processing of tree like structure of event queues.

from framework import Dispatcher, Loop
from signal import SIGTERM, SIGINT
        
if __name__ == '__main__':
    parent = Dispatcher()
    child = Dispatcher(parent)        
    
    loop = Loop(parent)
    loop.interrupt_on(SIGTERM)
    loop.interrupt_on(SIGINT)
    loop.run_forever()

Timers

Once started, a OneShotTimer timer will execute its callback function only once. It can be manually re-started, but will not automatically re-start itself.

from datetime import datetime, timedelta
from framework import Dispatcher, Loop, OneShotTimer
from signal import SIGTERM, SIGINT
    

def on_timer(timer):
    # type: (OneShotTimer) -> None
    print(f'{datetime.now()} - {timer} elapsed!')
    timer.schedule(timedelta(seconds=5))
    
    
if __name__ == '__main__':
    dispatcher = Dispatcher()
    
    timer = OneShotTimer(dispatcher, on_timer)
    timer.schedule(timedelta(seconds=2))
    
    loop = Loop(dispatcher)
    loop.interrupt_on(SIGTERM)
    loop.interrupt_on(SIGINT)
    loop.run_forever()
    ``` 
    
An **AutoReloadTimer** will automatically re-start itself after each execution of its callback function, resulting in periodic callback execution. 

```python
from datetime import datetime, timedelta
from framework import AutoReloadTimer, Dispatcher, Loop
from signal import SIGTERM, SIGINT
    
    
def on_timer(timer, count):
    print(f'{datetime.now()} - {timer} elapsed! number of times - {count}')
    
    
if __name__ == '__main__':
    dispatcher = Dispatcher()
    
    timer = AutoReloadTimer(dispatcher, timedelta(seconds=2), on_timer)
    timer.arm()
    
    loop = Loop(dispatcher)
    loop.interrupt_on(SIGTERM)
    loop.interrupt_on(SIGINT)
    loop.run_forever()

Signals

An instance of SignalHandler can be used to handle signals targeted at the process using dispatch mechanism.

import os
from signal import SIGUSR1, SIGTERM, SIGINT
from framework import Dispatcher, Loop, SignalHandler
    
    
def on_signal(signo):
    print(f'received {signo}')
    
    
if __name__ == '__main__':
    print(f'kill -10 {os.getpid()}')
    
    dispatcher = Dispatcher()
    signal_handler = SignalHandler(dispatcher, SIGUSR1, on_signal)
    
    loop = Loop(dispatcher)
    loop.interrupt_on(SIGTERM)
    loop.interrupt_on(SIGINT)
    loop.run_forever()

IPv4 Transports

A set of transports for a range of IPv4 network protocols:

Udp multicast

A pair of UdpListenerTransport and UdpPublisherTransport provide mechanism for publication and subscription to Udp multicast.

publisher

from datetime import datetime, timedelta
from framework import AutoReloadTimer, Dispatcher, Loop
from framework.transports.ipv4 import UdpPublisherTransport
from signal import SIGTERM, SIGINT
        
        
if __name__ == '__main__':
    dispatcher = Dispatcher()
        
    transport = UdpPublisherTransport(dispatcher, '224.0.0.1:30000')
    def on_timer(*args):
        transport.send(f'Hallo world! It is {datetime.now()}')
        
        timer = AutoReloadTimer(dispatcher, timedelta(seconds=1), on_timer)
        timer.arm()
        
        loop = Loop(dispatcher)
        loop.interrupt_on(SIGTERM)
        loop.interrupt_on(SIGINT)
        loop.run_forever()
         

listener

from datetime import datetime, timedelta
from framework import AutoReloadTimer, Dispatcher, Loop
from framework.transports.ipv4 import UdpListenerTransport
from signal import SIGTERM, SIGINT
        
        
def on_data(transport, data):
    print(f'received {bytes(memoryview(data))}')
        
        
if __name__ == '__main__':
    dispatcher = Dispatcher()
    transport = UdpListenerTransport(dispatcher, '224.0.0.1:30000', on_data)
        
    loop = Loop(dispatcher)
    loop.interrupt_on(SIGTERM)
    loop.interrupt_on(SIGINT)
    loop.run_forever()

Udp unicast

A pair of transports UdpServerTransport and UdpClientTransport are designated to serve Udp unicast.

server

from datetime import datetime
from framework import Dispatcher, Loop
from framework.transports.ipv4 import UdpServerTransport
from signal import SIGTERM, SIGINT
        
        
class Pong(object):
    def on_ping(self, transport, data):
        print(bytes(memoryview(data)))
        transport.reply(f'pong at {datetime.now()}')
        
        
if __name__ == '__main__':
    dispatcher = Dispatcher()
        
    pong = Pong()
    transport = UdpServerTransport(dispatcher, '127.0.0.1:6000', pong.on_ping)
        
    loop = Loop(dispatcher)
    loop.interrupt_on(SIGTERM)
    loop.interrupt_on(SIGINT)
    loop.run_forever()

client

from datetime import datetime, timedelta
from framework import Dispatcher, Loop
from framework.timers import AutoReloadTimer
from framework.transports.ipv4 import UdpClientTransport
from signal import SIGTERM, SIGINT
        
        
class Ping(object):
    def __init__(self):
        self.transport = None
        
        def on_timer(self, timer, count):
            self.transport.send(f'ping at {datetime.now()}')
        
        def on_pong(self, transport, data):
            print(bytes(memoryview(data)))
        
        
if __name__ == '__main__':
    dispatcher = Dispatcher()
        
    ping = Ping()
    ping.transport = UdpClientTransport(dispatcher, '127.0.0.1:6000', ping.on_pong)
    timer = AutoReloadTimer(dispatcher, timedelta(seconds=1), ping.on_timer)
    timer.arm()
        
    loop = Loop(dispatcher)
    loop.interrupt_on(SIGTERM)
    loop.interrupt_on(SIGINT)
    loop.run_forever()

Tcp

Tcp server and clients are implemented as TcpServerTransport and TcpClientTransport accordingly. TcpServeTransport creates a separate instance of TcpConnectedClientTransport for every client connected to the server. This is an individual object representing unique connection between client and server. TcpConnectClientTransport can store an instance of any object representing state connection in its closure.

server

from datetime import datetime
from framework import Dispatcher, Loop
from framework.transports.ipv4 import TcpConnectedClientTransport, TcpServerTransport
from signal import SIGTERM, SIGINT
      
      
class Pong(object):
    def on_connected(self, transport):
        # type: (TcpConnectedClientTransport) -> None
        transport.closure = 0
        print(f'connected - {transport}')
      
    def on_disconnected(self, transport):
        print(f'disconnected - {transport}')
      
    def on_ping(self, transport, data):
        print(bytes(memoryview(data)))
      
        count = transport.closure
        transport.send(f'pong at {datetime.now()} #{count}')
        transport.closure += 1
      
      
if __name__ == '__main__':
    dispatcher = Dispatcher()
      
    pong = Pong()
    transport = TcpServerTransport(dispatcher, '127.0.0.1:6000', pong.on_connected, pong.on_disconnected, pong.on_ping)
      
    loop = Loop(dispatcher)
    loop.interrupt_on(SIGTERM)
    loop.interrupt_on(SIGINT)
    loop.run_forever()  

client

from datetime import datetime, timedelta
from framework import Dispatcher, Loop
from framework.timers import AutoReloadTimer
from framework.transports.ipv4 import TcpClientTransport
from signal import SIGTERM, SIGINT
      
      
class Ping(object):
    def __init__(self):
        self.transport = None
      
    def on_timer(self, timer, count):
        self.transport.send(f'ping at {datetime.now()}')
      
    def on_connected(self, transport):
        print(f'connected - {transport}')
      
    def on_disconnected(self, transport):
        print(f'disconnected - {transport}')
      
    def on_pong(self, transport, data):
        print(bytes(memoryview(data)))
      
      
if __name__ == '__main__':
    dispatcher = Dispatcher()
      
    ping = Ping()
    ping.transport = TcpClientTransport(dispatcher,
                                        '127.0.0.1:6000',
                                        ping.on_connected,
                                        ping.on_disconnected,
                                        ping.on_pong)
    timer = AutoReloadTimer(dispatcher, timedelta(seconds=1), ping.on_timer)
    timer.arm()
      
    loop = Loop(dispatcher)
    loop.interrupt_on(SIGTERM)
    loop.interrupt_on(SIGINT)
    loop.run_forever()