msgpackstream

A SAX-like MessagegPack library in python to deserialize messages from an input stream


Keywords
python
License
MIT
Install
pip install msgpackstream==2.0.5

Documentation

msgpack-pystream

A SAX-like MessagegPack library in python to deserialize messages from an input stream.

MessagePack

MessagePack is a serializtion/deserialization libary like JSON that uses binary format. Data in message consist of segments. Each segment contain at least 1 byte of header and possible additonal bytes to hold length and value of the data.

Msgpack-python

The official u-msgpack-python library provides a simple to use API to serialize/deserialize objects toand from msgpack binary data. However, this API doesn't provide a stream API that can be used for Big Data.

About

This library provides a MessagePack SAX-like API to deserialize msgpack binary data from an input stream. This enables utilization of mssgpack in big data environment. Msgpackstream generates events upon parsing of a binary stream and uppon receiving segments of the data. It does not require the complete data to be ready and exist in memory. As a matter of fact it doesn't require the complete data to be ready at all. It can process existing data and buffer the segments that still require additional bytes to be processed. Futhermore, the library has a easy to read state based and template based implementation that can be easily understood.

Installation

Msgpackstream can be installed using pip and is supports both python 2.x and 3.x

    pip install msgpackstream

Usage

Simple msgpackstream usage is to unpack an inputstream

    from msgpackstream.stream import unpack
    ..
    ..
    events = unpack(instream, buffersize)

The instream is an inputstream and buffersize is an int value indicating the buffer size to read from the stream. While, the method allows passing the buffersize, this argument is optional and the default value is 1000.

A more flexible way of using msgpackstream is to use the StreamUnpacker class as shown bellow:

    from msgpackstream.stream import StreamUnpacker
    ..
    ..
    unpacker = StreamUnpacker()
    bytes = read()
    while bytes:
        unpacker.process(bytes)
        events = unpacker.generate_events()
        for event in events:
            print(event)                               
    ...

Each event is a tuple of 4 different information inspired by ijson JSON library that can be processed as followed additional to using index:

    for prefix, eventtype, formattype, value in events:
        print(eventtype)

THe prefix is an array of string indicating the path recursive path to the current segment. For example in a map the prefix will contain the name of the property for the property value segment. Eventtype is an enum contain the type of deserialization event. Formattype, corresponds to the type of the segment based on the official documentation. Finally value contains the value if the event type is VALUE, MAP_PROPERTY_NAME or EXT.

For more details and working examples read here

Event Type

Msgpackstream generates various events based on the data being parsed. The different types of events is recorded in an Enum as followed:

class EventType(Enum):
    
    VALUE = 1                   #value event
    ARRAY_START = 2             #event that indicates start of an array
    ARRAY_END = 3               #event that indicates end of an array
    MAP_START = 4               #event that indicates start of a map
    MAP_END = 5                 #event that indicates end of a map
    MAP_PROPERTY_NAME = 6       #event that indicates property name
    EXT = 7                     #event that indicates ext value

FormatType

Currently, msgpackstream supports all format types in msgpack official specification as of Sep 18 2017. The full list can be found here

However, if event type is EXT, the formattype will be an instance of the class ExtType to hold and additional ext code indicating the type of the extension:

class ExtType():
    '''
    Class for Extention Type including format type, template and length in the header
    '''
    
    def __init__(self, formattype, extcode):
        self._formattype = formattype
        self._extcode = extcode
        
    ...
    ...
    
    formattype = property(get_formattype, set_formattype)
    extcode = property(get_extcode, set_extcode)
    

Extension Type

Msgpackstream allows handling of extension types in two different ways:

  • Receiving a byte[] within and EXT event that can be processed
  • Registering a custom ExtParser class that can be used to parse the byte[] within the buffer

The latter is slightly more efficient as it prevents an extra step of copying byte[] into an event

class   ExtTypeParser():
    __metaclass__ = ABCMeta
    
        
    @abstractmethod
    def deserialize(self, exttype, buff, start , end):
        '''
            Should be implemented for every user defined extension type
        :param data:
        '''
    @abstractmethod
    def handled_extcode(self):
        pass

The handled_extcode class should simply return the extcode number that is handled by the parser implmentation. However, the main method is the deserialize method that received exttype as explained in the previous section, buffer and the range that value can be found in. Finally an instance of the ExtTypeParser can be registered using the following methods:

    parser = MyExtParser()
    unpacker = StreamUnpacker()
    unpacker.register(parser)

or

    parser = MyExtParser()
    unpack(instream, buffersize, [praser])

TODO

  • Optimize byte scanner
  • Add additional unit test for more complecated messages

Related Projects and Links

  • Check out my parallel project to create some sort of easy to use Python Object Parser (POP)
  • Check out official msgpack package u-msgpack-python
  • Another third msgpack package for python msgpack-python
  • Latest msgpack specification link

Credits:

Thanks to everyone who contributed to the msgpack. Additionaly the interface of the api was inspired by the simple interface of ijson library.