Goroutine Pools

The pool package provides powerful and flexible goroutine pools for managing concurrent tasks. Pools are useful for limiting concurrency, collecting results, handling errors, and managing task cancellation with contexts.

All pools are created using pool.New() or pool.NewWithResults[T]() and then configured using With... methods.

pool.Pool

The pool.Pool is the most basic pool. It is used to execute a number of tasks with a configurable limit on the number of concurrently running goroutines.

Usage

  1. Create a new pool with pool.New().
  2. Optionally, limit concurrency with WithMaxGoroutines(n).
  3. Submit tasks with Go(f func()).
  4. Wait for all tasks to complete with Wait().
import "github.com/sourcegraph/conc/pool"

// Create a pool with a limit of 4 concurrent goroutines.
p := pool.New().WithMaxGoroutines(4)

for i := 0; i < 10; i++ {
    p.Go(func() {
        // do work
        fmt.Println("Task running...")
    })
}

// Wait for all tasks to finish.
p.Wait()

pool.ResultPool[T]

Use ResultPool when your tasks produce a result and you want to collect all results in a slice. The results are returned in the same order that the tasks were submitted.

Usage

  1. Create a pool with pool.NewWithResults[T]() where T is the result type.
  2. Submit tasks with Go(f func() T).
  3. Call Wait() to get the slice of results []T.
import "github.com/sourcegraph/conc/pool"

p := pool.NewWithResults[int]()

for i := 0; i < 5; i++ {
    i := i
    p.Go(func() int {
        return i * 2
    })
}

results := p.Wait()
// results will be [0, 2, 4, 6, 8]
fmt.Println(results)

pool.ErrorPool

Use ErrorPool for tasks that can fail. It collects all non-nil errors returned by the tasks.

Usage

  1. Create a pool with pool.New().WithErrors().
  2. Submit tasks with Go(f func() error).
  3. Call Wait() to get a single combined error. You can use errors.Is to check for specific underlying errors.
import (
    "errors"
    "github.com/sourcegraph/conc/pool"
)

p := pool.New().WithErrors()

p.Go(func() error { return nil })
p.Go(func() error { return errors.New("task failed") })

err := p.Wait()
if err != nil {
    // err will contain "task failed"
    fmt.Println(err)
}

WithFirstError()

By default, ErrorPool combines all errors. If you only care about the first error that occurred, you can use WithFirstError().

p.WithFirstError()

pool.ResultErrorPool[T]

This pool is for tasks that return both a result and an error. It's a combination of ResultPool and ErrorPool.

By default, if a task returns an error, its result is discarded. You can change this behavior with WithCollectErrored().

Usage

  1. Create a pool with pool.NewWithResults[T]().WithErrors().
  2. Submit tasks with Go(f func() (T, error)).
  3. Call Wait() to get the results and the combined error.
p := pool.NewWithResults[string]().WithErrors()

p.Go(func() (string, error) {
    return "success", nil
})
p.Go(func() (string, error) {
    return "", errors.New("failure")
})

results, err := p.Wait()
// results will be ["success"]
// err will contain "failure"

WithCollectErrored()

To keep the results of tasks that returned an error, use WithCollectErrored().

p.WithCollectErrored()
// Now, results would be ["success", ""], with the zero value for the failed task's result.

Context-Aware Pools

For tasks that need to be cancellable, conc provides context-aware pools. These pools pass a context.Context to each task.

pool.ContextPool

A context-aware version of ErrorPool.

Usage

  1. Create a pool with pool.New().WithContext(ctx).
  2. Submit tasks with Go(f func(ctx context.Context) error).
  3. Call Wait() to get the combined error.
ctx, cancel := context.WithCancel(context.Background())
p := pool.New().WithContext(ctx)

p.Go(func(ctx context.Context) error {
    <-ctx.Done() // waits for cancellation
    return ctx.Err()
})

cancel() // Cancel the context
err := p.Wait() // err will be context.Canceled

pool.ResultContextPool[T]

A context-aware version of ResultErrorPool.

Usage

  1. Create with pool.NewWithResults[T]().WithContext(ctx).
  2. Submit tasks with Go(f func(context.Context) (T, error)).
  3. Call Wait() to get results and error.

Automatic Cancellation on Error

Context pools can be configured to automatically cancel all other running tasks as soon as one task returns an error. This is useful for fail-fast scenarios.

  • WithCancelOnError(): Cancels the context on the first error.
  • WithFailFast(): A convenient alias for WithCancelOnError().WithFirstError(). This ensures the pool stops on the first error and only returns that single error.
// This pool will cancel all other tasks if one returns an error.
p := pool.New().
    WithContext(context.Background()).
    WithCancelOnError()

p.Go(func(ctx context.Context) error {
    // This task will be canceled if the one below fails first.
    select {
    case <-time.After(100 * time.Millisecond):
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
})
p.Go(func(ctx context.Context) error {
    return errors.New("fail fast!")
})

err := p.Wait()
// err will contain "fail fast!" and context.Canceled