config_selector.go raw

   1  /*
   2   *
   3   * Copyright 2020 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  // Package resolver provides internal resolver-related functionality.
  20  package resolver
  21  
  22  import (
  23  	"context"
  24  	"sync"
  25  
  26  	"google.golang.org/grpc/internal/serviceconfig"
  27  	"google.golang.org/grpc/metadata"
  28  	"google.golang.org/grpc/resolver"
  29  )
  30  
  31  // ConfigSelector controls what configuration to use for every RPC.
  32  type ConfigSelector interface {
  33  	// Selects the configuration for the RPC, or terminates it using the error.
  34  	// This error will be converted by the gRPC library to a status error with
  35  	// code UNKNOWN if it is not returned as a status error.
  36  	SelectConfig(RPCInfo) (*RPCConfig, error)
  37  }
  38  
  39  // RPCInfo contains RPC information needed by a ConfigSelector.
  40  type RPCInfo struct {
  41  	// Context is the user's context for the RPC and contains headers and
  42  	// application timeout.  It is passed for interception purposes and for
  43  	// efficiency reasons.  SelectConfig should not be blocking.
  44  	Context context.Context
  45  	Method  string // i.e. "/Service/Method"
  46  }
  47  
  48  // RPCConfig describes the configuration to use for each RPC.
  49  type RPCConfig struct {
  50  	// The context to use for the remainder of the RPC; can pass info to LB
  51  	// policy or affect timeout or metadata.
  52  	Context      context.Context
  53  	MethodConfig serviceconfig.MethodConfig // configuration to use for this RPC
  54  	OnCommitted  func()                     // Called when the RPC has been committed (retries no longer possible)
  55  	Interceptor  ClientInterceptor
  56  }
  57  
  58  // ClientStream is the same as grpc.ClientStream, but defined here for circular
  59  // dependency reasons.
  60  type ClientStream interface {
  61  	// Header returns the header metadata received from the server if there
  62  	// is any. It blocks if the metadata is not ready to read.
  63  	Header() (metadata.MD, error)
  64  	// Trailer returns the trailer metadata from the server, if there is any.
  65  	// It must only be called after stream.CloseAndRecv has returned, or
  66  	// stream.Recv has returned a non-nil error (including io.EOF).
  67  	Trailer() metadata.MD
  68  	// CloseSend closes the send direction of the stream. It closes the stream
  69  	// when non-nil error is met. It is also not safe to call CloseSend
  70  	// concurrently with SendMsg.
  71  	CloseSend() error
  72  	// Context returns the context for this stream.
  73  	//
  74  	// It should not be called until after Header or RecvMsg has returned. Once
  75  	// called, subsequent client-side retries are disabled.
  76  	Context() context.Context
  77  	// SendMsg is generally called by generated code. On error, SendMsg aborts
  78  	// the stream. If the error was generated by the client, the status is
  79  	// returned directly; otherwise, io.EOF is returned and the status of
  80  	// the stream may be discovered using RecvMsg.
  81  	//
  82  	// SendMsg blocks until:
  83  	//   - There is sufficient flow control to schedule m with the transport, or
  84  	//   - The stream is done, or
  85  	//   - The stream breaks.
  86  	//
  87  	// SendMsg does not wait until the message is received by the server. An
  88  	// untimely stream closure may result in lost messages. To ensure delivery,
  89  	// users should ensure the RPC completed successfully using RecvMsg.
  90  	//
  91  	// It is safe to have a goroutine calling SendMsg and another goroutine
  92  	// calling RecvMsg on the same stream at the same time, but it is not safe
  93  	// to call SendMsg on the same stream in different goroutines. It is also
  94  	// not safe to call CloseSend concurrently with SendMsg.
  95  	SendMsg(m any) error
  96  	// RecvMsg blocks until it receives a message into m or the stream is
  97  	// done. It returns io.EOF when the stream completes successfully. On
  98  	// any other error, the stream is aborted and the error contains the RPC
  99  	// status.
 100  	//
 101  	// It is safe to have a goroutine calling SendMsg and another goroutine
 102  	// calling RecvMsg on the same stream at the same time, but it is not
 103  	// safe to call RecvMsg on the same stream in different goroutines.
 104  	RecvMsg(m any) error
 105  }
 106  
 107  // ClientInterceptor is an interceptor for gRPC client streams.
 108  type ClientInterceptor interface {
 109  	// NewStream produces a ClientStream for an RPC which may optionally use
 110  	// the provided function to produce a stream for delegation.  Note:
 111  	// RPCInfo.Context should not be used (will be nil).
 112  	//
 113  	// done is invoked when the RPC is finished using its connection, or could
 114  	// not be assigned a connection.  RPC operations may still occur on
 115  	// ClientStream after done is called, since the interceptor is invoked by
 116  	// application-layer operations.  done must never be nil when called.
 117  	NewStream(ctx context.Context, ri RPCInfo, done func(), newStream func(ctx context.Context, done func()) (ClientStream, error)) (ClientStream, error)
 118  }
 119  
 120  // ServerInterceptor is an interceptor for incoming RPC's on gRPC server side.
 121  type ServerInterceptor interface {
 122  	// AllowRPC checks if an incoming RPC is allowed to proceed based on
 123  	// information about connection RPC was received on, and HTTP Headers. This
 124  	// information will be piped into context.
 125  	AllowRPC(ctx context.Context) error // TODO: Make this a real interceptor for filters such as rate limiting.
 126  }
 127  
 128  type csKeyType string
 129  
 130  const csKey = csKeyType("grpc.internal.resolver.configSelector")
 131  
 132  // SetConfigSelector sets the config selector in state and returns the new
 133  // state.
 134  func SetConfigSelector(state resolver.State, cs ConfigSelector) resolver.State {
 135  	state.Attributes = state.Attributes.WithValue(csKey, cs)
 136  	return state
 137  }
 138  
 139  // GetConfigSelector retrieves the config selector from state, if present, and
 140  // returns it or nil if absent.
 141  func GetConfigSelector(state resolver.State) ConfigSelector {
 142  	cs, _ := state.Attributes.Value(csKey).(ConfigSelector)
 143  	return cs
 144  }
 145  
 146  // SafeConfigSelector allows for safe switching of ConfigSelector
 147  // implementations such that previous values are guaranteed to not be in use
 148  // when UpdateConfigSelector returns.
 149  type SafeConfigSelector struct {
 150  	mu sync.RWMutex
 151  	cs ConfigSelector
 152  }
 153  
 154  // UpdateConfigSelector swaps to the provided ConfigSelector and blocks until
 155  // all uses of the previous ConfigSelector have completed.
 156  func (scs *SafeConfigSelector) UpdateConfigSelector(cs ConfigSelector) {
 157  	scs.mu.Lock()
 158  	defer scs.mu.Unlock()
 159  	scs.cs = cs
 160  }
 161  
 162  // SelectConfig defers to the current ConfigSelector in scs.
 163  func (scs *SafeConfigSelector) SelectConfig(r RPCInfo) (*RPCConfig, error) {
 164  	scs.mu.RLock()
 165  	defer scs.mu.RUnlock()
 166  	return scs.cs.SelectConfig(r)
 167  }
 168