github.com/samuell/scipipe/proclib

Robust, flexible and resource-efficient pipelines using Go and the commandline


Keywords
bioinformatics, bioinformatics-pipeline, cheminformatics, dataflow, fbp, go, pipeline, scientific-workflows, scipipe, workflow
License
MIT
Install
go get github.com/samuell/scipipe/proclib

Documentation

SciPipe

Build Status GratiPay amount

SciPipe is an experimental library for writing scientific Workflows in vanilla Go(lang). The architecture of SciPipe is based on an flow-based programming like pattern in pure Go as presented in this and this Gopher Academy blog posts.

Benefits

Some benefits of SciPipe that are not always available in other scientific workflow systems:

  • Easy-to-grasp behaviour: Data flowing through a network.
  • Parallel: Apart from the inherent pipeline parallelism, SciPipe processes also spawn multiple parallel tasks when the same process has multiple inputs.
  • Concurrent: Each process runs in an own light-weight thread, and is not blocked by operations in other processes, except when waiting for inputs from upstream processes.
  • Inherently simple: Uses Go's concurrency primitives (go-routines and channels) to create an "implicit" scheduler, which means very little additional infrastructure code. This means that the code is easy to modify and extend.
  • Resource efficient: You can choose to stream selected outputs via Unix FIFO files, to avoid temporary storage.
  • Flexible: Processes that wrap command-line programs and scripts can be combined with processes coded directly in Golang.
  • Custom file naming: SciPipe gives you full control over how file names are produced, making it easy to understand and find your way among the output files of your computations.
  • Highly Debuggable(!): Since everything in SciPipe is plain Go(lang), you can easily use the gdb debugger (preferrably with the cgdb interface for easier use) to step through your program at any detail, as well as all the other excellent debugging tooling for Go (See eg delve and godebug), or just use println() statements at any place in your code. In addition, you can easily turn on very detailed debug output from SciPipe's execution itself, by just turning on debug-level logging with scipipe.InitLogDebug() in your main() method.
  • Efficient: Workflows are compiled into static compiled code, that runs fast.
  • Portable: Workflows can be distributed as go code to be run with the go run command or compiled into stand-alone binaries for basically any unix-like operating system.

Known limitations

  • There is not yet a really comprehensive audit log generation. It is being worked on currently.
  • There is not yet support for the Common Workflow Language, but that is also something that we plan to support in the future.

Connection to flow-based programming

From Flow-based programming, SciPipe uses the ideas of separate network (workflow dependency graph) definition, named in- and out-ports, sub-networks/sub-workflows and bounded buffers (already available in Go's channels) to make writing workflows as easy as possible.

In addition to that it adds convenience factory methods such as sci.Shell() which creates ad hoc processes on the fly based on a shell command pattern, where inputs, outputs and parameters are defined in-line in the shell command with a syntax of {i:INPORT_NAME} for inports, and {o:OUTPORT_NAME} for outports and {p:PARAM_NAME} for parameters.

Getting started: Install

  1. Install Go by following instructions on this page.

    • I typically install to a custom location (~/go for the go tools, and ~/code/go for my own go-projects).
    • If you want to install (which means, untar the go tarball) to ~/go just like me, you should put the following in your ~/.bashrc file:
    # Go stuff
    export GOROOT=~/go
    export GOPATH=~/code/go
    export PATH=$GOROOT/bin:$PATH
    export PATH=$GOPATH/bin:$PATH
  2. Then, install scipipe:

    go get github.com/scipipe/scipipe
  3. Now, you should be able to write code like in the examples below, in files ending with .go.

  4. To run a .go file, use go run:

    go run myfirstworkflow.go

An example workflow

Let's look at a toy-example workflow. First the full version:

package main

import (
    sp "github.com/scipipe/scipipe"
)

func main() {
    // Initialize processes
    fwt := sp.Shell("fooer", "echo 'foo' > {o:foo}")
    f2b := sp.Shell("foo2bar", "sed 's/foo/bar/g' {i:foo} > {o:bar}")
    snk := sp.NewSink() // Will just receive file targets, doing nothing

    // Add output file path formatters for the components created above
    fwt.SetPathFormatStatic("foo", "foo.txt")
    f2b.SetPathFormatExtend("foo", "bar", ".bar")

    // Connect network
    f2b.InPorts["foo"] = fwt.OutPorts["foo"]
    snk.In = f2b.OutPorts["bar"]

    // Add to a pipeline runner and run
    pl := sp.NewPipelineRunner()
    pl.AddProcesses(fwt, f2b, snk)
    pl.Run()
}

And to see what it does, let's put the code in a file test.go and run it:

[samuel test]$ go run test.go
AUDIT   2016/04/28 16:04:41 Task:fooer        Executing command: echo 'foo' > foo.txt.tmp
AUDIT   2016/04/28 16:04:41 Task:foo2bar      Executing command: sed 's/foo/bar/g' foo.txt > foo.txt.bar.tmp

Now, let's go through the code above in more detail, part by part:

Initializing processes

fwt := sci.Shell("foowriter", "echo 'foo' > {o:out}")
f2b := sci.Shell("foo2bar", "sed 's/foo/bar/g' {i:foo} > {o:bar}")
snk := sci.NewSink() // Will just receive file targets, doing nothing

For these inports and outports, channels for sending and receiving FileTargets are automatically created and put in a hashmap added as a struct field of the process, named InPorts and OutPorts repectively, Eash channel is added to the hashmap with its inport/outport name as key in the hashmap, so that the channel can be retrieved from the hashmap using the in/outport name.

Connecting processes into a network

Connecting outports of one process to the inport of another process is then done by assigning the respective channels to the corresponding places in the hashmap:

f2b.InPorts["foo"] = fwt.OutPorts["foo"]
snk.In = f2b.OutPorts["bar"]

(Note that the sink has just one inport, as a static struct field).

Formatting output file paths

The only thing remaining after this, is to provide some way for the program to figure out a suitable file name for each of the files propagating through this little "network" of processes. This is done by adding a closure (function) to another special hashmap, again keyed by the names of the outports of the processes. So, to define the output filenames of the two processes above, we would add:

fwt.PathFormatters["foo"] = func(t *sp.SciTask) string {
    // Just statically create a file named foo.txt
    return "foo.txt"
}
f2b.PathFormatters["bar"] = func(t *sp.SciTask) string {
    // Here, we instead re-use the file name of the process we depend
    // on (which we get on the 'foo' inport), and just
    // pad '.bar' at the end:
    return f2b.GetInPath("foo") + ".bar"
}

Formatting output file paths: A nicer way

Now, the above way of defining path formats is a bit verbose, isn't it? Luckily, there's a shorter way, by using convenience methods for doing the same thing. So, the above two path formats can also be defined like so, with the exact same result:

// Create a static file name for the out-port 'foo':
fwt.SetPathFormatStatic("foo", "foo.txt")

// For out-port 'bar', extend the file names of files on in-port 'foo', with
// the suffix '.bar':
f2b.SetPathFormatExtend("foo", "bar", ".bar")

Running the pipeline

So, the final part probably explains itself, but the pipeline runner component is a very simple one that will start each component except the last one in a separate go-routine, while the last process will be run in the main go-routine, so as to block until the pipeline has finished.

pl := sci.NewPipelineRunner()
pl.AddProcesses(fwt, f2b, snk)
pl.Run()

Summary

So with this, we have done everything needed to set up a file-based batch workflow system.

In summary, what we did, was to:

  • Specify process dependencies by wiring outputs of the upstream processes to inports in downstream processes.
  • For each outport, provide a function that will compute a suitable file name for the new file.

For more examples, see the examples folder.

Related tools

Find below a few tools that are more or less similar to SciPipe that are worth worth checking out before deciding on what tool fits you best (in approximate order of similarity to SciPipe):

Acknowledgements