Go Advent Day 24 - Channel Buffering Patterns
Introduction
One common method for message processing in Go is to receive from an input channel and write to an output channel, often using intermediate channels for message transformation and filtering. Multiple goroutines may perform these functions concurrently and independently, making for code that is easily parallelized and tested.
Message buffering is one kind of transformation that is sometimes useful in these systems. Some programs don’t need to process each message immediately, and can more efficiently process several messages at once. Other programs receive bursty input but are able to coalesce groups of messages.
An example of the latter case is software built with an inotify/kqueue library such as github.com/howeyc/fsnotify. This library provides the user with a channel of file events. Messages from fsnotify often come in bursts; for instance, some text editors save files by writing out a temporary file and moving it into place – a create event, one or more modifications, and a rename.
I observed such behavior when writing github.com/cespare/reflex, a tool for running commands when files change, and the code below is similar to real code in that program.
In this post I will explore how to handle bursty messages in channel pipelines.
Producing messages
We’ll represent messages with Event
s, which are just int
s. Multiple Event
s are allowed to be merged
into a single event using addition. (In an fsnotify-based program, for example, you might be able to
coalesce multiple events by preserving only the unique file names, or only the fact that some file was changed
at all.) Our producer intermittently emits a burst of random-valued Event
s.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
type Event int func NewEvent() Event { return 0 } func (e Event) Merge(other Event) Event { return e + other } func produce(out chan<- Event) { for { delay := time.Duration(rand.Intn(5)+1) * time.Second nMessages := rand.Intn(10) + 1 time.Sleep(delay) for i := 0; i < nMessages; i++ { out <- Event(rand.Intn(10)) } } } |
If we print out all the messages created by produce
, we can see that they come out in small batches.
See the runnable playground example (you may have to scroll down to
see the output).
Batching by time
Let’s suppose we are willing to wait for a short time after receiving a message to see if other messages
arrive that we can coalesce with the first. One simple way of doing this is simply to send every N
milliseconds and merge all received messages in the meantime. This is easy with a
time.Ticker
:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
func coalesce500(in <-chan Event, out chan<- Event) { ticker := time.NewTicker(500 * time.Millisecond) event := NewEvent() for { select { case <-ticker.C: out <- event event = NewEvent() case e := <-in: event = event.Merge(e) } } } |
See the full runnable code here. This works, but it has a few problems:
- If no messages are arriving, every 500ms
coalesce500
sends an empty message. - If a message arrives shortly before the ticker fires, we’ll send it by itself without waiting for more messages. (More generally, a burst falling across the ticker will create two messages, rather than one.)
- If the receiver doesn’t handle the message immediately, the coalescing goroutine will be blocked, which in turns blocks the upstream producer. (In real code, that producer may be constructed to discard any messages it cannot immediately send.)
Let’s tackle the third problem first.
Handling slow receivers
One obvious way to avoid blocking on a slow receiver is to change the output channel to be buffered. This only helps until the buffer fills up, though, and furthermore we won’t merge multiple buffered messages, potentially causing a downstream burst as well. A better way of handling the slow receiver is to implement the desired semantics directly: keep coalescing messages until the receiver is ready to accept more input.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
func coalesceSlow(in <-chan Event, out chan<- Event) { event := NewEvent() for e := range in { event = event.Merge(e) loop: for { select { case e := <-in: event = event.Merge(e) case out <- event: event = NewEvent() break loop } } } } |
You can run the code with a slow receiver and see that this has the
desired effect: we only create a single coalesced message while the receiver blocks. coalesceSlow
still
suffers from the problem as coalesce500
that it does not wait for a follow-up burst after receiving a
message. Further, it introduces a bit of ugly code: we receive from in
in two different places (the range
and inside the select) and we have to bounce between the inner and outer loops depending on whether we have
anything to send.
Putting it together
To review, we would like for our final version to do the following:
- Wait for 500ms after the receipt of a message to send
- Wait until the receiver is ready before sending
- Keep coalescing incoming messages while waiting
- Only have single channel receive and send statements instead of duplicates
Our state machine will need to be a little more complex to meet these goals. To solve the empty message
problem from coalesce500
, we’ll start a timer running only after receiving an event. Then we’ll coalesce
messages until the timer fires. Finally, we’ll continue coalescing messages until the output channel is ready,
after which we can send and go back to the first state again.
Here is a diagram to show these transitions:
Here is the code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
func coalesce(in <-chan Event, out chan<- Event) { event := NewEvent() timer := time.NewTimer(0) var timerCh <-chan time.Time var outCh chan<- Event for { select { case e := <-in: event = event.Merge(e) if timerCh == nil { timer.Reset(500 * time.Millisecond) timerCh = timer.C } case <-timerCh: outCh = out timerCh = nil case outCh <- event: event = NewEvent() outCh = nil } } } |
There are a few techniques used here that deserve explanation. Most notably, we’re exploiting the fact that
any cases inside a select which receive from or send to a nil channel are skipped (read more about this
in the language spec). We use this in the second and third
cases: timerCh
and outCh
are toggled between nil and the timer.C
/ out
channels to selectively
remove them from the select.
In the initial state (labeled “ready” in the graph), both timerCh
and outCh
are nil, so we simply block on
in
, waiting for input. After we receive a message, the timer is set for 500ms in the future and timerCh
is
set to the timer’s output channel; further messages are coalesced while we wait for the timer to fire (the
second state, “waiting”). Once this happens, we toggle outCh
to out
, so that we can detect when the output
channel is ready to receive (this is the last state, “ready to send”). Once we have successfully sent, we
return to the initial state, outCh
and timerCh
having previously been reset to nil.
The timer
variable initialized with time.NewTimer(0)
fires immediately, but we don’t start listening to
its output channel until after we have Reset
it. (There is no way to create a time.Timer
without
scheduling it.)
If you run this on the playground you can verify that it behaves as advertised.
The end
If you have questions, you can find me on Google+ or Twitter. I am also cespare on IRC; if you haven’t checked it out, we have a great community at #go-nuts on Freenode.
Special thanks to Tommi Virtanen and Dominik Honnef for the excellent diagram and valuable feedback as I was editing this post.