Goroutine Pools
The pool
package provides powerful and flexible goroutine pools for managing concurrent tasks. Pools are useful for limiting concurrency, collecting results, handling errors, and managing task cancellation with contexts.
All pools are created using pool.New()
or pool.NewWithResults[T]()
and then configured using With...
methods.
pool.Pool
The pool.Pool
is the most basic pool. It is used to execute a number of tasks with a configurable limit on the number of concurrently running goroutines.
Usage
- Create a new pool with
pool.New()
. - Optionally, limit concurrency with
WithMaxGoroutines(n)
. - Submit tasks with
Go(f func())
. - Wait for all tasks to complete with
Wait()
.
import "github.com/sourcegraph/conc/pool"
// Create a pool with a limit of 4 concurrent goroutines.
p := pool.New().WithMaxGoroutines(4)
for i := 0; i < 10; i++ {
p.Go(func() {
// do work
fmt.Println("Task running...")
})
}
// Wait for all tasks to finish.
p.Wait()
pool.ResultPool[T]
Use ResultPool
when your tasks produce a result and you want to collect all results in a slice. The results are returned in the same order that the tasks were submitted.
Usage
- Create a pool with
pool.NewWithResults[T]()
whereT
is the result type. - Submit tasks with
Go(f func() T)
. - Call
Wait()
to get the slice of results[]T
.
import "github.com/sourcegraph/conc/pool"
p := pool.NewWithResults[int]()
for i := 0; i < 5; i++ {
i := i
p.Go(func() int {
return i * 2
})
}
results := p.Wait()
// results will be [0, 2, 4, 6, 8]
fmt.Println(results)
pool.ErrorPool
Use ErrorPool
for tasks that can fail. It collects all non-nil errors returned by the tasks.
Usage
- Create a pool with
pool.New().WithErrors()
. - Submit tasks with
Go(f func() error)
. - Call
Wait()
to get a single combined error. You can useerrors.Is
to check for specific underlying errors.
import (
"errors"
"github.com/sourcegraph/conc/pool"
)
p := pool.New().WithErrors()
p.Go(func() error { return nil })
p.Go(func() error { return errors.New("task failed") })
err := p.Wait()
if err != nil {
// err will contain "task failed"
fmt.Println(err)
}
WithFirstError()
By default, ErrorPool
combines all errors. If you only care about the first error that occurred, you can use WithFirstError()
.
p.WithFirstError()
pool.ResultErrorPool[T]
This pool is for tasks that return both a result and an error. It's a combination of ResultPool
and ErrorPool
.
By default, if a task returns an error, its result is discarded. You can change this behavior with WithCollectErrored()
.
Usage
- Create a pool with
pool.NewWithResults[T]().WithErrors()
. - Submit tasks with
Go(f func() (T, error))
. - Call
Wait()
to get the results and the combined error.
p := pool.NewWithResults[string]().WithErrors()
p.Go(func() (string, error) {
return "success", nil
})
p.Go(func() (string, error) {
return "", errors.New("failure")
})
results, err := p.Wait()
// results will be ["success"]
// err will contain "failure"
WithCollectErrored()
To keep the results of tasks that returned an error, use WithCollectErrored()
.
p.WithCollectErrored()
// Now, results would be ["success", ""], with the zero value for the failed task's result.
Context-Aware Pools
For tasks that need to be cancellable, conc
provides context-aware pools. These pools pass a context.Context
to each task.
pool.ContextPool
A context-aware version of ErrorPool
.
Usage
- Create a pool with
pool.New().WithContext(ctx)
. - Submit tasks with
Go(f func(ctx context.Context) error)
. - Call
Wait()
to get the combined error.
ctx, cancel := context.WithCancel(context.Background())
p := pool.New().WithContext(ctx)
p.Go(func(ctx context.Context) error {
<-ctx.Done() // waits for cancellation
return ctx.Err()
})
cancel() // Cancel the context
err := p.Wait() // err will be context.Canceled
pool.ResultContextPool[T]
A context-aware version of ResultErrorPool
.
Usage
- Create with
pool.NewWithResults[T]().WithContext(ctx)
. - Submit tasks with
Go(f func(context.Context) (T, error))
. - Call
Wait()
to get results and error.
Automatic Cancellation on Error
Context pools can be configured to automatically cancel all other running tasks as soon as one task returns an error. This is useful for fail-fast scenarios.
WithCancelOnError()
: Cancels the context on the first error.WithFailFast()
: A convenient alias forWithCancelOnError().WithFirstError()
. This ensures the pool stops on the first error and only returns that single error.
// This pool will cancel all other tasks if one returns an error.
p := pool.New().
WithContext(context.Background()).
WithCancelOnError()
p.Go(func(ctx context.Context) error {
// This task will be canceled if the one below fails first.
select {
case <-time.After(100 * time.Millisecond):
return nil
case <-ctx.Done():
return ctx.Err()
}
})
p.Go(func(ctx context.Context) error {
return errors.New("fail fast!")
})
err := p.Wait()
// err will contain "fail fast!" and context.Canceled