software engineering Jul 13, 2017

Buffered Channels In Go — What Are They Good For?

Post 3 in a Series on Go

In a previous blog post we looked at how to build an unbounded channel, where writes to the channel would never block. It was built on top of two unbuffered channels, a goroutine, a slice, and a couple of closures.

Notably missing was a buffered channel. Buffered channels never have unlimited buffers. Proper use of a buffered channel means that you must handle the case where the buffer is full and your writing goroutine blocks waiting for a reading goroutine. So what is the proper use of a buffered channel? Why were they included in the design of Go?

The case for buffered channels is subtle. To sum it up in a single sentence:

Buffered channels are useful when you know how many goroutines you have launched, want to limit the number of goroutines you will launch, or want to limit the amount of work that is queued up.

Buffered channels work great when you want to either gather data back from a set of goroutines that you have launched or when you want to limit concurrent usage. They are also helpful for managing the amount of work a system has queued up, preventing your services from falling behind and becoming overwhelmed. Here are a couple of examples to show how they can be used.

Parallel Processing

One common pattern for goroutines is fan-out. When you want to apply the same data to multiple algorithms, you can launch a goroutine for each subtask, and then gather the data back in when they are done. For example, you might want to process the same data via multiple scoring algorithms and return back all of the scores or pull data from multiple microservices to compose a single page. A buffered channel is an ideal way to gather the data back from your subtasks.

As noted in the previous blog post, Go doesn’t have generics, so we are going to use interface{} as a placeholder type.

The code is straightforward. First, we define a new type, Evaluator, which is a function that takes in a value and returns a value and an error. This isn’t necessary, but it makes it easier to understand the code.

Our function DivideAndConquer takes in a slice of Evaluators and the data that’s going to be sent to each of them. We create two buffered channels, one to hold valid responses and the other to hold errors. Next, we launch a new goroutine for each Evaluator.

(note that we pass in v to the goroutine so that each goroutine is referring to a different Evaluator; a common gotcha when launching goroutines in afor loop is to use the key or value from the for declaration inside of the goroutines; this will result in the same value being passed in to all of the goroutines!)

In the goroutine, we execute the Evaluator and write the output or the error to the appropriate channel.

Once all the goroutines are launched, we set up the out and errs slices to hold the results, loop len(evaluators) times to pull back either the result or the error from each goroutine, and then return out and errs when we are done.

By using buffered channels to return back the values and errors, we make sure that none of the goroutines are paused waiting for the main task to read their output. The buffer allows the goroutines to write and then exit. What’s also interesting is that using the buffered channels removes the need for any further synchronization tools. We know that we will get exactly len(evaluators) writes across both channels, so all that our main task has to do is wait for a value to be written on either channel and put it on the appropriate slice. When all of the Evaluators are done, the function returns.

This simple test case demonstrates how we can use DivideAndConquer:

The results come back in a random order, which is what you’d expect if you ran them in parallel:

Where Does the Time Go?

Our parallel processor does what it is supposed to do, but it has one glaring limitation. What if you don’t want to wait forever for all of the subtasks to complete? Let’s think about a case where we are aggregating data from multiple microservices. If one microservice is slow, or (even worse) hanging, we don’t want to freeze the front-end waiting for it to respond. Even if we are just running scoring algorithms, one of them might have a bug that triggers slow behavior or an infinite loop. It’d be better to limit each subtask to a certain amount of time and return an error for the ones that take too long.

To implement this new feature, we are going to make our Evaluator type a bit more complex. Rather than just being a name for afunc(interface{}) (interface{}, error), it is now an interface with two methods, Evaluate and Name:

We have moved the function to a different type, EvaluatorFunc, which implements the Evaluator interface. We want to have the Name method so that we can report back which subtask isn’t completing. EvaluatorFunc takes advantage of a trick in Go to get the name of a function given a variable that refers to it. If the name returned using runtime.Func.Name() is not to your taste, you can embed EvaluatorFunc in a struct to customize it:

Here’s our updated DivideAndConquer function:

While the code to gather the results remains the same, the goroutine logic gets a bit more complicated. Each goroutine now creates two buffered channels of size 1, ch and ech, and launches a second goroutine. The second goroutine runs Evaluator.Evaluate and writes to either ch (in case of success) or ech (in case an error is returned).

The surrounding goroutine has a select statement that checks to see what comes back first — a result, an error, or a timeout. If the result or error arrive first, we write them out to the gather or errors channel, respectively. If we timeout, we instead create a new error to indicate timeout and return that on the errors channel.

In order to take advantage of time.After, we need to launch a second goroutine and communicate with it over channels. But what isn’t obvious is why the ch and ech channels are buffered. Why not just use an unbuffered channel? The answer is that we don’t want to leak any goroutines. While the Go runtime is capable of handling thousands or hundreds of thousands of goroutines at a time, each goroutine does use some resources, so you don’t want to leave them hanging around when you don’t have to. If you do, a long-running Go program will start performing poorly.

Remember that an unbuffered channel pauses the writing goroutine until there’s a read by another goroutine. If the timeout triggers before the Evaluator finishes executing, the read will never happen because the only place those channels are read is in the outer goroutine’s select statement, and the outer goroutine exited after the timeout triggered. This means that using an unbuffered channel will cause the inner goroutine to wait forever whenever there is a timeout, leaking the goroutine. Again, the buffered channel proves useful because we know exactly how many writes we can expect.

Here’s a simple test that shows off timeouts:

This returns the output:

Creating a Pool

Another situation where buffered channels are useful is when creating a pool of objects. When you have objects that are expensive to create (like large memory buffers) or where you want to limit simultaneous execution (such as not overwhelming a service with too many requests), a pool should be used.

The idea here is to use the buffered channel as the pool. The NewPool function takes in a Factory that populates the pool with count-identical items (if the items aren’t functionally identical, then the behavior of your pool’s clients will depend on which item they get, which is a bad idea). When the Borrow method is called, a value is read off the channel and returned. If all of the items in the pool have been used, the channel read will block until a value is put back into the pool via the Return method.

If you don’t want to wait forever (and in production code, you probably don’t ever want to wait forever for something to happen), BorrowWithTimeoutallows you to specify a time.Duration to wait for an item to return to the pool. By not waiting forever, you are providing valuable information to your client processes that your current process is doing too much work. This is called backpressure and it’s a very important tool to use when building scalable systems. Backpressure lets you know that one part of your system is under too much load, or is being called too often. When you see load increasing, one of the best ways to prevent a complete meltdown is to simply refuse to queue up work that you won’t be able to process in a reasonable amount of time.

The pool works as you’d expect:

As you watch the output, you’ll see that three entries are printed quickly, and then there are pauses as the number is returned to the pool and picked up by another goroutine.

Plugging a Leak in the Pool

While this simple pool will work, it requires a lot of trust. It functions like a public library. While most people who check out books do the right thing and return them promptly some people forget to return them. Poorly-written pool clients are the same. They will leak items from the pool by not calling Return. Eventually, the pool will be empty, and there will be no values to be borrowed.

Another possibility is that the wrong value is put back into the pool. This will break the expected contract and cause nondeterministic problems. It’d be like returning a different book to the library than the one you checked out and hoping the librarian doesn’t notice.

We can turn this model around. Rather than be a lending library, we can instead treat our pool items like special collections; people can only access these items within the library.

Translating this to Go, we’re not going to return values from the pool via Borrow and hope they come back in Return; we are going to take in a closure and run it.

By structuring the pool this way, we not only make it harder to corrupt the pool, we also make it easier to write a client, as they no longer have to do bookkeeping for the pool:

As you can see, buffered channels do have their place in systems that are designed to handle load intelligently. They can help you manage the load in your systems, which in turn helps you to keep your services up and running.

Jon Bodner
Lead Software Engineer - Mobile Identity
@JonathanBodner1

DISCLOSURE STATEMENT: These opinions are those of the author. Unless noted otherwise in this post, Capital One is not affiliated with, nor is it endorsed by, any of the companies mentioned. All trademarks and other intellectual property used or displayed are the ownership of their respective owners. This article is © 2018 Capital One.