errgroup.go raw

   1  // Copyright 2016 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  // Package errgroup provides synchronization, error propagation, and Context
   6  // cancellation for groups of goroutines working on subtasks of a common task.
   7  //
   8  // [errgroup.Group] is related to [sync.WaitGroup] but adds handling of tasks
   9  // returning errors.
  10  package errgroup
  11  
  12  import (
  13  	"context"
  14  	"fmt"
  15  	"sync"
  16  )
  17  
  18  type token struct{}
  19  
  20  // A Group is a collection of goroutines working on subtasks that are part of
  21  // the same overall task. A Group should not be reused for different tasks.
  22  //
  23  // A zero Group is valid, has no limit on the number of active goroutines,
  24  // and does not cancel on error.
  25  type Group struct {
  26  	cancel func(error)
  27  
  28  	wg sync.WaitGroup
  29  
  30  	sem chan token
  31  
  32  	errOnce sync.Once
  33  	err     error
  34  }
  35  
  36  func (g *Group) done() {
  37  	if g.sem != nil {
  38  		<-g.sem
  39  	}
  40  	g.wg.Done()
  41  }
  42  
  43  // WithContext returns a new Group and an associated Context derived from ctx.
  44  //
  45  // The derived Context is canceled the first time a function passed to Go
  46  // returns a non-nil error or the first time Wait returns, whichever occurs
  47  // first.
  48  func WithContext(ctx context.Context) (*Group, context.Context) {
  49  	ctx, cancel := context.WithCancelCause(ctx)
  50  	return &Group{cancel: cancel}, ctx
  51  }
  52  
  53  // Wait blocks until all function calls from the Go method have returned, then
  54  // returns the first non-nil error (if any) from them.
  55  func (g *Group) Wait() error {
  56  	g.wg.Wait()
  57  	if g.cancel != nil {
  58  		g.cancel(g.err)
  59  	}
  60  	return g.err
  61  }
  62  
  63  // Go calls the given function in a new goroutine.
  64  //
  65  // The first call to Go must happen before a Wait.
  66  // It blocks until the new goroutine can be added without the number of
  67  // goroutines in the group exceeding the configured limit.
  68  //
  69  // The first goroutine in the group that returns a non-nil error will
  70  // cancel the associated Context, if any. The error will be returned
  71  // by Wait.
  72  func (g *Group) Go(f func() error) {
  73  	if g.sem != nil {
  74  		g.sem <- token{}
  75  	}
  76  
  77  	g.wg.Add(1)
  78  	go func() {
  79  		defer g.done()
  80  
  81  		// It is tempting to propagate panics from f()
  82  		// up to the goroutine that calls Wait, but
  83  		// it creates more problems than it solves:
  84  		// - it delays panics arbitrarily,
  85  		//   making bugs harder to detect;
  86  		// - it turns f's panic stack into a mere value,
  87  		//   hiding it from crash-monitoring tools;
  88  		// - it risks deadlocks that hide the panic entirely,
  89  		//   if f's panic leaves the program in a state
  90  		//   that prevents the Wait call from being reached.
  91  		// See #53757, #74275, #74304, #74306.
  92  
  93  		if err := f(); err != nil {
  94  			g.errOnce.Do(func() {
  95  				g.err = err
  96  				if g.cancel != nil {
  97  					g.cancel(g.err)
  98  				}
  99  			})
 100  		}
 101  	}()
 102  }
 103  
 104  // TryGo calls the given function in a new goroutine only if the number of
 105  // active goroutines in the group is currently below the configured limit.
 106  //
 107  // The return value reports whether the goroutine was started.
 108  func (g *Group) TryGo(f func() error) bool {
 109  	if g.sem != nil {
 110  		select {
 111  		case g.sem <- token{}:
 112  			// Note: this allows barging iff channels in general allow barging.
 113  		default:
 114  			return false
 115  		}
 116  	}
 117  
 118  	g.wg.Add(1)
 119  	go func() {
 120  		defer g.done()
 121  
 122  		if err := f(); err != nil {
 123  			g.errOnce.Do(func() {
 124  				g.err = err
 125  				if g.cancel != nil {
 126  					g.cancel(g.err)
 127  				}
 128  			})
 129  		}
 130  	}()
 131  	return true
 132  }
 133  
 134  // SetLimit limits the number of active goroutines in this group to at most n.
 135  // A negative value indicates no limit.
 136  // A limit of zero will prevent any new goroutines from being added.
 137  //
 138  // Any subsequent call to the Go method will block until it can add an active
 139  // goroutine without exceeding the configured limit.
 140  //
 141  // The limit must not be modified while any goroutines in the group are active.
 142  func (g *Group) SetLimit(n int) {
 143  	if n < 0 {
 144  		g.sem = nil
 145  		return
 146  	}
 147  	if active := len(g.sem); active != 0 {
 148  		panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", active))
 149  	}
 150  	g.sem = make(chan token, n)
 151  }
 152