singleflight.go raw

   1  // Copyright 2013 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  package singleflight
   6  
   7  import (
   8  	"bytes"
   9  	"errors"
  10  	"fmt"
  11  	"runtime"
  12  	"runtime/debug"
  13  	"sync"
  14  )
  15  
  16  // errGoexit indicates the runtime.Goexit was called in
  17  // the user given function.
  18  var errGoexit = errors.New("runtime.Goexit was called")
  19  
  20  // A panicError is an arbitrary value recovered from a panic
  21  // with the stack trace during the execution of given function.
  22  type panicError struct {
  23  	value interface{}
  24  	stack []byte
  25  }
  26  
  27  // Error implements error interface.
  28  func (p *panicError) Error() string {
  29  	return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
  30  }
  31  
  32  func newPanicError(v interface{}) error {
  33  	stack := debug.Stack()
  34  
  35  	// The first line of the stack trace is of the form "goroutine N [status]:"
  36  	// but by the time the panic reaches Do the goroutine may no longer exist
  37  	// and its status will have changed. Trim out the misleading line.
  38  	if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
  39  		stack = stack[line+1:]
  40  	}
  41  	return &panicError{value: v, stack: stack}
  42  }
  43  
  44  // call is an in-flight or completed singleflight.Do call
  45  type call struct {
  46  	wg sync.WaitGroup
  47  
  48  	// These fields are written once before the WaitGroup is done
  49  	// and are only read after the WaitGroup is done.
  50  	val interface{}
  51  	err error
  52  
  53  	// forgotten indicates whether Forget was called with this call's key
  54  	// while the call was still in flight.
  55  	forgotten bool
  56  
  57  	// These fields are read and written with the singleflight
  58  	// mutex held before the WaitGroup is done, and are read but
  59  	// not written after the WaitGroup is done.
  60  	dups  int
  61  	chans []chan<- Result
  62  }
  63  
  64  // Group represents a class of work and forms a namespace in
  65  // which units of work can be executed with duplicate suppression.
  66  type Group struct {
  67  	mu sync.Mutex       // protects m
  68  	m  map[string]*call // lazily initialized
  69  }
  70  
  71  // Result holds the results of Do, so they can be passed
  72  // on a channel.
  73  type Result struct {
  74  	Val    interface{}
  75  	Err    error
  76  	Shared bool
  77  }
  78  
  79  // Do executes and returns the results of the given function, making
  80  // sure that only one execution is in-flight for a given key at a
  81  // time. If a duplicate comes in, the duplicate caller waits for the
  82  // original to complete and receives the same results.
  83  // The return value shared indicates whether v was given to multiple callers.
  84  func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
  85  	g.mu.Lock()
  86  	if g.m == nil {
  87  		g.m = make(map[string]*call)
  88  	}
  89  	if c, ok := g.m[key]; ok {
  90  		c.dups++
  91  		g.mu.Unlock()
  92  		c.wg.Wait()
  93  
  94  		if e, ok := c.err.(*panicError); ok {
  95  			panic(e)
  96  		} else if c.err == errGoexit {
  97  			runtime.Goexit()
  98  		}
  99  		return c.val, c.err, true
 100  	}
 101  	c := new(call)
 102  	c.wg.Add(1)
 103  	g.m[key] = c
 104  	g.mu.Unlock()
 105  
 106  	g.doCall(c, key, fn)
 107  	return c.val, c.err, c.dups > 0
 108  }
 109  
 110  // DoChan is like Do but returns a channel that will receive the
 111  // results when they are ready.
 112  //
 113  // The returned channel will not be closed.
 114  func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
 115  	ch := make(chan Result, 1)
 116  	g.mu.Lock()
 117  	if g.m == nil {
 118  		g.m = make(map[string]*call)
 119  	}
 120  	if c, ok := g.m[key]; ok {
 121  		c.dups++
 122  		c.chans = append(c.chans, ch)
 123  		g.mu.Unlock()
 124  		return ch
 125  	}
 126  	c := &call{chans: []chan<- Result{ch}}
 127  	c.wg.Add(1)
 128  	g.m[key] = c
 129  	g.mu.Unlock()
 130  
 131  	go g.doCall(c, key, fn)
 132  
 133  	return ch
 134  }
 135  
 136  // doCall handles the single call for a key.
 137  func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
 138  	normalReturn := false
 139  	recovered := false
 140  
 141  	// use double-defer to distinguish panic from runtime.Goexit,
 142  	// more details see https://golang.org/cl/134395
 143  	defer func() {
 144  		// the given function invoked runtime.Goexit
 145  		if !normalReturn && !recovered {
 146  			c.err = errGoexit
 147  		}
 148  
 149  		c.wg.Done()
 150  		g.mu.Lock()
 151  		defer g.mu.Unlock()
 152  		if !c.forgotten {
 153  			delete(g.m, key)
 154  		}
 155  
 156  		if e, ok := c.err.(*panicError); ok {
 157  			// In order to prevent the waiting channels from being blocked forever,
 158  			// needs to ensure that this panic cannot be recovered.
 159  			if len(c.chans) > 0 {
 160  				go panic(e)
 161  				select {} // Keep this goroutine around so that it will appear in the crash dump.
 162  			} else {
 163  				panic(e)
 164  			}
 165  		} else if c.err == errGoexit {
 166  			// Already in the process of goexit, no need to call again
 167  		} else {
 168  			// Normal return
 169  			for _, ch := range c.chans {
 170  				ch <- Result{c.val, c.err, c.dups > 0}
 171  			}
 172  		}
 173  	}()
 174  
 175  	func() {
 176  		defer func() {
 177  			if !normalReturn {
 178  				// Ideally, we would wait to take a stack trace until we've determined
 179  				// whether this is a panic or a runtime.Goexit.
 180  				//
 181  				// Unfortunately, the only way we can distinguish the two is to see
 182  				// whether the recover stopped the goroutine from terminating, and by
 183  				// the time we know that, the part of the stack trace relevant to the
 184  				// panic has been discarded.
 185  				if r := recover(); r != nil {
 186  					c.err = newPanicError(r)
 187  				}
 188  			}
 189  		}()
 190  
 191  		c.val, c.err = fn()
 192  		normalReturn = true
 193  	}()
 194  
 195  	if !normalReturn {
 196  		recovered = true
 197  	}
 198  }
 199  
 200  // Forget tells the singleflight to forget about a key.  Future calls
 201  // to Do for this key will call the function rather than waiting for
 202  // an earlier call to complete.
 203  func (g *Group) Forget(key string) {
 204  	g.mu.Lock()
 205  	if c, ok := g.m[key]; ok {
 206  		c.forgotten = true
 207  	}
 208  	delete(g.m, key)
 209  	g.mu.Unlock()
 210  }
 211