Python binding for rendezvous
pip install pyrendezvous==1.0
A python wrapper for rendezvoues - library providing infrastructure for writing applications around event loop(s).
The library is currently devoted to expose in consistent, simplistic way amenities provided by epoll event loop on Linux.
Simply, using pip:
$ pip install pyrendezvous
The library comes in a form of a source package, thus a C++ 14 compiler has to be available in the system. Only Linux is supported at this moment.
A Dispatcher is a core component of the framework, providing binding point between framework facilities and event queue dispatch. Thus every factility object is always initialised with it's dispatcher. There are two ways of dispatching events: 1 by calling Dispatcher.dispatch in a loop 2 by using a Loop convinience object, which does the above in automated way.
from framework import Dispatcher, Loop
from signal import SIGTERM, SIGINT
if __name__ == '__main__':
dispatcher = Dispatcher()
loop = Loop(dispatcher)
loop.interrupt_on(SIGTERM)
loop.interrupt_on(SIGINT)
loop.run_forever()
Dispatchers can form a hierarchy. Meaning they can be attached as dispatchable to other dispatchers allowing cascade processing of tree like structure of event queues.
from framework import Dispatcher, Loop
from signal import SIGTERM, SIGINT
if __name__ == '__main__':
parent = Dispatcher()
child = Dispatcher(parent)
loop = Loop(parent)
loop.interrupt_on(SIGTERM)
loop.interrupt_on(SIGINT)
loop.run_forever()
Once started, a OneShotTimer timer will execute its callback function only once. It can be manually re-started, but will not automatically re-start itself.
from datetime import datetime, timedelta
from framework import Dispatcher, Loop, OneShotTimer
from signal import SIGTERM, SIGINT
def on_timer(timer):
# type: (OneShotTimer) -> None
print(f'{datetime.now()} - {timer} elapsed!')
timer.schedule(timedelta(seconds=5))
if __name__ == '__main__':
dispatcher = Dispatcher()
timer = OneShotTimer(dispatcher, on_timer)
timer.schedule(timedelta(seconds=2))
loop = Loop(dispatcher)
loop.interrupt_on(SIGTERM)
loop.interrupt_on(SIGINT)
loop.run_forever()
```
An **AutoReloadTimer** will automatically re-start itself after each execution of its callback function, resulting in periodic callback execution.
```python
from datetime import datetime, timedelta
from framework import AutoReloadTimer, Dispatcher, Loop
from signal import SIGTERM, SIGINT
def on_timer(timer, count):
print(f'{datetime.now()} - {timer} elapsed! number of times - {count}')
if __name__ == '__main__':
dispatcher = Dispatcher()
timer = AutoReloadTimer(dispatcher, timedelta(seconds=2), on_timer)
timer.arm()
loop = Loop(dispatcher)
loop.interrupt_on(SIGTERM)
loop.interrupt_on(SIGINT)
loop.run_forever()
An instance of SignalHandler can be used to handle signals targeted at the process using dispatch mechanism.
import os
from signal import SIGUSR1, SIGTERM, SIGINT
from framework import Dispatcher, Loop, SignalHandler
def on_signal(signo):
print(f'received {signo}')
if __name__ == '__main__':
print(f'kill -10 {os.getpid()}')
dispatcher = Dispatcher()
signal_handler = SignalHandler(dispatcher, SIGUSR1, on_signal)
loop = Loop(dispatcher)
loop.interrupt_on(SIGTERM)
loop.interrupt_on(SIGINT)
loop.run_forever()
A set of transports for a range of IPv4 network protocols:
A pair of UdpListenerTransport and UdpPublisherTransport provide mechanism for publication and subscription to Udp multicast.
publisher
from datetime import datetime, timedelta
from framework import AutoReloadTimer, Dispatcher, Loop
from framework.transports.ipv4 import UdpPublisherTransport
from signal import SIGTERM, SIGINT
if __name__ == '__main__':
dispatcher = Dispatcher()
transport = UdpPublisherTransport(dispatcher, '224.0.0.1:30000')
def on_timer(*args):
transport.send(f'Hallo world! It is {datetime.now()}')
timer = AutoReloadTimer(dispatcher, timedelta(seconds=1), on_timer)
timer.arm()
loop = Loop(dispatcher)
loop.interrupt_on(SIGTERM)
loop.interrupt_on(SIGINT)
loop.run_forever()
listener
from datetime import datetime, timedelta
from framework import AutoReloadTimer, Dispatcher, Loop
from framework.transports.ipv4 import UdpListenerTransport
from signal import SIGTERM, SIGINT
def on_data(transport, data):
print(f'received {bytes(memoryview(data))}')
if __name__ == '__main__':
dispatcher = Dispatcher()
transport = UdpListenerTransport(dispatcher, '224.0.0.1:30000', on_data)
loop = Loop(dispatcher)
loop.interrupt_on(SIGTERM)
loop.interrupt_on(SIGINT)
loop.run_forever()
A pair of transports UdpServerTransport and UdpClientTransport are designated to serve Udp unicast.
server
from datetime import datetime
from framework import Dispatcher, Loop
from framework.transports.ipv4 import UdpServerTransport
from signal import SIGTERM, SIGINT
class Pong(object):
def on_ping(self, transport, data):
print(bytes(memoryview(data)))
transport.reply(f'pong at {datetime.now()}')
if __name__ == '__main__':
dispatcher = Dispatcher()
pong = Pong()
transport = UdpServerTransport(dispatcher, '127.0.0.1:6000', pong.on_ping)
loop = Loop(dispatcher)
loop.interrupt_on(SIGTERM)
loop.interrupt_on(SIGINT)
loop.run_forever()
client
from datetime import datetime, timedelta
from framework import Dispatcher, Loop
from framework.timers import AutoReloadTimer
from framework.transports.ipv4 import UdpClientTransport
from signal import SIGTERM, SIGINT
class Ping(object):
def __init__(self):
self.transport = None
def on_timer(self, timer, count):
self.transport.send(f'ping at {datetime.now()}')
def on_pong(self, transport, data):
print(bytes(memoryview(data)))
if __name__ == '__main__':
dispatcher = Dispatcher()
ping = Ping()
ping.transport = UdpClientTransport(dispatcher, '127.0.0.1:6000', ping.on_pong)
timer = AutoReloadTimer(dispatcher, timedelta(seconds=1), ping.on_timer)
timer.arm()
loop = Loop(dispatcher)
loop.interrupt_on(SIGTERM)
loop.interrupt_on(SIGINT)
loop.run_forever()
Tcp server and clients are implemented as TcpServerTransport and TcpClientTransport accordingly. TcpServeTransport creates a separate instance of TcpConnectedClientTransport for every client connected to the server. This is an individual object representing unique connection between client and server. TcpConnectClientTransport can store an instance of any object representing state connection in its closure.
server
from datetime import datetime
from framework import Dispatcher, Loop
from framework.transports.ipv4 import TcpConnectedClientTransport, TcpServerTransport
from signal import SIGTERM, SIGINT
class Pong(object):
def on_connected(self, transport):
# type: (TcpConnectedClientTransport) -> None
transport.closure = 0
print(f'connected - {transport}')
def on_disconnected(self, transport):
print(f'disconnected - {transport}')
def on_ping(self, transport, data):
print(bytes(memoryview(data)))
count = transport.closure
transport.send(f'pong at {datetime.now()} #{count}')
transport.closure += 1
if __name__ == '__main__':
dispatcher = Dispatcher()
pong = Pong()
transport = TcpServerTransport(dispatcher, '127.0.0.1:6000', pong.on_connected, pong.on_disconnected, pong.on_ping)
loop = Loop(dispatcher)
loop.interrupt_on(SIGTERM)
loop.interrupt_on(SIGINT)
loop.run_forever()
client
from datetime import datetime, timedelta
from framework import Dispatcher, Loop
from framework.timers import AutoReloadTimer
from framework.transports.ipv4 import TcpClientTransport
from signal import SIGTERM, SIGINT
class Ping(object):
def __init__(self):
self.transport = None
def on_timer(self, timer, count):
self.transport.send(f'ping at {datetime.now()}')
def on_connected(self, transport):
print(f'connected - {transport}')
def on_disconnected(self, transport):
print(f'disconnected - {transport}')
def on_pong(self, transport, data):
print(bytes(memoryview(data)))
if __name__ == '__main__':
dispatcher = Dispatcher()
ping = Ping()
ping.transport = TcpClientTransport(dispatcher,
'127.0.0.1:6000',
ping.on_connected,
ping.on_disconnected,
ping.on_pong)
timer = AutoReloadTimer(dispatcher, timedelta(seconds=1), ping.on_timer)
timer.arm()
loop = Loop(dispatcher)
loop.interrupt_on(SIGTERM)
loop.interrupt_on(SIGINT)
loop.run_forever()