client_stream.go raw

   1  /*
   2   *
   3   * Copyright 2024 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package transport
  20  
  21  import (
  22  	"sync/atomic"
  23  
  24  	"golang.org/x/net/http2"
  25  	"google.golang.org/grpc/mem"
  26  	"google.golang.org/grpc/metadata"
  27  	"google.golang.org/grpc/status"
  28  )
  29  
  30  // ClientStream implements streaming functionality for a gRPC client.
  31  type ClientStream struct {
  32  	Stream // Embed for common stream functionality.
  33  
  34  	ct       *http2Client
  35  	done     chan struct{} // closed at the end of stream to unblock writers.
  36  	doneFunc func()        // invoked at the end of stream.
  37  
  38  	headerChan chan struct{} // closed to indicate the end of header metadata.
  39  	header     metadata.MD   // the received header metadata
  40  
  41  	status *status.Status // the status error received from the server
  42  
  43  	// Non-pointer fields are at the end to optimize GC allocations.
  44  
  45  	// headerValid indicates whether a valid header was received.  Only
  46  	// meaningful after headerChan is closed (always call waitOnHeader() before
  47  	// reading its value).
  48  	headerValid      bool
  49  	noHeaders        bool        // set if the client never received headers (set only after the stream is done).
  50  	headerChanClosed uint32      // set when headerChan is closed. Used to avoid closing headerChan multiple times.
  51  	bytesReceived    atomic.Bool // indicates whether any bytes have been received on this stream
  52  	unprocessed      atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream
  53  }
  54  
  55  // Read reads an n byte message from the input stream.
  56  func (s *ClientStream) Read(n int) (mem.BufferSlice, error) {
  57  	b, err := s.Stream.read(n)
  58  	if err == nil {
  59  		s.ct.incrMsgRecv()
  60  	}
  61  	return b, err
  62  }
  63  
  64  // Close closes the stream and propagates err to any readers.
  65  func (s *ClientStream) Close(err error) {
  66  	var (
  67  		rst     bool
  68  		rstCode http2.ErrCode
  69  	)
  70  	if err != nil {
  71  		rst = true
  72  		rstCode = http2.ErrCodeCancel
  73  	}
  74  	s.ct.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
  75  }
  76  
  77  // Write writes the hdr and data bytes to the output stream.
  78  func (s *ClientStream) Write(hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
  79  	return s.ct.write(s, hdr, data, opts)
  80  }
  81  
  82  // BytesReceived indicates whether any bytes have been received on this stream.
  83  func (s *ClientStream) BytesReceived() bool {
  84  	return s.bytesReceived.Load()
  85  }
  86  
  87  // Unprocessed indicates whether the server did not process this stream --
  88  // i.e. it sent a refused stream or GOAWAY including this stream ID.
  89  func (s *ClientStream) Unprocessed() bool {
  90  	return s.unprocessed.Load()
  91  }
  92  
  93  func (s *ClientStream) waitOnHeader() {
  94  	select {
  95  	case <-s.ctx.Done():
  96  		// Close the stream to prevent headers/trailers from changing after
  97  		// this function returns.
  98  		s.Close(ContextErr(s.ctx.Err()))
  99  		// headerChan could possibly not be closed yet if closeStream raced
 100  		// with operateHeaders; wait until it is closed explicitly here.
 101  		<-s.headerChan
 102  	case <-s.headerChan:
 103  	}
 104  }
 105  
 106  // RecvCompress returns the compression algorithm applied to the inbound
 107  // message. It is empty string if there is no compression applied.
 108  func (s *ClientStream) RecvCompress() string {
 109  	s.waitOnHeader()
 110  	return s.recvCompress
 111  }
 112  
 113  // Done returns a channel which is closed when it receives the final status
 114  // from the server.
 115  func (s *ClientStream) Done() <-chan struct{} {
 116  	return s.done
 117  }
 118  
 119  // Header returns the header metadata of the stream. Acquires the key-value
 120  // pairs of header metadata once it is available. It blocks until i) the
 121  // metadata is ready or ii) there is no header metadata or iii) the stream is
 122  // canceled/expired.
 123  func (s *ClientStream) Header() (metadata.MD, error) {
 124  	s.waitOnHeader()
 125  
 126  	if !s.headerValid || s.noHeaders {
 127  		return nil, s.status.Err()
 128  	}
 129  
 130  	return s.header.Copy(), nil
 131  }
 132  
 133  // TrailersOnly blocks until a header or trailers-only frame is received and
 134  // then returns true if the stream was trailers-only.  If the stream ends
 135  // before headers are received, returns true, nil.
 136  func (s *ClientStream) TrailersOnly() bool {
 137  	s.waitOnHeader()
 138  	return s.noHeaders
 139  }
 140  
 141  // Status returns the status received from the server.
 142  // Status can be read safely only after the stream has ended,
 143  // that is, after Done() is closed.
 144  func (s *ClientStream) Status() *status.Status {
 145  	return s.status
 146  }
 147  
 148  func (s *ClientStream) requestRead(n int) {
 149  	s.ct.adjustWindow(s, uint32(n))
 150  }
 151  
 152  func (s *ClientStream) updateWindow(n int) {
 153  	s.ct.updateWindow(s, uint32(n))
 154  }
 155