proto_json_stream.go raw

   1  // Copyright 2022, Google Inc.
   2  // All rights reserved.
   3  //
   4  // Redistribution and use in source and binary forms, with or without
   5  // modification, are permitted provided that the following conditions are
   6  // met:
   7  //
   8  //     * Redistributions of source code must retain the above copyright
   9  // notice, this list of conditions and the following disclaimer.
  10  //     * Redistributions in binary form must reproduce the above
  11  // copyright notice, this list of conditions and the following disclaimer
  12  // in the documentation and/or other materials provided with the
  13  // distribution.
  14  //     * Neither the name of Google Inc. nor the names of its
  15  // contributors may be used to endorse or promote products derived from
  16  // this software without specific prior written permission.
  17  //
  18  // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  19  // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  20  // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  21  // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  22  // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  23  // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  24  // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  25  // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  26  // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  27  // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  28  // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  29  
  30  package gax
  31  
  32  import (
  33  	"encoding/json"
  34  	"errors"
  35  	"io"
  36  
  37  	"google.golang.org/protobuf/encoding/protojson"
  38  	"google.golang.org/protobuf/proto"
  39  	"google.golang.org/protobuf/reflect/protoreflect"
  40  )
  41  
  42  var (
  43  	arrayOpen     = json.Delim('[')
  44  	arrayClose    = json.Delim(']')
  45  	errBadOpening = errors.New("unexpected opening token, expected '['")
  46  )
  47  
  48  // ProtoJSONStream represents a wrapper for consuming a stream of protobuf
  49  // messages encoded using protobuf-JSON format. More information on this format
  50  // can be found at https://developers.google.com/protocol-buffers/docs/proto3#json.
  51  // The stream must appear as a comma-delimited, JSON array of obbjects with
  52  // opening and closing square braces.
  53  //
  54  // This is for internal use only.
  55  type ProtoJSONStream struct {
  56  	first, closed bool
  57  	reader        io.ReadCloser
  58  	stream        *json.Decoder
  59  	typ           protoreflect.MessageType
  60  }
  61  
  62  // NewProtoJSONStreamReader accepts a stream of bytes via an io.ReadCloser that are
  63  // protobuf-JSON encoded protobuf messages of the given type. The ProtoJSONStream
  64  // must be closed when done.
  65  //
  66  // This is for internal use only.
  67  func NewProtoJSONStreamReader(rc io.ReadCloser, typ protoreflect.MessageType) *ProtoJSONStream {
  68  	return &ProtoJSONStream{
  69  		first:  true,
  70  		reader: rc,
  71  		stream: json.NewDecoder(rc),
  72  		typ:    typ,
  73  	}
  74  }
  75  
  76  // Recv decodes the next protobuf message in the stream or returns io.EOF if
  77  // the stream is done. It is not safe to call Recv on the same stream from
  78  // different goroutines, just like it is not safe to do so with a single gRPC
  79  // stream. Type-cast the protobuf message returned to the type provided at
  80  // ProtoJSONStream creation.
  81  // Calls to Recv after calling Close will produce io.EOF.
  82  func (s *ProtoJSONStream) Recv() (proto.Message, error) {
  83  	if s.closed {
  84  		return nil, io.EOF
  85  	}
  86  	if s.first {
  87  		s.first = false
  88  
  89  		// Consume the opening '[' so Decode gets one object at a time.
  90  		if t, err := s.stream.Token(); err != nil {
  91  			return nil, err
  92  		} else if t != arrayOpen {
  93  			return nil, errBadOpening
  94  		}
  95  	}
  96  
  97  	// Capture the next block of data for the item (a JSON object) in the stream.
  98  	var raw json.RawMessage
  99  	if err := s.stream.Decode(&raw); err != nil {
 100  		e := err
 101  		// To avoid checking the first token of each stream, just attempt to
 102  		// Decode the next blob and if that fails, double check if it is just
 103  		// the closing token ']'. If it is the closing, return io.EOF. If it
 104  		// isn't, return the original error.
 105  		if t, _ := s.stream.Token(); t == arrayClose {
 106  			e = io.EOF
 107  		}
 108  		return nil, e
 109  	}
 110  
 111  	// Initialize a new instance of the protobuf message to unmarshal the
 112  	// raw data into.
 113  	m := s.typ.New().Interface()
 114  	unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
 115  	err := unm.Unmarshal(raw, m)
 116  
 117  	return m, err
 118  }
 119  
 120  // Close closes the stream so that resources are cleaned up.
 121  func (s *ProtoJSONStream) Close() error {
 122  	// Dereference the *json.Decoder so that the memory is gc'd.
 123  	s.stream = nil
 124  	s.closed = true
 125  
 126  	return s.reader.Close()
 127  }
 128