Polycephaly

Easily create system daemons (and programs) that use an email-like syntax for communicating between threaded and forked processes.


License
MIT
Install
pip install Polycephaly==2019.11a5

Documentation

Polycephaly

Polycephaly is a Python module that allows you to easily create programs that are capable of parallel operations at the I/O and CPU levels:

  • Email-like syntax for inter-process communications, without external dependencies such as SQLite or Redis.
  • Message routing (e.g. between processes operating under Polycephaly, and other platforms such as Unix Domain Socketfile, D-Bus, MQTT, USB, et al.)
  • Message filters for assigning callbacks to receive messages on different routes.
  • JSON encoding and parsing for external messaging (e.g. Unix Domain Socketfile or MQTT).
  • FIFO or Priority message queuing.

Message relay example - short

Overview

Originally, Polycephaly was created as a shared framework for building a Linux-based, embedded system for Robot Operating System-based robotics with a large touchscreen monitor with Python. This framework is shared between 2 separate processes running in tandem as a server and a client:

  1. System service
    • Shared messaging library (e.g. Unix Domain Socketfile Server).
    • System and network management:
      • Persistent Internet connection for remote monitoring and management.
      • Applying system updates with opportunistic downtime (e.g. rebooting for a UEFI, Grub, and/or Kernel update outside of peak usage and business hours).
    • Software and hardware communications, including but not limited to:
      • MQTT
      • IPC
        • Unix domain socket
        • Message queue
        • Pipe
      • USB
      • TTL (via USB)
  2. Touchscreen application
    • Shared messaging library (e.g. Unix Domain Socketfile Client).
    • Administrative functions and hardware diagnostics.
    • Interfacing to mobile devices via QR Codes.

Please note: this module should not be considered production-ready, and many improvements are still underway.

Since friends have indicated that they could use this for some of their projects, I'm releasing this far earlier than I probably should. If you find an error or have a suggestion, please consider opening a bug report, or better yet, submitting a pull request.


Usage

Get

To download Polycephaly:

Install from PyPI via pip:

$ pip install polycephaly

or

Install from this repository via pip:

$ pip install git+ssh://git@gitlab.com/ltgiv/polycephaly.git

or

Clone this repository:

$ git clone https://gitlab.com/ltgiv/polycephaly.git

Grok

Polycephaly is 2 components that are the same derived class for consistency. Thus, every process is interchangeable.

Components

At the very least, there is the main process, and then there are sub-processes.

  1. Main process Since many TUI/GUI frameworks (e.g. Urwid, curses, Kivy, pyglet, PyQt, Tkinter, et al.) work better as (or have to be) the main process, this easily allows for that. If this process ends, all of the sub-processes are shutdown.

  2. Sub-processes These answer to the main process. When setting up your application, each sub-process has a mode that can be toggled between threaded or forked, with the former being the default, and the latter better used for CPU-intensive tasks (e.g. training a Machine Learning model) or where required (e.g. rospy requires this.)


For the sub-processes whose operating modes are set to forked, they will operate independently, under a separate Python process, and cannot share existing objects (e.g. dictionaries) like they can if they were threaded. This is where the email-like communications of Polycephaly shine, as it allows you to pass serializable objects.

If you're wanting to share an object such as a Network or Database connection, you'd simply devote a sub-process to this task, and then use Polycephaly's communications from other processes for carrying out their requests, including the ability to wait for replies.


Methods

The lifetime of a process has 3 stages:

  1. birth( self ) - You can think of this as a constructor. This carries out preliminary actions (e.g. initializing hardware or connecting to a database) before looping the method, life().
    • args - a tuple of arguments that are passed from the application's build side to the process.
    • kwargs - a dictionary of keyword arguments that are passed from the application's build side to the process.
  2. life( self ) - This is the entirety of the process. When building a Polycephaly-based application, you can adjust a global and/or local frequency value, which dictates how often this method should be run. For example, if you set this value to 30, Polycephaly would attempt to run this method 30 times per second with a sleep time after each run of life() defined by time.sleep( 1 / 30 )
  3. death( self ) - You can think of this as a destructor. This carries out the cleanup portion of a process (e.g. finalizing database transactions and then closing the database connection) before notifying the main process that it has reached its end of life, and has fully shutdown.

Many helper methods are inherited from the parent class for use in each process definition. The API documentation provides more information, but some of the common ones to pay attention to, are:

  • frequency( i=None ) - Without an argument provided, this will return the current integer for the local process' frequency. With an argument, this will write an integer to be used for a frequency. An example for increasing and decreasing frequency if an instance boolean is toggled:

    if self.stayAlert and self.frequency() < 60:
        self.frequency( 60 )
    
    elif self.frequency() != 30:
        self.frequency( 30 )
    
  • mailman() - With no arguments provided, this checks for new messages destined for the process on the internal message bus from the main process or a sub-process.

    • If a filter matches a message, the callback is then executed. This is normally a blocking event, but callbacks can be spun off into threaded or forked processes if so desired.
    • Only one message is read from the process' queue per run.
    • Only run once per loop of life().
    • This can be easily extended to cover other message queues and routes, such as checking for (JSON-based) messages received from MQTT, XMPP, or USB, for example.
  • die() - Used by a process to shut itself down, and make its way towards death() as the final step.

    During the build phase of your application, a poison pill is generated, which is simply a UUID saved as a string. A message filter is then automatically added for each process, that looks for this value in the subject line on the internal route. When a process receives this value from the main process or a sub-process, the receiving process will run its die() method.

  • ebrake( reason=None ) - As the name implies, this Emergency Brake method allows any process within the application to send a request to the main process, requesting an immediate shutdown of the application, and allows for an optional reason that will be shown as a part of the shutdown message.

  • send() - Send a message from one process to another. An example:

    self.send(
        recipient   =   "main",
        subject     =   "salutation",
        body        =   "Hello, World!"
    )
    
  • waitForReply() - Blocking event with an option for timing out, that will wait for a response from a recipient, before proceeding. An example:

    message =   self.send(
                    recipient   =   "main",
                    subject     =   "salutation",
                    body        =   "Hello, World!"
                )
    
    reply   =   self.waitForReply( message, timeout=10 )
    

Go!

This is a high-level overview of the "Hello, World!" example. The examples directory is usually the best place to start from, for fully functioning code:

Build

launch.py:

#!/usr/bin/env python -u
# -*- coding: utf-8 -*-

class Application( polycephaly.Application ):

    def build( self ):

        # Update global frequency
        self.globalFrequency( 15 )              # Run fifteen times per second.

        # Add process : Hello, World!
        self.addProcess(

            processes.helloWorld,               # If the default class name of `Process` is used, it doesn't need to be specified here.

            # Arguments to pass through to the process.
            'Arg1',
            'Arg2',

            # Keyword arguments to pass through to the process.
            abc             =   123,
            xyz             =   789,

            # Process Parameters
            name            =   'Hello',        # Override the default name of `helloWorld` with a shorter name of `hello`.
            mode            =   'Thread',       # Run the process as a thread.
            frequency       =   1 / 5,          # Update the local frequency to run once every 5 seconds.
            autostart       =   True,           # This is default behavior, with the alternative being to setup a process, and then start it at a later time.
            boundShutdown   =   False,          # Run independently without binding to main process.

        )

        pass # END METHOD : Build

    pass # END CLASS : Application

if __name__ == '__main__':

    logger.notice( "Start : 'Hello, World!'." )

    Application(

        # Add process : Main
        processes.main,
        name            =   'Main',         # Set the name of the process that we refer to, or specify an added process as main.
        ppill           =   'STOP!',        # Case-sensitive poison pill.
        queueSize       =   25,             # Maximum number of messages to keep in each queue.
        queueType       =   'FIFO',         # FIFO or Priority message queue.
        frequency       =   5,              # Update the local frequency to run five times per second.
        forceStop       =   False,          # Allow the process to ignore repeated shutdown requests.
        threadsTimeout  =   30,             # Application will wait on threads for this long.

    ).run()

    logger.notice( "Stop : 'Hello, World!'." )

    pass # END MAIN

helloWorld.py

#!/usr/bin/env python -u
# -*- coding: utf-8 -*-

class Process( polycephaly.Process ):

    def life( self ):

        # Send a message to the Main process.
        message     =   self.send(

                            # Message parameters
                            recipient   =   'main',
                            subject     =   'salutation',
                            body        =   'Hello, World!',

                            # Extra message headers
                            args        =   self.args,
                            kwargs      =   self.kwargs,

                        )
        logger.debug( f"'{ self.name }' sent a message to '{ message[ 'recipient' ] }':\n{ pf( message ) }" )

        # Wait for reply from the Main process.
        reply       =   self.waitForReply( message, timeout=10 )
        logger.debug( f"'{ self.name }' received a reply from '{ reply.get( 'sender' ) }':\n{ pf( reply ) }" )

        # Check for new messages, and run appropriate callbacks.
        self.mailman()

        pass # END METHOD : Life

    pass # END CLASS : PROCESS : Hello, World!

main.py

#!/usr/bin/env python -u
# -*- coding: utf-8 -*-

class Process( polycephaly.Process ):

    # This is a callback method that replies back to a message.
    def salutation( self, message ):

        logger.debug( f"'{ self.name }' received a salutation message from '{ message[ 'sender' ] }':\n{ pf( message ) }" )

        # Respond to a received message.
        reply   =   self.reply(

                        # Message parameters
                        message,
                        body    =   "Hi, thanks for writing.",

                        # Extra message headers
                        acme    =   123,

                    )
        logger.debug( f"'{ self.name }' sent a reply to '{ reply[ 'recipient' ] }':\n{ pf( reply ) }" )

        pass # END CALLBACK : Salutation

    def birth( self ):

        # Add a message filter for case-insensitive matching of "salutation" in the subject line, with self.salutation() set as the callback method.
        self.addFilter(
            subject     =   r'(?i)^SALUTATION$',
            callback    =   self.salutation,
        )

        pass # END METHOD : Birth

    pass # END CLASS : PROCESS : Main

Run

Running this application is as simple as ~/helloWorld/launch.py

Now, every 5-6 seconds, you'll see output on your console:

[1970-01-01 00:00:00.00] DEBUG: processes.helloWorld: 'hello' sent a message to 'main':
{
    'args': ('Arg1', 'Arg2'),
    'body': 'Hello, World!',
    'kwargs': {'abc': 123, 'xyz': 789},
    'messageid': '9da03bcf-af3d-49ad-a9d4-17617c974de6',
    'recipient': 'main',
    'sender': 'hello',
    'subject': 'salutation',
    'threadid': '05cc7e85-f291-4c63-af34-6d47b8fc2594',
    'threadindex': 1,
    'time': 123456789.0000
}

[1970-01-01 00:00:00.05] DEBUG: processes.main: 'main' received a salutation message from 'hello':
{
    'args': ('Arg1', 'Arg2'),
    'body': 'Hello, World!',
    'kwargs': {'abc': 123, 'xyz': 789},
    'messageid': '52c7cca0-0f0a-4791-a1d8-7de96d5b2c68',
    'recipient': 'main',
    'sender': 'hello',
    'subject': 'salutation',
    'threadid': '05cc7e85-f291-4c63-af34-6d47b8fc2594',
    'threadindex': 1,
    'time': 123456789.0005
}

[1970-01-01 00:00:00.10] DEBUG: processes.main: 'main' sent a reply to 'hello':
{
    'acme': 123,
    'args': ('Arg1', 'Arg2'),
    'body': 'Hi, thanks for writing.',
    'kwargs': {'abc': 123, 'xyz': 789},
    'messageid': '270c8371-0467-4648-9630-5376e33ababa',
    'recipient': 'hello',
    'sender': 'main',
    'subject': 'reply',
    'threadid': '05cc7e85-f291-4c63-af34-6d47b8fc2594',
    'threadindex': 2,
    'time': 123456789.0010
}

[1970-01-01 00:00:00.15] DEBUG: processes.helloWorld: 'hello' received a reply from 'main':
{
    'acme': 123,
    'args': ('Arg1', 'Arg2'),
    'body': 'Hi, thanks for writing.',
    'kwargs': {'abc': 123, 'xyz': 789},
    'messageid': '270c8371-0467-4648-9630-5376e33ababa',
    'recipient': 'hello',
    'sender': 'main',
    'subject': 'reply',
    'threadid': '05cc7e85-f291-4c63-af34-6d47b8fc2594',
    'threadindex': 2,
    'time': 123456789.0015
}

Explanation

Due to the way that we set the frequencies above, the main process is checking for messages five times per second, while as the "Hello, World!" sub-process is sending one message every 5 seconds.

Besides arbitrary headers that you can add (e.g. args and kwargs in the call to send() above), additional headers are added to this message (e.g. Sender, Time in UTC, Message ID, and Thread ID) which can be used for other purposes, such as waiting for a reply. Just like e-mail, the Message ID is different, but the Thread ID is the same, and the Thread Index has incremented.

Contributors

Have something to contribute to this project? Please see the document, How to contribute.

Statistics

Pipeline Status

Downloads

Downloads

Downloads

Links

License

MIT License

Copyright (c) 2019 Louis T. Getterman IV

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

Written with StackEdit.