server_stream.go raw

   1  /*
   2   *
   3   * Copyright 2024 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package transport
  20  
  21  import (
  22  	"context"
  23  	"errors"
  24  	"strings"
  25  	"sync"
  26  	"sync/atomic"
  27  
  28  	"google.golang.org/grpc/mem"
  29  	"google.golang.org/grpc/metadata"
  30  	"google.golang.org/grpc/status"
  31  )
  32  
  33  // ServerStream implements streaming functionality for a gRPC server.
  34  type ServerStream struct {
  35  	Stream // Embed for common stream functionality.
  36  
  37  	st      internalServerTransport
  38  	ctxDone <-chan struct{} // closed at the end of stream.  Cache of ctx.Done() (for performance)
  39  	// cancel is invoked at the end of stream to cancel ctx. It also stops the
  40  	// timer for monitoring the rpc deadline if configured.
  41  	cancel func()
  42  
  43  	// Holds compressor names passed in grpc-accept-encoding metadata from the
  44  	// client.
  45  	clientAdvertisedCompressors string
  46  
  47  	// hdrMu protects outgoing header and trailer metadata.
  48  	hdrMu      sync.Mutex
  49  	header     metadata.MD // the outgoing header metadata.  Updated by WriteHeader.
  50  	headerSent atomic.Bool // atomically set when the headers are sent out.
  51  
  52  	headerWireLength int
  53  }
  54  
  55  // Read reads an n byte message from the input stream.
  56  func (s *ServerStream) Read(n int) (mem.BufferSlice, error) {
  57  	b, err := s.Stream.read(n)
  58  	if err == nil {
  59  		s.st.incrMsgRecv()
  60  	}
  61  	return b, err
  62  }
  63  
  64  // SendHeader sends the header metadata for the given stream.
  65  func (s *ServerStream) SendHeader(md metadata.MD) error {
  66  	return s.st.writeHeader(s, md)
  67  }
  68  
  69  // Write writes the hdr and data bytes to the output stream.
  70  func (s *ServerStream) Write(hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
  71  	return s.st.write(s, hdr, data, opts)
  72  }
  73  
  74  // WriteStatus sends the status of a stream to the client.  WriteStatus is
  75  // the final call made on a stream and always occurs.
  76  func (s *ServerStream) WriteStatus(st *status.Status) error {
  77  	return s.st.writeStatus(s, st)
  78  }
  79  
  80  // isHeaderSent indicates whether headers have been sent.
  81  func (s *ServerStream) isHeaderSent() bool {
  82  	return s.headerSent.Load()
  83  }
  84  
  85  // updateHeaderSent updates headerSent and returns true
  86  // if it was already set.
  87  func (s *ServerStream) updateHeaderSent() bool {
  88  	return s.headerSent.Swap(true)
  89  }
  90  
  91  // RecvCompress returns the compression algorithm applied to the inbound
  92  // message. It is empty string if there is no compression applied.
  93  func (s *ServerStream) RecvCompress() string {
  94  	return s.recvCompress
  95  }
  96  
  97  // SendCompress returns the send compressor name.
  98  func (s *ServerStream) SendCompress() string {
  99  	return s.sendCompress
 100  }
 101  
 102  // ContentSubtype returns the content-subtype for a request. For example, a
 103  // content-subtype of "proto" will result in a content-type of
 104  // "application/grpc+proto". This will always be lowercase.  See
 105  // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
 106  // more details.
 107  func (s *ServerStream) ContentSubtype() string {
 108  	return s.contentSubtype
 109  }
 110  
 111  // SetSendCompress sets the compression algorithm to the stream.
 112  func (s *ServerStream) SetSendCompress(name string) error {
 113  	if s.isHeaderSent() || s.getState() == streamDone {
 114  		return errors.New("transport: set send compressor called after headers sent or stream done")
 115  	}
 116  
 117  	s.sendCompress = name
 118  	return nil
 119  }
 120  
 121  // SetContext sets the context of the stream. This will be deleted once the
 122  // stats handler callouts all move to gRPC layer.
 123  func (s *ServerStream) SetContext(ctx context.Context) {
 124  	s.ctx = ctx
 125  }
 126  
 127  // ClientAdvertisedCompressors returns the compressor names advertised by the
 128  // client via grpc-accept-encoding header.
 129  func (s *ServerStream) ClientAdvertisedCompressors() []string {
 130  	values := strings.Split(s.clientAdvertisedCompressors, ",")
 131  	for i, v := range values {
 132  		values[i] = strings.TrimSpace(v)
 133  	}
 134  	return values
 135  }
 136  
 137  // Header returns the header metadata of the stream.  It returns the out header
 138  // after t.WriteHeader is called.  It does not block and must not be called
 139  // until after WriteHeader.
 140  func (s *ServerStream) Header() (metadata.MD, error) {
 141  	// Return the header in stream. It will be the out
 142  	// header after t.WriteHeader is called.
 143  	return s.header.Copy(), nil
 144  }
 145  
 146  // HeaderWireLength returns the size of the headers of the stream as received
 147  // from the wire.
 148  func (s *ServerStream) HeaderWireLength() int {
 149  	return s.headerWireLength
 150  }
 151  
 152  // SetHeader sets the header metadata. This can be called multiple times.
 153  // This should not be called in parallel to other data writes.
 154  func (s *ServerStream) SetHeader(md metadata.MD) error {
 155  	if md.Len() == 0 {
 156  		return nil
 157  	}
 158  	if s.isHeaderSent() || s.getState() == streamDone {
 159  		return ErrIllegalHeaderWrite
 160  	}
 161  	s.hdrMu.Lock()
 162  	s.header = metadata.Join(s.header, md)
 163  	s.hdrMu.Unlock()
 164  	return nil
 165  }
 166  
 167  // SetTrailer sets the trailer metadata which will be sent with the RPC status
 168  // by the server. This can be called multiple times.
 169  // This should not be called parallel to other data writes.
 170  func (s *ServerStream) SetTrailer(md metadata.MD) error {
 171  	if md.Len() == 0 {
 172  		return nil
 173  	}
 174  	if s.getState() == streamDone {
 175  		return ErrIllegalHeaderWrite
 176  	}
 177  	s.hdrMu.Lock()
 178  	s.trailer = metadata.Join(s.trailer, md)
 179  	s.hdrMu.Unlock()
 180  	return nil
 181  }
 182  
 183  func (s *ServerStream) requestRead(n int) {
 184  	s.st.adjustWindow(s, uint32(n))
 185  }
 186  
 187  func (s *ServerStream) updateWindow(n int) {
 188  	s.st.updateWindow(s, uint32(n))
 189  }
 190