1 /*
2 *
3 * Copyright 2024 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 19 package transport
20 21 import (
22 "context"
23 "errors"
24 "strings"
25 "sync"
26 "sync/atomic"
27 28 "google.golang.org/grpc/mem"
29 "google.golang.org/grpc/metadata"
30 "google.golang.org/grpc/status"
31 )
32 33 // ServerStream implements streaming functionality for a gRPC server.
34 type ServerStream struct {
35 Stream // Embed for common stream functionality.
36 37 st internalServerTransport
38 ctxDone <-chan struct{} // closed at the end of stream. Cache of ctx.Done() (for performance)
39 // cancel is invoked at the end of stream to cancel ctx. It also stops the
40 // timer for monitoring the rpc deadline if configured.
41 cancel func()
42 43 // Holds compressor names passed in grpc-accept-encoding metadata from the
44 // client.
45 clientAdvertisedCompressors string
46 47 // hdrMu protects outgoing header and trailer metadata.
48 hdrMu sync.Mutex
49 header metadata.MD // the outgoing header metadata. Updated by WriteHeader.
50 headerSent atomic.Bool // atomically set when the headers are sent out.
51 52 headerWireLength int
53 }
54 55 // Read reads an n byte message from the input stream.
56 func (s *ServerStream) Read(n int) (mem.BufferSlice, error) {
57 b, err := s.Stream.read(n)
58 if err == nil {
59 s.st.incrMsgRecv()
60 }
61 return b, err
62 }
63 64 // SendHeader sends the header metadata for the given stream.
65 func (s *ServerStream) SendHeader(md metadata.MD) error {
66 return s.st.writeHeader(s, md)
67 }
68 69 // Write writes the hdr and data bytes to the output stream.
70 func (s *ServerStream) Write(hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
71 return s.st.write(s, hdr, data, opts)
72 }
73 74 // WriteStatus sends the status of a stream to the client. WriteStatus is
75 // the final call made on a stream and always occurs.
76 func (s *ServerStream) WriteStatus(st *status.Status) error {
77 return s.st.writeStatus(s, st)
78 }
79 80 // isHeaderSent indicates whether headers have been sent.
81 func (s *ServerStream) isHeaderSent() bool {
82 return s.headerSent.Load()
83 }
84 85 // updateHeaderSent updates headerSent and returns true
86 // if it was already set.
87 func (s *ServerStream) updateHeaderSent() bool {
88 return s.headerSent.Swap(true)
89 }
90 91 // RecvCompress returns the compression algorithm applied to the inbound
92 // message. It is empty string if there is no compression applied.
93 func (s *ServerStream) RecvCompress() string {
94 return s.recvCompress
95 }
96 97 // SendCompress returns the send compressor name.
98 func (s *ServerStream) SendCompress() string {
99 return s.sendCompress
100 }
101 102 // ContentSubtype returns the content-subtype for a request. For example, a
103 // content-subtype of "proto" will result in a content-type of
104 // "application/grpc+proto". This will always be lowercase. See
105 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
106 // more details.
107 func (s *ServerStream) ContentSubtype() string {
108 return s.contentSubtype
109 }
110 111 // SetSendCompress sets the compression algorithm to the stream.
112 func (s *ServerStream) SetSendCompress(name string) error {
113 if s.isHeaderSent() || s.getState() == streamDone {
114 return errors.New("transport: set send compressor called after headers sent or stream done")
115 }
116 117 s.sendCompress = name
118 return nil
119 }
120 121 // SetContext sets the context of the stream. This will be deleted once the
122 // stats handler callouts all move to gRPC layer.
123 func (s *ServerStream) SetContext(ctx context.Context) {
124 s.ctx = ctx
125 }
126 127 // ClientAdvertisedCompressors returns the compressor names advertised by the
128 // client via grpc-accept-encoding header.
129 func (s *ServerStream) ClientAdvertisedCompressors() []string {
130 values := strings.Split(s.clientAdvertisedCompressors, ",")
131 for i, v := range values {
132 values[i] = strings.TrimSpace(v)
133 }
134 return values
135 }
136 137 // Header returns the header metadata of the stream. It returns the out header
138 // after t.WriteHeader is called. It does not block and must not be called
139 // until after WriteHeader.
140 func (s *ServerStream) Header() (metadata.MD, error) {
141 // Return the header in stream. It will be the out
142 // header after t.WriteHeader is called.
143 return s.header.Copy(), nil
144 }
145 146 // HeaderWireLength returns the size of the headers of the stream as received
147 // from the wire.
148 func (s *ServerStream) HeaderWireLength() int {
149 return s.headerWireLength
150 }
151 152 // SetHeader sets the header metadata. This can be called multiple times.
153 // This should not be called in parallel to other data writes.
154 func (s *ServerStream) SetHeader(md metadata.MD) error {
155 if md.Len() == 0 {
156 return nil
157 }
158 if s.isHeaderSent() || s.getState() == streamDone {
159 return ErrIllegalHeaderWrite
160 }
161 s.hdrMu.Lock()
162 s.header = metadata.Join(s.header, md)
163 s.hdrMu.Unlock()
164 return nil
165 }
166 167 // SetTrailer sets the trailer metadata which will be sent with the RPC status
168 // by the server. This can be called multiple times.
169 // This should not be called parallel to other data writes.
170 func (s *ServerStream) SetTrailer(md metadata.MD) error {
171 if md.Len() == 0 {
172 return nil
173 }
174 if s.getState() == streamDone {
175 return ErrIllegalHeaderWrite
176 }
177 s.hdrMu.Lock()
178 s.trailer = metadata.Join(s.trailer, md)
179 s.hdrMu.Unlock()
180 return nil
181 }
182 183 func (s *ServerStream) requestRead(n int) {
184 s.st.adjustWindow(s, uint32(n))
185 }
186 187 func (s *ServerStream) updateWindow(n int) {
188 s.st.updateWindow(s, uint32(n))
189 }
190