Ordered Streams
The stream
package provides a way to process a stream of tasks concurrently while ensuring that the results are handled in the order the tasks were submitted.
This is useful in scenarios like processing messages from a queue where the work for each message can be done in parallel, but the final action (like committing an offset or writing to a database) must happen sequentially.
Core Concepts
The stream.Stream
works with two main concepts:
Task
: A function that performs the concurrent work. It is submitted to the stream viaGo()
. The task must return aCallback
.Callback
: A function returned by aTask
. All callbacks are executed serially by a single goroutine, in the same order that their corresponding tasks were submitted.
This design separates the concurrent, out-of-order execution (Task
) from the serial, in-order finalization (Callback
).
Standard Library vs. stream.Stream
Implementing this pattern manually requires complex coordination with multiple channels and wait groups.
// stdlib
func mapStream(in chan int, out chan int, f func(int) int) {
tasks := make(chan func())
taskResults := make(chan chan int)
// Worker goroutines
var workerWg sync.WaitGroup
for i := 0; i < 10; i++ {
workerWg.Add(1)
go func() {
defer workerWg.Done()
for task := range tasks {
task()
}
}()
}
// Ordered reader goroutine
var readerWg sync.WaitGroup
readerWg.Add(1)
go func() {
defer readerWg.Done()
for result := range taskResults {
item := <-result
out <- item
}
}()
// Feed the workers with tasks
for elem := range in {
resultCh := make(chan int, 1)
taskResults <- resultCh
tasks <- func() {
resultCh <- f(elem)
}
}
close(tasks)
workerWg.Wait()
close(taskResults)
readerWg.Wait()
}
conc
drastically simplifies this.
// conc
import "github.com/sourcegraph/conc/stream"
func mapStream(in chan int, out chan int, f func(int) int) {
s := stream.New().WithMaxGoroutines(10)
for elem := range in {
elem := elem
s.Go(func() stream.Callback {
res := f(elem)
return func() { out <- res }
})
}
s.Wait()
}
Usage
- Create a stream with
stream.New()
. - Optionally, configure it with
WithMaxGoroutines(n)
. - For each piece of work, call
Go(task)
. The task function performs the work and returns a callback function. - After submitting all tasks, call
Wait()
to wait for all tasks and their callbacks to complete.
Example
This example processes a slice of durations. The tasks sleep for a variable amount of time, but the callbacks print the durations in their original order.
package main
import (
"fmt"
"time"
"github.com/sourcegraph/conc/stream"
)
func main() {
times := []int{20, 52, 16, 45, 4, 80}
s := stream.New()
for _, millis := range times {
dur := time.Duration(millis) * time.Millisecond
// s.Go executes the task concurrently.
s.Go(func() stream.Callback {
// Simulate work
time.Sleep(dur)
// Return a callback to be executed in order.
return func() { fmt.Println(dur) }
})
}
// Wait for all tasks and callbacks to finish.
s.Wait()
}
Expected Output
Even though the tasks complete out of order (e.g., the 4ms task finishes first), the output is sequential:
20ms
52ms
16ms
45ms
4ms
80ms