iter.go raw

   1  package iter
   2  
   3  import (
   4  	"runtime"
   5  	"sync/atomic"
   6  
   7  	"github.com/sourcegraph/conc"
   8  )
   9  
  10  // defaultMaxGoroutines returns the default maximum number of
  11  // goroutines to use within this package.
  12  func defaultMaxGoroutines() int { return runtime.GOMAXPROCS(0) }
  13  
  14  // Iterator can be used to configure the behaviour of ForEach
  15  // and ForEachIdx. The zero value is safe to use with reasonable
  16  // defaults.
  17  //
  18  // Iterator is also safe for reuse and concurrent use.
  19  type Iterator[T any] struct {
  20  	// MaxGoroutines controls the maximum number of goroutines
  21  	// to use on this Iterator's methods.
  22  	//
  23  	// If unset, MaxGoroutines defaults to runtime.GOMAXPROCS(0).
  24  	MaxGoroutines int
  25  }
  26  
  27  // ForEach executes f in parallel over each element in input.
  28  //
  29  // It is safe to mutate the input parameter, which makes it
  30  // possible to map in place.
  31  //
  32  // ForEach always uses at most runtime.GOMAXPROCS goroutines.
  33  // It takes roughly 2µs to start up the goroutines and adds
  34  // an overhead of roughly 50ns per element of input. For
  35  // a configurable goroutine limit, use a custom Iterator.
  36  func ForEach[T any](input []T, f func(*T)) { Iterator[T]{}.ForEach(input, f) }
  37  
  38  // ForEach executes f in parallel over each element in input,
  39  // using up to the Iterator's configured maximum number of
  40  // goroutines.
  41  //
  42  // It is safe to mutate the input parameter, which makes it
  43  // possible to map in place.
  44  //
  45  // It takes roughly 2µs to start up the goroutines and adds
  46  // an overhead of roughly 50ns per element of input.
  47  func (iter Iterator[T]) ForEach(input []T, f func(*T)) {
  48  	iter.ForEachIdx(input, func(_ int, t *T) {
  49  		f(t)
  50  	})
  51  }
  52  
  53  // ForEachIdx is the same as ForEach except it also provides the
  54  // index of the element to the callback.
  55  func ForEachIdx[T any](input []T, f func(int, *T)) { Iterator[T]{}.ForEachIdx(input, f) }
  56  
  57  // ForEachIdx is the same as ForEach except it also provides the
  58  // index of the element to the callback.
  59  func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) {
  60  	if iter.MaxGoroutines == 0 {
  61  		// iter is a value receiver and is hence safe to mutate
  62  		iter.MaxGoroutines = defaultMaxGoroutines()
  63  	}
  64  
  65  	numInput := len(input)
  66  	if iter.MaxGoroutines > numInput {
  67  		// No more concurrent tasks than the number of input items.
  68  		iter.MaxGoroutines = numInput
  69  	}
  70  
  71  	var idx atomic.Int64
  72  	// Create the task outside the loop to avoid extra closure allocations.
  73  	task := func() {
  74  		i := int(idx.Add(1) - 1)
  75  		for ; i < numInput; i = int(idx.Add(1) - 1) {
  76  			f(i, &input[i])
  77  		}
  78  	}
  79  
  80  	var wg conc.WaitGroup
  81  	for i := 0; i < iter.MaxGoroutines; i++ {
  82  		wg.Go(task)
  83  	}
  84  	wg.Wait()
  85  }
  86