Ordered Streams

The stream package provides a way to process a stream of tasks concurrently while ensuring that the results are handled in the order the tasks were submitted.

This is useful in scenarios like processing messages from a queue where the work for each message can be done in parallel, but the final action (like committing an offset or writing to a database) must happen sequentially.

Core Concepts

The stream.Stream works with two main concepts:

  • Task: A function that performs the concurrent work. It is submitted to the stream via Go(). The task must return a Callback.
  • Callback: A function returned by a Task. All callbacks are executed serially by a single goroutine, in the same order that their corresponding tasks were submitted.

This design separates the concurrent, out-of-order execution (Task) from the serial, in-order finalization (Callback).

Standard Library vs. stream.Stream

Implementing this pattern manually requires complex coordination with multiple channels and wait groups.

// stdlib
func mapStream(in chan int, out chan int, f func(int) int) {
    tasks := make(chan func())
    taskResults := make(chan chan int)

    // Worker goroutines
    var workerWg sync.WaitGroup
    for i := 0; i < 10; i++ {
        workerWg.Add(1)
        go func() {
            defer workerWg.Done()
            for task := range tasks {
                task()
            }
        }()
    }

    // Ordered reader goroutine
    var readerWg sync.WaitGroup
    readerWg.Add(1)
    go func() {
        defer readerWg.Done()
        for result := range taskResults {
            item := <-result
            out <- item
        }
    }()

    // Feed the workers with tasks
    for elem := range in {
        resultCh := make(chan int, 1)
        taskResults <- resultCh
        tasks <- func() {
            resultCh <- f(elem)
        }
    }

    close(tasks)
    workerWg.Wait()
    close(taskResults)
    readerWg.Wait()
}

conc drastically simplifies this.

// conc
import "github.com/sourcegraph/conc/stream"

func mapStream(in chan int, out chan int, f func(int) int) {
    s := stream.New().WithMaxGoroutines(10)
    for elem := range in {
        elem := elem
        s.Go(func() stream.Callback {
            res := f(elem)
            return func() { out <- res }
        })
    }
    s.Wait()
}

Usage

  1. Create a stream with stream.New().
  2. Optionally, configure it with WithMaxGoroutines(n).
  3. For each piece of work, call Go(task). The task function performs the work and returns a callback function.
  4. After submitting all tasks, call Wait() to wait for all tasks and their callbacks to complete.

Example

This example processes a slice of durations. The tasks sleep for a variable amount of time, but the callbacks print the durations in their original order.

package main

import (
    "fmt"
    "time"

    "github.com/sourcegraph/conc/stream"
)

func main() {
    times := []int{20, 52, 16, 45, 4, 80}

    s := stream.New()
    for _, millis := range times {
        dur := time.Duration(millis) * time.Millisecond

        // s.Go executes the task concurrently.
        s.Go(func() stream.Callback {
            // Simulate work
            time.Sleep(dur)

            // Return a callback to be executed in order.
            return func() { fmt.Println(dur) }
        })
    }

    // Wait for all tasks and callbacks to finish.
    s.Wait()
}

Expected Output

Even though the tasks complete out of order (e.g., the 4ms task finishes first), the output is sequential:

20ms
52ms
16ms
45ms
4ms
80ms