Threading and multiprocessing eye-candy.


Keywords
thread, process, pool, decorator, asyncio, decorators, multiprocessing, python, threading
License
Other
Install
pip install Pebble==5.0.7

Documentation

Pebble

Pebble provides a neat API to manage threads and processes within an application.

Source: https://github.com/noxdafox/pebble
Documentation: https://pebble.readthedocs.io
Download: https://pypi.org/project/Pebble/

Build Status Documentation Status PyPI - Downloads

Examples

Run a job in a separate thread and wait for its results.

from pebble import concurrent

@concurrent.thread
def function(foo, bar=0):
    return foo + bar

future = function(1, bar=2)

result = future.result()  # blocks until results are ready

Same code with AsyncIO support.

import asyncio

from pebble import asynchronous

@asynchronous.thread
def function(foo, bar=0):
    return foo + bar

async def asynchronous_function():
    result = await function(1, bar=2)  # blocks until results are ready
    print(result)

asyncio.run(asynchronous_function())

Run a function with a timeout of ten seconds and deal with errors.

from pebble import concurrent
from concurrent.futures import TimeoutError

@concurrent.process(timeout=10)
def function(foo, bar=0):
    return foo + bar

future = function(1, bar=2)

try:
    result = future.result()  # blocks until results are ready
except TimeoutError as error:
    print("Function took longer than %d seconds" % error.args[1])
except Exception as error:
    print("Function raised %s" % error)
    print(error.traceback)  # traceback of the function

Pools support workers restart, timeout for long running tasks and more.

from pebble import ProcessPool
from concurrent.futures import TimeoutError

TIMEOUT_SECONDS = 3

def function(foo, bar=0):
    return foo + bar

def task_done(future):
    try:
        result = future.result()  # blocks until results are ready
    except TimeoutError as error:
        print("Function took longer than %d seconds" % error.args[1])
    except Exception as error:
        print("Function raised %s" % error)
        print(error.traceback)  # traceback of the function

with ProcessPool(max_workers=5, max_tasks=10) as pool:
    for index in range(0, 10):
        future = pool.schedule(function, index, bar=1, timeout=TIMEOUT_SECONDS)
        future.add_done_callback(task_done)