Concurrent Iteration
The iter
package provides helpers to concurrently process elements of a slice. It simplifies the pattern of distributing slice elements among a pool of worker goroutines.
By default, the number of goroutines is capped at runtime.GOMAXPROCS(0)
. You can configure this for more control.
iter.ForEach
ForEach
executes a function for each element in a slice in parallel. This is useful for performing side effects, like updating elements in place.
Standard Library vs. iter.ForEach
Processing a slice concurrently with the standard library requires manual setup of worker pools and channels.
// stdlib
func process(values []int) {
feeder := make(chan int, 8)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for elem := range feeder {
handle(elem)
}
}()
}
for _, value := range values {
feeder <- value
}
close(feeder)
wg.Wait()
}
With conc
, this becomes a one-liner.
// conc
import "github.com/sourcegraph/conc/iter"
func process(values []int) {
// Note: handle needs to accept a pointer: func handle(val *int)
iter.ForEach(values, handle)
}
Usage
The function passed to ForEach
receives a pointer to the element, allowing for in-place mutations.
input := []int{1, 2, 3, 4}
iter.ForEach(input, func(v *int) {
if *v % 2 != 0 {
*v = -1
}
})
// input is now [-1 2 -1 4]
fmt.Println(input)
There is also ForEachIdx
which provides the element's index to the callback: func(index int, val *T)
.
iter.Map
Map
applies a function to each element of a slice and returns a new slice containing the results. The order of the results is the same as the input slice.
Standard Library vs. iter.Map
Concurrent mapping with the standard library is also verbose.
// stdlib
func concMap(input []int, f func(int) int) []int {
res := make([]int, len(input))
var idx atomic.Int64
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
i := int(idx.Add(1) - 1)
if i >= len(input) {
return
}
res[i] = f(input[i])
}
}()
}
wg.Wait()
return res
}
conc
makes this trivial.
// conc
import "github.com/sourcegraph/conc/iter"
func concMap(input []int, f func(*int) int) []int {
return iter.Map(input, f)
}
Usage
input := []int{1, 2, 3, 4}
results := iter.Map(input, func(v *int) string {
return fmt.Sprintf("val-%d", *v)
})
// results is ["val-1", "val-2", "val-3", "val-4"]
fmt.Println(results)
iter.MapErr
For mapping functions that can return an error, use MapErr
. It returns the slice of results and a single combined error containing all errors that occurred.
input := []string{"1", "two", "3"}
results, err := iter.MapErr(input, func(v *string) (int, error) {
return strconv.Atoi(*v)
})
// results contains [1, 0, 3] (zero value for the failed conversion)
// err contains the parsing error for "two"
Custom Concurrency
To control the number of goroutines used, you can create a custom Iterator
or Mapper
and set the MaxGoroutines
field.
input := []int{1, 2, 3, 4, 5, 6}
// Use a maximum of 2 goroutines
iterator := iter.Iterator[int]{
MaxGoroutines: 2,
}
iterator.ForEach(input, func(v *int) {
// ...
})
// The same applies to Mapper
mapper := iter.Mapper[int, string]{
MaxGoroutines: 2,
}
results := mapper.Map(input, func(v *int) string {
// ...
})