1 package iter
2 3 import (
4 "runtime"
5 "sync/atomic"
6 7 "github.com/sourcegraph/conc"
8 )
9 10 // defaultMaxGoroutines returns the default maximum number of
11 // goroutines to use within this package.
12 func defaultMaxGoroutines() int { return runtime.GOMAXPROCS(0) }
13 14 // Iterator can be used to configure the behaviour of ForEach
15 // and ForEachIdx. The zero value is safe to use with reasonable
16 // defaults.
17 //
18 // Iterator is also safe for reuse and concurrent use.
19 type Iterator[T any] struct {
20 // MaxGoroutines controls the maximum number of goroutines
21 // to use on this Iterator's methods.
22 //
23 // If unset, MaxGoroutines defaults to runtime.GOMAXPROCS(0).
24 MaxGoroutines int
25 }
26 27 // ForEach executes f in parallel over each element in input.
28 //
29 // It is safe to mutate the input parameter, which makes it
30 // possible to map in place.
31 //
32 // ForEach always uses at most runtime.GOMAXPROCS goroutines.
33 // It takes roughly 2µs to start up the goroutines and adds
34 // an overhead of roughly 50ns per element of input. For
35 // a configurable goroutine limit, use a custom Iterator.
36 func ForEach[T any](input []T, f func(*T)) { Iterator[T]{}.ForEach(input, f) }
37 38 // ForEach executes f in parallel over each element in input,
39 // using up to the Iterator's configured maximum number of
40 // goroutines.
41 //
42 // It is safe to mutate the input parameter, which makes it
43 // possible to map in place.
44 //
45 // It takes roughly 2µs to start up the goroutines and adds
46 // an overhead of roughly 50ns per element of input.
47 func (iter Iterator[T]) ForEach(input []T, f func(*T)) {
48 iter.ForEachIdx(input, func(_ int, t *T) {
49 f(t)
50 })
51 }
52 53 // ForEachIdx is the same as ForEach except it also provides the
54 // index of the element to the callback.
55 func ForEachIdx[T any](input []T, f func(int, *T)) { Iterator[T]{}.ForEachIdx(input, f) }
56 57 // ForEachIdx is the same as ForEach except it also provides the
58 // index of the element to the callback.
59 func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) {
60 if iter.MaxGoroutines == 0 {
61 // iter is a value receiver and is hence safe to mutate
62 iter.MaxGoroutines = defaultMaxGoroutines()
63 }
64 65 numInput := len(input)
66 if iter.MaxGoroutines > numInput {
67 // No more concurrent tasks than the number of input items.
68 iter.MaxGoroutines = numInput
69 }
70 71 var idx atomic.Int64
72 // Create the task outside the loop to avoid extra closure allocations.
73 task := func() {
74 i := int(idx.Add(1) - 1)
75 for ; i < numInput; i = int(idx.Add(1) - 1) {
76 f(i, &input[i])
77 }
78 }
79 80 var wg conc.WaitGroup
81 for i := 0; i < iter.MaxGoroutines; i++ {
82 wg.Go(task)
83 }
84 wg.Wait()
85 }
86