Concurrent Iteration

The iter package provides helpers to concurrently process elements of a slice. It simplifies the pattern of distributing slice elements among a pool of worker goroutines.

By default, the number of goroutines is capped at runtime.GOMAXPROCS(0). You can configure this for more control.

iter.ForEach

ForEach executes a function for each element in a slice in parallel. This is useful for performing side effects, like updating elements in place.

Standard Library vs. iter.ForEach

Processing a slice concurrently with the standard library requires manual setup of worker pools and channels.

// stdlib
func process(values []int) {
    feeder := make(chan int, 8)

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for elem := range feeder {
                handle(elem)
            }
        }()
    }

    for _, value := range values {
        feeder <- value
    }
    close(feeder)
    wg.Wait()
}

With conc, this becomes a one-liner.

// conc
import "github.com/sourcegraph/conc/iter"

func process(values []int) {
    // Note: handle needs to accept a pointer: func handle(val *int)
    iter.ForEach(values, handle)
}

Usage

The function passed to ForEach receives a pointer to the element, allowing for in-place mutations.

input := []int{1, 2, 3, 4}

iter.ForEach(input, func(v *int) {
    if *v % 2 != 0 {
        *v = -1
    }
})

// input is now [-1 2 -1 4]
fmt.Println(input)

There is also ForEachIdx which provides the element's index to the callback: func(index int, val *T).

iter.Map

Map applies a function to each element of a slice and returns a new slice containing the results. The order of the results is the same as the input slice.

Standard Library vs. iter.Map

Concurrent mapping with the standard library is also verbose.

// stdlib
func concMap(input []int, f func(int) int) []int {
    res := make([]int, len(input))
    var idx atomic.Int64

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()

            for {
                i := int(idx.Add(1) - 1)
                if i >= len(input) {
                    return
                }
                res[i] = f(input[i])
            }
        }()
    }
    wg.Wait()
    return res
}

conc makes this trivial.

// conc
import "github.com/sourcegraph/conc/iter"

func concMap(input []int, f func(*int) int) []int {
    return iter.Map(input, f)
}

Usage

input := []int{1, 2, 3, 4}

results := iter.Map(input, func(v *int) string {
    return fmt.Sprintf("val-%d", *v)
})

// results is ["val-1", "val-2", "val-3", "val-4"]
fmt.Println(results)

iter.MapErr

For mapping functions that can return an error, use MapErr. It returns the slice of results and a single combined error containing all errors that occurred.

input := []string{"1", "two", "3"}

results, err := iter.MapErr(input, func(v *string) (int, error) {
    return strconv.Atoi(*v)
})

// results contains [1, 0, 3] (zero value for the failed conversion)
// err contains the parsing error for "two"

Custom Concurrency

To control the number of goroutines used, you can create a custom Iterator or Mapper and set the MaxGoroutines field.

input := []int{1, 2, 3, 4, 5, 6}

// Use a maximum of 2 goroutines
iterator := iter.Iterator[int]{
    MaxGoroutines: 2,
}

iterator.ForEach(input, func(v *int) {
    // ...
})

// The same applies to Mapper
mapper := iter.Mapper[int, string]{
    MaxGoroutines: 2,
}
results := mapper.Map(input, func(v *int) string {
    // ...
})