transport.go raw

   1  /*
   2   *
   3   * Copyright 2014 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  // Package transport defines and implements message oriented communication
  20  // channel to complete various transactions (e.g., an RPC).  It is meant for
  21  // grpc-internal usage and is not intended to be imported directly by users.
  22  package transport
  23  
  24  import (
  25  	"context"
  26  	"errors"
  27  	"fmt"
  28  	"io"
  29  	"net"
  30  	"sync"
  31  	"sync/atomic"
  32  	"time"
  33  
  34  	"google.golang.org/grpc/codes"
  35  	"google.golang.org/grpc/credentials"
  36  	"google.golang.org/grpc/internal/channelz"
  37  	"google.golang.org/grpc/keepalive"
  38  	"google.golang.org/grpc/mem"
  39  	"google.golang.org/grpc/metadata"
  40  	"google.golang.org/grpc/peer"
  41  	"google.golang.org/grpc/stats"
  42  	"google.golang.org/grpc/status"
  43  	"google.golang.org/grpc/tap"
  44  )
  45  
  46  const logLevel = 2
  47  
  48  // recvMsg represents the received msg from the transport. All transport
  49  // protocol specific info has been removed.
  50  type recvMsg struct {
  51  	buffer mem.Buffer
  52  	// nil: received some data
  53  	// io.EOF: stream is completed. data is nil.
  54  	// other non-nil error: transport failure. data is nil.
  55  	err error
  56  }
  57  
  58  // recvBuffer is an unbounded channel of recvMsg structs.
  59  //
  60  // Note: recvBuffer differs from buffer.Unbounded only in the fact that it
  61  // holds a channel of recvMsg structs instead of objects implementing "item"
  62  // interface. recvBuffer is written to much more often and using strict recvMsg
  63  // structs helps avoid allocation in "recvBuffer.put"
  64  type recvBuffer struct {
  65  	c       chan recvMsg
  66  	mu      sync.Mutex
  67  	backlog []recvMsg
  68  	err     error
  69  }
  70  
  71  // init allows a recvBuffer to be initialized in-place, which is useful
  72  // for resetting a buffer or for avoiding a heap allocation when the buffer
  73  // is embedded in another struct.
  74  func (b *recvBuffer) init() {
  75  	b.c = make(chan recvMsg, 1)
  76  }
  77  
  78  func (b *recvBuffer) put(r recvMsg) {
  79  	b.mu.Lock()
  80  	if b.err != nil {
  81  		// drop the buffer on the floor. Since b.err is not nil, any subsequent reads
  82  		// will always return an error, making this buffer inaccessible.
  83  		r.buffer.Free()
  84  		b.mu.Unlock()
  85  		// An error had occurred earlier, don't accept more
  86  		// data or errors.
  87  		return
  88  	}
  89  	b.err = r.err
  90  	if len(b.backlog) == 0 {
  91  		select {
  92  		case b.c <- r:
  93  			b.mu.Unlock()
  94  			return
  95  		default:
  96  		}
  97  	}
  98  	b.backlog = append(b.backlog, r)
  99  	b.mu.Unlock()
 100  }
 101  
 102  func (b *recvBuffer) load() {
 103  	b.mu.Lock()
 104  	if len(b.backlog) > 0 {
 105  		select {
 106  		case b.c <- b.backlog[0]:
 107  			b.backlog[0] = recvMsg{}
 108  			b.backlog = b.backlog[1:]
 109  		default:
 110  		}
 111  	}
 112  	b.mu.Unlock()
 113  }
 114  
 115  // get returns the channel that receives a recvMsg in the buffer.
 116  //
 117  // Upon receipt of a recvMsg, the caller should call load to send another
 118  // recvMsg onto the channel if there is any.
 119  func (b *recvBuffer) get() <-chan recvMsg {
 120  	return b.c
 121  }
 122  
 123  // recvBufferReader implements io.Reader interface to read the data from
 124  // recvBuffer.
 125  type recvBufferReader struct {
 126  	_            noCopy
 127  	clientStream *ClientStream // The client transport stream is closed with a status representing ctx.Err() and nil trailer metadata.
 128  	ctx          context.Context
 129  	ctxDone      <-chan struct{} // cache of ctx.Done() (for performance).
 130  	recv         *recvBuffer
 131  	last         mem.Buffer // Stores the remaining data in the previous calls.
 132  	err          error
 133  }
 134  
 135  func (r *recvBufferReader) ReadMessageHeader(header []byte) (n int, err error) {
 136  	if r.err != nil {
 137  		return 0, r.err
 138  	}
 139  	if r.last != nil {
 140  		n, r.last = mem.ReadUnsafe(header, r.last)
 141  		return n, nil
 142  	}
 143  	if r.clientStream != nil {
 144  		n, r.err = r.readMessageHeaderClient(header)
 145  	} else {
 146  		n, r.err = r.readMessageHeader(header)
 147  	}
 148  	return n, r.err
 149  }
 150  
 151  // Read reads the next n bytes from last. If last is drained, it tries to read
 152  // additional data from recv. It blocks if there no additional data available in
 153  // recv. If Read returns any non-nil error, it will continue to return that
 154  // error.
 155  func (r *recvBufferReader) Read(n int) (buf mem.Buffer, err error) {
 156  	if r.err != nil {
 157  		return nil, r.err
 158  	}
 159  	if r.last != nil {
 160  		buf = r.last
 161  		if r.last.Len() > n {
 162  			buf, r.last = mem.SplitUnsafe(buf, n)
 163  		} else {
 164  			r.last = nil
 165  		}
 166  		return buf, nil
 167  	}
 168  	if r.clientStream != nil {
 169  		buf, r.err = r.readClient(n)
 170  	} else {
 171  		buf, r.err = r.read(n)
 172  	}
 173  	return buf, r.err
 174  }
 175  
 176  func (r *recvBufferReader) readMessageHeader(header []byte) (n int, err error) {
 177  	select {
 178  	case <-r.ctxDone:
 179  		return 0, ContextErr(r.ctx.Err())
 180  	case m := <-r.recv.get():
 181  		return r.readMessageHeaderAdditional(m, header)
 182  	}
 183  }
 184  
 185  func (r *recvBufferReader) read(n int) (buf mem.Buffer, err error) {
 186  	select {
 187  	case <-r.ctxDone:
 188  		return nil, ContextErr(r.ctx.Err())
 189  	case m := <-r.recv.get():
 190  		return r.readAdditional(m, n)
 191  	}
 192  }
 193  
 194  func (r *recvBufferReader) readMessageHeaderClient(header []byte) (n int, err error) {
 195  	// If the context is canceled, then closes the stream with nil metadata.
 196  	// closeStream writes its error parameter to r.recv as a recvMsg.
 197  	// r.readAdditional acts on that message and returns the necessary error.
 198  	select {
 199  	case <-r.ctxDone:
 200  		// Note that this adds the ctx error to the end of recv buffer, and
 201  		// reads from the head. This will delay the error until recv buffer is
 202  		// empty, thus will delay ctx cancellation in Recv().
 203  		//
 204  		// It's done this way to fix a race between ctx cancel and trailer. The
 205  		// race was, stream.Recv() may return ctx error if ctxDone wins the
 206  		// race, but stream.Trailer() may return a non-nil md because the stream
 207  		// was not marked as done when trailer is received. This closeStream
 208  		// call will mark stream as done, thus fix the race.
 209  		//
 210  		// TODO: delaying ctx error seems like a unnecessary side effect. What
 211  		// we really want is to mark the stream as done, and return ctx error
 212  		// faster.
 213  		r.clientStream.Close(ContextErr(r.ctx.Err()))
 214  		m := <-r.recv.get()
 215  		return r.readMessageHeaderAdditional(m, header)
 216  	case m := <-r.recv.get():
 217  		return r.readMessageHeaderAdditional(m, header)
 218  	}
 219  }
 220  
 221  func (r *recvBufferReader) readClient(n int) (buf mem.Buffer, err error) {
 222  	// If the context is canceled, then closes the stream with nil metadata.
 223  	// closeStream writes its error parameter to r.recv as a recvMsg.
 224  	// r.readAdditional acts on that message and returns the necessary error.
 225  	select {
 226  	case <-r.ctxDone:
 227  		// Note that this adds the ctx error to the end of recv buffer, and
 228  		// reads from the head. This will delay the error until recv buffer is
 229  		// empty, thus will delay ctx cancellation in Recv().
 230  		//
 231  		// It's done this way to fix a race between ctx cancel and trailer. The
 232  		// race was, stream.Recv() may return ctx error if ctxDone wins the
 233  		// race, but stream.Trailer() may return a non-nil md because the stream
 234  		// was not marked as done when trailer is received. This closeStream
 235  		// call will mark stream as done, thus fix the race.
 236  		//
 237  		// TODO: delaying ctx error seems like a unnecessary side effect. What
 238  		// we really want is to mark the stream as done, and return ctx error
 239  		// faster.
 240  		r.clientStream.Close(ContextErr(r.ctx.Err()))
 241  		m := <-r.recv.get()
 242  		return r.readAdditional(m, n)
 243  	case m := <-r.recv.get():
 244  		return r.readAdditional(m, n)
 245  	}
 246  }
 247  
 248  func (r *recvBufferReader) readMessageHeaderAdditional(m recvMsg, header []byte) (n int, err error) {
 249  	r.recv.load()
 250  	if m.err != nil {
 251  		if m.buffer != nil {
 252  			m.buffer.Free()
 253  		}
 254  		return 0, m.err
 255  	}
 256  
 257  	n, r.last = mem.ReadUnsafe(header, m.buffer)
 258  
 259  	return n, nil
 260  }
 261  
 262  func (r *recvBufferReader) readAdditional(m recvMsg, n int) (b mem.Buffer, err error) {
 263  	r.recv.load()
 264  	if m.err != nil {
 265  		if m.buffer != nil {
 266  			m.buffer.Free()
 267  		}
 268  		return nil, m.err
 269  	}
 270  
 271  	if m.buffer.Len() > n {
 272  		m.buffer, r.last = mem.SplitUnsafe(m.buffer, n)
 273  	}
 274  
 275  	return m.buffer, nil
 276  }
 277  
 278  type streamState uint32
 279  
 280  const (
 281  	streamActive    streamState = iota
 282  	streamWriteDone             // EndStream sent
 283  	streamReadDone              // EndStream received
 284  	streamDone                  // the entire stream is finished.
 285  )
 286  
 287  // Stream represents an RPC in the transport layer.
 288  type Stream struct {
 289  	ctx          context.Context // the associated context of the stream
 290  	method       string          // the associated RPC method of the stream
 291  	recvCompress string
 292  	sendCompress string
 293  
 294  	readRequester readRequester
 295  
 296  	// contentSubtype is the content-subtype for requests.
 297  	// this must be lowercase or the behavior is undefined.
 298  	contentSubtype string
 299  
 300  	trailer metadata.MD // the key-value map of trailer metadata.
 301  
 302  	// Non-pointer fields are at the end to optimize GC performance.
 303  	state    streamState
 304  	id       uint32
 305  	buf      recvBuffer
 306  	trReader transportReader
 307  	fc       inFlow
 308  	wq       writeQuota
 309  }
 310  
 311  // readRequester is used to state application's intentions to read data. This
 312  // is used to adjust flow control, if needed.
 313  type readRequester interface {
 314  	requestRead(int)
 315  }
 316  
 317  func (s *Stream) swapState(st streamState) streamState {
 318  	return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
 319  }
 320  
 321  func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
 322  	return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
 323  }
 324  
 325  func (s *Stream) getState() streamState {
 326  	return streamState(atomic.LoadUint32((*uint32)(&s.state)))
 327  }
 328  
 329  // Trailer returns the cached trailer metadata. Note that if it is not called
 330  // after the entire stream is done, it could return an empty MD.
 331  // It can be safely read only after stream has ended that is either read
 332  // or write have returned io.EOF.
 333  func (s *Stream) Trailer() metadata.MD {
 334  	return s.trailer.Copy()
 335  }
 336  
 337  // Context returns the context of the stream.
 338  func (s *Stream) Context() context.Context {
 339  	return s.ctx
 340  }
 341  
 342  // Method returns the method for the stream.
 343  func (s *Stream) Method() string {
 344  	return s.method
 345  }
 346  
 347  func (s *Stream) write(m recvMsg) {
 348  	s.buf.put(m)
 349  }
 350  
 351  // ReadMessageHeader reads data into the provided header slice from the stream.
 352  // It first checks if there was an error during a previous read operation and
 353  // returns it if present. It then requests a read operation for the length of
 354  // the header. It continues to read from the stream until the entire header
 355  // slice is filled or an error occurs. If an `io.EOF` error is encountered with
 356  // partially read data, it is converted to `io.ErrUnexpectedEOF` to indicate an
 357  // unexpected end of the stream. The method returns any error encountered during
 358  // the read process or nil if the header was successfully read.
 359  func (s *Stream) ReadMessageHeader(header []byte) (err error) {
 360  	// Don't request a read if there was an error earlier
 361  	if er := s.trReader.er; er != nil {
 362  		return er
 363  	}
 364  	s.readRequester.requestRead(len(header))
 365  	for len(header) != 0 {
 366  		n, err := s.trReader.ReadMessageHeader(header)
 367  		header = header[n:]
 368  		if len(header) == 0 {
 369  			err = nil
 370  		}
 371  		if err != nil {
 372  			if n > 0 && err == io.EOF {
 373  				err = io.ErrUnexpectedEOF
 374  			}
 375  			return err
 376  		}
 377  	}
 378  	return nil
 379  }
 380  
 381  // Read reads n bytes from the wire for this stream.
 382  func (s *Stream) read(n int) (data mem.BufferSlice, err error) {
 383  	// Don't request a read if there was an error earlier
 384  	if er := s.trReader.er; er != nil {
 385  		return nil, er
 386  	}
 387  	s.readRequester.requestRead(n)
 388  	for n != 0 {
 389  		buf, err := s.trReader.Read(n)
 390  		var bufLen int
 391  		if buf != nil {
 392  			bufLen = buf.Len()
 393  		}
 394  		n -= bufLen
 395  		if n == 0 {
 396  			err = nil
 397  		}
 398  		if err != nil {
 399  			if bufLen > 0 && err == io.EOF {
 400  				err = io.ErrUnexpectedEOF
 401  			}
 402  			data.Free()
 403  			return nil, err
 404  		}
 405  		data = append(data, buf)
 406  	}
 407  	return data, nil
 408  }
 409  
 410  // noCopy may be embedded into structs which must not be copied
 411  // after the first use.
 412  //
 413  // See https://golang.org/issues/8005#issuecomment-190753527
 414  // for details.
 415  type noCopy struct {
 416  }
 417  
 418  func (*noCopy) Lock()   {}
 419  func (*noCopy) Unlock() {}
 420  
 421  // transportReader reads all the data available for this Stream from the transport and
 422  // passes them into the decoder, which converts them into a gRPC message stream.
 423  // The error is io.EOF when the stream is done or another non-nil error if
 424  // the stream broke.
 425  type transportReader struct {
 426  	_ noCopy
 427  	// The handler to control the window update procedure for both this
 428  	// particular stream and the associated transport.
 429  	windowHandler windowHandler
 430  	er            error
 431  	reader        recvBufferReader
 432  }
 433  
 434  // The handler to control the window update procedure for both this
 435  // particular stream and the associated transport.
 436  type windowHandler interface {
 437  	updateWindow(int)
 438  }
 439  
 440  func (t *transportReader) ReadMessageHeader(header []byte) (int, error) {
 441  	n, err := t.reader.ReadMessageHeader(header)
 442  	if err != nil {
 443  		t.er = err
 444  		return 0, err
 445  	}
 446  	t.windowHandler.updateWindow(n)
 447  	return n, nil
 448  }
 449  
 450  func (t *transportReader) Read(n int) (mem.Buffer, error) {
 451  	buf, err := t.reader.Read(n)
 452  	if err != nil {
 453  		t.er = err
 454  		return buf, err
 455  	}
 456  	t.windowHandler.updateWindow(buf.Len())
 457  	return buf, nil
 458  }
 459  
 460  // GoString is implemented by Stream so context.String() won't
 461  // race when printing %#v.
 462  func (s *Stream) GoString() string {
 463  	return fmt.Sprintf("<stream: %p, %v>", s, s.method)
 464  }
 465  
 466  // state of transport
 467  type transportState int
 468  
 469  const (
 470  	reachable transportState = iota
 471  	closing
 472  	draining
 473  )
 474  
 475  // ServerConfig consists of all the configurations to establish a server transport.
 476  type ServerConfig struct {
 477  	MaxStreams            uint32
 478  	ConnectionTimeout     time.Duration
 479  	Credentials           credentials.TransportCredentials
 480  	InTapHandle           tap.ServerInHandle
 481  	StatsHandler          stats.Handler
 482  	KeepaliveParams       keepalive.ServerParameters
 483  	KeepalivePolicy       keepalive.EnforcementPolicy
 484  	InitialWindowSize     int32
 485  	InitialConnWindowSize int32
 486  	WriteBufferSize       int
 487  	ReadBufferSize        int
 488  	SharedWriteBuffer     bool
 489  	ChannelzParent        *channelz.Server
 490  	MaxHeaderListSize     *uint32
 491  	HeaderTableSize       *uint32
 492  	BufferPool            mem.BufferPool
 493  	StaticWindowSize      bool
 494  }
 495  
 496  // ConnectOptions covers all relevant options for communicating with the server.
 497  type ConnectOptions struct {
 498  	// UserAgent is the application user agent.
 499  	UserAgent string
 500  	// Dialer specifies how to dial a network address.
 501  	Dialer func(context.Context, string) (net.Conn, error)
 502  	// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
 503  	FailOnNonTempDialError bool
 504  	// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
 505  	PerRPCCredentials []credentials.PerRPCCredentials
 506  	// TransportCredentials stores the Authenticator required to setup a client
 507  	// connection. Only one of TransportCredentials and CredsBundle is non-nil.
 508  	TransportCredentials credentials.TransportCredentials
 509  	// CredsBundle is the credentials bundle to be used. Only one of
 510  	// TransportCredentials and CredsBundle is non-nil.
 511  	CredsBundle credentials.Bundle
 512  	// KeepaliveParams stores the keepalive parameters.
 513  	KeepaliveParams keepalive.ClientParameters
 514  	// StatsHandlers stores the handler for stats.
 515  	StatsHandlers []stats.Handler
 516  	// InitialWindowSize sets the initial window size for a stream.
 517  	InitialWindowSize int32
 518  	// InitialConnWindowSize sets the initial window size for a connection.
 519  	InitialConnWindowSize int32
 520  	// WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
 521  	WriteBufferSize int
 522  	// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
 523  	ReadBufferSize int
 524  	// SharedWriteBuffer indicates whether connections should reuse write buffer
 525  	SharedWriteBuffer bool
 526  	// ChannelzParent sets the addrConn id which initiated the creation of this client transport.
 527  	ChannelzParent *channelz.SubChannel
 528  	// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
 529  	MaxHeaderListSize *uint32
 530  	// The mem.BufferPool to use when reading/writing to the wire.
 531  	BufferPool mem.BufferPool
 532  	// StaticWindowSize controls whether dynamic window sizing is enabled.
 533  	StaticWindowSize bool
 534  }
 535  
 536  // WriteOptions provides additional hints and information for message
 537  // transmission.
 538  type WriteOptions struct {
 539  	// Last indicates whether this write is the last piece for
 540  	// this stream.
 541  	Last bool
 542  }
 543  
 544  // CallHdr carries the information of a particular RPC.
 545  type CallHdr struct {
 546  	// Host specifies the peer's host.
 547  	Host string
 548  
 549  	// Method specifies the operation to perform.
 550  	Method string
 551  
 552  	// SendCompress specifies the compression algorithm applied on
 553  	// outbound message.
 554  	SendCompress string
 555  
 556  	// AcceptedCompressors overrides the grpc-accept-encoding header for this
 557  	// call. When nil, the transport advertises the default set of registered
 558  	// compressors. A non-nil pointer overrides that value (including the empty
 559  	// string to advertise none).
 560  	AcceptedCompressors *string
 561  
 562  	// Creds specifies credentials.PerRPCCredentials for a call.
 563  	Creds credentials.PerRPCCredentials
 564  
 565  	// ContentSubtype specifies the content-subtype for a request. For example, a
 566  	// content-subtype of "proto" will result in a content-type of
 567  	// "application/grpc+proto". The value of ContentSubtype must be all
 568  	// lowercase, otherwise the behavior is undefined. See
 569  	// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
 570  	// for more details.
 571  	ContentSubtype string
 572  
 573  	PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
 574  
 575  	DoneFunc func() // called when the stream is finished
 576  
 577  	// Authority is used to explicitly override the `:authority` header. If set,
 578  	// this value takes precedence over the Host field and will be used as the
 579  	// value for the `:authority` header.
 580  	Authority string
 581  }
 582  
 583  // ClientTransport is the common interface for all gRPC client-side transport
 584  // implementations.
 585  type ClientTransport interface {
 586  	// Close tears down this transport. Once it returns, the transport
 587  	// should not be accessed any more. The caller must make sure this
 588  	// is called only once.
 589  	Close(err error)
 590  
 591  	// GracefulClose starts to tear down the transport: the transport will stop
 592  	// accepting new RPCs and NewStream will return error. Once all streams are
 593  	// finished, the transport will close.
 594  	//
 595  	// It does not block.
 596  	GracefulClose()
 597  
 598  	// NewStream creates a Stream for an RPC.
 599  	NewStream(ctx context.Context, callHdr *CallHdr) (*ClientStream, error)
 600  
 601  	// Error returns a channel that is closed when some I/O error
 602  	// happens. Typically the caller should have a goroutine to monitor
 603  	// this in order to take action (e.g., close the current transport
 604  	// and create a new one) in error case. It should not return nil
 605  	// once the transport is initiated.
 606  	Error() <-chan struct{}
 607  
 608  	// GoAway returns a channel that is closed when ClientTransport
 609  	// receives the draining signal from the server (e.g., GOAWAY frame in
 610  	// HTTP/2).
 611  	GoAway() <-chan struct{}
 612  
 613  	// GetGoAwayReason returns the reason why GoAway frame was received, along
 614  	// with a human readable string with debug info.
 615  	GetGoAwayReason() (GoAwayReason, string)
 616  
 617  	// Peer returns information about the peer associated with the Transport.
 618  	// The returned information includes authentication and network address details.
 619  	Peer() *peer.Peer
 620  }
 621  
 622  // ServerTransport is the common interface for all gRPC server-side transport
 623  // implementations.
 624  //
 625  // Methods may be called concurrently from multiple goroutines, but
 626  // Write methods for a given Stream will be called serially.
 627  type ServerTransport interface {
 628  	// HandleStreams receives incoming streams using the given handler.
 629  	HandleStreams(context.Context, func(*ServerStream))
 630  
 631  	// Close tears down the transport. Once it is called, the transport
 632  	// should not be accessed any more. All the pending streams and their
 633  	// handlers will be terminated asynchronously.
 634  	Close(err error)
 635  
 636  	// Peer returns the peer of the server transport.
 637  	Peer() *peer.Peer
 638  
 639  	// Drain notifies the client this ServerTransport stops accepting new RPCs.
 640  	Drain(debugData string)
 641  }
 642  
 643  type internalServerTransport interface {
 644  	ServerTransport
 645  	writeHeader(s *ServerStream, md metadata.MD) error
 646  	write(s *ServerStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error
 647  	writeStatus(s *ServerStream, st *status.Status) error
 648  	incrMsgRecv()
 649  	adjustWindow(s *ServerStream, n uint32)
 650  	updateWindow(s *ServerStream, n uint32)
 651  }
 652  
 653  // connectionErrorf creates an ConnectionError with the specified error description.
 654  func connectionErrorf(temp bool, e error, format string, a ...any) ConnectionError {
 655  	return ConnectionError{
 656  		Desc: fmt.Sprintf(format, a...),
 657  		temp: temp,
 658  		err:  e,
 659  	}
 660  }
 661  
 662  // ConnectionError is an error that results in the termination of the
 663  // entire connection and the retry of all the active streams.
 664  type ConnectionError struct {
 665  	Desc string
 666  	temp bool
 667  	err  error
 668  }
 669  
 670  func (e ConnectionError) Error() string {
 671  	return fmt.Sprintf("connection error: desc = %q", e.Desc)
 672  }
 673  
 674  // Temporary indicates if this connection error is temporary or fatal.
 675  func (e ConnectionError) Temporary() bool {
 676  	return e.temp
 677  }
 678  
 679  // Origin returns the original error of this connection error.
 680  func (e ConnectionError) Origin() error {
 681  	// Never return nil error here.
 682  	// If the original error is nil, return itself.
 683  	if e.err == nil {
 684  		return e
 685  	}
 686  	return e.err
 687  }
 688  
 689  // Unwrap returns the original error of this connection error or nil when the
 690  // origin is nil.
 691  func (e ConnectionError) Unwrap() error {
 692  	return e.err
 693  }
 694  
 695  var (
 696  	// ErrConnClosing indicates that the transport is closing.
 697  	ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
 698  	// errStreamDrain indicates that the stream is rejected because the
 699  	// connection is draining. This could be caused by goaway or balancer
 700  	// removing the address.
 701  	errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
 702  	// errStreamDone is returned from write at the client side to indicate application
 703  	// layer of an error.
 704  	errStreamDone = errors.New("the stream is done")
 705  	// StatusGoAway indicates that the server sent a GOAWAY that included this
 706  	// stream's ID in unprocessed RPCs.
 707  	statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
 708  )
 709  
 710  // GoAwayReason contains the reason for the GoAway frame received.
 711  type GoAwayReason uint8
 712  
 713  const (
 714  	// GoAwayInvalid indicates that no GoAway frame is received.
 715  	GoAwayInvalid GoAwayReason = 0
 716  	// GoAwayNoReason is the default value when GoAway frame is received.
 717  	GoAwayNoReason GoAwayReason = 1
 718  	// GoAwayTooManyPings indicates that a GoAway frame with
 719  	// ErrCodeEnhanceYourCalm was received and that the debug data said
 720  	// "too_many_pings".
 721  	GoAwayTooManyPings GoAwayReason = 2
 722  )
 723  
 724  // ContextErr converts the error from context package into a status error.
 725  func ContextErr(err error) error {
 726  	switch err {
 727  	case context.DeadlineExceeded:
 728  		return status.Error(codes.DeadlineExceeded, err.Error())
 729  	case context.Canceled:
 730  		return status.Error(codes.Canceled, err.Error())
 731  	}
 732  	return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
 733  }
 734