server.go raw

   1  /*
   2   *
   3   * Copyright 2014 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package grpc
  20  
  21  import (
  22  	"context"
  23  	"errors"
  24  	"fmt"
  25  	"io"
  26  	"math"
  27  	"net"
  28  	"net/http"
  29  	"reflect"
  30  	"runtime"
  31  	"strings"
  32  	"sync"
  33  	"sync/atomic"
  34  	"time"
  35  
  36  	"google.golang.org/grpc/codes"
  37  	"google.golang.org/grpc/credentials"
  38  	"google.golang.org/grpc/encoding"
  39  	"google.golang.org/grpc/encoding/proto"
  40  	estats "google.golang.org/grpc/experimental/stats"
  41  	"google.golang.org/grpc/grpclog"
  42  	"google.golang.org/grpc/internal"
  43  	"google.golang.org/grpc/internal/binarylog"
  44  	"google.golang.org/grpc/internal/channelz"
  45  	"google.golang.org/grpc/internal/grpcsync"
  46  	"google.golang.org/grpc/internal/grpcutil"
  47  	istats "google.golang.org/grpc/internal/stats"
  48  	"google.golang.org/grpc/internal/transport"
  49  	"google.golang.org/grpc/keepalive"
  50  	"google.golang.org/grpc/mem"
  51  	"google.golang.org/grpc/metadata"
  52  	"google.golang.org/grpc/peer"
  53  	"google.golang.org/grpc/stats"
  54  	"google.golang.org/grpc/status"
  55  	"google.golang.org/grpc/tap"
  56  )
  57  
  58  const (
  59  	defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
  60  	defaultServerMaxSendMessageSize    = math.MaxInt32
  61  
  62  	// Server transports are tracked in a map which is keyed on listener
  63  	// address. For regular gRPC traffic, connections are accepted in Serve()
  64  	// through a call to Accept(), and we use the actual listener address as key
  65  	// when we add it to the map. But for connections received through
  66  	// ServeHTTP(), we do not have a listener and hence use this dummy value.
  67  	listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
  68  )
  69  
  70  func init() {
  71  	internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
  72  		return srv.opts.creds
  73  	}
  74  	internal.IsRegisteredMethod = func(srv *Server, method string) bool {
  75  		return srv.isRegisteredMethod(method)
  76  	}
  77  	internal.ServerFromContext = serverFromContext
  78  	internal.AddGlobalServerOptions = func(opt ...ServerOption) {
  79  		globalServerOptions = append(globalServerOptions, opt...)
  80  	}
  81  	internal.ClearGlobalServerOptions = func() {
  82  		globalServerOptions = nil
  83  	}
  84  	internal.BinaryLogger = binaryLogger
  85  	internal.JoinServerOptions = newJoinServerOption
  86  	internal.BufferPool = bufferPool
  87  	internal.MetricsRecorderForServer = func(srv *Server) estats.MetricsRecorder {
  88  		return istats.NewMetricsRecorderList(srv.opts.statsHandlers)
  89  	}
  90  }
  91  
  92  var statusOK = status.New(codes.OK, "")
  93  var logger = grpclog.Component("core")
  94  
  95  // MethodHandler is a function type that processes a unary RPC method call.
  96  type MethodHandler func(srv any, ctx context.Context, dec func(any) error, interceptor UnaryServerInterceptor) (any, error)
  97  
  98  // MethodDesc represents an RPC service's method specification.
  99  type MethodDesc struct {
 100  	MethodName string
 101  	Handler    MethodHandler
 102  }
 103  
 104  // ServiceDesc represents an RPC service's specification.
 105  type ServiceDesc struct {
 106  	ServiceName string
 107  	// The pointer to the service interface. Used to check whether the user
 108  	// provided implementation satisfies the interface requirements.
 109  	HandlerType any
 110  	Methods     []MethodDesc
 111  	Streams     []StreamDesc
 112  	Metadata    any
 113  }
 114  
 115  // serviceInfo wraps information about a service. It is very similar to
 116  // ServiceDesc and is constructed from it for internal purposes.
 117  type serviceInfo struct {
 118  	// Contains the implementation for the methods in this service.
 119  	serviceImpl any
 120  	methods     map[string]*MethodDesc
 121  	streams     map[string]*StreamDesc
 122  	mdata       any
 123  }
 124  
 125  // Server is a gRPC server to serve RPC requests.
 126  type Server struct {
 127  	opts         serverOptions
 128  	statsHandler stats.Handler
 129  
 130  	mu  sync.Mutex // guards following
 131  	lis map[net.Listener]bool
 132  	// conns contains all active server transports. It is a map keyed on a
 133  	// listener address with the value being the set of active transports
 134  	// belonging to that listener.
 135  	conns    map[string]map[transport.ServerTransport]bool
 136  	serve    bool
 137  	drain    bool
 138  	cv       *sync.Cond              // signaled when connections close for GracefulStop
 139  	services map[string]*serviceInfo // service name -> service info
 140  	events   traceEventLog
 141  
 142  	quit               *grpcsync.Event
 143  	done               *grpcsync.Event
 144  	channelzRemoveOnce sync.Once
 145  	serveWG            sync.WaitGroup // counts active Serve goroutines for Stop/GracefulStop
 146  	handlersWG         sync.WaitGroup // counts active method handler goroutines
 147  
 148  	channelz *channelz.Server
 149  
 150  	serverWorkerChannel      chan func()
 151  	serverWorkerChannelClose func()
 152  }
 153  
 154  type serverOptions struct {
 155  	creds                 credentials.TransportCredentials
 156  	codec                 baseCodec
 157  	cp                    Compressor
 158  	dc                    Decompressor
 159  	unaryInt              UnaryServerInterceptor
 160  	streamInt             StreamServerInterceptor
 161  	chainUnaryInts        []UnaryServerInterceptor
 162  	chainStreamInts       []StreamServerInterceptor
 163  	binaryLogger          binarylog.Logger
 164  	inTapHandle           tap.ServerInHandle
 165  	statsHandlers         []stats.Handler
 166  	maxConcurrentStreams  uint32
 167  	maxReceiveMessageSize int
 168  	maxSendMessageSize    int
 169  	unknownStreamDesc     *StreamDesc
 170  	keepaliveParams       keepalive.ServerParameters
 171  	keepalivePolicy       keepalive.EnforcementPolicy
 172  	initialWindowSize     int32
 173  	initialConnWindowSize int32
 174  	writeBufferSize       int
 175  	readBufferSize        int
 176  	sharedWriteBuffer     bool
 177  	connectionTimeout     time.Duration
 178  	maxHeaderListSize     *uint32
 179  	headerTableSize       *uint32
 180  	numServerWorkers      uint32
 181  	bufferPool            mem.BufferPool
 182  	waitForHandlers       bool
 183  	staticWindowSize      bool
 184  }
 185  
 186  var defaultServerOptions = serverOptions{
 187  	maxConcurrentStreams:  math.MaxUint32,
 188  	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
 189  	maxSendMessageSize:    defaultServerMaxSendMessageSize,
 190  	connectionTimeout:     120 * time.Second,
 191  	writeBufferSize:       defaultWriteBufSize,
 192  	readBufferSize:        defaultReadBufSize,
 193  	bufferPool:            mem.DefaultBufferPool(),
 194  }
 195  var globalServerOptions []ServerOption
 196  
 197  // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
 198  type ServerOption interface {
 199  	apply(*serverOptions)
 200  }
 201  
 202  // EmptyServerOption does not alter the server configuration. It can be embedded
 203  // in another structure to build custom server options.
 204  //
 205  // # Experimental
 206  //
 207  // Notice: This type is EXPERIMENTAL and may be changed or removed in a
 208  // later release.
 209  type EmptyServerOption struct{}
 210  
 211  func (EmptyServerOption) apply(*serverOptions) {}
 212  
 213  // funcServerOption wraps a function that modifies serverOptions into an
 214  // implementation of the ServerOption interface.
 215  type funcServerOption struct {
 216  	f func(*serverOptions)
 217  }
 218  
 219  func (fdo *funcServerOption) apply(do *serverOptions) {
 220  	fdo.f(do)
 221  }
 222  
 223  func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
 224  	return &funcServerOption{
 225  		f: f,
 226  	}
 227  }
 228  
 229  // joinServerOption provides a way to combine arbitrary number of server
 230  // options into one.
 231  type joinServerOption struct {
 232  	opts []ServerOption
 233  }
 234  
 235  func (mdo *joinServerOption) apply(do *serverOptions) {
 236  	for _, opt := range mdo.opts {
 237  		opt.apply(do)
 238  	}
 239  }
 240  
 241  func newJoinServerOption(opts ...ServerOption) ServerOption {
 242  	return &joinServerOption{opts: opts}
 243  }
 244  
 245  // SharedWriteBuffer allows reusing per-connection transport write buffer.
 246  // If this option is set to true every connection will release the buffer after
 247  // flushing the data on the wire.
 248  //
 249  // # Experimental
 250  //
 251  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
 252  // later release.
 253  func SharedWriteBuffer(val bool) ServerOption {
 254  	return newFuncServerOption(func(o *serverOptions) {
 255  		o.sharedWriteBuffer = val
 256  	})
 257  }
 258  
 259  // WriteBufferSize determines how much data can be batched before doing a write
 260  // on the wire. The default value for this buffer is 32KB. Zero or negative
 261  // values will disable the write buffer such that each write will be on underlying
 262  // connection. Note: A Send call may not directly translate to a write.
 263  func WriteBufferSize(s int) ServerOption {
 264  	return newFuncServerOption(func(o *serverOptions) {
 265  		o.writeBufferSize = s
 266  	})
 267  }
 268  
 269  // ReadBufferSize lets you set the size of read buffer, this determines how much
 270  // data can be read at most for one read syscall. The default value for this
 271  // buffer is 32KB. Zero or negative values will disable read buffer for a
 272  // connection so data framer can access the underlying conn directly.
 273  func ReadBufferSize(s int) ServerOption {
 274  	return newFuncServerOption(func(o *serverOptions) {
 275  		o.readBufferSize = s
 276  	})
 277  }
 278  
 279  // InitialWindowSize returns a ServerOption that sets window size for stream.
 280  // The lower bound for window size is 64K and any value smaller than that will be ignored.
 281  func InitialWindowSize(s int32) ServerOption {
 282  	return newFuncServerOption(func(o *serverOptions) {
 283  		o.initialWindowSize = s
 284  		o.staticWindowSize = true
 285  	})
 286  }
 287  
 288  // InitialConnWindowSize returns a ServerOption that sets window size for a connection.
 289  // The lower bound for window size is 64K and any value smaller than that will be ignored.
 290  func InitialConnWindowSize(s int32) ServerOption {
 291  	return newFuncServerOption(func(o *serverOptions) {
 292  		o.initialConnWindowSize = s
 293  		o.staticWindowSize = true
 294  	})
 295  }
 296  
 297  // StaticStreamWindowSize returns a ServerOption to set the initial stream
 298  // window size to the value provided and disables dynamic flow control.
 299  // The lower bound for window size is 64K and any value smaller than that
 300  // will be ignored.
 301  func StaticStreamWindowSize(s int32) ServerOption {
 302  	return newFuncServerOption(func(o *serverOptions) {
 303  		o.initialWindowSize = s
 304  		o.staticWindowSize = true
 305  	})
 306  }
 307  
 308  // StaticConnWindowSize returns a ServerOption to set the initial connection
 309  // window size to the value provided and disables dynamic flow control.
 310  // The lower bound for window size is 64K and any value smaller than that
 311  // will be ignored.
 312  func StaticConnWindowSize(s int32) ServerOption {
 313  	return newFuncServerOption(func(o *serverOptions) {
 314  		o.initialConnWindowSize = s
 315  		o.staticWindowSize = true
 316  	})
 317  }
 318  
 319  // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
 320  func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
 321  	if kp.Time > 0 && kp.Time < internal.KeepaliveMinServerPingTime {
 322  		logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
 323  		kp.Time = internal.KeepaliveMinServerPingTime
 324  	}
 325  
 326  	return newFuncServerOption(func(o *serverOptions) {
 327  		o.keepaliveParams = kp
 328  	})
 329  }
 330  
 331  // KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
 332  func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
 333  	return newFuncServerOption(func(o *serverOptions) {
 334  		o.keepalivePolicy = kep
 335  	})
 336  }
 337  
 338  // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
 339  //
 340  // This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
 341  //
 342  // Deprecated: register codecs using encoding.RegisterCodec. The server will
 343  // automatically use registered codecs based on the incoming requests' headers.
 344  // See also
 345  // https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
 346  // Will be supported throughout 1.x.
 347  func CustomCodec(codec Codec) ServerOption {
 348  	return newFuncServerOption(func(o *serverOptions) {
 349  		o.codec = newCodecV0Bridge(codec)
 350  	})
 351  }
 352  
 353  // ForceServerCodec returns a ServerOption that sets a codec for message
 354  // marshaling and unmarshaling.
 355  //
 356  // This will override any lookups by content-subtype for Codecs registered
 357  // with RegisterCodec.
 358  //
 359  // See Content-Type on
 360  // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
 361  // more details. Also see the documentation on RegisterCodec and
 362  // CallContentSubtype for more details on the interaction between encoding.Codec
 363  // and content-subtype.
 364  //
 365  // This function is provided for advanced users; prefer to register codecs
 366  // using encoding.RegisterCodec.
 367  // The server will automatically use registered codecs based on the incoming
 368  // requests' headers. See also
 369  // https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
 370  // Will be supported throughout 1.x.
 371  //
 372  // # Experimental
 373  //
 374  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
 375  // later release.
 376  func ForceServerCodec(codec encoding.Codec) ServerOption {
 377  	return newFuncServerOption(func(o *serverOptions) {
 378  		o.codec = newCodecV1Bridge(codec)
 379  	})
 380  }
 381  
 382  // ForceServerCodecV2 is the equivalent of ForceServerCodec, but for the new
 383  // CodecV2 interface.
 384  //
 385  // Will be supported throughout 1.x.
 386  //
 387  // # Experimental
 388  //
 389  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
 390  // later release.
 391  func ForceServerCodecV2(codecV2 encoding.CodecV2) ServerOption {
 392  	return newFuncServerOption(func(o *serverOptions) {
 393  		o.codec = codecV2
 394  	})
 395  }
 396  
 397  // RPCCompressor returns a ServerOption that sets a compressor for outbound
 398  // messages.  For backward compatibility, all outbound messages will be sent
 399  // using this compressor, regardless of incoming message compression.  By
 400  // default, server messages will be sent using the same compressor with which
 401  // request messages were sent.
 402  //
 403  // Deprecated: use encoding.RegisterCompressor instead. Will be supported
 404  // throughout 1.x.
 405  func RPCCompressor(cp Compressor) ServerOption {
 406  	return newFuncServerOption(func(o *serverOptions) {
 407  		o.cp = cp
 408  	})
 409  }
 410  
 411  // RPCDecompressor returns a ServerOption that sets a decompressor for inbound
 412  // messages.  It has higher priority than decompressors registered via
 413  // encoding.RegisterCompressor.
 414  //
 415  // Deprecated: use encoding.RegisterCompressor instead. Will be supported
 416  // throughout 1.x.
 417  func RPCDecompressor(dc Decompressor) ServerOption {
 418  	return newFuncServerOption(func(o *serverOptions) {
 419  		o.dc = dc
 420  	})
 421  }
 422  
 423  // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
 424  // If this is not set, gRPC uses the default limit.
 425  //
 426  // Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
 427  func MaxMsgSize(m int) ServerOption {
 428  	return MaxRecvMsgSize(m)
 429  }
 430  
 431  // MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
 432  // If this is not set, gRPC uses the default 4MB.
 433  func MaxRecvMsgSize(m int) ServerOption {
 434  	return newFuncServerOption(func(o *serverOptions) {
 435  		o.maxReceiveMessageSize = m
 436  	})
 437  }
 438  
 439  // MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
 440  // If this is not set, gRPC uses the default `math.MaxInt32`.
 441  func MaxSendMsgSize(m int) ServerOption {
 442  	return newFuncServerOption(func(o *serverOptions) {
 443  		o.maxSendMessageSize = m
 444  	})
 445  }
 446  
 447  // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
 448  // of concurrent streams to each ServerTransport.
 449  func MaxConcurrentStreams(n uint32) ServerOption {
 450  	if n == 0 {
 451  		n = math.MaxUint32
 452  	}
 453  	return newFuncServerOption(func(o *serverOptions) {
 454  		o.maxConcurrentStreams = n
 455  	})
 456  }
 457  
 458  // Creds returns a ServerOption that sets credentials for server connections.
 459  func Creds(c credentials.TransportCredentials) ServerOption {
 460  	return newFuncServerOption(func(o *serverOptions) {
 461  		o.creds = c
 462  	})
 463  }
 464  
 465  // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
 466  // server. Only one unary interceptor can be installed. The construction of multiple
 467  // interceptors (e.g., chaining) can be implemented at the caller.
 468  func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
 469  	return newFuncServerOption(func(o *serverOptions) {
 470  		if o.unaryInt != nil {
 471  			panic("The unary server interceptor was already set and may not be reset.")
 472  		}
 473  		o.unaryInt = i
 474  	})
 475  }
 476  
 477  // ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
 478  // for unary RPCs. The first interceptor will be the outer most,
 479  // while the last interceptor will be the inner most wrapper around the real call.
 480  // All unary interceptors added by this method will be chained.
 481  func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
 482  	return newFuncServerOption(func(o *serverOptions) {
 483  		o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
 484  	})
 485  }
 486  
 487  // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
 488  // server. Only one stream interceptor can be installed.
 489  func StreamInterceptor(i StreamServerInterceptor) ServerOption {
 490  	return newFuncServerOption(func(o *serverOptions) {
 491  		if o.streamInt != nil {
 492  			panic("The stream server interceptor was already set and may not be reset.")
 493  		}
 494  		o.streamInt = i
 495  	})
 496  }
 497  
 498  // ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
 499  // for streaming RPCs. The first interceptor will be the outer most,
 500  // while the last interceptor will be the inner most wrapper around the real call.
 501  // All stream interceptors added by this method will be chained.
 502  func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
 503  	return newFuncServerOption(func(o *serverOptions) {
 504  		o.chainStreamInts = append(o.chainStreamInts, interceptors...)
 505  	})
 506  }
 507  
 508  // InTapHandle returns a ServerOption that sets the tap handle for all the server
 509  // transport to be created. Only one can be installed.
 510  //
 511  // # Experimental
 512  //
 513  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
 514  // later release.
 515  func InTapHandle(h tap.ServerInHandle) ServerOption {
 516  	return newFuncServerOption(func(o *serverOptions) {
 517  		if o.inTapHandle != nil {
 518  			panic("The tap handle was already set and may not be reset.")
 519  		}
 520  		o.inTapHandle = h
 521  	})
 522  }
 523  
 524  // StatsHandler returns a ServerOption that sets the stats handler for the server.
 525  func StatsHandler(h stats.Handler) ServerOption {
 526  	return newFuncServerOption(func(o *serverOptions) {
 527  		if h == nil {
 528  			logger.Error("ignoring nil parameter in grpc.StatsHandler ServerOption")
 529  			// Do not allow a nil stats handler, which would otherwise cause
 530  			// panics.
 531  			return
 532  		}
 533  		o.statsHandlers = append(o.statsHandlers, h)
 534  	})
 535  }
 536  
 537  // binaryLogger returns a ServerOption that can set the binary logger for the
 538  // server.
 539  func binaryLogger(bl binarylog.Logger) ServerOption {
 540  	return newFuncServerOption(func(o *serverOptions) {
 541  		o.binaryLogger = bl
 542  	})
 543  }
 544  
 545  // UnknownServiceHandler returns a ServerOption that allows for adding a custom
 546  // unknown service handler. The provided method is a bidi-streaming RPC service
 547  // handler that will be invoked instead of returning the "unimplemented" gRPC
 548  // error whenever a request is received for an unregistered service or method.
 549  // The handling function and stream interceptor (if set) have full access to
 550  // the ServerStream, including its Context.
 551  func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
 552  	return newFuncServerOption(func(o *serverOptions) {
 553  		o.unknownStreamDesc = &StreamDesc{
 554  			StreamName: "unknown_service_handler",
 555  			Handler:    streamHandler,
 556  			// We need to assume that the users of the streamHandler will want to use both.
 557  			ClientStreams: true,
 558  			ServerStreams: true,
 559  		}
 560  	})
 561  }
 562  
 563  // ConnectionTimeout returns a ServerOption that sets the timeout for
 564  // connection establishment (up to and including HTTP/2 handshaking) for all
 565  // new connections.  If this is not set, the default is 120 seconds.  A zero or
 566  // negative value will result in an immediate timeout.
 567  //
 568  // # Experimental
 569  //
 570  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
 571  // later release.
 572  func ConnectionTimeout(d time.Duration) ServerOption {
 573  	return newFuncServerOption(func(o *serverOptions) {
 574  		o.connectionTimeout = d
 575  	})
 576  }
 577  
 578  // MaxHeaderListSizeServerOption is a ServerOption that sets the max
 579  // (uncompressed) size of header list that the server is prepared to accept.
 580  type MaxHeaderListSizeServerOption struct {
 581  	MaxHeaderListSize uint32
 582  }
 583  
 584  func (o MaxHeaderListSizeServerOption) apply(so *serverOptions) {
 585  	so.maxHeaderListSize = &o.MaxHeaderListSize
 586  }
 587  
 588  // MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
 589  // of header list that the server is prepared to accept.
 590  func MaxHeaderListSize(s uint32) ServerOption {
 591  	return MaxHeaderListSizeServerOption{
 592  		MaxHeaderListSize: s,
 593  	}
 594  }
 595  
 596  // HeaderTableSize returns a ServerOption that sets the size of dynamic
 597  // header table for stream.
 598  //
 599  // # Experimental
 600  //
 601  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
 602  // later release.
 603  func HeaderTableSize(s uint32) ServerOption {
 604  	return newFuncServerOption(func(o *serverOptions) {
 605  		o.headerTableSize = &s
 606  	})
 607  }
 608  
 609  // NumStreamWorkers returns a ServerOption that sets the number of worker
 610  // goroutines that should be used to process incoming streams. Setting this to
 611  // zero (default) will disable workers and spawn a new goroutine for each
 612  // stream.
 613  //
 614  // # Experimental
 615  //
 616  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
 617  // later release.
 618  func NumStreamWorkers(numServerWorkers uint32) ServerOption {
 619  	// TODO: If/when this API gets stabilized (i.e. stream workers become the
 620  	// only way streams are processed), change the behavior of the zero value to
 621  	// a sane default. Preliminary experiments suggest that a value equal to the
 622  	// number of CPUs available is most performant; requires thorough testing.
 623  	return newFuncServerOption(func(o *serverOptions) {
 624  		o.numServerWorkers = numServerWorkers
 625  	})
 626  }
 627  
 628  // WaitForHandlers cause Stop to wait until all outstanding method handlers have
 629  // exited before returning.  If false, Stop will return as soon as all
 630  // connections have closed, but method handlers may still be running. By
 631  // default, Stop does not wait for method handlers to return.
 632  //
 633  // # Experimental
 634  //
 635  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
 636  // later release.
 637  func WaitForHandlers(w bool) ServerOption {
 638  	return newFuncServerOption(func(o *serverOptions) {
 639  		o.waitForHandlers = w
 640  	})
 641  }
 642  
 643  func bufferPool(bufferPool mem.BufferPool) ServerOption {
 644  	return newFuncServerOption(func(o *serverOptions) {
 645  		o.bufferPool = bufferPool
 646  	})
 647  }
 648  
 649  // serverWorkerResetThreshold defines how often the stack must be reset. Every
 650  // N requests, by spawning a new goroutine in its place, a worker can reset its
 651  // stack so that large stacks don't live in memory forever. 2^16 should allow
 652  // each goroutine stack to live for at least a few seconds in a typical
 653  // workload (assuming a QPS of a few thousand requests/sec).
 654  const serverWorkerResetThreshold = 1 << 16
 655  
 656  // serverWorker blocks on a *transport.ServerStream channel forever and waits
 657  // for data to be fed by serveStreams. This allows multiple requests to be
 658  // processed by the same goroutine, removing the need for expensive stack
 659  // re-allocations (see the runtime.morestack problem [1]).
 660  //
 661  // [1] https://github.com/golang/go/issues/18138
 662  func (s *Server) serverWorker() {
 663  	for completed := 0; completed < serverWorkerResetThreshold; completed++ {
 664  		f, ok := <-s.serverWorkerChannel
 665  		if !ok {
 666  			return
 667  		}
 668  		f()
 669  	}
 670  	go s.serverWorker()
 671  }
 672  
 673  // initServerWorkers creates worker goroutines and a channel to process incoming
 674  // connections to reduce the time spent overall on runtime.morestack.
 675  func (s *Server) initServerWorkers() {
 676  	s.serverWorkerChannel = make(chan func())
 677  	s.serverWorkerChannelClose = sync.OnceFunc(func() {
 678  		close(s.serverWorkerChannel)
 679  	})
 680  	for i := uint32(0); i < s.opts.numServerWorkers; i++ {
 681  		go s.serverWorker()
 682  	}
 683  }
 684  
 685  // NewServer creates a gRPC server which has no service registered and has not
 686  // started to accept requests yet.
 687  func NewServer(opt ...ServerOption) *Server {
 688  	opts := defaultServerOptions
 689  	for _, o := range globalServerOptions {
 690  		o.apply(&opts)
 691  	}
 692  	for _, o := range opt {
 693  		o.apply(&opts)
 694  	}
 695  	s := &Server{
 696  		lis:          make(map[net.Listener]bool),
 697  		opts:         opts,
 698  		statsHandler: istats.NewCombinedHandler(opts.statsHandlers...),
 699  		conns:        make(map[string]map[transport.ServerTransport]bool),
 700  		services:     make(map[string]*serviceInfo),
 701  		quit:         grpcsync.NewEvent(),
 702  		done:         grpcsync.NewEvent(),
 703  		channelz:     channelz.RegisterServer(""),
 704  	}
 705  	chainUnaryServerInterceptors(s)
 706  	chainStreamServerInterceptors(s)
 707  	s.cv = sync.NewCond(&s.mu)
 708  	if EnableTracing {
 709  		_, file, line, _ := runtime.Caller(1)
 710  		s.events = newTraceEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
 711  	}
 712  
 713  	if s.opts.numServerWorkers > 0 {
 714  		s.initServerWorkers()
 715  	}
 716  
 717  	channelz.Info(logger, s.channelz, "Server created")
 718  	return s
 719  }
 720  
 721  // printf records an event in s's event log, unless s has been stopped.
 722  // REQUIRES s.mu is held.
 723  func (s *Server) printf(format string, a ...any) {
 724  	if s.events != nil {
 725  		s.events.Printf(format, a...)
 726  	}
 727  }
 728  
 729  // errorf records an error in s's event log, unless s has been stopped.
 730  // REQUIRES s.mu is held.
 731  func (s *Server) errorf(format string, a ...any) {
 732  	if s.events != nil {
 733  		s.events.Errorf(format, a...)
 734  	}
 735  }
 736  
 737  // ServiceRegistrar wraps a single method that supports service registration. It
 738  // enables users to pass concrete types other than grpc.Server to the service
 739  // registration methods exported by the IDL generated code.
 740  type ServiceRegistrar interface {
 741  	// RegisterService registers a service and its implementation to the
 742  	// concrete type implementing this interface.  It may not be called
 743  	// once the server has started serving.
 744  	// desc describes the service and its methods and handlers. impl is the
 745  	// service implementation which is passed to the method handlers.
 746  	RegisterService(desc *ServiceDesc, impl any)
 747  }
 748  
 749  // RegisterService registers a service and its implementation to the gRPC
 750  // server. It is called from the IDL generated code. This must be called before
 751  // invoking Serve. If ss is non-nil (for legacy code), its type is checked to
 752  // ensure it implements sd.HandlerType.
 753  func (s *Server) RegisterService(sd *ServiceDesc, ss any) {
 754  	if ss != nil {
 755  		ht := reflect.TypeOf(sd.HandlerType).Elem()
 756  		st := reflect.TypeOf(ss)
 757  		if !st.Implements(ht) {
 758  			logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
 759  		}
 760  	}
 761  	s.register(sd, ss)
 762  }
 763  
 764  func (s *Server) register(sd *ServiceDesc, ss any) {
 765  	s.mu.Lock()
 766  	defer s.mu.Unlock()
 767  	s.printf("RegisterService(%q)", sd.ServiceName)
 768  	if s.serve {
 769  		logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
 770  	}
 771  	if _, ok := s.services[sd.ServiceName]; ok {
 772  		logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
 773  	}
 774  	info := &serviceInfo{
 775  		serviceImpl: ss,
 776  		methods:     make(map[string]*MethodDesc),
 777  		streams:     make(map[string]*StreamDesc),
 778  		mdata:       sd.Metadata,
 779  	}
 780  	for i := range sd.Methods {
 781  		d := &sd.Methods[i]
 782  		info.methods[d.MethodName] = d
 783  	}
 784  	for i := range sd.Streams {
 785  		d := &sd.Streams[i]
 786  		info.streams[d.StreamName] = d
 787  	}
 788  	s.services[sd.ServiceName] = info
 789  }
 790  
 791  // MethodInfo contains the information of an RPC including its method name and type.
 792  type MethodInfo struct {
 793  	// Name is the method name only, without the service name or package name.
 794  	Name string
 795  	// IsClientStream indicates whether the RPC is a client streaming RPC.
 796  	IsClientStream bool
 797  	// IsServerStream indicates whether the RPC is a server streaming RPC.
 798  	IsServerStream bool
 799  }
 800  
 801  // ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
 802  type ServiceInfo struct {
 803  	Methods []MethodInfo
 804  	// Metadata is the metadata specified in ServiceDesc when registering service.
 805  	Metadata any
 806  }
 807  
 808  // GetServiceInfo returns a map from service names to ServiceInfo.
 809  // Service names include the package names, in the form of <package>.<service>.
 810  func (s *Server) GetServiceInfo() map[string]ServiceInfo {
 811  	ret := make(map[string]ServiceInfo)
 812  	for n, srv := range s.services {
 813  		methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
 814  		for m := range srv.methods {
 815  			methods = append(methods, MethodInfo{
 816  				Name:           m,
 817  				IsClientStream: false,
 818  				IsServerStream: false,
 819  			})
 820  		}
 821  		for m, d := range srv.streams {
 822  			methods = append(methods, MethodInfo{
 823  				Name:           m,
 824  				IsClientStream: d.ClientStreams,
 825  				IsServerStream: d.ServerStreams,
 826  			})
 827  		}
 828  
 829  		ret[n] = ServiceInfo{
 830  			Methods:  methods,
 831  			Metadata: srv.mdata,
 832  		}
 833  	}
 834  	return ret
 835  }
 836  
 837  // ErrServerStopped indicates that the operation is now illegal because of
 838  // the server being stopped.
 839  var ErrServerStopped = errors.New("grpc: the server has been stopped")
 840  
 841  type listenSocket struct {
 842  	net.Listener
 843  	channelz *channelz.Socket
 844  }
 845  
 846  func (l *listenSocket) Close() error {
 847  	err := l.Listener.Close()
 848  	channelz.RemoveEntry(l.channelz.ID)
 849  	channelz.Info(logger, l.channelz, "ListenSocket deleted")
 850  	return err
 851  }
 852  
 853  // Serve accepts incoming connections on the listener lis, creating a new
 854  // ServerTransport and service goroutine for each. The service goroutines
 855  // read gRPC requests and then call the registered handlers to reply to them.
 856  // Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
 857  // this method returns.
 858  // Serve will return a non-nil error unless Stop or GracefulStop is called.
 859  //
 860  // Note: All supported releases of Go (as of December 2023) override the OS
 861  // defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
 862  // with OS defaults for keepalive time and interval, callers need to do the
 863  // following two things:
 864  //   - pass a net.Listener created by calling the Listen method on a
 865  //     net.ListenConfig with the `KeepAlive` field set to a negative value. This
 866  //     will result in the Go standard library not overriding OS defaults for TCP
 867  //     keepalive interval and time. But this will also result in the Go standard
 868  //     library not enabling TCP keepalives by default.
 869  //   - override the Accept method on the passed in net.Listener and set the
 870  //     SO_KEEPALIVE socket option to enable TCP keepalives, with OS defaults.
 871  func (s *Server) Serve(lis net.Listener) error {
 872  	s.mu.Lock()
 873  	s.printf("serving")
 874  	s.serve = true
 875  	if s.lis == nil {
 876  		// Serve called after Stop or GracefulStop.
 877  		s.mu.Unlock()
 878  		lis.Close()
 879  		return ErrServerStopped
 880  	}
 881  
 882  	s.serveWG.Add(1)
 883  	defer func() {
 884  		s.serveWG.Done()
 885  		if s.quit.HasFired() {
 886  			// Stop or GracefulStop called; block until done and return nil.
 887  			<-s.done.Done()
 888  		}
 889  	}()
 890  
 891  	ls := &listenSocket{
 892  		Listener: lis,
 893  		channelz: channelz.RegisterSocket(&channelz.Socket{
 894  			SocketType:    channelz.SocketTypeListen,
 895  			Parent:        s.channelz,
 896  			RefName:       lis.Addr().String(),
 897  			LocalAddr:     lis.Addr(),
 898  			SocketOptions: channelz.GetSocketOption(lis)},
 899  		),
 900  	}
 901  	s.lis[ls] = true
 902  
 903  	defer func() {
 904  		s.mu.Lock()
 905  		if s.lis != nil && s.lis[ls] {
 906  			ls.Close()
 907  			delete(s.lis, ls)
 908  		}
 909  		s.mu.Unlock()
 910  	}()
 911  
 912  	s.mu.Unlock()
 913  	channelz.Info(logger, ls.channelz, "ListenSocket created")
 914  
 915  	var tempDelay time.Duration // how long to sleep on accept failure
 916  	for {
 917  		rawConn, err := lis.Accept()
 918  		if err != nil {
 919  			if ne, ok := err.(interface {
 920  				Temporary() bool
 921  			}); ok && ne.Temporary() {
 922  				if tempDelay == 0 {
 923  					tempDelay = 5 * time.Millisecond
 924  				} else {
 925  					tempDelay *= 2
 926  				}
 927  				if max := 1 * time.Second; tempDelay > max {
 928  					tempDelay = max
 929  				}
 930  				s.mu.Lock()
 931  				s.printf("Accept error: %v; retrying in %v", err, tempDelay)
 932  				s.mu.Unlock()
 933  				timer := time.NewTimer(tempDelay)
 934  				select {
 935  				case <-timer.C:
 936  				case <-s.quit.Done():
 937  					timer.Stop()
 938  					return nil
 939  				}
 940  				continue
 941  			}
 942  			s.mu.Lock()
 943  			s.printf("done serving; Accept = %v", err)
 944  			s.mu.Unlock()
 945  
 946  			if s.quit.HasFired() {
 947  				return nil
 948  			}
 949  			return err
 950  		}
 951  		tempDelay = 0
 952  		// Start a new goroutine to deal with rawConn so we don't stall this Accept
 953  		// loop goroutine.
 954  		//
 955  		// Make sure we account for the goroutine so GracefulStop doesn't nil out
 956  		// s.conns before this conn can be added.
 957  		s.serveWG.Add(1)
 958  		go func() {
 959  			s.handleRawConn(lis.Addr().String(), rawConn)
 960  			s.serveWG.Done()
 961  		}()
 962  	}
 963  }
 964  
 965  // handleRawConn forks a goroutine to handle a just-accepted connection that
 966  // has not had any I/O performed on it yet.
 967  func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
 968  	if s.quit.HasFired() {
 969  		rawConn.Close()
 970  		return
 971  	}
 972  	rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
 973  
 974  	// Finish handshaking (HTTP2)
 975  	st := s.newHTTP2Transport(rawConn)
 976  	rawConn.SetDeadline(time.Time{})
 977  	if st == nil {
 978  		return
 979  	}
 980  
 981  	if cc, ok := rawConn.(interface {
 982  		PassServerTransport(transport.ServerTransport)
 983  	}); ok {
 984  		cc.PassServerTransport(st)
 985  	}
 986  
 987  	if !s.addConn(lisAddr, st) {
 988  		return
 989  	}
 990  	go func() {
 991  		s.serveStreams(context.Background(), st, rawConn)
 992  		s.removeConn(lisAddr, st)
 993  	}()
 994  }
 995  
 996  // newHTTP2Transport sets up a http/2 transport (using the
 997  // gRPC http2 server transport in transport/http2_server.go).
 998  func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
 999  	config := &transport.ServerConfig{
1000  		MaxStreams:            s.opts.maxConcurrentStreams,
1001  		ConnectionTimeout:     s.opts.connectionTimeout,
1002  		Credentials:           s.opts.creds,
1003  		InTapHandle:           s.opts.inTapHandle,
1004  		StatsHandler:          s.statsHandler,
1005  		KeepaliveParams:       s.opts.keepaliveParams,
1006  		KeepalivePolicy:       s.opts.keepalivePolicy,
1007  		InitialWindowSize:     s.opts.initialWindowSize,
1008  		InitialConnWindowSize: s.opts.initialConnWindowSize,
1009  		WriteBufferSize:       s.opts.writeBufferSize,
1010  		ReadBufferSize:        s.opts.readBufferSize,
1011  		SharedWriteBuffer:     s.opts.sharedWriteBuffer,
1012  		ChannelzParent:        s.channelz,
1013  		MaxHeaderListSize:     s.opts.maxHeaderListSize,
1014  		HeaderTableSize:       s.opts.headerTableSize,
1015  		BufferPool:            s.opts.bufferPool,
1016  		StaticWindowSize:      s.opts.staticWindowSize,
1017  	}
1018  	st, err := transport.NewServerTransport(c, config)
1019  	if err != nil {
1020  		s.mu.Lock()
1021  		s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
1022  		s.mu.Unlock()
1023  		// ErrConnDispatched means that the connection was dispatched away from
1024  		// gRPC; those connections should be left open.
1025  		if err != credentials.ErrConnDispatched {
1026  			// Don't log on ErrConnDispatched and io.EOF to prevent log spam.
1027  			if err != io.EOF {
1028  				channelz.Info(logger, s.channelz, "grpc: Server.Serve failed to create ServerTransport: ", err)
1029  			}
1030  			c.Close()
1031  		}
1032  		return nil
1033  	}
1034  
1035  	return st
1036  }
1037  
1038  func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, rawConn net.Conn) {
1039  	ctx = transport.SetConnection(ctx, rawConn)
1040  	ctx = peer.NewContext(ctx, st.Peer())
1041  	if s.statsHandler != nil {
1042  		ctx = s.statsHandler.TagConn(ctx, &stats.ConnTagInfo{
1043  			RemoteAddr: st.Peer().Addr,
1044  			LocalAddr:  st.Peer().LocalAddr,
1045  		})
1046  		s.statsHandler.HandleConn(ctx, &stats.ConnBegin{})
1047  	}
1048  
1049  	defer func() {
1050  		st.Close(errors.New("finished serving streams for the server transport"))
1051  		if s.statsHandler != nil {
1052  			s.statsHandler.HandleConn(ctx, &stats.ConnEnd{})
1053  		}
1054  	}()
1055  
1056  	streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
1057  	st.HandleStreams(ctx, func(stream *transport.ServerStream) {
1058  		s.handlersWG.Add(1)
1059  		streamQuota.acquire()
1060  		f := func() {
1061  			defer streamQuota.release()
1062  			defer s.handlersWG.Done()
1063  			s.handleStream(st, stream)
1064  		}
1065  
1066  		if s.opts.numServerWorkers > 0 {
1067  			select {
1068  			case s.serverWorkerChannel <- f:
1069  				return
1070  			default:
1071  				// If all stream workers are busy, fallback to the default code path.
1072  			}
1073  		}
1074  		go f()
1075  	})
1076  }
1077  
1078  var _ http.Handler = (*Server)(nil)
1079  
1080  // ServeHTTP implements the Go standard library's http.Handler
1081  // interface by responding to the gRPC request r, by looking up
1082  // the requested gRPC method in the gRPC server s.
1083  //
1084  // The provided HTTP request must have arrived on an HTTP/2
1085  // connection. When using the Go standard library's server,
1086  // practically this means that the Request must also have arrived
1087  // over TLS.
1088  //
1089  // To share one port (such as 443 for https) between gRPC and an
1090  // existing http.Handler, use a root http.Handler such as:
1091  //
1092  //	if r.ProtoMajor == 2 && strings.HasPrefix(
1093  //		r.Header.Get("Content-Type"), "application/grpc") {
1094  //		grpcServer.ServeHTTP(w, r)
1095  //	} else {
1096  //		yourMux.ServeHTTP(w, r)
1097  //	}
1098  //
1099  // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
1100  // separate from grpc-go's HTTP/2 server. Performance and features may vary
1101  // between the two paths. ServeHTTP does not support some gRPC features
1102  // available through grpc-go's HTTP/2 server.
1103  //
1104  // # Experimental
1105  //
1106  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
1107  // later release.
1108  func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
1109  	st, err := transport.NewServerHandlerTransport(w, r, s.statsHandler, s.opts.bufferPool)
1110  	if err != nil {
1111  		// Errors returned from transport.NewServerHandlerTransport have
1112  		// already been written to w.
1113  		return
1114  	}
1115  	if !s.addConn(listenerAddressForServeHTTP, st) {
1116  		return
1117  	}
1118  	defer s.removeConn(listenerAddressForServeHTTP, st)
1119  	s.serveStreams(r.Context(), st, nil)
1120  }
1121  
1122  func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
1123  	s.mu.Lock()
1124  	defer s.mu.Unlock()
1125  	if s.conns == nil {
1126  		st.Close(errors.New("Server.addConn called when server has already been stopped"))
1127  		return false
1128  	}
1129  	if s.drain {
1130  		// Transport added after we drained our existing conns: drain it
1131  		// immediately.
1132  		st.Drain("")
1133  	}
1134  
1135  	if s.conns[addr] == nil {
1136  		// Create a map entry if this is the first connection on this listener.
1137  		s.conns[addr] = make(map[transport.ServerTransport]bool)
1138  	}
1139  	s.conns[addr][st] = true
1140  	return true
1141  }
1142  
1143  func (s *Server) removeConn(addr string, st transport.ServerTransport) {
1144  	s.mu.Lock()
1145  	defer s.mu.Unlock()
1146  
1147  	conns := s.conns[addr]
1148  	if conns != nil {
1149  		delete(conns, st)
1150  		if len(conns) == 0 {
1151  			// If the last connection for this address is being removed, also
1152  			// remove the map entry corresponding to the address. This is used
1153  			// in GracefulStop() when waiting for all connections to be closed.
1154  			delete(s.conns, addr)
1155  		}
1156  		s.cv.Broadcast()
1157  	}
1158  }
1159  
1160  func (s *Server) incrCallsStarted() {
1161  	s.channelz.ServerMetrics.CallsStarted.Add(1)
1162  	s.channelz.ServerMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
1163  }
1164  
1165  func (s *Server) incrCallsSucceeded() {
1166  	s.channelz.ServerMetrics.CallsSucceeded.Add(1)
1167  }
1168  
1169  func (s *Server) incrCallsFailed() {
1170  	s.channelz.ServerMetrics.CallsFailed.Add(1)
1171  }
1172  
1173  func (s *Server) sendResponse(ctx context.Context, stream *transport.ServerStream, msg any, cp Compressor, opts *transport.WriteOptions, comp encoding.Compressor) error {
1174  	data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
1175  	if err != nil {
1176  		channelz.Error(logger, s.channelz, "grpc: server failed to encode response: ", err)
1177  		return err
1178  	}
1179  
1180  	compData, pf, err := compress(data, cp, comp, s.opts.bufferPool)
1181  	if err != nil {
1182  		data.Free()
1183  		channelz.Error(logger, s.channelz, "grpc: server failed to compress response: ", err)
1184  		return err
1185  	}
1186  
1187  	hdr, payload := msgHeader(data, compData, pf)
1188  
1189  	defer func() {
1190  		compData.Free()
1191  		data.Free()
1192  		// payload does not need to be freed here, it is either data or compData, both of
1193  		// which are already freed.
1194  	}()
1195  
1196  	dataLen := data.Len()
1197  	payloadLen := payload.Len()
1198  	// TODO(dfawley): should we be checking len(data) instead?
1199  	if payloadLen > s.opts.maxSendMessageSize {
1200  		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", payloadLen, s.opts.maxSendMessageSize)
1201  	}
1202  	err = stream.Write(hdr, payload, opts)
1203  	if err == nil && s.statsHandler != nil {
1204  		s.statsHandler.HandleRPC(ctx, outPayload(false, msg, dataLen, payloadLen, time.Now()))
1205  	}
1206  	return err
1207  }
1208  
1209  // chainUnaryServerInterceptors chains all unary server interceptors into one.
1210  func chainUnaryServerInterceptors(s *Server) {
1211  	// Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
1212  	// be executed before any other chained interceptors.
1213  	interceptors := s.opts.chainUnaryInts
1214  	if s.opts.unaryInt != nil {
1215  		interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
1216  	}
1217  
1218  	var chainedInt UnaryServerInterceptor
1219  	if len(interceptors) == 0 {
1220  		chainedInt = nil
1221  	} else if len(interceptors) == 1 {
1222  		chainedInt = interceptors[0]
1223  	} else {
1224  		chainedInt = chainUnaryInterceptors(interceptors)
1225  	}
1226  
1227  	s.opts.unaryInt = chainedInt
1228  }
1229  
1230  func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
1231  	return func(ctx context.Context, req any, info *UnaryServerInfo, handler UnaryHandler) (any, error) {
1232  		return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
1233  	}
1234  }
1235  
1236  func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
1237  	if curr == len(interceptors)-1 {
1238  		return finalHandler
1239  	}
1240  	return func(ctx context.Context, req any) (any, error) {
1241  		return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
1242  	}
1243  }
1244  
1245  func (s *Server) processUnaryRPC(ctx context.Context, stream *transport.ServerStream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
1246  	sh := s.statsHandler
1247  	if sh != nil || trInfo != nil || channelz.IsOn() {
1248  		if channelz.IsOn() {
1249  			s.incrCallsStarted()
1250  		}
1251  		var statsBegin *stats.Begin
1252  		if sh != nil {
1253  			statsBegin = &stats.Begin{
1254  				BeginTime:      time.Now(),
1255  				IsClientStream: false,
1256  				IsServerStream: false,
1257  			}
1258  			sh.HandleRPC(ctx, statsBegin)
1259  		}
1260  		if trInfo != nil {
1261  			trInfo.tr.LazyLog(&trInfo.firstLine, false)
1262  		}
1263  		// The deferred error handling for tracing, stats handler and channelz are
1264  		// combined into one function to reduce stack usage -- a defer takes ~56-64
1265  		// bytes on the stack, so overflowing the stack will require a stack
1266  		// re-allocation, which is expensive.
1267  		//
1268  		// To maintain behavior similar to separate deferred statements, statements
1269  		// should be executed in the reverse order. That is, tracing first, stats
1270  		// handler second, and channelz last. Note that panics *within* defers will
1271  		// lead to different behavior, but that's an acceptable compromise; that
1272  		// would be undefined behavior territory anyway.
1273  		defer func() {
1274  			if trInfo != nil {
1275  				if err != nil && err != io.EOF {
1276  					trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1277  					trInfo.tr.SetError()
1278  				}
1279  				trInfo.tr.Finish()
1280  			}
1281  
1282  			if sh != nil {
1283  				end := &stats.End{
1284  					BeginTime: statsBegin.BeginTime,
1285  					EndTime:   time.Now(),
1286  				}
1287  				if err != nil && err != io.EOF {
1288  					end.Error = toRPCErr(err)
1289  				}
1290  				sh.HandleRPC(ctx, end)
1291  			}
1292  
1293  			if channelz.IsOn() {
1294  				if err != nil && err != io.EOF {
1295  					s.incrCallsFailed()
1296  				} else {
1297  					s.incrCallsSucceeded()
1298  				}
1299  			}
1300  		}()
1301  	}
1302  	var binlogs []binarylog.MethodLogger
1303  	if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
1304  		binlogs = append(binlogs, ml)
1305  	}
1306  	if s.opts.binaryLogger != nil {
1307  		if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
1308  			binlogs = append(binlogs, ml)
1309  		}
1310  	}
1311  	if len(binlogs) != 0 {
1312  		md, _ := metadata.FromIncomingContext(ctx)
1313  		logEntry := &binarylog.ClientHeader{
1314  			Header:     md,
1315  			MethodName: stream.Method(),
1316  			PeerAddr:   nil,
1317  		}
1318  		if deadline, ok := ctx.Deadline(); ok {
1319  			logEntry.Timeout = time.Until(deadline)
1320  			if logEntry.Timeout < 0 {
1321  				logEntry.Timeout = 0
1322  			}
1323  		}
1324  		if a := md[":authority"]; len(a) > 0 {
1325  			logEntry.Authority = a[0]
1326  		}
1327  		if peer, ok := peer.FromContext(ctx); ok {
1328  			logEntry.PeerAddr = peer.Addr
1329  		}
1330  		for _, binlog := range binlogs {
1331  			binlog.Log(ctx, logEntry)
1332  		}
1333  	}
1334  
1335  	// comp and cp are used for compression.  decomp and dc are used for
1336  	// decompression.  If comp and decomp are both set, they are the same;
1337  	// however they are kept separate to ensure that at most one of the
1338  	// compressor/decompressor variable pairs are set for use later.
1339  	var comp, decomp encoding.Compressor
1340  	var cp Compressor
1341  	var dc Decompressor
1342  	var sendCompressorName string
1343  
1344  	// If dc is set and matches the stream's compression, use it.  Otherwise, try
1345  	// to find a matching registered compressor for decomp.
1346  	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1347  		dc = s.opts.dc
1348  	} else if rc != "" && rc != encoding.Identity {
1349  		decomp = encoding.GetCompressor(rc)
1350  		if decomp == nil {
1351  			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1352  			stream.WriteStatus(st)
1353  			return st.Err()
1354  		}
1355  	}
1356  
1357  	// If cp is set, use it.  Otherwise, attempt to compress the response using
1358  	// the incoming message compression method.
1359  	//
1360  	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1361  	if s.opts.cp != nil {
1362  		cp = s.opts.cp
1363  		sendCompressorName = cp.Type()
1364  	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1365  		// Legacy compressor not specified; attempt to respond with same encoding.
1366  		comp = encoding.GetCompressor(rc)
1367  		if comp != nil {
1368  			sendCompressorName = comp.Name()
1369  		}
1370  	}
1371  
1372  	if sendCompressorName != "" {
1373  		if err := stream.SetSendCompress(sendCompressorName); err != nil {
1374  			return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
1375  		}
1376  	}
1377  
1378  	var payInfo *payloadInfo
1379  	if sh != nil || len(binlogs) != 0 {
1380  		payInfo = &payloadInfo{}
1381  		defer payInfo.free()
1382  	}
1383  
1384  	d, err := recvAndDecompress(&parser{r: stream, bufferPool: s.opts.bufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp, true)
1385  	if err != nil {
1386  		if e := stream.WriteStatus(status.Convert(err)); e != nil {
1387  			channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1388  		}
1389  		return err
1390  	}
1391  	freed := false
1392  	dataFree := func() {
1393  		if !freed {
1394  			d.Free()
1395  			freed = true
1396  		}
1397  	}
1398  	defer dataFree()
1399  	df := func(v any) error {
1400  		defer dataFree()
1401  		if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
1402  			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
1403  		}
1404  
1405  		if sh != nil {
1406  			sh.HandleRPC(ctx, &stats.InPayload{
1407  				RecvTime:         time.Now(),
1408  				Payload:          v,
1409  				Length:           d.Len(),
1410  				WireLength:       payInfo.compressedLength + headerLen,
1411  				CompressedLength: payInfo.compressedLength,
1412  			})
1413  		}
1414  		if len(binlogs) != 0 {
1415  			cm := &binarylog.ClientMessage{
1416  				Message: d.Materialize(),
1417  			}
1418  			for _, binlog := range binlogs {
1419  				binlog.Log(ctx, cm)
1420  			}
1421  		}
1422  		if trInfo != nil {
1423  			trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
1424  		}
1425  		return nil
1426  	}
1427  	ctx = NewContextWithServerTransportStream(ctx, stream)
1428  	reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
1429  	if appErr != nil {
1430  		appStatus, ok := status.FromError(appErr)
1431  		if !ok {
1432  			// Convert non-status application error to a status error with code
1433  			// Unknown, but handle context errors specifically.
1434  			appStatus = status.FromContextError(appErr)
1435  			appErr = appStatus.Err()
1436  		}
1437  		if trInfo != nil {
1438  			trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1439  			trInfo.tr.SetError()
1440  		}
1441  		if e := stream.WriteStatus(appStatus); e != nil {
1442  			channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1443  		}
1444  		if len(binlogs) != 0 {
1445  			if h, _ := stream.Header(); h.Len() > 0 {
1446  				// Only log serverHeader if there was header. Otherwise it can
1447  				// be trailer only.
1448  				sh := &binarylog.ServerHeader{
1449  					Header: h,
1450  				}
1451  				for _, binlog := range binlogs {
1452  					binlog.Log(ctx, sh)
1453  				}
1454  			}
1455  			st := &binarylog.ServerTrailer{
1456  				Trailer: stream.Trailer(),
1457  				Err:     appErr,
1458  			}
1459  			for _, binlog := range binlogs {
1460  				binlog.Log(ctx, st)
1461  			}
1462  		}
1463  		return appErr
1464  	}
1465  	if trInfo != nil {
1466  		trInfo.tr.LazyLog(stringer("OK"), false)
1467  	}
1468  	opts := &transport.WriteOptions{Last: true}
1469  
1470  	// Server handler could have set new compressor by calling SetSendCompressor.
1471  	// In case it is set, we need to use it for compressing outbound message.
1472  	if stream.SendCompress() != sendCompressorName {
1473  		comp = encoding.GetCompressor(stream.SendCompress())
1474  	}
1475  	if err := s.sendResponse(ctx, stream, reply, cp, opts, comp); err != nil {
1476  		if err == io.EOF {
1477  			// The entire stream is done (for unary RPC only).
1478  			return err
1479  		}
1480  		if sts, ok := status.FromError(err); ok {
1481  			if e := stream.WriteStatus(sts); e != nil {
1482  				channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1483  			}
1484  		} else {
1485  			switch st := err.(type) {
1486  			case transport.ConnectionError:
1487  				// Nothing to do here.
1488  			default:
1489  				panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1490  			}
1491  		}
1492  		if len(binlogs) != 0 {
1493  			h, _ := stream.Header()
1494  			sh := &binarylog.ServerHeader{
1495  				Header: h,
1496  			}
1497  			st := &binarylog.ServerTrailer{
1498  				Trailer: stream.Trailer(),
1499  				Err:     appErr,
1500  			}
1501  			for _, binlog := range binlogs {
1502  				binlog.Log(ctx, sh)
1503  				binlog.Log(ctx, st)
1504  			}
1505  		}
1506  		return err
1507  	}
1508  	if len(binlogs) != 0 {
1509  		h, _ := stream.Header()
1510  		sh := &binarylog.ServerHeader{
1511  			Header: h,
1512  		}
1513  		sm := &binarylog.ServerMessage{
1514  			Message: reply,
1515  		}
1516  		for _, binlog := range binlogs {
1517  			binlog.Log(ctx, sh)
1518  			binlog.Log(ctx, sm)
1519  		}
1520  	}
1521  	if trInfo != nil {
1522  		trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1523  	}
1524  	// TODO: Should we be logging if writing status failed here, like above?
1525  	// Should the logging be in WriteStatus?  Should we ignore the WriteStatus
1526  	// error or allow the stats handler to see it?
1527  	if len(binlogs) != 0 {
1528  		st := &binarylog.ServerTrailer{
1529  			Trailer: stream.Trailer(),
1530  			Err:     appErr,
1531  		}
1532  		for _, binlog := range binlogs {
1533  			binlog.Log(ctx, st)
1534  		}
1535  	}
1536  	return stream.WriteStatus(statusOK)
1537  }
1538  
1539  // chainStreamServerInterceptors chains all stream server interceptors into one.
1540  func chainStreamServerInterceptors(s *Server) {
1541  	// Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
1542  	// be executed before any other chained interceptors.
1543  	interceptors := s.opts.chainStreamInts
1544  	if s.opts.streamInt != nil {
1545  		interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
1546  	}
1547  
1548  	var chainedInt StreamServerInterceptor
1549  	if len(interceptors) == 0 {
1550  		chainedInt = nil
1551  	} else if len(interceptors) == 1 {
1552  		chainedInt = interceptors[0]
1553  	} else {
1554  		chainedInt = chainStreamInterceptors(interceptors)
1555  	}
1556  
1557  	s.opts.streamInt = chainedInt
1558  }
1559  
1560  func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor {
1561  	return func(srv any, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
1562  		return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
1563  	}
1564  }
1565  
1566  func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
1567  	if curr == len(interceptors)-1 {
1568  		return finalHandler
1569  	}
1570  	return func(srv any, stream ServerStream) error {
1571  		return interceptors[curr+1](srv, stream, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
1572  	}
1573  }
1574  
1575  func (s *Server) processStreamingRPC(ctx context.Context, stream *transport.ServerStream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
1576  	if channelz.IsOn() {
1577  		s.incrCallsStarted()
1578  	}
1579  	sh := s.statsHandler
1580  	var statsBegin *stats.Begin
1581  	if sh != nil {
1582  		statsBegin = &stats.Begin{
1583  			BeginTime:      time.Now(),
1584  			IsClientStream: sd.ClientStreams,
1585  			IsServerStream: sd.ServerStreams,
1586  		}
1587  		sh.HandleRPC(ctx, statsBegin)
1588  	}
1589  	ctx = NewContextWithServerTransportStream(ctx, stream)
1590  	ss := &serverStream{
1591  		ctx:                   ctx,
1592  		s:                     stream,
1593  		p:                     parser{r: stream, bufferPool: s.opts.bufferPool},
1594  		codec:                 s.getCodec(stream.ContentSubtype()),
1595  		desc:                  sd,
1596  		maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1597  		maxSendMessageSize:    s.opts.maxSendMessageSize,
1598  		trInfo:                trInfo,
1599  		statsHandler:          sh,
1600  	}
1601  
1602  	if sh != nil || trInfo != nil || channelz.IsOn() {
1603  		// See comment in processUnaryRPC on defers.
1604  		defer func() {
1605  			if trInfo != nil {
1606  				ss.mu.Lock()
1607  				if err != nil && err != io.EOF {
1608  					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1609  					ss.trInfo.tr.SetError()
1610  				}
1611  				ss.trInfo.tr.Finish()
1612  				ss.trInfo.tr = nil
1613  				ss.mu.Unlock()
1614  			}
1615  
1616  			if sh != nil {
1617  				end := &stats.End{
1618  					BeginTime: statsBegin.BeginTime,
1619  					EndTime:   time.Now(),
1620  				}
1621  				if err != nil && err != io.EOF {
1622  					end.Error = toRPCErr(err)
1623  				}
1624  				sh.HandleRPC(ctx, end)
1625  			}
1626  
1627  			if channelz.IsOn() {
1628  				if err != nil && err != io.EOF {
1629  					s.incrCallsFailed()
1630  				} else {
1631  					s.incrCallsSucceeded()
1632  				}
1633  			}
1634  		}()
1635  	}
1636  
1637  	if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
1638  		ss.binlogs = append(ss.binlogs, ml)
1639  	}
1640  	if s.opts.binaryLogger != nil {
1641  		if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
1642  			ss.binlogs = append(ss.binlogs, ml)
1643  		}
1644  	}
1645  	if len(ss.binlogs) != 0 {
1646  		md, _ := metadata.FromIncomingContext(ctx)
1647  		logEntry := &binarylog.ClientHeader{
1648  			Header:     md,
1649  			MethodName: stream.Method(),
1650  			PeerAddr:   nil,
1651  		}
1652  		if deadline, ok := ctx.Deadline(); ok {
1653  			logEntry.Timeout = time.Until(deadline)
1654  			if logEntry.Timeout < 0 {
1655  				logEntry.Timeout = 0
1656  			}
1657  		}
1658  		if a := md[":authority"]; len(a) > 0 {
1659  			logEntry.Authority = a[0]
1660  		}
1661  		if peer, ok := peer.FromContext(ss.Context()); ok {
1662  			logEntry.PeerAddr = peer.Addr
1663  		}
1664  		for _, binlog := range ss.binlogs {
1665  			binlog.Log(ctx, logEntry)
1666  		}
1667  	}
1668  
1669  	// If dc is set and matches the stream's compression, use it.  Otherwise, try
1670  	// to find a matching registered compressor for decomp.
1671  	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1672  		ss.decompressorV0 = s.opts.dc
1673  	} else if rc != "" && rc != encoding.Identity {
1674  		ss.decompressorV1 = encoding.GetCompressor(rc)
1675  		if ss.decompressorV1 == nil {
1676  			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1677  			ss.s.WriteStatus(st)
1678  			return st.Err()
1679  		}
1680  	}
1681  
1682  	// If cp is set, use it.  Otherwise, attempt to compress the response using
1683  	// the incoming message compression method.
1684  	//
1685  	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1686  	if s.opts.cp != nil {
1687  		ss.compressorV0 = s.opts.cp
1688  		ss.sendCompressorName = s.opts.cp.Type()
1689  	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1690  		// Legacy compressor not specified; attempt to respond with same encoding.
1691  		ss.compressorV1 = encoding.GetCompressor(rc)
1692  		if ss.compressorV1 != nil {
1693  			ss.sendCompressorName = rc
1694  		}
1695  	}
1696  
1697  	if ss.sendCompressorName != "" {
1698  		if err := stream.SetSendCompress(ss.sendCompressorName); err != nil {
1699  			return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
1700  		}
1701  	}
1702  
1703  	ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.compressorV0, ss.compressorV1)
1704  
1705  	if trInfo != nil {
1706  		trInfo.tr.LazyLog(&trInfo.firstLine, false)
1707  	}
1708  	var appErr error
1709  	var server any
1710  	if info != nil {
1711  		server = info.serviceImpl
1712  	}
1713  	if s.opts.streamInt == nil {
1714  		appErr = sd.Handler(server, ss)
1715  	} else {
1716  		info := &StreamServerInfo{
1717  			FullMethod:     stream.Method(),
1718  			IsClientStream: sd.ClientStreams,
1719  			IsServerStream: sd.ServerStreams,
1720  		}
1721  		appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1722  	}
1723  	if appErr != nil {
1724  		appStatus, ok := status.FromError(appErr)
1725  		if !ok {
1726  			// Convert non-status application error to a status error with code
1727  			// Unknown, but handle context errors specifically.
1728  			appStatus = status.FromContextError(appErr)
1729  			appErr = appStatus.Err()
1730  		}
1731  		if trInfo != nil {
1732  			ss.mu.Lock()
1733  			ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1734  			ss.trInfo.tr.SetError()
1735  			ss.mu.Unlock()
1736  		}
1737  		if len(ss.binlogs) != 0 {
1738  			st := &binarylog.ServerTrailer{
1739  				Trailer: ss.s.Trailer(),
1740  				Err:     appErr,
1741  			}
1742  			for _, binlog := range ss.binlogs {
1743  				binlog.Log(ctx, st)
1744  			}
1745  		}
1746  		ss.s.WriteStatus(appStatus)
1747  		// TODO: Should we log an error from WriteStatus here and below?
1748  		return appErr
1749  	}
1750  	if trInfo != nil {
1751  		ss.mu.Lock()
1752  		ss.trInfo.tr.LazyLog(stringer("OK"), false)
1753  		ss.mu.Unlock()
1754  	}
1755  	if len(ss.binlogs) != 0 {
1756  		st := &binarylog.ServerTrailer{
1757  			Trailer: ss.s.Trailer(),
1758  			Err:     appErr,
1759  		}
1760  		for _, binlog := range ss.binlogs {
1761  			binlog.Log(ctx, st)
1762  		}
1763  	}
1764  	return ss.s.WriteStatus(statusOK)
1765  }
1766  
1767  func (s *Server) handleStream(t transport.ServerTransport, stream *transport.ServerStream) {
1768  	ctx := stream.Context()
1769  	ctx = contextWithServer(ctx, s)
1770  	var ti *traceInfo
1771  	if EnableTracing {
1772  		tr := newTrace("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
1773  		ctx = newTraceContext(ctx, tr)
1774  		ti = &traceInfo{
1775  			tr: tr,
1776  			firstLine: firstLine{
1777  				client:     false,
1778  				remoteAddr: t.Peer().Addr,
1779  			},
1780  		}
1781  		if dl, ok := ctx.Deadline(); ok {
1782  			ti.firstLine.deadline = time.Until(dl)
1783  		}
1784  	}
1785  
1786  	sm := stream.Method()
1787  	if sm != "" && sm[0] == '/' {
1788  		sm = sm[1:]
1789  	}
1790  	pos := strings.LastIndex(sm, "/")
1791  	if pos == -1 {
1792  		if ti != nil {
1793  			ti.tr.LazyLog(&fmtStringer{"Malformed method name %q", []any{sm}}, true)
1794  			ti.tr.SetError()
1795  		}
1796  		errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1797  		if err := stream.WriteStatus(status.New(codes.Unimplemented, errDesc)); err != nil {
1798  			if ti != nil {
1799  				ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1800  				ti.tr.SetError()
1801  			}
1802  			channelz.Warningf(logger, s.channelz, "grpc: Server.handleStream failed to write status: %v", err)
1803  		}
1804  		if ti != nil {
1805  			ti.tr.Finish()
1806  		}
1807  		return
1808  	}
1809  	service := sm[:pos]
1810  	method := sm[pos+1:]
1811  
1812  	// FromIncomingContext is expensive: skip if there are no statsHandlers
1813  	if s.statsHandler != nil {
1814  		md, _ := metadata.FromIncomingContext(ctx)
1815  		ctx = s.statsHandler.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: stream.Method()})
1816  		s.statsHandler.HandleRPC(ctx, &stats.InHeader{
1817  			FullMethod:  stream.Method(),
1818  			RemoteAddr:  t.Peer().Addr,
1819  			LocalAddr:   t.Peer().LocalAddr,
1820  			Compression: stream.RecvCompress(),
1821  			WireLength:  stream.HeaderWireLength(),
1822  			Header:      md,
1823  		})
1824  	}
1825  	// To have calls in stream callouts work. Will delete once all stats handler
1826  	// calls come from the gRPC layer.
1827  	stream.SetContext(ctx)
1828  
1829  	srv, knownService := s.services[service]
1830  	if knownService {
1831  		if md, ok := srv.methods[method]; ok {
1832  			s.processUnaryRPC(ctx, stream, srv, md, ti)
1833  			return
1834  		}
1835  		if sd, ok := srv.streams[method]; ok {
1836  			s.processStreamingRPC(ctx, stream, srv, sd, ti)
1837  			return
1838  		}
1839  	}
1840  	// Unknown service, or known server unknown method.
1841  	if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1842  		s.processStreamingRPC(ctx, stream, nil, unknownDesc, ti)
1843  		return
1844  	}
1845  	var errDesc string
1846  	if !knownService {
1847  		errDesc = fmt.Sprintf("unknown service %v", service)
1848  	} else {
1849  		errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
1850  	}
1851  	if ti != nil {
1852  		ti.tr.LazyPrintf("%s", errDesc)
1853  		ti.tr.SetError()
1854  	}
1855  	if err := stream.WriteStatus(status.New(codes.Unimplemented, errDesc)); err != nil {
1856  		if ti != nil {
1857  			ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1858  			ti.tr.SetError()
1859  		}
1860  		channelz.Warningf(logger, s.channelz, "grpc: Server.handleStream failed to write status: %v", err)
1861  	}
1862  	if ti != nil {
1863  		ti.tr.Finish()
1864  	}
1865  }
1866  
1867  // The key to save ServerTransportStream in the context.
1868  type streamKey struct{}
1869  
1870  // NewContextWithServerTransportStream creates a new context from ctx and
1871  // attaches stream to it.
1872  //
1873  // # Experimental
1874  //
1875  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
1876  // later release.
1877  func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1878  	return context.WithValue(ctx, streamKey{}, stream)
1879  }
1880  
1881  // ServerTransportStream is a minimal interface that a transport stream must
1882  // implement. This can be used to mock an actual transport stream for tests of
1883  // handler code that use, for example, grpc.SetHeader (which requires some
1884  // stream to be in context).
1885  //
1886  // See also NewContextWithServerTransportStream.
1887  //
1888  // # Experimental
1889  //
1890  // Notice: This type is EXPERIMENTAL and may be changed or removed in a
1891  // later release.
1892  type ServerTransportStream interface {
1893  	Method() string
1894  	SetHeader(md metadata.MD) error
1895  	SendHeader(md metadata.MD) error
1896  	SetTrailer(md metadata.MD) error
1897  }
1898  
1899  // ServerTransportStreamFromContext returns the ServerTransportStream saved in
1900  // ctx. Returns nil if the given context has no stream associated with it
1901  // (which implies it is not an RPC invocation context).
1902  //
1903  // # Experimental
1904  //
1905  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
1906  // later release.
1907  func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1908  	s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1909  	return s
1910  }
1911  
1912  // Stop stops the gRPC server. It immediately closes all open
1913  // connections and listeners.
1914  // It cancels all active RPCs on the server side and the corresponding
1915  // pending RPCs on the client side will get notified by connection
1916  // errors.
1917  func (s *Server) Stop() {
1918  	s.stop(false)
1919  }
1920  
1921  // GracefulStop stops the gRPC server gracefully. It stops the server from
1922  // accepting new connections and RPCs and blocks until all the pending RPCs are
1923  // finished.
1924  func (s *Server) GracefulStop() {
1925  	s.stop(true)
1926  }
1927  
1928  func (s *Server) stop(graceful bool) {
1929  	s.quit.Fire()
1930  	defer s.done.Fire()
1931  
1932  	s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelz.ID) })
1933  	s.mu.Lock()
1934  	s.closeListenersLocked()
1935  	// Wait for serving threads to be ready to exit.  Only then can we be sure no
1936  	// new conns will be created.
1937  	s.mu.Unlock()
1938  	s.serveWG.Wait()
1939  
1940  	s.mu.Lock()
1941  	defer s.mu.Unlock()
1942  
1943  	if graceful {
1944  		s.drainAllServerTransportsLocked()
1945  	} else {
1946  		s.closeServerTransportsLocked()
1947  	}
1948  
1949  	for len(s.conns) != 0 {
1950  		s.cv.Wait()
1951  	}
1952  	s.conns = nil
1953  
1954  	if s.opts.numServerWorkers > 0 {
1955  		// Closing the channel (only once, via sync.OnceFunc) after all the
1956  		// connections have been closed above ensures that there are no
1957  		// goroutines executing the callback passed to st.HandleStreams (where
1958  		// the channel is written to).
1959  		s.serverWorkerChannelClose()
1960  	}
1961  
1962  	if graceful || s.opts.waitForHandlers {
1963  		s.handlersWG.Wait()
1964  	}
1965  
1966  	if s.events != nil {
1967  		s.events.Finish()
1968  		s.events = nil
1969  	}
1970  }
1971  
1972  // s.mu must be held by the caller.
1973  func (s *Server) closeServerTransportsLocked() {
1974  	for _, conns := range s.conns {
1975  		for st := range conns {
1976  			st.Close(errors.New("Server.Stop called"))
1977  		}
1978  	}
1979  }
1980  
1981  // s.mu must be held by the caller.
1982  func (s *Server) drainAllServerTransportsLocked() {
1983  	if !s.drain {
1984  		for _, conns := range s.conns {
1985  			for st := range conns {
1986  				st.Drain("graceful_stop")
1987  			}
1988  		}
1989  		s.drain = true
1990  	}
1991  }
1992  
1993  // s.mu must be held by the caller.
1994  func (s *Server) closeListenersLocked() {
1995  	for lis := range s.lis {
1996  		lis.Close()
1997  	}
1998  	s.lis = nil
1999  }
2000  
2001  // contentSubtype must be lowercase
2002  // cannot return nil
2003  func (s *Server) getCodec(contentSubtype string) baseCodec {
2004  	if s.opts.codec != nil {
2005  		return s.opts.codec
2006  	}
2007  	if contentSubtype == "" {
2008  		return getCodec(proto.Name)
2009  	}
2010  	codec := getCodec(contentSubtype)
2011  	if codec == nil {
2012  		logger.Warningf("Unsupported codec %q. Defaulting to %q for now. This will start to fail in future releases.", contentSubtype, proto.Name)
2013  		return getCodec(proto.Name)
2014  	}
2015  	return codec
2016  }
2017  
2018  type serverKey struct{}
2019  
2020  // serverFromContext gets the Server from the context.
2021  func serverFromContext(ctx context.Context) *Server {
2022  	s, _ := ctx.Value(serverKey{}).(*Server)
2023  	return s
2024  }
2025  
2026  // contextWithServer sets the Server in the context.
2027  func contextWithServer(ctx context.Context, server *Server) context.Context {
2028  	return context.WithValue(ctx, serverKey{}, server)
2029  }
2030  
2031  // isRegisteredMethod returns whether the passed in method is registered as a
2032  // method on the server. /service/method and service/method will match if the
2033  // service and method are registered on the server.
2034  func (s *Server) isRegisteredMethod(serviceMethod string) bool {
2035  	if serviceMethod != "" && serviceMethod[0] == '/' {
2036  		serviceMethod = serviceMethod[1:]
2037  	}
2038  	pos := strings.LastIndex(serviceMethod, "/")
2039  	if pos == -1 { // Invalid method name syntax.
2040  		return false
2041  	}
2042  	service := serviceMethod[:pos]
2043  	method := serviceMethod[pos+1:]
2044  	srv, knownService := s.services[service]
2045  	if knownService {
2046  		if _, ok := srv.methods[method]; ok {
2047  			return true
2048  		}
2049  		if _, ok := srv.streams[method]; ok {
2050  			return true
2051  		}
2052  	}
2053  	return false
2054  }
2055  
2056  // SetHeader sets the header metadata to be sent from the server to the client.
2057  // The context provided must be the context passed to the server's handler.
2058  //
2059  // Streaming RPCs should prefer the SetHeader method of the ServerStream.
2060  //
2061  // When called multiple times, all the provided metadata will be merged.  All
2062  // the metadata will be sent out when one of the following happens:
2063  //
2064  //   - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader.
2065  //   - The first response message is sent.  For unary handlers, this occurs when
2066  //     the handler returns; for streaming handlers, this can happen when stream's
2067  //     SendMsg method is called.
2068  //   - An RPC status is sent out (error or success).  This occurs when the handler
2069  //     returns.
2070  //
2071  // SetHeader will fail if called after any of the events above.
2072  //
2073  // The error returned is compatible with the status package.  However, the
2074  // status code will often not match the RPC status as seen by the client
2075  // application, and therefore, should not be relied upon for this purpose.
2076  func SetHeader(ctx context.Context, md metadata.MD) error {
2077  	if md.Len() == 0 {
2078  		return nil
2079  	}
2080  	stream := ServerTransportStreamFromContext(ctx)
2081  	if stream == nil {
2082  		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
2083  	}
2084  	return stream.SetHeader(md)
2085  }
2086  
2087  // SendHeader sends header metadata. It may be called at most once, and may not
2088  // be called after any event that causes headers to be sent (see SetHeader for
2089  // a complete list).  The provided md and headers set by SetHeader() will be
2090  // sent.
2091  //
2092  // The error returned is compatible with the status package.  However, the
2093  // status code will often not match the RPC status as seen by the client
2094  // application, and therefore, should not be relied upon for this purpose.
2095  func SendHeader(ctx context.Context, md metadata.MD) error {
2096  	stream := ServerTransportStreamFromContext(ctx)
2097  	if stream == nil {
2098  		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
2099  	}
2100  	if err := stream.SendHeader(md); err != nil {
2101  		return toRPCErr(err)
2102  	}
2103  	return nil
2104  }
2105  
2106  // SetSendCompressor sets a compressor for outbound messages from the server.
2107  // It must not be called after any event that causes headers to be sent
2108  // (see ServerStream.SetHeader for the complete list). Provided compressor is
2109  // used when below conditions are met:
2110  //
2111  //   - compressor is registered via encoding.RegisterCompressor
2112  //   - compressor name must exist in the client advertised compressor names
2113  //     sent in grpc-accept-encoding header. Use ClientSupportedCompressors to
2114  //     get client supported compressor names.
2115  //
2116  // The context provided must be the context passed to the server's handler.
2117  // It must be noted that compressor name encoding.Identity disables the
2118  // outbound compression.
2119  // By default, server messages will be sent using the same compressor with
2120  // which request messages were sent.
2121  //
2122  // It is not safe to call SetSendCompressor concurrently with SendHeader and
2123  // SendMsg.
2124  //
2125  // # Experimental
2126  //
2127  // Notice: This function is EXPERIMENTAL and may be changed or removed in a
2128  // later release.
2129  func SetSendCompressor(ctx context.Context, name string) error {
2130  	stream, ok := ServerTransportStreamFromContext(ctx).(*transport.ServerStream)
2131  	if !ok || stream == nil {
2132  		return fmt.Errorf("failed to fetch the stream from the given context")
2133  	}
2134  
2135  	if err := validateSendCompressor(name, stream.ClientAdvertisedCompressors()); err != nil {
2136  		return fmt.Errorf("unable to set send compressor: %w", err)
2137  	}
2138  
2139  	return stream.SetSendCompress(name)
2140  }
2141  
2142  // ClientSupportedCompressors returns compressor names advertised by the client
2143  // via grpc-accept-encoding header.
2144  //
2145  // The context provided must be the context passed to the server's handler.
2146  //
2147  // # Experimental
2148  //
2149  // Notice: This function is EXPERIMENTAL and may be changed or removed in a
2150  // later release.
2151  func ClientSupportedCompressors(ctx context.Context) ([]string, error) {
2152  	stream, ok := ServerTransportStreamFromContext(ctx).(*transport.ServerStream)
2153  	if !ok || stream == nil {
2154  		return nil, fmt.Errorf("failed to fetch the stream from the given context %v", ctx)
2155  	}
2156  
2157  	return stream.ClientAdvertisedCompressors(), nil
2158  }
2159  
2160  // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
2161  // When called more than once, all the provided metadata will be merged.
2162  //
2163  // The error returned is compatible with the status package.  However, the
2164  // status code will often not match the RPC status as seen by the client
2165  // application, and therefore, should not be relied upon for this purpose.
2166  func SetTrailer(ctx context.Context, md metadata.MD) error {
2167  	if md.Len() == 0 {
2168  		return nil
2169  	}
2170  	stream := ServerTransportStreamFromContext(ctx)
2171  	if stream == nil {
2172  		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
2173  	}
2174  	return stream.SetTrailer(md)
2175  }
2176  
2177  // Method returns the method string for the server context.  The returned
2178  // string is in the format of "/service/method".
2179  func Method(ctx context.Context) (string, bool) {
2180  	s := ServerTransportStreamFromContext(ctx)
2181  	if s == nil {
2182  		return "", false
2183  	}
2184  	return s.Method(), true
2185  }
2186  
2187  // validateSendCompressor returns an error when given compressor name cannot be
2188  // handled by the server or the client based on the advertised compressors.
2189  func validateSendCompressor(name string, clientCompressors []string) error {
2190  	if name == encoding.Identity {
2191  		return nil
2192  	}
2193  
2194  	if !grpcutil.IsCompressorNameRegistered(name) {
2195  		return fmt.Errorf("compressor not registered %q", name)
2196  	}
2197  
2198  	for _, c := range clientCompressors {
2199  		if c == name {
2200  			return nil // found match
2201  		}
2202  	}
2203  	return fmt.Errorf("client does not support compressor %q", name)
2204  }
2205  
2206  // atomicSemaphore implements a blocking, counting semaphore. acquire should be
2207  // called synchronously; release may be called asynchronously.
2208  type atomicSemaphore struct {
2209  	n    atomic.Int64
2210  	wait chan struct{}
2211  }
2212  
2213  func (q *atomicSemaphore) acquire() {
2214  	if q.n.Add(-1) < 0 {
2215  		// We ran out of quota.  Block until a release happens.
2216  		<-q.wait
2217  	}
2218  }
2219  
2220  func (q *atomicSemaphore) release() {
2221  	// N.B. the "<= 0" check below should allow for this to work with multiple
2222  	// concurrent calls to acquire, but also note that with synchronous calls to
2223  	// acquire, as our system does, n will never be less than -1.  There are
2224  	// fairness issues (queuing) to consider if this was to be generalized.
2225  	if q.n.Add(1) <= 0 {
2226  		// An acquire was waiting on us.  Unblock it.
2227  		q.wait <- struct{}{}
2228  	}
2229  }
2230  
2231  func newHandlerQuota(n uint32) *atomicSemaphore {
2232  	a := &atomicSemaphore{wait: make(chan struct{}, 1)}
2233  	a.n.Store(int64(n))
2234  	return a
2235  }
2236