preloader.go raw

   1  /*
   2   *
   3   * Copyright 2019 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package grpc
  20  
  21  import (
  22  	"google.golang.org/grpc/codes"
  23  	"google.golang.org/grpc/mem"
  24  	"google.golang.org/grpc/status"
  25  )
  26  
  27  // PreparedMsg is responsible for creating a Marshalled and Compressed object.
  28  //
  29  // # Experimental
  30  //
  31  // Notice: This type is EXPERIMENTAL and may be changed or removed in a
  32  // later release.
  33  type PreparedMsg struct {
  34  	// Struct for preparing msg before sending them
  35  	encodedData mem.BufferSlice
  36  	hdr         []byte
  37  	payload     mem.BufferSlice
  38  	pf          payloadFormat
  39  }
  40  
  41  // Encode marshalls and compresses the message using the codec and compressor for the stream.
  42  func (p *PreparedMsg) Encode(s Stream, msg any) error {
  43  	ctx := s.Context()
  44  	rpcInfo, ok := rpcInfoFromContext(ctx)
  45  	if !ok {
  46  		return status.Errorf(codes.Internal, "grpc: unable to get rpcInfo")
  47  	}
  48  
  49  	// check if the context has the relevant information to prepareMsg
  50  	if rpcInfo.preloaderInfo.codec == nil {
  51  		return status.Errorf(codes.Internal, "grpc: rpcInfo.preloaderInfo.codec is nil")
  52  	}
  53  
  54  	// prepare the msg
  55  	data, err := encode(rpcInfo.preloaderInfo.codec, msg)
  56  	if err != nil {
  57  		return err
  58  	}
  59  
  60  	materializedData := data.Materialize()
  61  	data.Free()
  62  	p.encodedData = mem.BufferSlice{mem.SliceBuffer(materializedData)}
  63  
  64  	// TODO: it should be possible to grab the bufferPool from the underlying
  65  	//  stream implementation with a type cast to its actual type (such as
  66  	//  addrConnStream) and accessing the buffer pool directly.
  67  	var compData mem.BufferSlice
  68  	compData, p.pf, err = compress(p.encodedData, rpcInfo.preloaderInfo.cp, rpcInfo.preloaderInfo.comp, mem.DefaultBufferPool())
  69  	if err != nil {
  70  		return err
  71  	}
  72  
  73  	if p.pf.isCompressed() {
  74  		materializedCompData := compData.Materialize()
  75  		compData.Free()
  76  		compData = mem.BufferSlice{mem.SliceBuffer(materializedCompData)}
  77  	}
  78  
  79  	p.hdr, p.payload = msgHeader(p.encodedData, compData, p.pf)
  80  
  81  	return nil
  82  }
  83