Lightweight thread and asyncio task library


Keywords
asyncio, event, interval, python3, queue, thread-pool, threadpoolexecutor, workers
License
MIT
Install
pip install neotasker==0.0.13

Documentation

neotasker

Lightweight Python library for modern thread / multiprocessing pooling and task processing via asyncio.

Neotasker is lightweight variation of atasker library: tasks don't have priorities, go directly to ThreadPoolExecutor and are standard Python future objects. This library is useful for the high-load projects with lightweight tasks as majority tasks are directly proxied to pool.

Neotasker works on top of ThreadPoolExecutor and asyncio and provides additional features:

  • Easy thread pool and asyncio loops initialization
  • Interval, queue and event-based workers
  • Built-in integration with aiosched

Install

pip3 install neotasker

Sources: https://github.com/alttch/neotasker

Documentation: https://neotasker.readthedocs.io/

Code examples

Start/stop

from neotasker import task_supervisor

# set pool size
# min_size='max' means pre-spawn all pool threads
task_supervisor.set_thread_pool(min_size='max', max_size=20)
task_supervisor.start()
# ...
# start workers, other threads etc.
# ...
# optionally block current thread
task_supervisor.block()

# stop from any thread
task_supervisor.stop()

Executing future

You may work with neotasker.thread_pool object directly or use task_supervisor.spawn function, which's directly mapped to thread_pool.submit)

from neotasker import thread_pool

thread_pool.start()

def mytask(a, b, c):
    print(f'I am working in the background! {a} {b} {c}')
    return 777

task = task_supervisor.spawn(mytask, 1, 2, c=3)

# get future result
result = task.result()

Creating async io loop

from neotasker import thread_pool

thread_pool.start()
task_supervisor.create_aloop('default', default=True)

# The loop will until supervisor is stopped
# Spawn coroutine from another thread:

task_supervisor.get_aloop().spawn_coroutine_threadsafe(coro)

Worker examples

from neotasker import background_worker, task_supervisor

task_supervisor.start()
# we need to create at least one aloop to start workers
task_supervisor.create_aloop('default', default=True)
# create one more async loop
task_supervisor.create_aloop('loop2')

@background_worker
def worker1(**kwargs):
    print('I am a simple background worker')

@background_worker
async def worker_async(**kwargs):
    print('I am async background worker')

@background_worker(interval=1, loop='loop2')
def worker2(**kwargs):
    print('I run every second!')

@background_worker(queue=True)
def worker3(task, **kwargs):
    print('I run when there is a task in my queue')

@background_worker(event=True)
def worker4(**kwargs):
    print('I run when triggered')

worker1.start()
worker_async.start()
worker2.start()
worker3.start()
worker4.start()

worker3.put_threadsafe('todo1')
worker4.trigger_threadsafe()

from neotasker import BackgroundIntervalWorker

class MyWorker(BackgroundIntervalWorker):

    def run(self, **kwargs):
        print('I am custom worker class')

worker5 = MyWorker(interval=0.1, name='worker5')
worker5.start()