1 /*
2 *
3 * Copyright 2020 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 19 // Package resolver provides internal resolver-related functionality.
20 package resolver
21 22 import (
23 "context"
24 "sync"
25 26 "google.golang.org/grpc/internal/serviceconfig"
27 "google.golang.org/grpc/metadata"
28 "google.golang.org/grpc/resolver"
29 )
30 31 // ConfigSelector controls what configuration to use for every RPC.
32 type ConfigSelector interface {
33 // Selects the configuration for the RPC, or terminates it using the error.
34 // This error will be converted by the gRPC library to a status error with
35 // code UNKNOWN if it is not returned as a status error.
36 SelectConfig(RPCInfo) (*RPCConfig, error)
37 }
38 39 // RPCInfo contains RPC information needed by a ConfigSelector.
40 type RPCInfo struct {
41 // Context is the user's context for the RPC and contains headers and
42 // application timeout. It is passed for interception purposes and for
43 // efficiency reasons. SelectConfig should not be blocking.
44 Context context.Context
45 Method string // i.e. "/Service/Method"
46 }
47 48 // RPCConfig describes the configuration to use for each RPC.
49 type RPCConfig struct {
50 // The context to use for the remainder of the RPC; can pass info to LB
51 // policy or affect timeout or metadata.
52 Context context.Context
53 MethodConfig serviceconfig.MethodConfig // configuration to use for this RPC
54 OnCommitted func() // Called when the RPC has been committed (retries no longer possible)
55 Interceptor ClientInterceptor
56 }
57 58 // ClientStream is the same as grpc.ClientStream, but defined here for circular
59 // dependency reasons.
60 type ClientStream interface {
61 // Header returns the header metadata received from the server if there
62 // is any. It blocks if the metadata is not ready to read.
63 Header() (metadata.MD, error)
64 // Trailer returns the trailer metadata from the server, if there is any.
65 // It must only be called after stream.CloseAndRecv has returned, or
66 // stream.Recv has returned a non-nil error (including io.EOF).
67 Trailer() metadata.MD
68 // CloseSend closes the send direction of the stream. It closes the stream
69 // when non-nil error is met. It is also not safe to call CloseSend
70 // concurrently with SendMsg.
71 CloseSend() error
72 // Context returns the context for this stream.
73 //
74 // It should not be called until after Header or RecvMsg has returned. Once
75 // called, subsequent client-side retries are disabled.
76 Context() context.Context
77 // SendMsg is generally called by generated code. On error, SendMsg aborts
78 // the stream. If the error was generated by the client, the status is
79 // returned directly; otherwise, io.EOF is returned and the status of
80 // the stream may be discovered using RecvMsg.
81 //
82 // SendMsg blocks until:
83 // - There is sufficient flow control to schedule m with the transport, or
84 // - The stream is done, or
85 // - The stream breaks.
86 //
87 // SendMsg does not wait until the message is received by the server. An
88 // untimely stream closure may result in lost messages. To ensure delivery,
89 // users should ensure the RPC completed successfully using RecvMsg.
90 //
91 // It is safe to have a goroutine calling SendMsg and another goroutine
92 // calling RecvMsg on the same stream at the same time, but it is not safe
93 // to call SendMsg on the same stream in different goroutines. It is also
94 // not safe to call CloseSend concurrently with SendMsg.
95 SendMsg(m any) error
96 // RecvMsg blocks until it receives a message into m or the stream is
97 // done. It returns io.EOF when the stream completes successfully. On
98 // any other error, the stream is aborted and the error contains the RPC
99 // status.
100 //
101 // It is safe to have a goroutine calling SendMsg and another goroutine
102 // calling RecvMsg on the same stream at the same time, but it is not
103 // safe to call RecvMsg on the same stream in different goroutines.
104 RecvMsg(m any) error
105 }
106 107 // ClientInterceptor is an interceptor for gRPC client streams.
108 type ClientInterceptor interface {
109 // NewStream produces a ClientStream for an RPC which may optionally use
110 // the provided function to produce a stream for delegation. Note:
111 // RPCInfo.Context should not be used (will be nil).
112 //
113 // done is invoked when the RPC is finished using its connection, or could
114 // not be assigned a connection. RPC operations may still occur on
115 // ClientStream after done is called, since the interceptor is invoked by
116 // application-layer operations. done must never be nil when called.
117 NewStream(ctx context.Context, ri RPCInfo, done func(), newStream func(ctx context.Context, done func()) (ClientStream, error)) (ClientStream, error)
118 }
119 120 // ServerInterceptor is an interceptor for incoming RPC's on gRPC server side.
121 type ServerInterceptor interface {
122 // AllowRPC checks if an incoming RPC is allowed to proceed based on
123 // information about connection RPC was received on, and HTTP Headers. This
124 // information will be piped into context.
125 AllowRPC(ctx context.Context) error // TODO: Make this a real interceptor for filters such as rate limiting.
126 }
127 128 type csKeyType string
129 130 const csKey = csKeyType("grpc.internal.resolver.configSelector")
131 132 // SetConfigSelector sets the config selector in state and returns the new
133 // state.
134 func SetConfigSelector(state resolver.State, cs ConfigSelector) resolver.State {
135 state.Attributes = state.Attributes.WithValue(csKey, cs)
136 return state
137 }
138 139 // GetConfigSelector retrieves the config selector from state, if present, and
140 // returns it or nil if absent.
141 func GetConfigSelector(state resolver.State) ConfigSelector {
142 cs, _ := state.Attributes.Value(csKey).(ConfigSelector)
143 return cs
144 }
145 146 // SafeConfigSelector allows for safe switching of ConfigSelector
147 // implementations such that previous values are guaranteed to not be in use
148 // when UpdateConfigSelector returns.
149 type SafeConfigSelector struct {
150 mu sync.RWMutex
151 cs ConfigSelector
152 }
153 154 // UpdateConfigSelector swaps to the provided ConfigSelector and blocks until
155 // all uses of the previous ConfigSelector have completed.
156 func (scs *SafeConfigSelector) UpdateConfigSelector(cs ConfigSelector) {
157 scs.mu.Lock()
158 defer scs.mu.Unlock()
159 scs.cs = cs
160 }
161 162 // SelectConfig defers to the current ConfigSelector in scs.
163 func (scs *SafeConfigSelector) SelectConfig(r RPCInfo) (*RPCConfig, error) {
164 scs.mu.RLock()
165 defer scs.mu.RUnlock()
166 return scs.cs.SelectConfig(r)
167 }
168