1 /*
2 *
3 * Copyright 2024 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 19 package transport
20 21 import (
22 "sync/atomic"
23 24 "golang.org/x/net/http2"
25 "google.golang.org/grpc/mem"
26 "google.golang.org/grpc/metadata"
27 "google.golang.org/grpc/status"
28 )
29 30 // ClientStream implements streaming functionality for a gRPC client.
31 type ClientStream struct {
32 Stream // Embed for common stream functionality.
33 34 ct *http2Client
35 done chan struct{} // closed at the end of stream to unblock writers.
36 doneFunc func() // invoked at the end of stream.
37 38 headerChan chan struct{} // closed to indicate the end of header metadata.
39 header metadata.MD // the received header metadata
40 41 status *status.Status // the status error received from the server
42 43 // Non-pointer fields are at the end to optimize GC allocations.
44 45 // headerValid indicates whether a valid header was received. Only
46 // meaningful after headerChan is closed (always call waitOnHeader() before
47 // reading its value).
48 headerValid bool
49 noHeaders bool // set if the client never received headers (set only after the stream is done).
50 headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
51 bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream
52 unprocessed atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream
53 }
54 55 // Read reads an n byte message from the input stream.
56 func (s *ClientStream) Read(n int) (mem.BufferSlice, error) {
57 b, err := s.Stream.read(n)
58 if err == nil {
59 s.ct.incrMsgRecv()
60 }
61 return b, err
62 }
63 64 // Close closes the stream and propagates err to any readers.
65 func (s *ClientStream) Close(err error) {
66 var (
67 rst bool
68 rstCode http2.ErrCode
69 )
70 if err != nil {
71 rst = true
72 rstCode = http2.ErrCodeCancel
73 }
74 s.ct.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
75 }
76 77 // Write writes the hdr and data bytes to the output stream.
78 func (s *ClientStream) Write(hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
79 return s.ct.write(s, hdr, data, opts)
80 }
81 82 // BytesReceived indicates whether any bytes have been received on this stream.
83 func (s *ClientStream) BytesReceived() bool {
84 return s.bytesReceived.Load()
85 }
86 87 // Unprocessed indicates whether the server did not process this stream --
88 // i.e. it sent a refused stream or GOAWAY including this stream ID.
89 func (s *ClientStream) Unprocessed() bool {
90 return s.unprocessed.Load()
91 }
92 93 func (s *ClientStream) waitOnHeader() {
94 select {
95 case <-s.ctx.Done():
96 // Close the stream to prevent headers/trailers from changing after
97 // this function returns.
98 s.Close(ContextErr(s.ctx.Err()))
99 // headerChan could possibly not be closed yet if closeStream raced
100 // with operateHeaders; wait until it is closed explicitly here.
101 <-s.headerChan
102 case <-s.headerChan:
103 }
104 }
105 106 // RecvCompress returns the compression algorithm applied to the inbound
107 // message. It is empty string if there is no compression applied.
108 func (s *ClientStream) RecvCompress() string {
109 s.waitOnHeader()
110 return s.recvCompress
111 }
112 113 // Done returns a channel which is closed when it receives the final status
114 // from the server.
115 func (s *ClientStream) Done() <-chan struct{} {
116 return s.done
117 }
118 119 // Header returns the header metadata of the stream. Acquires the key-value
120 // pairs of header metadata once it is available. It blocks until i) the
121 // metadata is ready or ii) there is no header metadata or iii) the stream is
122 // canceled/expired.
123 func (s *ClientStream) Header() (metadata.MD, error) {
124 s.waitOnHeader()
125 126 if !s.headerValid || s.noHeaders {
127 return nil, s.status.Err()
128 }
129 130 return s.header.Copy(), nil
131 }
132 133 // TrailersOnly blocks until a header or trailers-only frame is received and
134 // then returns true if the stream was trailers-only. If the stream ends
135 // before headers are received, returns true, nil.
136 func (s *ClientStream) TrailersOnly() bool {
137 s.waitOnHeader()
138 return s.noHeaders
139 }
140 141 // Status returns the status received from the server.
142 // Status can be read safely only after the stream has ended,
143 // that is, after Done() is closed.
144 func (s *ClientStream) Status() *status.Status {
145 return s.status
146 }
147 148 func (s *ClientStream) requestRead(n int) {
149 s.ct.adjustWindow(s, uint32(n))
150 }
151 152 func (s *ClientStream) updateWindow(n int) {
153 s.ct.updateWindow(s, uint32(n))
154 }
155