Composable Pipelines Improved
I wrote a post here on GopherAcademy earlier this year, about an idea for a “framework-less” pattern for Flow-Based Programming style programs in Go, or let’s just call it “composable concurrent pipelines”. During the year, I have experimented more, and added some minor modifications to this pattern, which I describe below.
Please note that the code examples below are kept short and thus incomplete, for readability. For a full working example of the presented pattern in action, please see this code example on the Go playground, or this gist!
The old way
The basic idea in that earlier post was to expand on the generator pattern described in a slide by Rob Pike by storing the concurrent processes in structs rather than just functions. This allows representing in- and out-ports as struct fields that can be used to connect in- and out-ports of multiple processes in a more fluent way, which the post described.
I have realized some further simplifications of this pattern though. One unnecessary thing suggested in that post was to use a method for each out-port, that would ensure that a channel is created for that out-port before returning it.
The pattern proposed that a process would be defined like so:
|
|
Connecting such processes together would go something like:
|
|
A better way
What I realized though is that if we use a “factory function” to create new tasks and pre-populate the channel fields, we just need to assign one such field to a field of another processes to connect two processes.
We will also let all the processes be based on a base interface named process
, for reasons we will see later on.
So, a task would be defined like so, including its factory function:
|
|
Then, to connect two such processes together, we would go:
|
|
… and we could even connect the other way around, since both the In- and Out-port fields are initiated with channels, so it doesn’t matter which of these channels we use, as long as it is the same channel used on the corresponding out- and in-port:
|
|
Let’s create a pipeline component too
There was one other ugly part of that previously blogged example. In order to drive the execution of a set of connected processes, we were looping over the output of the out-port of the last component, right in the programs main-method.
That is, the following part in the previous post:
|
|
This was due to the fact that (as far as I know) the execution of separate go-routines can not really be started until they get a signal from the main go-routine, over a channel, for example.
I couldn’t come up with a better suggestion for how to drive such a chain of processes, until Egon Elbre elaborated on some tips on how to enhance the pattern.
While I did not use Egon’s whole suggestion since it included a fair bit of reflection and departed from my idea
of a framework-less pattern, his code examples did a nice trick; Rather than letting the processes fire up go-routines,
(i.e. use the go
keyword) like my pattern did inside the Init()
methods, he had Run()
methods without any go
statements in them and instead called the go
keyword outside of the processes.
So, if we replaced the Init()
method in the code examples above with the following Run()
method:
|
|
… then we would execute these Run()
methods like so (adapting it to our example code here):
|
|
This suggests an elegant solution to the problem of driving a chain of go-routines from the main thread: Skip the go keyword for the last process in the chain! So, for example like so:
|
|
Now, this can be packaged into a convenient Pipeline component (and this is where we have use of the process
interface
, to tell the pipeline component to take processes of type process
):
|
|
… that we can use like this (assuming we have already initiated and connected proc1
and proc2
):
|
|
So a full code example of using the refined “framework-less flow-based-programming inspired” pattern, could look like so, (leaving out the process implementations for brevity):
|
|
Just note that if the last process is sending some output on a channel as well we need another process in the end that just receives these outputs as inputs, doing nothing with it.
We could for example implement a special “sink” process for that:
|
|
Again, make sure to check this code example on the Go playground, or this gist if you want to test this pattern out in practice!
SciPipe
This pattern is now serving as the basis for a scientific workflow library that I’m experimenting with, which I call SciPipe.
Very briefly, the implementation of SciPipe consists of the pattern above with the addition of a specialized process type (ShellProcess
), that can take a shell
command pattern and generate a component out of that with one in-port per input file, and out-port per output file,
and that will fire off tasks executing a formatted shell command for every full set of inputs received on the in-ports (read more
in the SciPipe README).
SciPipe is still in prototype phase but there are a fair number of fully working toy examples in the examples folder, so the basic idea seems to be working.
Feedback and suggestions for improvement of the idea and the code are much welcome!
Twitter: @smllmp