Concurrency in Go is an extensive topic, and we won’t cover all the things in this post. There are tons of articles and several books that cover concurrency in depth.
The goal of this post is to cover topics between basic and advance. We’ll cover the usage of sync.WaitGroup
and channels. Those two things combined can be powerful tools.
Let’s start first with sync.WaitGroup
. The idea is to count how many goroutines we have scheduled and wait for them to finish. There is a simple example on Go by Example that shows this in action:
const WORKERS = 5
func main() {
var wg sync.WaitGroup // initialise the goroutine "counter"
for i := 1; i <= WORKERS; i++ {
wg.Add(1) // add +1 to the counter
go func(id int, wg *sync.WaitGroup) {
defer wg.Done() // add -1 to the counter
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second * 1) // Sleep to simulate an expensive task.
fmt.Printf("Worker %d done\n", id)
}(i)
}
wg.Wait() // wait for the counter to be 0 (i.e. wait for all goroutines to finish)
}
This is very straightforward. Now let’s look at goroutines and channels:
func processAnimal(animal string) string{
time.Sleep(time.Second * 1) // Sleep to simulate an expensive task.
return animal
}
func main() {
animals := []string{"dog", "cat", "bird"}
results := []string{}
resultStream := make(chan string)
for _, animal := range animals {
go func(a string) {
resultStream <- processAnimal(a)
}(animal)
}
for i := 0; i < len(animals); i++ {
result := <-resultStream
results = append(results, result)
}
return results
}
Channel serves as a pipe for a stream of information. That’s why it’s a good idea to end chan
variable names with the word “Stream”.
We have a slice of animals we want to process concurrently. We loop through them, fire a goroutine for each, and write the result to the channel.
In the end, we read from the channel. Keep in mind that result := <-resultStream
is a blocking operation. It will block if the channel is empty. This is covered in A Tour of Go.
This is great, but what if you want to process 10k animals? This code will spawn 10k goroutines, and it will consume a lot of our resources. We probably don’t want this. What we can do is combine sync.WaitGroup
and channels:
const WORKERS = 3
func processAnimal(animal string, taskId int) string {
fmt.Printf("Processing animal %s (taskID: %d)...\n", animal, taskId)
time.Sleep(time.Second * 1) // Sleep to simulate an expensive task.
return animal
}
func main() {
animals := []string{"dog", "cat", "bird", "monkey", "fish", "snake", "whale"}
resultStream := make(chan string, len(animals))
animalStream := make(chan string)
var wg sync.WaitGroup
for i := 1; i <= WORKERS; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done() // On return, notify the WaitGroup that we're done.
// Get animals from the channel. Don't forget to close the `animalStream`
// when you are done sending the data in or else you will have a deadlock
// here.
for a := range animalStream {
fmt.Printf("Worker %d starting\n", id)
resultStream <- processAnimal(a, id)
fmt.Printf("Worker %d done\n", id)
}
}(i)
}
//
for _, animal := range animals {
animalStream <- animal
}
// Close the `animalStream` chan to indicate that we are done sending the data.
close(animalStream)
wg.Wait() // Wait for all goroutines to finish their work.
// Gather the data
for i := 0; i < len(animals); i++ {
result := <-resultStream
fmt.Printf("We have the processed data ready: %s\n", result)
}
}
In this example, we say we’ll have 3 workers processing our animals. They will all run concurrently, and when one finishes its job, it will immediately start processing another animal. We use sync.WaitGroup
to make sure we wait for all goroutines to finish their job before we start reading the result. Keep in mind that if one animal is stuck while getting processed, we’ll be waiting for all results because with wg.Wait()
, we say we need to wait for all workers to finish all their jobs.
We use the animalStream
channel to send the data to the goroutine, which can be unbuffered because we have already defined a receiver for it (in the goroutine: for a := range animalStream
). An unbuffered channel will block the sender until the receiver has received the value. Here, you need to close the channel after you pushed all the data you need to process. Otherwise, the goroutines won’t know when to exit from the for a := range animalStream
. In other words, we will have a deadlock inside the goroutine.
Another way of telling the goroutine we have finished sending the data in is to create a dedicated channel e.g. done := make(chan bool)
`, and use a select
statement inside the for a := range animalStream
.
For sending the data out of goroutines, we have created a resultStream
channel. Here we have to define the length of the channel (i.e., it needs to be buffered) because we will first write all the data in the channel and then read from it.
I hope you have learned something new about Go and concurrently. All the examples above and several more are in this repository.