sseReader.go raw
1 // Copyright (c) 2016, 2018, 2025, Oracle and/or its affiliates. All rights reserved.
2 // This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license.
3
4 package common
5
6 import (
7 "bufio"
8 "bytes"
9 "context"
10 "io"
11 "net/http"
12 )
13
14 type SseReader struct {
15 HttpBody io.ReadCloser
16 eventScanner bufio.Scanner
17 OnClose func(r *SseReader)
18 }
19
20 // InvalidSSEResponseError returned in the case that a nil response body was given
21 // to NewSSEReader()
22 type InvalidSSEResponseError struct {
23 }
24
25 const InvalidResponseErrorMessage = "invalid response struct given to NewSSEReader"
26
27 func (e InvalidSSEResponseError) Error() string {
28 return InvalidResponseErrorMessage
29 }
30
31 // NewSSEReader returns an SSE Reader given an sse response
32 func NewSSEReader(response *http.Response) (*SseReader, error) {
33
34 if response == nil || response.Body == nil {
35 return nil, InvalidSSEResponseError{}
36 }
37
38 reader := &SseReader{
39 HttpBody: response.Body,
40 eventScanner: *bufio.NewScanner(response.Body),
41 OnClose: func(r *SseReader) { r.HttpBody.Close() }, // Default on close function, ensures body is closed after use
42 }
43 return reader, nil
44 }
45
46 // Take the response in bytes and trim it if necessary
47 func processEvent(e []byte) []byte {
48 e = bytes.TrimPrefix(e, []byte("data: ")) // Text/event-stream always prefixed with 'data: '
49 return e
50 }
51
52 // ReadNextEvent reads the next event in the stream, return it unmarshalled
53 func (r *SseReader) ReadNextEvent() (event []byte, err error) {
54 if r.eventScanner.Scan() {
55 eventBytes := r.eventScanner.Bytes()
56 return processEvent(eventBytes), nil
57 } else {
58
59 // Close out the stream since we are finished reading from it
60 if r.OnClose != nil {
61 r.OnClose(r)
62 }
63
64 err := r.eventScanner.Err()
65 if err == context.Canceled || err == nil {
66 err = io.EOF
67 }
68 return nil, err
69 }
70
71 }
72
73 // ReadAllEvents reads all events from the response stream, and processes each with given event handler
74 func (r *SseReader) ReadAllEvents(eventHandler func(e []byte)) error {
75 for {
76
77 event, err := r.ReadNextEvent()
78
79 if err != nil {
80
81 if err == io.EOF {
82 err = nil
83 }
84 return err
85 }
86
87 // Ignore empty events
88 if len(event) > 0 {
89 eventHandler(event)
90 }
91 }
92 }
93