async-patterns

pattern for registering callbacks


License
MIT
Install
pip install async-patterns==0.3b14

Documentation

async_patterns

https://travis-ci.org/chuck1/async_patterns.svg?branch=master Documentation Status

Useful python patterns using async.

The module contains the following submodules:

Install

pip3 install async_patterns

Test Source

git clone git@github.com:chuck1/async_patterns
cd async_patterns
pip3 install -e .
pytest

Examples

.. testsetup::

    import asyncio
    import functools
    from async_patterns import Callbacks
    from async_patterns.coro_queue import CoroQueue

    loop = asyncio.get_event_loop()

Callbacks

Code:

.. testcode::

    cb = Callbacks()

    l = []

    def func(a): print(a)

    cb.add_callback(functools.partial(func, 1))
    cb.add_callback(functools.partial(func, 2))

    cb()

Output:

.. testoutput::

    1
    2

Code:

.. testcode::

    cb = Callbacks()

    l = []

    async def func(a): print(a)

    cb.add_callback(functools.partial(func, 1))
    cb.add_callback(functools.partial(func, 2))

    loop.run_until_complete(cb.acall())

Output:

.. testoutput::

    1
    2

CoroQueue

.. testcode::

    async def a(i):
        await asyncio.sleep(1)
        print(i)

    async def b(i):
        print(i)

    q = CoroQueue(loop)

    q.schedule_run_forever()

    q.put_nowait(a, 1)
    q.put_nowait(b, 2)

    loop.run_until_complete(q.join())

    loop.run_until_complete(q.close())

.. testoutput::

    1
    2