method_logger.go raw

   1  /*
   2   *
   3   * Copyright 2018 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package binarylog
  20  
  21  import (
  22  	"context"
  23  	"net"
  24  	"strings"
  25  	"sync/atomic"
  26  	"time"
  27  
  28  	binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
  29  	"google.golang.org/grpc/metadata"
  30  	"google.golang.org/grpc/status"
  31  	"google.golang.org/protobuf/proto"
  32  	"google.golang.org/protobuf/types/known/durationpb"
  33  	"google.golang.org/protobuf/types/known/timestamppb"
  34  )
  35  
  36  type callIDGenerator struct {
  37  	id uint64
  38  }
  39  
  40  func (g *callIDGenerator) next() uint64 {
  41  	id := atomic.AddUint64(&g.id, 1)
  42  	return id
  43  }
  44  
  45  // reset is for testing only, and doesn't need to be thread safe.
  46  func (g *callIDGenerator) reset() {
  47  	g.id = 0
  48  }
  49  
  50  var idGen callIDGenerator
  51  
  52  // MethodLogger is the sub-logger for each method.
  53  //
  54  // This is used in the 1.0 release of gcp/observability, and thus must not be
  55  // deleted or changed.
  56  type MethodLogger interface {
  57  	Log(context.Context, LogEntryConfig)
  58  }
  59  
  60  // TruncatingMethodLogger is a method logger that truncates headers and messages
  61  // based on configured fields.
  62  type TruncatingMethodLogger struct {
  63  	headerMaxLen, messageMaxLen uint64
  64  
  65  	callID          uint64
  66  	idWithinCallGen *callIDGenerator
  67  
  68  	sink Sink // TODO(blog): make this pluggable.
  69  }
  70  
  71  // NewTruncatingMethodLogger returns a new truncating method logger.
  72  //
  73  // This is used in the 1.0 release of gcp/observability, and thus must not be
  74  // deleted or changed.
  75  func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {
  76  	return &TruncatingMethodLogger{
  77  		headerMaxLen:  h,
  78  		messageMaxLen: m,
  79  
  80  		callID:          idGen.next(),
  81  		idWithinCallGen: &callIDGenerator{},
  82  
  83  		sink: DefaultSink, // TODO(blog): make it pluggable.
  84  	}
  85  }
  86  
  87  // Build is an internal only method for building the proto message out of the
  88  // input event. It's made public to enable other library to reuse as much logic
  89  // in TruncatingMethodLogger as possible.
  90  func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry {
  91  	m := c.toProto()
  92  	timestamp := timestamppb.Now()
  93  	m.Timestamp = timestamp
  94  	m.CallId = ml.callID
  95  	m.SequenceIdWithinCall = ml.idWithinCallGen.next()
  96  
  97  	switch pay := m.Payload.(type) {
  98  	case *binlogpb.GrpcLogEntry_ClientHeader:
  99  		m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata())
 100  	case *binlogpb.GrpcLogEntry_ServerHeader:
 101  		m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata())
 102  	case *binlogpb.GrpcLogEntry_Message:
 103  		m.PayloadTruncated = ml.truncateMessage(pay.Message)
 104  	}
 105  	return m
 106  }
 107  
 108  // Log creates a proto binary log entry, and logs it to the sink.
 109  func (ml *TruncatingMethodLogger) Log(_ context.Context, c LogEntryConfig) {
 110  	ml.sink.Write(ml.Build(c))
 111  }
 112  
 113  func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *binlogpb.Metadata) (truncated bool) {
 114  	if ml.headerMaxLen == maxUInt {
 115  		return false
 116  	}
 117  	var (
 118  		bytesLimit = ml.headerMaxLen
 119  		index      int
 120  	)
 121  	// At the end of the loop, index will be the first entry where the total
 122  	// size is greater than the limit:
 123  	//
 124  	// len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr.
 125  	for ; index < len(mdPb.Entry); index++ {
 126  		entry := mdPb.Entry[index]
 127  		if entry.Key == "grpc-trace-bin" {
 128  			// "grpc-trace-bin" is a special key. It's kept in the log entry,
 129  			// but not counted towards the size limit.
 130  			continue
 131  		}
 132  		currentEntryLen := uint64(len(entry.GetKey())) + uint64(len(entry.GetValue()))
 133  		if currentEntryLen > bytesLimit {
 134  			break
 135  		}
 136  		bytesLimit -= currentEntryLen
 137  	}
 138  	truncated = index < len(mdPb.Entry)
 139  	mdPb.Entry = mdPb.Entry[:index]
 140  	return truncated
 141  }
 142  
 143  func (ml *TruncatingMethodLogger) truncateMessage(msgPb *binlogpb.Message) (truncated bool) {
 144  	if ml.messageMaxLen == maxUInt {
 145  		return false
 146  	}
 147  	if ml.messageMaxLen >= uint64(len(msgPb.Data)) {
 148  		return false
 149  	}
 150  	msgPb.Data = msgPb.Data[:ml.messageMaxLen]
 151  	return true
 152  }
 153  
 154  // LogEntryConfig represents the configuration for binary log entry.
 155  //
 156  // This is used in the 1.0 release of gcp/observability, and thus must not be
 157  // deleted or changed.
 158  type LogEntryConfig interface {
 159  	toProto() *binlogpb.GrpcLogEntry
 160  }
 161  
 162  // ClientHeader configs the binary log entry to be a ClientHeader entry.
 163  type ClientHeader struct {
 164  	OnClientSide bool
 165  	Header       metadata.MD
 166  	MethodName   string
 167  	Authority    string
 168  	Timeout      time.Duration
 169  	// PeerAddr is required only when it's on server side.
 170  	PeerAddr net.Addr
 171  }
 172  
 173  func (c *ClientHeader) toProto() *binlogpb.GrpcLogEntry {
 174  	// This function doesn't need to set all the fields (e.g. seq ID). The Log
 175  	// function will set the fields when necessary.
 176  	clientHeader := &binlogpb.ClientHeader{
 177  		Metadata:   mdToMetadataProto(c.Header),
 178  		MethodName: c.MethodName,
 179  		Authority:  c.Authority,
 180  	}
 181  	if c.Timeout > 0 {
 182  		clientHeader.Timeout = durationpb.New(c.Timeout)
 183  	}
 184  	ret := &binlogpb.GrpcLogEntry{
 185  		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
 186  		Payload: &binlogpb.GrpcLogEntry_ClientHeader{
 187  			ClientHeader: clientHeader,
 188  		},
 189  	}
 190  	if c.OnClientSide {
 191  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
 192  	} else {
 193  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
 194  	}
 195  	if c.PeerAddr != nil {
 196  		ret.Peer = addrToProto(c.PeerAddr)
 197  	}
 198  	return ret
 199  }
 200  
 201  // ServerHeader configs the binary log entry to be a ServerHeader entry.
 202  type ServerHeader struct {
 203  	OnClientSide bool
 204  	Header       metadata.MD
 205  	// PeerAddr is required only when it's on client side.
 206  	PeerAddr net.Addr
 207  }
 208  
 209  func (c *ServerHeader) toProto() *binlogpb.GrpcLogEntry {
 210  	ret := &binlogpb.GrpcLogEntry{
 211  		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
 212  		Payload: &binlogpb.GrpcLogEntry_ServerHeader{
 213  			ServerHeader: &binlogpb.ServerHeader{
 214  				Metadata: mdToMetadataProto(c.Header),
 215  			},
 216  		},
 217  	}
 218  	if c.OnClientSide {
 219  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
 220  	} else {
 221  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
 222  	}
 223  	if c.PeerAddr != nil {
 224  		ret.Peer = addrToProto(c.PeerAddr)
 225  	}
 226  	return ret
 227  }
 228  
 229  // ClientMessage configs the binary log entry to be a ClientMessage entry.
 230  type ClientMessage struct {
 231  	OnClientSide bool
 232  	// Message can be a proto.Message or []byte. Other messages formats are not
 233  	// supported.
 234  	Message any
 235  }
 236  
 237  func (c *ClientMessage) toProto() *binlogpb.GrpcLogEntry {
 238  	var (
 239  		data []byte
 240  		err  error
 241  	)
 242  	if m, ok := c.Message.(proto.Message); ok {
 243  		data, err = proto.Marshal(m)
 244  		if err != nil {
 245  			grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
 246  		}
 247  	} else if b, ok := c.Message.([]byte); ok {
 248  		data = b
 249  	} else {
 250  		grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
 251  	}
 252  	ret := &binlogpb.GrpcLogEntry{
 253  		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
 254  		Payload: &binlogpb.GrpcLogEntry_Message{
 255  			Message: &binlogpb.Message{
 256  				Length: uint32(len(data)),
 257  				Data:   data,
 258  			},
 259  		},
 260  	}
 261  	if c.OnClientSide {
 262  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
 263  	} else {
 264  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
 265  	}
 266  	return ret
 267  }
 268  
 269  // ServerMessage configs the binary log entry to be a ServerMessage entry.
 270  type ServerMessage struct {
 271  	OnClientSide bool
 272  	// Message can be a proto.Message or []byte. Other messages formats are not
 273  	// supported.
 274  	Message any
 275  }
 276  
 277  func (c *ServerMessage) toProto() *binlogpb.GrpcLogEntry {
 278  	var (
 279  		data []byte
 280  		err  error
 281  	)
 282  	if m, ok := c.Message.(proto.Message); ok {
 283  		data, err = proto.Marshal(m)
 284  		if err != nil {
 285  			grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
 286  		}
 287  	} else if b, ok := c.Message.([]byte); ok {
 288  		data = b
 289  	} else {
 290  		grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
 291  	}
 292  	ret := &binlogpb.GrpcLogEntry{
 293  		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
 294  		Payload: &binlogpb.GrpcLogEntry_Message{
 295  			Message: &binlogpb.Message{
 296  				Length: uint32(len(data)),
 297  				Data:   data,
 298  			},
 299  		},
 300  	}
 301  	if c.OnClientSide {
 302  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
 303  	} else {
 304  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
 305  	}
 306  	return ret
 307  }
 308  
 309  // ClientHalfClose configs the binary log entry to be a ClientHalfClose entry.
 310  type ClientHalfClose struct {
 311  	OnClientSide bool
 312  }
 313  
 314  func (c *ClientHalfClose) toProto() *binlogpb.GrpcLogEntry {
 315  	ret := &binlogpb.GrpcLogEntry{
 316  		Type:    binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
 317  		Payload: nil, // No payload here.
 318  	}
 319  	if c.OnClientSide {
 320  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
 321  	} else {
 322  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
 323  	}
 324  	return ret
 325  }
 326  
 327  // ServerTrailer configs the binary log entry to be a ServerTrailer entry.
 328  type ServerTrailer struct {
 329  	OnClientSide bool
 330  	Trailer      metadata.MD
 331  	// Err is the status error.
 332  	Err error
 333  	// PeerAddr is required only when it's on client side and the RPC is trailer
 334  	// only.
 335  	PeerAddr net.Addr
 336  }
 337  
 338  func (c *ServerTrailer) toProto() *binlogpb.GrpcLogEntry {
 339  	st, ok := status.FromError(c.Err)
 340  	if !ok {
 341  		grpclogLogger.Info("binarylogging: error in trailer is not a status error")
 342  	}
 343  	var (
 344  		detailsBytes []byte
 345  		err          error
 346  	)
 347  	stProto := st.Proto()
 348  	if stProto != nil && len(stProto.Details) != 0 {
 349  		detailsBytes, err = proto.Marshal(stProto)
 350  		if err != nil {
 351  			grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err)
 352  		}
 353  	}
 354  	ret := &binlogpb.GrpcLogEntry{
 355  		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
 356  		Payload: &binlogpb.GrpcLogEntry_Trailer{
 357  			Trailer: &binlogpb.Trailer{
 358  				Metadata:      mdToMetadataProto(c.Trailer),
 359  				StatusCode:    uint32(st.Code()),
 360  				StatusMessage: st.Message(),
 361  				StatusDetails: detailsBytes,
 362  			},
 363  		},
 364  	}
 365  	if c.OnClientSide {
 366  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
 367  	} else {
 368  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
 369  	}
 370  	if c.PeerAddr != nil {
 371  		ret.Peer = addrToProto(c.PeerAddr)
 372  	}
 373  	return ret
 374  }
 375  
 376  // Cancel configs the binary log entry to be a Cancel entry.
 377  type Cancel struct {
 378  	OnClientSide bool
 379  }
 380  
 381  func (c *Cancel) toProto() *binlogpb.GrpcLogEntry {
 382  	ret := &binlogpb.GrpcLogEntry{
 383  		Type:    binlogpb.GrpcLogEntry_EVENT_TYPE_CANCEL,
 384  		Payload: nil,
 385  	}
 386  	if c.OnClientSide {
 387  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
 388  	} else {
 389  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
 390  	}
 391  	return ret
 392  }
 393  
 394  // metadataKeyOmit returns whether the metadata entry with this key should be
 395  // omitted.
 396  func metadataKeyOmit(key string) bool {
 397  	switch key {
 398  	case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te":
 399  		return true
 400  	case "grpc-trace-bin": // grpc-trace-bin is special because it's visible to users.
 401  		return false
 402  	}
 403  	return strings.HasPrefix(key, "grpc-")
 404  }
 405  
 406  func mdToMetadataProto(md metadata.MD) *binlogpb.Metadata {
 407  	ret := &binlogpb.Metadata{}
 408  	for k, vv := range md {
 409  		if metadataKeyOmit(k) {
 410  			continue
 411  		}
 412  		for _, v := range vv {
 413  			ret.Entry = append(ret.Entry,
 414  				&binlogpb.MetadataEntry{
 415  					Key:   k,
 416  					Value: []byte(v),
 417  				},
 418  			)
 419  		}
 420  	}
 421  	return ret
 422  }
 423  
 424  func addrToProto(addr net.Addr) *binlogpb.Address {
 425  	ret := &binlogpb.Address{}
 426  	switch a := addr.(type) {
 427  	case *net.TCPAddr:
 428  		if a.IP.To4() != nil {
 429  			ret.Type = binlogpb.Address_TYPE_IPV4
 430  		} else if a.IP.To16() != nil {
 431  			ret.Type = binlogpb.Address_TYPE_IPV6
 432  		} else {
 433  			ret.Type = binlogpb.Address_TYPE_UNKNOWN
 434  			// Do not set address and port fields.
 435  			break
 436  		}
 437  		ret.Address = a.IP.String()
 438  		ret.IpPort = uint32(a.Port)
 439  	case *net.UnixAddr:
 440  		ret.Type = binlogpb.Address_TYPE_UNIX
 441  		ret.Address = a.String()
 442  	default:
 443  		ret.Type = binlogpb.Address_TYPE_UNKNOWN
 444  	}
 445  	return ret
 446  }
 447