Go Concurrency Pattern : Pipelines

Yulian
2 min readMar 6, 2021

Pipeline is only one of many go concurrency patterns. It can be said that the pipeline pattern is a multiple goroutines connected with channels. So, these multiple goroutines will work concurrently and connected with channels.

I will give a sample case. I want to create a function to return the sum of a multiple number. The number will be generated randomly and then the number will be multiplied by 2.

These are the functions to generate the number and multiply by 2.

func newInt() int {
time.Sleep(time.Millisecond * 20)
return rand.Intn(10000)
}
func multiplyBy2(x int) int {
time.Sleep(time.Millisecond * 20)
return x * 2
}

This is the function that will return the solution without concurrency.

func getSum() int {
sum := 0
for i := 0; i < arrSize; i++ {
sum += multiplyBy2(newInt())
}
return sum
}

For this problem we can split to 3 stages:

  1. Generate the number
  2. Multiply the number by 2
  3. Calculate sum

So for stage 1, we can create a function that returns a channel. This channel will receive value from newInt.

func generateNumber() chan int {
out := make(chan int)

go func() {
for i := 0; i < arrSize; i++ {
out <- newInt()
}
close(out)
}()

return out
}

For stage 2, we can create a function that will receive the generated number from a channel that returned from stage 1, and then create a new channel and multiply the generated number by 2 and send it to the new channel. And then return that new channel.

func multiplyNumber(ch chan int) chan int {
out := make(chan int)

go func() {
for c := range ch {
out <- multiplyBy2(c)
}
close(out)
}()

return out
}

For stage 3, we only need to receive the value from the channel from stage 2 and calculate the sum.

func getSumConcurrent() int {
// stage 1
ch1 := generateNumber()

// stage 2
ch2 := multiplyNumber(ch1)

// stage 3
sum := 0
for a := range ch2 {
sum += a
}
return sum
}

This is the benchmark

BenchmarkGetSum-8        1   4042952525 ns/op    104 B/op    4 allocs/op
BenchmarkConcurrency-8 1 2044313477 ns/op 1792 B/op 17 allocs/op

--

--