1 // Copyright 2022, Google Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Google Inc. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 30 package gax
31 32 import (
33 "encoding/json"
34 "errors"
35 "io"
36 37 "google.golang.org/protobuf/encoding/protojson"
38 "google.golang.org/protobuf/proto"
39 "google.golang.org/protobuf/reflect/protoreflect"
40 )
41 42 var (
43 arrayOpen = json.Delim('[')
44 arrayClose = json.Delim(']')
45 errBadOpening = errors.New("unexpected opening token, expected '['")
46 )
47 48 // ProtoJSONStream represents a wrapper for consuming a stream of protobuf
49 // messages encoded using protobuf-JSON format. More information on this format
50 // can be found at https://developers.google.com/protocol-buffers/docs/proto3#json.
51 // The stream must appear as a comma-delimited, JSON array of obbjects with
52 // opening and closing square braces.
53 //
54 // This is for internal use only.
55 type ProtoJSONStream struct {
56 first, closed bool
57 reader io.ReadCloser
58 stream *json.Decoder
59 typ protoreflect.MessageType
60 }
61 62 // NewProtoJSONStreamReader accepts a stream of bytes via an io.ReadCloser that are
63 // protobuf-JSON encoded protobuf messages of the given type. The ProtoJSONStream
64 // must be closed when done.
65 //
66 // This is for internal use only.
67 func NewProtoJSONStreamReader(rc io.ReadCloser, typ protoreflect.MessageType) *ProtoJSONStream {
68 return &ProtoJSONStream{
69 first: true,
70 reader: rc,
71 stream: json.NewDecoder(rc),
72 typ: typ,
73 }
74 }
75 76 // Recv decodes the next protobuf message in the stream or returns io.EOF if
77 // the stream is done. It is not safe to call Recv on the same stream from
78 // different goroutines, just like it is not safe to do so with a single gRPC
79 // stream. Type-cast the protobuf message returned to the type provided at
80 // ProtoJSONStream creation.
81 // Calls to Recv after calling Close will produce io.EOF.
82 func (s *ProtoJSONStream) Recv() (proto.Message, error) {
83 if s.closed {
84 return nil, io.EOF
85 }
86 if s.first {
87 s.first = false
88 89 // Consume the opening '[' so Decode gets one object at a time.
90 if t, err := s.stream.Token(); err != nil {
91 return nil, err
92 } else if t != arrayOpen {
93 return nil, errBadOpening
94 }
95 }
96 97 // Capture the next block of data for the item (a JSON object) in the stream.
98 var raw json.RawMessage
99 if err := s.stream.Decode(&raw); err != nil {
100 e := err
101 // To avoid checking the first token of each stream, just attempt to
102 // Decode the next blob and if that fails, double check if it is just
103 // the closing token ']'. If it is the closing, return io.EOF. If it
104 // isn't, return the original error.
105 if t, _ := s.stream.Token(); t == arrayClose {
106 e = io.EOF
107 }
108 return nil, e
109 }
110 111 // Initialize a new instance of the protobuf message to unmarshal the
112 // raw data into.
113 m := s.typ.New().Interface()
114 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
115 err := unm.Unmarshal(raw, m)
116 117 return m, err
118 }
119 120 // Close closes the stream so that resources are cleaned up.
121 func (s *ProtoJSONStream) Close() error {
122 // Dereference the *json.Decoder so that the memory is gc'd.
123 s.stream = nil
124 s.closed = true
125 126 return s.reader.Close()
127 }
128