Writing Concurrent Each, Map & Select Methods in Go

Ruby-style "Each", "Map" and "Select" methods in Go that run concurrently using the go routines and channels mechanisms provided by the language.

Engineering

Writing Concurrent Each, Map & Select Methods in Go

October 4, 2019

Target Audience

People who have never written a single line of code in Go but have programming experience and can tell the difference between dynamically typed languages such as JavaScript, Python or Ruby and statically typed languages such as Go, Java, or C#.

I will not be covering language basics because there’s an excellent Go Tour for that, but along the way, I will cover some characteristics of Go that are required for decision making and that will lead us to the final version of the methods we will introduce.

Preamble

Before diving in, let’s start with some facts about Go for those who are new to the language:

Go is a fast, statically typed, compiled language.

Go’s concurrency mechanisms make it easy to write programs that get the most out of multicore and networked machines.

Go does not have generics (yet).

Motivation for Writing the Methods

The nature of our business demands processing vast amounts of data and processing it concurrently is a good option for doing it fast.

Even though it is easy to write concurrent programs in Go, programmers need to be mindful about certain details and we wanted to create a library that would allow them to not worry about those details.

Starting with “Each”

Having some Ruby background, we wanted to use familiar names for methods that apply actions to all elements of a collection, such as those that exist for Ruby’s arrays, so let’s start with the method each.

Go is statically typed, thus you cannot have arrays containing values of different types. What it does offer is the empty interface interface{} type which allows you to get around some type restrictions. However, this comes with a cost that I’ll be talking about later on in this article.

For trying to mimic how an each method works on dynamically typed languages, we can declare a new type which is a slice of empty interfaces:

type Array []interface{}

Then make this type the receiver of an Each method that prints all of its values.

func(a Array) Each() {
	for _, anything := range a {
 		fmt.Println(anything)
	}
}

The next step is to add the ability to tell it what to do with each of the values, instead of the hard-coded Println. So, we modify the method and allow it to receive a function as a parameter.

Yes, Go supports first-class functions.

The function that we will be passing as a parameter must be able to receive a value whose type matches the type of the items of the slice. This is because the function will be receiving one of those items when invoked.

In this case, we are still using interface{} as the type, which means pretty much any type will be accepted.

func (a Array) Each(fn func(interface{})) {
	for _, anything := range a {
		fn(anything)
	}
}

Usage:

arrayOfStrings.Each(func(value interface{}) {
	fmt.Printf("Say %v\\n", value)
})

So far so good, right? We have just defined a type with a method that behaves just like Ruby’s each method for arrays — or have we?

It has some caveats:

  • We are not taking advantage of the benefits of a statically typed language which is to detect errors during compile-time instead of runtime.
  • Detecting the type during runtime to know how to deal with the value has a performance cost, in the example, the methods fmt.Println and fmt.Printf internally use something called reflection for dealing with it.
  • A slice of a certain type cannot be used as a slice of interface{} without having to allocate the whole slice again. In other words var i int can be used as a interface{} but var []int cannot be used as var []interface{}. This doesn’t compile – it would require us to do something like this.

Examples like this are what makes engineers ask for generics in Go.

In the meantime until Go 2.0 is launched, and because we like to stick to the YAGNI principle, we’ll focus on our most immediate need: to process strings.

type Strings []string

func (s Strings) Each(fn func(string)) {
	for _, str := range s {
		fn(str)
	}
}

And whenever the time comes that we need something similar for an int or any other type, then we could write a code generator.

Error Handling

So far we’re only printing strings and not expecting errors to happen, but what if the processing that we want to apply to the string could result in an error? We should handle it properly a la Go which means to return it. In Go, functions can return more than one value, so it is typical to return the normal value the function would return, plus an error value; in our example, we only need to return the error value.

The signature of the function that we receive changes to:

fn func(string) error

Note: Because of simplicity for the example, we arbitrarily decided it should exit the for loop and return the first inner error that it encounters. Alternatively, we could have wrapped all of them into a single error value that we return after the for loop.

func (s Strings) Each(fn func(string) error) error {
	for _, str := range s {
		if err := fn(str); err != nil {
			return err
		}
	}
	return nil
}

Concurrency

Here is where the fun begins 😄

Go’s concurrency mechanism is composed of goroutines that communicate through channels.

“Don’t communicate by sharing memory, share memory by communicating.” – Rob Pike

Then if we wanted to concurrently apply certain logic to a big collection of string values, it makes sense to launch thousands of goroutines and have them all read their input from a channel.

Channelize

The first step is to change our type

type Strings <-chan string

The for loop also needs a minor adjustment because when ranging over channels, it only returns one argument:

for str := range s

Finally, we need to feed the channel with values:

stringsCh := make(chan string)
go func() {  // separate goroutine to avoid starvation
	for _, str := range arrayOfStrings {
		stringsCh <- str
	}
	close(stringsCh)
}()

And now we have a working version of what processed each item of a slice, but this time for a strings channel.

To make it more interesting (more data), let’s simulate a file reader by writing a generator function that creates, feeds and returns a strings channel:

func generator(lines int) Strings {
	c := make(chan string)
	go func() {
		for i := 1; i < lines; i++ {
			var buf bytes.Buffer
			buf.WriteString(strconv.Itoa(i))
			for j := 0; j < i-1; j++ {  // write i tab delimited values
				buf.WriteString("\t")
				buf.WriteString(strconv.Itoa(i))
			}
			c <- buf.String()
		}
		close(c)
	}()
	return c
}

And change the function passed so that instead of just printing the string, it prints how many tab-delimited values it has. Let’s also add an artificial delay and let it error if the count equals 42.

err := generator(100).Each(func(str string) error {
	values := strings.Split(str, "\\t")
	if len(values) == 42 {
		return fmt.Errorf("you already found the meaning of life, universe and everything: %d", len(values))
	}

	fmt.Println(len(values))
	return nil
})

Launch N Workers

Now, instead of processing each line sequentially, we will pass a new parameter to our Each method, the number of workers wanted. And we will use that parameter for starting N goroutines that read from the channel.

func (s Strings) Each(workers int, fn func(int, string) error) error {
	var wg sync.WaitGroup

	for i := 0; i < workers; i++ {
		wg.Add(1)
		go func(wid int) {
			for str := range s {
				if err := fn(wid, str); err != nil {
					panic(err)
				}
			}
			wg.Done()
		}(i)
	}
	
	wg.Wait()
	
	return nil
}

As you can see, we use the WaitGroup object provided by the sync standard library package to hold the main thread until all workers are done; otherwise, the program would just exit before the goroutines finish.

We also added a worker ID wid parameter to the function received, just in case we need to identify each of the workers (mostly for debugging purposes).

The last — but very important — thing to notice is that because the passed function is now running inside a goroutine, we can’t capture its return value. Therefore, in case of error, we just panic instead of returning it. We will be taking a more appropriate action next.

Errors Occurring in the Background (inside goroutines)

Because the program timeline isn’t a single line anymore, we don’t have a way of immediately bailing out to handle errors if they should arise. In this scenario, one way to handle errors is to put them into a channel and postpone dealing with them until later.

func (s Strings) Each(workers int, fn func(int, string) error) <-chan error {
	errCh := make(chan error, workers)

	go func() {
		defer close(errCh)
	
		var wg sync.WaitGroup
	
		for i := 0; i < workers; i++ {
			wg.Add(1)
			go func(wid int) {
				for str := range s {
					if err := fn(wid, str); err != nil {
						errCh <- err
					}
				}
				wg.Done()
			}(i)
		}
	
		wg.Wait()
	}()
	
	return errCh
}

This technique also allowed us to have the body of Each run in the background, turning it into a non-blocking method. Now, the means whereby we wait for Each to finish is by reading from the errors channel that it returns because it is closed as soon as all workers are done.

for err := range errors {
	fmt.Println("There was an error:", err)
}

Early Exit

In the current state of our code, an error message is printed when it finds an offending line, but the method itself does not provide the user any other meaningful course of action. What if a certain error should result in a full stop of every worker?

We can use the context standard library package for sending a cancel signal to workers. But we will have to replace the for range over the channel with an infinite for loop and a select statement.

var ErrCancelled = errors.New("cancel signal received")

func (s Strings) Each(ctx context.Context, workers int, fn func(int, string) error) <-chan error {
	errCh := make(chan error, workers)

	go func() {
		defer close(errCh)
	
		var wg sync.WaitGroup
	
		for i := 0; i < workers; i++ {
			wg.Add(1)
	
			go func(wid int) {
				defer wg.Done()
				for {
	
					select {
					case <-ctx.Done():
						errCh <- ErrCancelled
						return
					case item, hasMore := <-s:
						if !hasMore {
							return
						}
						if err := fn(wid, item); err != nil {
							errCh <- err
						}
					}
				}
	
			}(i)
		}
	
		wg.Wait()
	}()
	
	return errCh
}

But we are still not doing anything meaningful with the error returned by the processing function and we don’t really need to. We can leave that responsibility to the user because they have to deal with it one way or another. Placing it in an errors channel isn’t saving them any lines of code; it is probably adding a burden instead.

func (s Strings) Each(ctx context.Context, workers int, fn func(int, string)) <-chan error {
	errCh := make(chan error, workers)

	go func() {
		defer close(errCh)
	
		var wg sync.WaitGroup
	
		for i := 0; i < workers; i++ {
			wg.Add(1)
	
			go func(wid int) {
				defer wg.Done()
				for {
	
					select {
					case <-ctx.Done():
						errCh <- ErrCancelled
						return
					case item, hasMore := <-s:
						if !hasMore {
							return
						}
						fn(wid, item)
					}
				}
	
			}(i)
		}
	
		wg.Wait()
	}()
	
	return errCh
}

And for calling it

errors := generator(100).Each(ctx, 10, func(wid int, str string) {
	values := strings.Split(str, "\\t")
	if len(values) == 42 {
		log.Printf("you already found the meaning of life, universe and everything: %d", len(values))
		cancel()
	}

	log.Println(wid, "->", len(values))
	time.Sleep(50 * time.Millisecond)
})

As a last minor improvement, we can eliminate the chances of a worker reading from a channel instead of receiving the cancellation signal by using a double select. See the explanation in this Stack Overflow answer.

for {
	select {
	case <-ctx.Done():
		errCh <- ErrCancelled
		return
	default:
	}

	select {
	case <-ctx.Done():
		errCh <- ErrCancelled
		return
	case item, hasMore := <-s:
		if !hasMore {
			return
		}
		fn(wid, item)
	}
}

“Map” & “Select”

Having written the Each method, writing the Map and Select methods should be an easy task because they can be built on top of Each.

func (sch Strings) Map(
	ctx context.Context, workers int, mapFn func(int, string) string,
) (Strings, <-chan error) {
	outCh := make(chan string, workers)
	return outCh, sch.Each(ctx, workers, func(wid int, item string) {
		outCh <- mapFn(wid, item)
	})
}
 
func (sch Strings) Select(
	ctx context.Context, workers int, selectFn func(int, string) bool,
) (Strings, <-chan error) {
	outCh := make(chan string, workers)
	return outCh, sch.Each(ctx, workers, func(wid int, item string) {
		if selectFn(wid, item) {
			outCh <- item
		}
	})
}

Except for one issue: we aren’t closing the output channels.

When working with channels, it is always recommended that the responsibility for closing them relies on who writes to them. This is to prevent panics that would occur when attempting to write to a closed channel.

So, we need to close the output channels and it needs to happen after we know the Each method has finished writing. However, Each works in the background, so how can we, as its users, know when it finishes? We can’t.

What we can do instead is to ask Each to perform some action right after all of its workers are done.

func (sch Strings) Each(ctx context.Context, workers int, fn func(int, string), doneFns ...func()) <-chan error {
	errCh := make(chan error, workers)
 
	go func() {
		defer close(errCh)
 
		var wg sync.WaitGroup

		for i := 0; i < workers; i++ {
			//...
		}
		wg.Wait()

		for _, doneFn := range doneFns {
			doneFn()
		}
	}()

	return errCh
}

Then we rewrite Map and Select:

func (sch Strings) Map(
	ctx context.Context, workers int, mapFn func(int, string) string,
) (Strings, <-chan error) {
	outCh := make(chan string, workers)
	return outCh, sch.Each(ctx, workers, func(wid int, item string) {
		outCh <- mapFn(wid, item)
	}, func() {
		close(outCh)
	})
}
 
func (sch Strings) Select(
	ctx context.Context, workers int, selectFn func(int, string) bool,
) (Strings, <-chan error) {
	outCh := make(chan string, workers)
	return outCh, sch.Each(ctx, workers, func(wid int, item string) {
		if selectFn(wid, item) {
			outCh <- item
		}
	}, func() {
		close(outCh)
	})
}

Conclusion

Coming from linear programming models, it sometimes becomes difficult to mentally follow all the paths and states that a concurrent Go program executes. It is actually difficult to do so not only mentally but even with the support of debugging tools.

When you get over the first hurdles and some patterns start to repeat, then it is time to turn the solutions into libraries that hide complexity from others. In the example presented in this blog post, we free programmers from worrying too much about:

  • Deadlocks caused by channel starvation
  • Panics caused by attempting to write on closed channels
  • Routine leaks
  • Having to come up with ways to handle errors that can occur in parallel timelines

It is also very important that the libraries are intuitive and easy to use. This is because they will not be perfect in the first iterations but as more people use them and feel free to contribute to them, then the libraries end up becoming solid pieces of software. They become a valuable asset for the organization or for a developer community.

Interested in working with us? Have a look at our careers page and reach out to us if you would like to be a part of our team!

Disclaimer: The above is solely intended for informational purposes and in no way constitutes legal advice or specific recommendations.