sseReader.go raw

   1  // Copyright (c) 2016, 2018, 2025, Oracle and/or its affiliates.  All rights reserved.
   2  // This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license.
   3  
   4  package common
   5  
   6  import (
   7  	"bufio"
   8  	"bytes"
   9  	"context"
  10  	"io"
  11  	"net/http"
  12  )
  13  
  14  type SseReader struct {
  15  	HttpBody     io.ReadCloser
  16  	eventScanner bufio.Scanner
  17  	OnClose      func(r *SseReader)
  18  }
  19  
  20  // InvalidSSEResponseError returned in the case that a nil response body  was given
  21  // to NewSSEReader()
  22  type InvalidSSEResponseError struct {
  23  }
  24  
  25  const InvalidResponseErrorMessage = "invalid response struct given to NewSSEReader"
  26  
  27  func (e InvalidSSEResponseError) Error() string {
  28  	return InvalidResponseErrorMessage
  29  }
  30  
  31  // NewSSEReader returns an SSE Reader given an sse response
  32  func NewSSEReader(response *http.Response) (*SseReader, error) {
  33  
  34  	if response == nil || response.Body == nil {
  35  		return nil, InvalidSSEResponseError{}
  36  	}
  37  
  38  	reader := &SseReader{
  39  		HttpBody:     response.Body,
  40  		eventScanner: *bufio.NewScanner(response.Body),
  41  		OnClose:      func(r *SseReader) { r.HttpBody.Close() }, // Default on close function, ensures body is closed after use
  42  	}
  43  	return reader, nil
  44  }
  45  
  46  // Take the response in bytes and trim it if necessary
  47  func processEvent(e []byte) []byte {
  48  	e = bytes.TrimPrefix(e, []byte("data: ")) // Text/event-stream always prefixed with 'data: '
  49  	return e
  50  }
  51  
  52  // ReadNextEvent reads the next event in the stream, return it unmarshalled
  53  func (r *SseReader) ReadNextEvent() (event []byte, err error) {
  54  	if r.eventScanner.Scan() {
  55  		eventBytes := r.eventScanner.Bytes()
  56  		return processEvent(eventBytes), nil
  57  	} else {
  58  
  59  		// Close out the stream since we are finished reading from it
  60  		if r.OnClose != nil {
  61  			r.OnClose(r)
  62  		}
  63  
  64  		err := r.eventScanner.Err()
  65  		if err == context.Canceled || err == nil {
  66  			err = io.EOF
  67  		}
  68  		return nil, err
  69  	}
  70  
  71  }
  72  
  73  // ReadAllEvents reads all events from the response stream, and processes each with given event handler
  74  func (r *SseReader) ReadAllEvents(eventHandler func(e []byte)) error {
  75  	for {
  76  
  77  		event, err := r.ReadNextEvent()
  78  
  79  		if err != nil {
  80  
  81  			if err == io.EOF {
  82  				err = nil
  83  			}
  84  			return err
  85  		}
  86  
  87  		// Ignore empty events
  88  		if len(event) > 0 {
  89  			eventHandler(event)
  90  		}
  91  	}
  92  }
  93