callnest

Yet another future/promise/task/asynchronous library


Keywords
cross
License
MIT
Install
haxelib install callnest 0.2.0

Documentation

Callnest

Callnest is, yet another, future/promise/task/asynchronous library for Haxe. However, the goal is to provide a consistent and unified interface for bringing your own async implementations and building on top of them.

Why

There are (too) many async Haxe libraries, such as Promhx, thx.promise, tink_core, and hxbolts, which provide implementations of an asynchronous pattern to various degrees. Some provide event loop and background execution support, while some provide exception handling. A good starting read is this blog post series.

They function well, but it becomes apparent that they can be lacking when once you have used a more powerful language supporting coroutines with async and await keywords (as known as the "Blub paradox").

async and await isn't in Haxe, but that does not mean it should stop us from incorporating good async design and patterns.

Using the library

Install using haxelib:

    haxelib install callnest

The library deals with two concerns: an interface to asynchronous results and scheduling the execution of asynchronous routines.

Future

The Future interface provides an API to asynchronous results.

To create a Future, create an instance which implements FutureSource. FutureSource is the write API ("promise") to the read API of Future. This pattern allows us to separate the producer and consumer concerns within a function:

function download(url:String):Future<Bytes> {
    var source:FutureSource<Bytes> = TaskDefaults.newFutureSource();

    var httpClient = new HTTPClient();
    httpClient.onFinish = function (event:Event) {
        source.setResult(event.data);
    };
    httpClient.onError = function (event:Event) {
        source.setException(new DownloadException("Failed to download"));
    }
    httpClient.get(url);

    return source.future;
}

In the above example, we attach our callbacks to the hypothetical async HTTP client which either sets the result or an exception to the future. The future from the future source is returned.

Now, we call our function and process the result:

download("http://example.com/data.png").onComplete(function (future:Future)) {
    var data:Bytes;

    try {
        data = future.getResult();
    } catch (exception:DownloadException) {
        reportDownloadFailure();
        return;
    }

    saveDownload(data);
});

The Future.onComplete() method allows us to attach our callback when the future has been resolved with a promise. Future.getResult() either returns the result or throws an exception. We don't have separate callback methods for success or exception to discourage implicit branching.

You can also access the result or exception on the Future.result or Future.exception property. The state of future can be checked by its isCompleted, hasResult, and hasException properties.

Before we continue, what happens if our callback to onComplete() throws an exception? That is up to the implementation to decide. It may let it propagate to the main thread, log the error and continue on, or crash silently. If we want to catch this error, we can wrap our code in a try-catch block, or better yet, add an error handling callback with handleException().

download()
    .onComplete(function (future) {
        1 / 0; // oops!
    })
    .handleException(function (info:ExceptionInfo) {
        trace('Internal error! Something has gone wrong: ${info.exception}');
        shutdown();
    });

In the example above, we add our handler which logs it and then quits the program gracefully with the hypothetical shutdown() method. If this becomes boilerplate, you can set the global error handler on TaskDefaults.handleException.

While it's simple to process a single Future, further processing of Futures will get messy as in the following example:

// Avoid doing this!

download("http://example.com/data.png").onComplete(function (future:Future)) {
    var data:Bytes;

    try {
        data = future.getResult();
    } catch (exception:DownloadException) {
        reportDownloadFailure();
        return;
    }

    reupload(data).onComplete(function (future) {
        var shareLink;

        try {
            shareLink = future.getResult();
        } catch (exception:DownloadException) {
            reportDownloadFailure();
            return;
        }

        sendLink(shareLink).onComplete(
            // [...]

            everythingIsDone();
        )
    });
});

We can avoid this callback hell by rewriting the anonymous functions into named functions, but there are still issues of result and exception propagation.

For example, we could wrap all the code into a single function and change everythingIsDone() to set a result to FutureSource:

// Avoid doing this!

function doAllTheThings():Future<Bool> {
    var futureSource<Bool> = TaskDefaults.newFutureSource();

    download("http://example.com/data.png").onComplete(
        // [...]
        try {
            // [...]
        } catch (exception:Dynamic) {
            futureSource.setException(exception);
            return;
        }

        // [...]

            try {
                // [...]
            } catch (exception:Dynamic) {
                futureSource.setException(exception);
                return;
            }

            futureSource.setResult(true);
        // [...]
    );

    return futureSource.future;
}

Ouch, that looks awful! There's a lot of boilerplate code just to ensure the result and exceptions are propagated to the FutureSource in doAllTheThings(). This can be very error prone. Let's look at using Tasks which will make propagation automatic.

Task

The Task interface extends from Future and provides an API to schedule the task and chain additional tasks.

The usage of Task is similar to a Future:

function download(url:String):Task<Bytes> {
    var source:TaskSource<Bytes> = TaskDefaults.newTaskSource();

    var httpClient = new HTTPClient();
    httpClient.onFinish = function (event:Event) {
        source.setResult(event.data);
    };
    // [...]

    return source.task;
}

download("http://example.com/data.png").onComplete(function (task:Task)) {
    var data:Bytes;

    try {
        data = task.getResult();
    } catch (exception:DownloadException) {
        reportDownloadFailure();
        return;
    }

    // [...]
});

However, doAllTheThings() can be written such that Tasks are chained together:

function doAllTheThings():Task<Bool> {
    return download("http://example.com/data.png")
        .continueWith(function (task:Task<Bytes>):Task<String> {
            var data = task.getResult();
            return reupload(data);
        })
        .continueWith(function (task:Task<String>):Task<Bool> {
            var shareLink = task.getResult();
            return sendLink(shareLink);
        });
}

doAllTheThings()
    .onComplete(function (task:Task<Bool>)) {
        var success;

        try {
            success = task.getResult();
        } catch (exception:DownloadException) {
            trace('Error downloading: $exception');
            return;
        }

        trace('Image shared success: $success');
    });

In the example above, Task.continueWith() accepts a callback that will process the task and return another task to be processed. The chain continues on until a final onComplete().

If an exception is thrown within continueWith(), the exception will be caught and a new Task with the exception will be passed along to a subsequent continueWith() or onSuccess() method calls.

Unused results

If you need to use tasks that don't have a useful return, you can use the VoidReturn enum:

import callnest.VoidReturn;

function doSomething():Task<VoidReturn> {
    // Do work
    source.setResult(Nothing);
}

doSomething()
    .onComplete(doFinishingWork);

Note that using Void may be allowed in some instances, but it may be a bug in Haxe and is not supported in some targets.

Convenience methods for chaining tasks

If you chain tasks that don't use the prior result in this processing, then TaskTools.continueNext may be used:

using callnest.TaskTools;

function doSomething():Task<VoidReturn> {
    // Do work
    source.setResult(VoidReturn);
}

function doSomethingElse():Task<VoidReturn> {
    // Do work
    source.setResult(VoidReturn);
}

doSomething()
    .continueNext(doSomethingElse)
    .onComplete(doFinishingWork);

Additionally, if you chain tasks that don't need fine grain control or exception handling, TaskTools provides "then" methods that skip task boilerplate code:

using callnest.TaskTools;

function doSomething():Task<Int> {
    // Do work
    source.setResult(123);
}


doSomething()
    .thenContinue(function (result:Int) { return result + 1; })
    .thenNext(function () { return 100; })
    .thenResult(200)
    .onComplete(function (task:Task<Int>) {
        trace(task.getResult()); // => 200
    });

Cancellation

Both Future and Task support cancellation. When the cancel() method is called, the instance goes into the cancelled state and further changes to result and exception values will throw an exception.

The source can check whether the task is cancelled before setting the result:

function download(url:String):Future<Bytes> {
    var source:FutureSource<Bytes> = TaskDefaults.newFutureSource();

    var httpClient = new HTTPClient();
    httpClient.onFinish = function (event:Event) {
        if (source.task.isCanceled) {
            return;
        }
        source.setResult(event.data);
    };
    httpClient.onError = function (event:Event) {
        if (source.task.isCanceled) {
            return;
        }
        source.setException(new DownloadException("Failed to download"));
    }
    httpClient.get(url);

    return source.future;
}

The consumer can check when the task completes:

var task = download().onComplete(function (task) {
    if (task.isCanceled) {
        // Skip doing extra work
        return;
    }

    // Do things..
});

task.cancel();

Within Task.completeWith(), implementations can propagate cancellation by two ways:

The first way is catching the exception and passing a canceled task. Each part of the chain can either handle the canceled task or let the exception be thrown again. The builtin implementation uses this way.

The second way is short-circuiting by skipping the execution of the callbacks and only returning a cancelled task. This may perform better, but may cause unintended side-effects if not used correctly.

Running Synchronous Code Asynchronously

Up until now, we have only been wrapping asynchronous routines that are natively supported on the target. If the target supports running synchronous routines (such as blocking IO or intensive computations) in the background (such as in a thread pool), then it would be useful to use the task pattern for this case.

Instead of calling TaskSource.setResult, call TaskSource.run with a callback that returns a value or throws an exception:

function intensiveCalculation():Task<Float> {
    var taskSource = TaskDefaults.newTaskSource();

    taskSource.run(function () {
        // Long running computation here
        // [...]

        return 123.456;
    });

    taskSource.task;
}

In the example above, we do our computation in the run() method. This method will schedule the callback for execution. If we return a value or throw an exception, it will automatically set the result or exception on the task source.

Scheduling Tasks

An implication of running synchronous routines within TaskSource.run is the need to schedule the running of them. When a task contains asynchronous routines, it will use the underling scheduler. When executing TaskSource.run, the library does not know how to execute it asynchronously.

This library aims to be flexible by allowing to specify a scheduler per task or globally.

A Scheduler will do the appropriate action to run the task. Running one or a few tasks on a frame in OpenFL, queueing them to haxe.MainLoop, or submitting to a thread pool executor are such examples.

By default, it will use a scheduler specified on the TaskDefaults class. The builtin default runs everything synchronously. You can change the scheduler described in the section below, or by providing a scheduler instance for methods that accept a scheduler argument.

Bringing your own implementation

You can bring your implementation by implementing the interfaces, using helper classes, or by using static method extensions. Implementing interfaces is suggested for wrapping an existing library or providing an optimized implementation. Helper classes and static extension is suggested for providing additional features not included in this library's API.

To change the scheduler, set the static property TaskDefaults.scheduler with an instance implementing Scheduler. Likewise, the TaskDefaults.newTaskSource() and TaskDefaults.newFutureSource() can be changed.

Extending

If you find that writing boilerplate, it may be useful to rewrite it as a static extension.

class CustomTaskTools {
    public static function trySetResult<T>(taskSource:TaskSource, result:T):Bool {
        if (!taskSource.task.hasResult) {
            taskSource.setResult(result);
            return true;
        } else {
            return false;
        }
    }

    public static function sum(taskTools:Class<TaskTools>, tasks:Array<Float>):Task<Float> {
        var currentSum = 0;
        var callback;

        callback = function (task:Task<Float>) {
            currentSum += task.getResult();

            if (tasks.length == 0) {
                return TaskTools.fromResult(currentSum);
            } else {
                return tasks.pop().continueWith(callback);
            }
        }

        return tasks.pop().continueWith(callback);
    }
}

Then, you can use it with the using keyword:

using CustomTaskTools;

// Static extension over TaskSource instance:
var taskSource = TaskDefaults.newTaskSource();
taskSource.trySetResult(123);

// Static extension over TaskTools class:
TaskTools.sum([task1, task2, task3])
    .onComplete(function (task) {
        var sum = task.getResult();
        trace('The result is $sum');
    });

Thread Safety

The builtin implementations are not thread-safe. That is, futures/tasks and their sources must not be mutated in different threads. Alternatively, you may bring your own thread-safe implementations.

Further reading

For details, see the API documentation at https://chfoo.github.io/callnest/api/

Development

Running tests:

    haxelib haxe hxml/test.neko.hxml && neko out/neko/test.n
    haxelib haxe hxml/test.cpp.hxml && ./out/cpp/TestAll-debug