distributed-worker

A Python wrapper around multiprocessing for easy cross-machine computation


License
BSD-2-Clause
Install
pip install distributed-worker==1.3.7

Documentation

distributed-worker

A Python 3.7 wrapper around multiprocessing for easy cross-machine computation

What is this?

Distributed Worker is a wrapper around the multiprocessing package allowing you to focus on the distributed computations instead of managing nodes/communication. You can use regular python primitives to communicate just as in multiprocessing, but you can also use remote machines as workers.

How does it work?

You create a DistributedManager which creates a central server for your workers to connect to. You can create some logic around it to manage tasks/messages.

Then you implement one or multiple DistributedWorkers to handle those tasks.

And now you can run your distributed computations.

Example Setup

Check /example.py for a fully functional version

Let's say we want a distributed prime checker, here is a very naive implementation:

def is_prime(x):
  for num in range(2, x):
    if (x % num) == 0:
      return False
  return True

If we want to check a large amount of primes that would take ages, so we want to distribute the task.

To be clear this task is likely overkill for a distributed setup, but it's a good example

First we will create a task server:

from distributed_worker import DistributedManager
class PrimeManager(DistributedManager):
  def __init__(self):
    super().__init__()
    self.pending = list(range(150000))
    self.results = {}
    self.tasked = set()
    self.chunks = 1000 # Task size

  def loop(self):
    # No tasks pending, wait on exit
    if len(self.pending) < 1:
        return

    # Assign tasks to workers
    active = set(self.get_active_workers())
    available_workers = active - self.tasked
    for x in available_workers:
      chunk = min(len(self.pending), self.chunks)
      # Send chunks (1000) of numbers for processing
      task = self.pending[:chunk]
      self.pending = self.pending[chunk:]
      print('Send task %d-%d to worker %d' % (task[0], task[-1], x))
      self.send(x, task)
      self.tasked.add(x)

  def on_new_worker(self, worker: int):
    print('New worker added %d' % worker)

  def on_worker_disconnect(self, worker: int):
    print('Worker disconnected %d' % worker)

  def handle_msg(self, worker: int, msg: Any):
    # Worker finished it's task
    for num in msg:
      self.results[num] = msg[num]
    self.tasked.remove(worker)

if __name__ == "__main__":
    manager = PrimeManager()

    # For adding remote workers
    print('client creds:', manager.get_client_args())

    while manager.tasked or manager.pending:
      manager.run_once()
      # Can do other tasks here as well

    # Print the results
    print(manager.results)

    # Stop all workers
    manager.stop()

Then we'll create a worker:

from distributed_worker import DistributedWorker
class PrimeWorker(DistributedWorker):
  def __init__(self, pipe, *args, **kwargs):
    super().__init__(pipe)
    self.task = []
    self.results = {}

  def loop(self):
    # Ideally keep the executing here within 1 hour or adjust TTL on the server
    if len(self.task):
      # Tasks available
      ctask = self.task.pop()
      self.results[ctask] = is_prime(ctask)
    else:
      # Finished tasks
      if len(self.results):
        self.send(self.results)
        # Clear results so we don't resend
        self.results = {}
  
  def handle_msg(self, msg):
    if type(msg) == list:
      self.task = msg

Now we can create distributed workers gallore:

from distributed_worker import create_remote_worker
# import PrimeWorker as well

# Example, check console of server
client_args = (('localhost', 6000), 'AF_INET', b'secret password') 

# Create 10 workers (10 threads)
for x in range(10):
  # Add PrimeWorker constructor args if needed
  create_remote_worker(PrimeWorker, client_args)

If we just want to utilize our local CPU cores (and not deal with the authentication) we can create workers through the manager:

manager = DistributedManager()
# Again, add PrimeWorker constructor args if needed
manager.create_local_worker(PrimeWorker)

The total execution time will generally improve as tasks need more time to complete

Running example.py vs a single-threaded version:

$ time example.py
...
real    0m11.631s
user    0m57.648s
sys     0m0.395s

$ time single.py
real    0m43.581s
user    0m43.550s
sys     0m0.018s