haxe-concurrent

A haxelib with cross-platform concurrency functions and classes (thread-pool, task executor/scheduler, re-entrant lock, semaphore, atomic int/bool, thread-safe collections/queues)


Keywords
cpp, cross, cs, flash, hashlink, hl, java, javascript, js, lua, neko, nodejs, phantomjs, php, python, utility, vegardit, concurrency, executor, haxe, haxelib, lock, scheduler, thread, thread-pool
License
Apache-2.0
Install
haxelib install haxe-concurrent 2.1.0

Documentation

haxe-concurrent - cross-platform concurrency support

Build Status Release License Contributor Covenant

  1. What is it?
  2. hx.concurrent.atomic package
  3. hx.concurrent.collection package
  4. hx.concurrent.executor package
  5. hx.concurrent.event package
  6. hx.concurrent.lock package
  7. hx.concurrent.thread package
  8. Installation
  9. Using the latest code
  10. License
  11. Alternatives

What is it?

A haxelib that provides some basic platform agnostic concurrency support.

All classes are located in the package hx.concurrent or below.

The library has been extensively unit tested (over 400 individual test cases) on the targets C++, C#, Flash, HashLink, Java, JavaScript (Node.js and PhantomJS), Lua, Neko, PHP 7 and Python 3.

Note:

  • When targeting Flash the option -swf-version 11.5 (or higher) must be specified, otherwise you will get Class flash.concurrent::Condition could not be found.
  • When targeting C# the option -D net-ver=45 must be specified, otherwise you may get error CS0234: The type or namespace name 'Volatile' does not exist in the namespace 'System.Threading'. Are you missing an assembly reference?

Haxe compatiblity

haxe-concurrent Haxe
1.0.0 to 1.2.0 3.2.1 or higher
2.0.0 to 2.1.3 3.4.2 or higher
3.0.0 or higher 4.0.5 or higher

The hx.concurrent.atomic package

The hx.concurrent.atomic package contains mutable value holder classes that allow for thread.safe manipulation:

The hx.concurrent.collection package

The hx.concurrent.collection package contains thread-safe implementations of different types of collections:

The hx.concurrent.executor package

The hx.concurrent.executor package contains Executor implementations that allow to execute functions concurrently and to schedule tasks for later/repeated execution.

On platform with the thread support (C++, C#, Eval, HashLink, Neko, Python, Java) threads are used to realize true concurrent execution, on other platforms haxe.Timer is used to at least realize async execution.

import hx.concurrent.executor.Schedule;
import hx.concurrent.executor.Executor;

class Test {

   static function main() {
      var executor = Executor.create(3);  // <- 3 means to use a thread pool of 3 threads on platforms that support threads
      // depending on the platform either a thread-based or timer-based implementation is returned

      // define a function to be executed concurrently/async/scheduled (return type can also be Void)
      var myTask=function():Date {
         trace("Executing...");
         return Date.now();
      }

      // submit 10 tasks each to be executed once asynchronously/concurrently as soon as possible
      for(i in 0...10) {
         executor.submit(myTask);
      }

      executor.submit(myTask, ONCE(2000));            // async one-time execution with a delay of 2 seconds
      executor.submit(myTask, FIXED_RATE(200));       // repeated async execution every 200ms
      executor.submit(myTask, FIXED_DELAY(200));      // repeated async execution 200ms after the last execution
      executor.submit(myTask, HOURLY(30));            // async execution 30min after each full hour
      executor.submit(myTask, DAILY(3, 30));          // async execution daily at 3:30
      executor.submit(myTask, WEEKLY(SUNDAY, 3, 30)); // async execution sundays at 3:30

      // submit a task and keep a reference to it
      var future = executor.submit(myTask, FIXED_RATE(200));

      // check if a result is already available
      switch(future.result) {
         case SUCCESS(value, time, _): trace('Successfully execution at ${Date.fromTime(time)} with result: $value');
         case FAILURE(ex, time, _):    trace('Execution failed at ${Date.fromTime(time)} with exception: $ex');
         case NONE(_):                 trace("No result yet...");
      }

      // check if the task is scheduled to be executed (again) in the future
      if(!future.isStopped) {
         trace('The task is scheduled for further executions with schedule: ${future.schedule}');
      }

      // cancel any future execution of the task
      future.cancel();
   }

The hx.concurrent.event package

The hx.current.event package contains classes for type-safe event dispatching.

import hx.concurrent.event.AsyncEventDispatcher;
import hx.concurrent.event.SyncEventDispatcher;
import hx.concurrent.Future;
import hx.concurrent.executor.Executor;

class Test {

   static function main() {
      /**
       * create a dispatcher that notifies listeners/callbacks synchronously in the current thread
       */
      var syncDispatcher = new SyncEventDispatcher<String>(); // events are of type string

      // create event listener
      var onEvent = function(event:String):Void {
         trace('Received event: $event');
      }

      syncDispatcher.subscribe(onEvent);

      // notify all registered listeners synchronously,
      // meaning this method call blocks until all listeners are finished executing
      syncDispatcher.fire("Hey there");

      /**
       * create a dispatcher that notifies listeners ansychronously using an execturo
       */
      var executor = Executor.create(5); // thread-pool with 5 threads
      var asyncDispatcher = new AsyncEventDispatcher<String>(executor);

      // create event listener
      var onAsyncEvent = function(event:String):Void {
         trace('Received event: $event');
      }

      // notify all registered listeners asynchronously,
      // meaning this method call returns immediately
      asyncDispatcher.fire("Hey there");

      // fire another event and get notified when all listeners where notified
      var future = asyncDispatcher.fire("Boom");

      asyncDispatcher.subscribe(onAsyncEvent);

      future.onResult = function(result:FutureResult<Dynamic>) {
         switch(result) {
            case SUCCESS(count, _): trace('$count listeners were successfully notified');
            case FAILURE(ex, _): trace('Event could not be delivered because of: $ex');
            case NONE(_): trace("Nothing is happening");
          }
      };

   }
}

The hx.concurrent.lock package

The hx.concurrent.lock package contains lock implementations for different purposes:

The hx.concurrent.thread package

The hx.concurrent.thread package contains classes for platforms supporting threads:

  • ThreadPool - basic thread-pool implementation supporting C++, C#, HashLink, Neko, Java and Python. For advanced concurrency or cross-platform requirements use Executor instead.

    import hx.concurrent.thread.*;
    
    class Test {
    
       static function main() {
          var pool = new ThreadPool(4); // 4 concurrent threads
    
          pool.submit(function(ctx:ThreadContext) {
             // do some work here
          });
    
          pool.awaitCompletion(30 * 1000); // wait 30 seconds for all submitted tasks to be processed
    
          pool.cancelPending(); // cancels execution of all currently queued tasks
    
          // initiate graceful stop of all running threads, i.e. they finish the current tasks they process
          // execution of all other queued tasks is cancelled
          pool.stop();
       }
    }
  • Threads

Installation

  1. install the library via haxelib using the command:

    haxelib install haxe-concurrent
    
  2. use in your Haxe project

    • for OpenFL/Lime projects add <haxelib name="haxe-concurrent" /> to your project.xml
    • for free-style projects add -lib haxe-concurrent to your *.hxml file or as command line option when running the Haxe compiler

Using the latest code

Using haxelib git

haxelib git haxe-concurrent https://github.com/vegardit/haxe-concurrent master D:\haxe-projects\haxe-concurrent

Using Git

  1. check-out the master branch

    git clone https://github.com/vegardit/haxe-concurrent --branch master --single-branch D:\haxe-projects\haxe-concurrent
    
  2. register the development release with haxe

    haxelib dev haxe-concurrent D:\haxe-projects\haxe-concurrent
    

Using Subversion

  1. check-out the trunk

    svn checkout https://github.com/vegardit/haxe-concurrent/trunk D:\haxe-projects\haxe-concurrent
    
  2. register the development release with haxe

    haxelib dev haxe-concurrent D:\haxe-projects\haxe-concurrent
    

License

All files are released under the Apache License 2.0.

Individual files contain the following tag instead of the full license text:

SPDX-License-Identifier: Apache-2.0

This enables machine processing of license information based on the SPDX License Identifiers that are available here: https://spdx.org/licenses/.

Alternatives

Other libraries addressing concurrency/parallism: