GopherAcademy
Vladimir Vivien
Dec 20, 2015 8 min read

Automi: Stream Processing Over Go Channels

As a Go programmer, one of the features that attracted me to the language is the channel-based idiom used for goroutine communications. Channels, along with goroutines, form the basis for the Go concurrency primitives. If you are reading this post, you are likely familiar with the Go proverb:

1
Do not communicate by sharing memory; instead share memory by communicating

Using channels, to communicate between concurrently running goroutines, promotes data safety without the brittle choreography imposed by synchronization primitives when sharing memory directly.

Goroutines, Channel, and Pipelines

Earlier this year, while working at XOR, I wrote a lot of code that was used for data preparation in a multi-step analytic process. Most of the code I worked used the pipeline patterns, introduced by Sameer Ajmani, and looked similar to the following listing (execuse the long code, but necessary to make the point).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
func ingest() <-chan []string {
	out := make(chan []string)
	go func() {
		out <- []string{"aaaa", "bbb"}
		out <- []string{"cccccc", "dddddd"}
		out <- []string{"e", "fffff", "g"}
		close(out)
	}()
	return out
}

func process(concurrency int, in <-chan []string) <-chan int {
	var wg sync.WaitGroup
	wg.Add(concurrency)

	out := make(chan int)

	work := func() {
		for data := range in {
			for _, word := range data {
				out <- len(word)
			}
		}
		wg.Done()

	}

	go func() {
		for i := 0; i < concurrency; i++ {
			go work()
		}

	}()

	go func() {
		wg.Wait()
		close(out)
	}()
	return out
}

func store(in <-chan int) <-chan struct{} {
	done := make(chan struct{})
	go func() {
		defer close(done)
		for data := range in {
			fmt.Println(data)
		}
	}()
	return done
}

func main() {
	// stage 1 ingest data from source
	in := ingest()

	// stage 2 - process data
	reduced := process(4, in)

	// stage 3 - store
	<-store(reduced)
}

http://play.golang.org/p/LQJzC4X-MS

The pipeline patterns afford programmers a set of tools to create powerful and complex execution flow by dividing the the work into stages. Each stage is wrapped within a goroutine and connected with each other using channels. In the previous code for instance, the data from the the input channel is processed using three distinct stages including ingest, process, and store.

1
ingest() -> process() -> store()

Result of upstream process is used as input for downstream process. However, the amount of noise generated by the communication primitives in the code always bothered me. This gets amplified as the pipelines get more complex causing synchronization logic to get lost in business-related code. I started to look for a better way to encapsulate the communication/synchronization logic between the goroutines (as any lazy programmer would). That led me to create project Automi.

Automi - Stream Processing API Over Go Channels

I started project Automi as a way to encapsulate the pipeline patterns while hiding the nuts and bolts of channel communication between goroutines. While researching and playing around with different ideas for the project, I realized that the pipeline patterns (see above) is a basic implementation of stream processing. The channels provide the perfect conduit for the streaming data and each stage can be modeled as an operation applied to the stream. By the time I got to the second rewrite of the project, I ended up creating a stream processing API that satisfied my initial goals and much more.

Streaming Basics

Stream processing, as it turns out, is an extremely useful paradigm for processing data. A quick Google search reveals that the subject has been studied quite extensively producing academic, commercial, and more recently, open source systems. Many stream processing implementations use a functional model to represent data and flow processing. Data is inputted into some function (a processing element) to process the data and to produce and output.

1
f (in) -> out

The functional model facilitates the creation of larger processing flow using composition where output of one function is used as the input of the next (downstream) processing element. This chain can grow until a terminal function produces the desired result.

1
g (f (in)) -> out

Stream processing is a vast subject. A proper treatment of the topic is well beyond the scope of this write up. At the risk of butchering it, I will stop here.

The Automi API

Stream processing systems are implemented in a variety of scale including as pure API, stand-alone services, and distributed systems. Automi, at the moment, is being developed as a pure API to create stream processors in Go. The following code snippet shows how the Automi API is used to process the content of a file using multiple stages.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
svc := someDataService.Create(context.Background())  // illustration, stand-in for some service

strm := stream.New()

// set stream source as csv file, emits []string
strm.From(file.CsvSource("./local/in-forms.csv"))

// Only allows record where col 0 starts with "CLEAR_"
strm.Filter(func(item interface{}) bool{
    row := item.([]string)
    return strings.HasPrefix(row[0], "CLEAR_")
})

// maps stream item from []string to struct Form
strm.Map(func(item interface{}) interface{} {
    row := item.([]string)
    return Form{Status:row[0], Id:row[1], Data:row[5]}
})

// Func to invoke some service call on data item
// Emits a []string for downstream
strm.Do(func(ctx context.Context, item interface{}) interface{} {
    form := item.(Form)
    resp, err := svc.Validate(form)
    if err != nil {
        return nil
    }
    return []string{resp.Id, resp.Code, resp.Content}
})

// Terminal step, sinks data into a csv flat file
strm.To(file.CsvSink("./local/resp-forms.txt"))

// open stream and wait for execution
err := <-strm.Open()
if err != nil {
    fmt.Println("Processing failed!")
}

The previous code sample creates a new stream to process data ingested from a csv file using several steps (see code comment). In the code, each method call on the stream (From(), Filter(), Map(), Do(), and To()) represents a stage in the pipeline as illustrated in the following.

1
From(source) -> Filter(item) -> Map(item) -> Do(item) -> To(sink)

The From() method, for instance, starts the stream by ingesting the content of a csv file and emits a []string for each row. Filter() does what you would expect, it filters out csv rows from the stream based on record content. Map() takes the []string from the previous stage and emits struct Form{} for downstream consumption. The Do() function provides a place for arbitrary logic to be applied to the stream. It makes a call to a service (here for illustrative purpose), then returns [] for the next processing element. Lastly, the stream is terminated with csv sink (with the To() function) that writes the result to a file.

The code implements stream processing based on the pipeline patterns mentioned earlier. What is clearly absent, however, is the low level channel communication code to coordinate and synchronize goroutines. The programmer is provided a clean surface to express business code without the noisy infrastructure code. Underneath the cover however, Automi is using patterns similar to the pipeline patterns discussed earlier to create safe and concurrent structures to execute the processing of the data stream.

The Automi Project

I am extremely pleased with the final direction of the project. Although the API is still taking shape, the overall design feels more natural than previous attempts. Automi API design is taking cues from popular open source stream processing projects (Apache Flink, Spark Streaming, etc) and numerous countless on the subject. Automi has plenty of room to grow and there are numerous ideas and features that I plan to add in the coming year.

Automi is really two projects rolled up in one. On one hand, the core of Automi will be concerned with creating a superb API for expressing and realizing complex stream processing data flow. On the other hand, Automi will also be a collection of integration API that will allow programmers to work with a variety of sources and sinks.

Features

Here are some stream processing features that are coming (or already implemented):

  • Support for processing functions (Map, Filter, FlatMap, grouping, etc)
  • Implementation of accumulators and reducers
  • Support for parallelism
  • Convenience functions for aggregating, calculating, printing result
  • Ability to express stream joins
  • Timeout and retry policies
  • Etc

As mentioned, Automi will also offer a collection sources and sinks that will allow developers to integrate many systems with Automi. The following is a list of systems (some may already be implemented with more coming):

  • File system
  • Distributed Fs (HDFS, S3, etc)
  • Network socket
  • Databases
  • Messaging systems
  • Logging
  • Etc

Conclusion

Automi is a project that allows programmers to further unlock the practicality and usefulness of channels and goroutines. By hiding the low-level communication primitives between goroutines, Automi makes it possible to quickly assemble primitives to create multi-stage pipeline for stream processing. While the project is still pre-beta, it is already proven itself and is continuing to evolve. In the coming year, the project will continue to grow and implement features that will make it an indispensible tool for people who primarily write Go to process data. Your PR is welcome!

Twitter - @vladimirvivien

Automi - https://github.com/vladimirvivien/automi

1
Special shout out to `Brian Ketelsen` for supporting and spreading the word about Automi when I presented the first draft of the project while at XOR.