1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 19 package binarylog
20 21 import (
22 "bufio"
23 "encoding/binary"
24 "io"
25 "sync"
26 "time"
27 28 binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
29 "google.golang.org/protobuf/proto"
30 )
31 32 var (
33 // DefaultSink is the sink where the logs will be written to. It's exported
34 // for the binarylog package to update.
35 DefaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp).
36 )
37 38 // Sink writes log entry into the binary log sink.
39 //
40 // sink is a copy of the exported binarylog.Sink, to avoid circular dependency.
41 type Sink interface {
42 // Write will be called to write the log entry into the sink.
43 //
44 // It should be thread-safe so it can be called in parallel.
45 Write(*binlogpb.GrpcLogEntry) error
46 // Close will be called when the Sink is replaced by a new Sink.
47 Close() error
48 }
49 50 type noopSink struct{}
51 52 func (ns *noopSink) Write(*binlogpb.GrpcLogEntry) error { return nil }
53 func (ns *noopSink) Close() error { return nil }
54 55 // newWriterSink creates a binary log sink with the given writer.
56 //
57 // Write() marshals the proto message and writes it to the given writer. Each
58 // message is prefixed with a 4 byte big endian unsigned integer as the length.
59 //
60 // No buffer is done, Close() doesn't try to close the writer.
61 func newWriterSink(w io.Writer) Sink {
62 return &writerSink{out: w}
63 }
64 65 type writerSink struct {
66 out io.Writer
67 }
68 69 func (ws *writerSink) Write(e *binlogpb.GrpcLogEntry) error {
70 b, err := proto.Marshal(e)
71 if err != nil {
72 grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", err)
73 return err
74 }
75 hdr := make([]byte, 4)
76 binary.BigEndian.PutUint32(hdr, uint32(len(b)))
77 if _, err := ws.out.Write(hdr); err != nil {
78 return err
79 }
80 if _, err := ws.out.Write(b); err != nil {
81 return err
82 }
83 return nil
84 }
85 86 func (ws *writerSink) Close() error { return nil }
87 88 type bufferedSink struct {
89 mu sync.Mutex
90 closer io.Closer
91 out Sink // out is built on buf.
92 buf *bufio.Writer // buf is kept for flush.
93 flusherStarted bool
94 95 writeTicker *time.Ticker
96 done chan struct{}
97 }
98 99 func (fs *bufferedSink) Write(e *binlogpb.GrpcLogEntry) error {
100 fs.mu.Lock()
101 defer fs.mu.Unlock()
102 if !fs.flusherStarted {
103 // Start the write loop when Write is called.
104 fs.startFlushGoroutine()
105 fs.flusherStarted = true
106 }
107 if err := fs.out.Write(e); err != nil {
108 return err
109 }
110 return nil
111 }
112 113 const (
114 bufFlushDuration = 60 * time.Second
115 )
116 117 func (fs *bufferedSink) startFlushGoroutine() {
118 fs.writeTicker = time.NewTicker(bufFlushDuration)
119 go func() {
120 for {
121 select {
122 case <-fs.done:
123 return
124 case <-fs.writeTicker.C:
125 }
126 fs.mu.Lock()
127 if err := fs.buf.Flush(); err != nil {
128 grpclogLogger.Warningf("failed to flush to Sink: %v", err)
129 }
130 fs.mu.Unlock()
131 }
132 }()
133 }
134 135 func (fs *bufferedSink) Close() error {
136 fs.mu.Lock()
137 defer fs.mu.Unlock()
138 if fs.writeTicker != nil {
139 fs.writeTicker.Stop()
140 }
141 close(fs.done)
142 if err := fs.buf.Flush(); err != nil {
143 grpclogLogger.Warningf("failed to flush to Sink: %v", err)
144 }
145 if err := fs.closer.Close(); err != nil {
146 grpclogLogger.Warningf("failed to close the underlying WriterCloser: %v", err)
147 }
148 if err := fs.out.Close(); err != nil {
149 grpclogLogger.Warningf("failed to close the Sink: %v", err)
150 }
151 return nil
152 }
153 154 // NewBufferedSink creates a binary log sink with the given WriteCloser.
155 //
156 // Write() marshals the proto message and writes it to the given writer. Each
157 // message is prefixed with a 4 byte big endian unsigned integer as the length.
158 //
159 // Content is kept in a buffer, and is flushed every 60 seconds.
160 //
161 // Close closes the WriteCloser.
162 func NewBufferedSink(o io.WriteCloser) Sink {
163 bufW := bufio.NewWriter(o)
164 return &bufferedSink{
165 closer: o,
166 out: newWriterSink(bufW),
167 buf: bufW,
168 done: make(chan struct{}),
169 }
170 }
171