sink.go raw

   1  /*
   2   *
   3   * Copyright 2018 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package binarylog
  20  
  21  import (
  22  	"bufio"
  23  	"encoding/binary"
  24  	"io"
  25  	"sync"
  26  	"time"
  27  
  28  	binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
  29  	"google.golang.org/protobuf/proto"
  30  )
  31  
  32  var (
  33  	// DefaultSink is the sink where the logs will be written to. It's exported
  34  	// for the binarylog package to update.
  35  	DefaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp).
  36  )
  37  
  38  // Sink writes log entry into the binary log sink.
  39  //
  40  // sink is a copy of the exported binarylog.Sink, to avoid circular dependency.
  41  type Sink interface {
  42  	// Write will be called to write the log entry into the sink.
  43  	//
  44  	// It should be thread-safe so it can be called in parallel.
  45  	Write(*binlogpb.GrpcLogEntry) error
  46  	// Close will be called when the Sink is replaced by a new Sink.
  47  	Close() error
  48  }
  49  
  50  type noopSink struct{}
  51  
  52  func (ns *noopSink) Write(*binlogpb.GrpcLogEntry) error { return nil }
  53  func (ns *noopSink) Close() error                       { return nil }
  54  
  55  // newWriterSink creates a binary log sink with the given writer.
  56  //
  57  // Write() marshals the proto message and writes it to the given writer. Each
  58  // message is prefixed with a 4 byte big endian unsigned integer as the length.
  59  //
  60  // No buffer is done, Close() doesn't try to close the writer.
  61  func newWriterSink(w io.Writer) Sink {
  62  	return &writerSink{out: w}
  63  }
  64  
  65  type writerSink struct {
  66  	out io.Writer
  67  }
  68  
  69  func (ws *writerSink) Write(e *binlogpb.GrpcLogEntry) error {
  70  	b, err := proto.Marshal(e)
  71  	if err != nil {
  72  		grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", err)
  73  		return err
  74  	}
  75  	hdr := make([]byte, 4)
  76  	binary.BigEndian.PutUint32(hdr, uint32(len(b)))
  77  	if _, err := ws.out.Write(hdr); err != nil {
  78  		return err
  79  	}
  80  	if _, err := ws.out.Write(b); err != nil {
  81  		return err
  82  	}
  83  	return nil
  84  }
  85  
  86  func (ws *writerSink) Close() error { return nil }
  87  
  88  type bufferedSink struct {
  89  	mu             sync.Mutex
  90  	closer         io.Closer
  91  	out            Sink          // out is built on buf.
  92  	buf            *bufio.Writer // buf is kept for flush.
  93  	flusherStarted bool
  94  
  95  	writeTicker *time.Ticker
  96  	done        chan struct{}
  97  }
  98  
  99  func (fs *bufferedSink) Write(e *binlogpb.GrpcLogEntry) error {
 100  	fs.mu.Lock()
 101  	defer fs.mu.Unlock()
 102  	if !fs.flusherStarted {
 103  		// Start the write loop when Write is called.
 104  		fs.startFlushGoroutine()
 105  		fs.flusherStarted = true
 106  	}
 107  	if err := fs.out.Write(e); err != nil {
 108  		return err
 109  	}
 110  	return nil
 111  }
 112  
 113  const (
 114  	bufFlushDuration = 60 * time.Second
 115  )
 116  
 117  func (fs *bufferedSink) startFlushGoroutine() {
 118  	fs.writeTicker = time.NewTicker(bufFlushDuration)
 119  	go func() {
 120  		for {
 121  			select {
 122  			case <-fs.done:
 123  				return
 124  			case <-fs.writeTicker.C:
 125  			}
 126  			fs.mu.Lock()
 127  			if err := fs.buf.Flush(); err != nil {
 128  				grpclogLogger.Warningf("failed to flush to Sink: %v", err)
 129  			}
 130  			fs.mu.Unlock()
 131  		}
 132  	}()
 133  }
 134  
 135  func (fs *bufferedSink) Close() error {
 136  	fs.mu.Lock()
 137  	defer fs.mu.Unlock()
 138  	if fs.writeTicker != nil {
 139  		fs.writeTicker.Stop()
 140  	}
 141  	close(fs.done)
 142  	if err := fs.buf.Flush(); err != nil {
 143  		grpclogLogger.Warningf("failed to flush to Sink: %v", err)
 144  	}
 145  	if err := fs.closer.Close(); err != nil {
 146  		grpclogLogger.Warningf("failed to close the underlying WriterCloser: %v", err)
 147  	}
 148  	if err := fs.out.Close(); err != nil {
 149  		grpclogLogger.Warningf("failed to close the Sink: %v", err)
 150  	}
 151  	return nil
 152  }
 153  
 154  // NewBufferedSink creates a binary log sink with the given WriteCloser.
 155  //
 156  // Write() marshals the proto message and writes it to the given writer. Each
 157  // message is prefixed with a 4 byte big endian unsigned integer as the length.
 158  //
 159  // Content is kept in a buffer, and is flushed every 60 seconds.
 160  //
 161  // Close closes the WriteCloser.
 162  func NewBufferedSink(o io.WriteCloser) Sink {
 163  	bufW := bufio.NewWriter(o)
 164  	return &bufferedSink{
 165  		closer: o,
 166  		out:    newWriterSink(bufW),
 167  		buf:    bufW,
 168  		done:   make(chan struct{}),
 169  	}
 170  }
 171