pipeline.mx raw

   1  // Copyright 2010 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  package textproto
   6  
   7  import (
   8  	"sync"
   9  )
  10  
  11  // A Pipeline manages a pipelined in-order request/response sequence.
  12  //
  13  // To use a Pipeline p to manage multiple clients on a connection,
  14  // each client should run:
  15  //
  16  //	id := p.Next()	// take a number
  17  //
  18  //	p.StartRequest(id)	// wait for turn to send request
  19  //	«send request»
  20  //	p.EndRequest(id)	// notify Pipeline that request is sent
  21  //
  22  //	p.StartResponse(id)	// wait for turn to read response
  23  //	«read response»
  24  //	p.EndResponse(id)	// notify Pipeline that response is read
  25  //
  26  // A pipelined server can use the same calls to ensure that
  27  // responses computed in parallel are written in the correct order.
  28  type Pipeline struct {
  29  	mu       sync.Mutex
  30  	id       uint
  31  	request  sequencer
  32  	response sequencer
  33  }
  34  
  35  // Next returns the next id for a request/response pair.
  36  func (p *Pipeline) Next() uint {
  37  	p.mu.Lock()
  38  	id := p.id
  39  	p.id++
  40  	p.mu.Unlock()
  41  	return id
  42  }
  43  
  44  // StartRequest blocks until it is time to send (or, if this is a server, receive)
  45  // the request with the given id.
  46  func (p *Pipeline) StartRequest(id uint) {
  47  	p.request.Start(id)
  48  }
  49  
  50  // EndRequest notifies p that the request with the given id has been sent
  51  // (or, if this is a server, received).
  52  func (p *Pipeline) EndRequest(id uint) {
  53  	p.request.End(id)
  54  }
  55  
  56  // StartResponse blocks until it is time to receive (or, if this is a server, send)
  57  // the request with the given id.
  58  func (p *Pipeline) StartResponse(id uint) {
  59  	p.response.Start(id)
  60  }
  61  
  62  // EndResponse notifies p that the response with the given id has been received
  63  // (or, if this is a server, sent).
  64  func (p *Pipeline) EndResponse(id uint) {
  65  	p.response.End(id)
  66  }
  67  
  68  // A sequencer schedules a sequence of numbered events that must
  69  // happen in order, one after the other. The event numbering must start
  70  // at 0 and increment without skipping. The event number wraps around
  71  // safely as long as there are not 2^32 simultaneous events pending.
  72  type sequencer struct {
  73  	mu   sync.Mutex
  74  	id   uint
  75  	wait map[uint]chan struct{}
  76  }
  77  
  78  // Start waits until it is time for the event numbered id to begin.
  79  // That is, except for the first event, it waits until End(id-1) has
  80  // been called.
  81  func (s *sequencer) Start(id uint) {
  82  	s.mu.Lock()
  83  	if s.id == id {
  84  		s.mu.Unlock()
  85  		return
  86  	}
  87  	c := chan struct{}{}
  88  	if s.wait == nil {
  89  		s.wait = map[uint]chan struct{}{}
  90  	}
  91  	s.wait[id] = c
  92  	s.mu.Unlock()
  93  	<-c
  94  }
  95  
  96  // End notifies the sequencer that the event numbered id has completed,
  97  // allowing it to schedule the event numbered id+1.  It is a run-time error
  98  // to call End with an id that is not the number of the active event.
  99  func (s *sequencer) End(id uint) {
 100  	s.mu.Lock()
 101  	if s.id != id {
 102  		s.mu.Unlock()
 103  		panic("out of sync")
 104  	}
 105  	id++
 106  	s.id = id
 107  	if s.wait == nil {
 108  		s.wait = map[uint]chan struct{}{}
 109  	}
 110  	c, ok := s.wait[id]
 111  	if ok {
 112  		delete(s.wait, id)
 113  	}
 114  	s.mu.Unlock()
 115  	if ok {
 116  		close(c)
 117  	}
 118  }
 119