singleflight.go raw

   1  // Copyright 2013 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  // Package singleflight provides a duplicate function call suppression
   6  // mechanism.
   7  package singleflight // import "golang.org/x/sync/singleflight"
   8  
   9  import (
  10  	"bytes"
  11  	"errors"
  12  	"fmt"
  13  	"runtime"
  14  	"runtime/debug"
  15  	"sync"
  16  )
  17  
  18  // errGoexit indicates the runtime.Goexit was called in
  19  // the user given function.
  20  var errGoexit = errors.New("runtime.Goexit was called")
  21  
  22  // A panicError is an arbitrary value recovered from a panic
  23  // with the stack trace during the execution of given function.
  24  type panicError struct {
  25  	value interface{}
  26  	stack []byte
  27  }
  28  
  29  // Error implements error interface.
  30  func (p *panicError) Error() string {
  31  	return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
  32  }
  33  
  34  func (p *panicError) Unwrap() error {
  35  	err, ok := p.value.(error)
  36  	if !ok {
  37  		return nil
  38  	}
  39  
  40  	return err
  41  }
  42  
  43  func newPanicError(v interface{}) error {
  44  	stack := debug.Stack()
  45  
  46  	// The first line of the stack trace is of the form "goroutine N [status]:"
  47  	// but by the time the panic reaches Do the goroutine may no longer exist
  48  	// and its status will have changed. Trim out the misleading line.
  49  	if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
  50  		stack = stack[line+1:]
  51  	}
  52  	return &panicError{value: v, stack: stack}
  53  }
  54  
  55  // call is an in-flight or completed singleflight.Do call
  56  type call struct {
  57  	wg sync.WaitGroup
  58  
  59  	// These fields are written once before the WaitGroup is done
  60  	// and are only read after the WaitGroup is done.
  61  	val interface{}
  62  	err error
  63  
  64  	// These fields are read and written with the singleflight
  65  	// mutex held before the WaitGroup is done, and are read but
  66  	// not written after the WaitGroup is done.
  67  	dups  int
  68  	chans []chan<- Result
  69  }
  70  
  71  // Group represents a class of work and forms a namespace in
  72  // which units of work can be executed with duplicate suppression.
  73  type Group struct {
  74  	mu sync.Mutex       // protects m
  75  	m  map[string]*call // lazily initialized
  76  }
  77  
  78  // Result holds the results of Do, so they can be passed
  79  // on a channel.
  80  type Result struct {
  81  	Val    interface{}
  82  	Err    error
  83  	Shared bool
  84  }
  85  
  86  // Do executes and returns the results of the given function, making
  87  // sure that only one execution is in-flight for a given key at a
  88  // time. If a duplicate comes in, the duplicate caller waits for the
  89  // original to complete and receives the same results.
  90  // The return value shared indicates whether v was given to multiple callers.
  91  func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
  92  	g.mu.Lock()
  93  	if g.m == nil {
  94  		g.m = make(map[string]*call)
  95  	}
  96  	if c, ok := g.m[key]; ok {
  97  		c.dups++
  98  		g.mu.Unlock()
  99  		c.wg.Wait()
 100  
 101  		if e, ok := c.err.(*panicError); ok {
 102  			panic(e)
 103  		} else if c.err == errGoexit {
 104  			runtime.Goexit()
 105  		}
 106  		return c.val, c.err, true
 107  	}
 108  	c := new(call)
 109  	c.wg.Add(1)
 110  	g.m[key] = c
 111  	g.mu.Unlock()
 112  
 113  	g.doCall(c, key, fn)
 114  	return c.val, c.err, c.dups > 0
 115  }
 116  
 117  // DoChan is like Do but returns a channel that will receive the
 118  // results when they are ready.
 119  //
 120  // The returned channel will not be closed.
 121  func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
 122  	ch := make(chan Result, 1)
 123  	g.mu.Lock()
 124  	if g.m == nil {
 125  		g.m = make(map[string]*call)
 126  	}
 127  	if c, ok := g.m[key]; ok {
 128  		c.dups++
 129  		c.chans = append(c.chans, ch)
 130  		g.mu.Unlock()
 131  		return ch
 132  	}
 133  	c := &call{chans: []chan<- Result{ch}}
 134  	c.wg.Add(1)
 135  	g.m[key] = c
 136  	g.mu.Unlock()
 137  
 138  	go g.doCall(c, key, fn)
 139  
 140  	return ch
 141  }
 142  
 143  // doCall handles the single call for a key.
 144  func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
 145  	normalReturn := false
 146  	recovered := false
 147  
 148  	// use double-defer to distinguish panic from runtime.Goexit,
 149  	// more details see https://golang.org/cl/134395
 150  	defer func() {
 151  		// the given function invoked runtime.Goexit
 152  		if !normalReturn && !recovered {
 153  			c.err = errGoexit
 154  		}
 155  
 156  		g.mu.Lock()
 157  		defer g.mu.Unlock()
 158  		c.wg.Done()
 159  		if g.m[key] == c {
 160  			delete(g.m, key)
 161  		}
 162  
 163  		if e, ok := c.err.(*panicError); ok {
 164  			// In order to prevent the waiting channels from being blocked forever,
 165  			// needs to ensure that this panic cannot be recovered.
 166  			if len(c.chans) > 0 {
 167  				go panic(e)
 168  				select {} // Keep this goroutine around so that it will appear in the crash dump.
 169  			} else {
 170  				panic(e)
 171  			}
 172  		} else if c.err == errGoexit {
 173  			// Already in the process of goexit, no need to call again
 174  		} else {
 175  			// Normal return
 176  			for _, ch := range c.chans {
 177  				ch <- Result{c.val, c.err, c.dups > 0}
 178  			}
 179  		}
 180  	}()
 181  
 182  	func() {
 183  		defer func() {
 184  			if !normalReturn {
 185  				// Ideally, we would wait to take a stack trace until we've determined
 186  				// whether this is a panic or a runtime.Goexit.
 187  				//
 188  				// Unfortunately, the only way we can distinguish the two is to see
 189  				// whether the recover stopped the goroutine from terminating, and by
 190  				// the time we know that, the part of the stack trace relevant to the
 191  				// panic has been discarded.
 192  				if r := recover(); r != nil {
 193  					c.err = newPanicError(r)
 194  				}
 195  			}
 196  		}()
 197  
 198  		c.val, c.err = fn()
 199  		normalReturn = true
 200  	}()
 201  
 202  	if !normalReturn {
 203  		recovered = true
 204  	}
 205  }
 206  
 207  // Forget tells the singleflight to forget about a key.  Future calls
 208  // to Do for this key will call the function rather than waiting for
 209  // an earlier call to complete.
 210  func (g *Group) Forget(key string) {
 211  	g.mu.Lock()
 212  	delete(g.m, key)
 213  	g.mu.Unlock()
 214  }
 215