unbounded.go raw

   1  /*
   2   * Copyright 2019 gRPC authors.
   3   *
   4   * Licensed under the Apache License, Version 2.0 (the "License");
   5   * you may not use this file except in compliance with the License.
   6   * You may obtain a copy of the License at
   7   *
   8   *     http://www.apache.org/licenses/LICENSE-2.0
   9   *
  10   * Unless required by applicable law or agreed to in writing, software
  11   * distributed under the License is distributed on an "AS IS" BASIS,
  12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13   * See the License for the specific language governing permissions and
  14   * limitations under the License.
  15   *
  16   */
  17  
  18  // Package buffer provides an implementation of an unbounded buffer.
  19  package buffer
  20  
  21  import (
  22  	"errors"
  23  	"sync"
  24  )
  25  
  26  // Unbounded is an implementation of an unbounded buffer which does not use
  27  // extra goroutines. This is typically used for passing updates from one entity
  28  // to another within gRPC.
  29  //
  30  // All methods on this type are thread-safe and don't block on anything except
  31  // the underlying mutex used for synchronization.
  32  //
  33  // Unbounded supports values of any type to be stored in it by using a channel
  34  // of `any`. This means that a call to Put() incurs an extra memory allocation,
  35  // and also that users need a type assertion while reading. For performance
  36  // critical code paths, using Unbounded is strongly discouraged and defining a
  37  // new type specific implementation of this buffer is preferred. See
  38  // internal/transport/transport.go for an example of this.
  39  type Unbounded struct {
  40  	c       chan any
  41  	closed  bool
  42  	closing bool
  43  	mu      sync.Mutex
  44  	backlog []any
  45  }
  46  
  47  // NewUnbounded returns a new instance of Unbounded.
  48  func NewUnbounded() *Unbounded {
  49  	return &Unbounded{c: make(chan any, 1)}
  50  }
  51  
  52  var errBufferClosed = errors.New("Put called on closed buffer.Unbounded")
  53  
  54  // Put adds t to the unbounded buffer.
  55  func (b *Unbounded) Put(t any) error {
  56  	b.mu.Lock()
  57  	defer b.mu.Unlock()
  58  	if b.closing {
  59  		return errBufferClosed
  60  	}
  61  	if len(b.backlog) == 0 {
  62  		select {
  63  		case b.c <- t:
  64  			return nil
  65  		default:
  66  		}
  67  	}
  68  	b.backlog = append(b.backlog, t)
  69  	return nil
  70  }
  71  
  72  // Load sends the earliest buffered data, if any, onto the read channel returned
  73  // by Get(). Users are expected to call this every time they successfully read a
  74  // value from the read channel.
  75  func (b *Unbounded) Load() {
  76  	b.mu.Lock()
  77  	defer b.mu.Unlock()
  78  	if len(b.backlog) > 0 {
  79  		select {
  80  		case b.c <- b.backlog[0]:
  81  			b.backlog[0] = nil
  82  			b.backlog = b.backlog[1:]
  83  		default:
  84  		}
  85  	} else if b.closing && !b.closed {
  86  		b.closed = true
  87  		close(b.c)
  88  	}
  89  }
  90  
  91  // Get returns a read channel on which values added to the buffer, via Put(),
  92  // are sent on.
  93  //
  94  // Upon reading a value from this channel, users are expected to call Load() to
  95  // send the next buffered value onto the channel if there is any.
  96  //
  97  // If the unbounded buffer is closed, the read channel returned by this method
  98  // is closed after all data is drained.
  99  func (b *Unbounded) Get() <-chan any {
 100  	return b.c
 101  }
 102  
 103  // Close closes the unbounded buffer. No subsequent data may be Put(), and the
 104  // channel returned from Get() will be closed after all the data is read and
 105  // Load() is called for the final time.
 106  func (b *Unbounded) Close() {
 107  	b.mu.Lock()
 108  	defer b.mu.Unlock()
 109  	if b.closing {
 110  		return
 111  	}
 112  	b.closing = true
 113  	if len(b.backlog) == 0 {
 114  		b.closed = true
 115  		close(b.c)
 116  	}
 117  }
 118