stream.go raw
1 package dara
2
3 import (
4 "bufio"
5 "bytes"
6 "context"
7 "encoding/json"
8 "fmt"
9 "io"
10 "io/ioutil"
11 "strings"
12 )
13
14 type SSEEvent struct {
15 Id *string
16 Event *string
17 Data *string
18 Retry *int
19 }
20
21 func parseEvent(lines []string) *SSEEvent {
22 event := &SSEEvent{}
23 for _, line := range lines {
24 if strings.HasPrefix(line, "data:") {
25 var data string
26 if strings.HasPrefix(line, "data: ") {
27 data = strings.TrimPrefix(line, "data: ") + "\n"
28 } else {
29 data = strings.TrimPrefix(line, "data:") + "\n"
30 }
31 if event.Data == nil {
32 event.Data = new(string)
33 }
34 *event.Data += data
35 } else if strings.HasPrefix(line, "event:") {
36 var eventName string
37 if strings.HasPrefix(line, "event: ") {
38 eventName = strings.TrimPrefix(line, "event: ")
39 } else {
40 eventName = strings.TrimPrefix(line, "event:")
41 }
42 event.Event = &eventName
43 } else if strings.HasPrefix(line, "id:") {
44 var id string
45 if strings.HasPrefix(line, "id: ") {
46 id = strings.TrimPrefix(line, "id: ")
47 } else {
48 id = strings.TrimPrefix(line, "id:")
49 }
50 event.Id = &id
51 } else if strings.HasPrefix(line, "retry:") {
52 var retryStr string
53 if strings.HasPrefix(line, "retry: ") {
54 retryStr = strings.TrimPrefix(line, "retry: ")
55 } else {
56 retryStr = strings.TrimPrefix(line, "retry:")
57 }
58 var retry int
59 fmt.Sscanf(retryStr, "%d", &retry)
60 event.Retry = &retry
61 }
62 }
63 if event.Data != nil {
64 data := strings.TrimRight(*event.Data, "\n")
65 event.Data = &data
66 }
67 return event
68 }
69
70 func ReadAsBytes(body io.Reader) ([]byte, error) {
71 byt, err := ioutil.ReadAll(body)
72 if err != nil {
73 return nil, err
74 }
75 r, ok := body.(io.ReadCloser)
76 if ok {
77 r.Close()
78 }
79 return byt, nil
80 }
81
82 func ReadAsJSON(body io.Reader) (result interface{}, err error) {
83 byt, err := ioutil.ReadAll(body)
84 if err != nil {
85 return
86 }
87 if string(byt) == "" {
88 return
89 }
90 r, ok := body.(io.ReadCloser)
91 if ok {
92 r.Close()
93 }
94 d := json.NewDecoder(bytes.NewReader(byt))
95 d.UseNumber()
96 err = d.Decode(&result)
97 return
98 }
99
100 func ReadAsString(body io.Reader) (string, error) {
101 byt, err := ioutil.ReadAll(body)
102 if err != nil {
103 return "", err
104 }
105 r, ok := body.(io.ReadCloser)
106 if ok {
107 r.Close()
108 }
109 return string(byt), nil
110 }
111
112 func ReadAsSSE(body io.ReadCloser, eventChannel chan *SSEEvent, errorChannel chan error) {
113
114 go func() {
115 defer func() {
116 body.Close()
117 close(eventChannel)
118 }()
119
120 reader := bufio.NewReader(body)
121 var eventLines []string
122
123 for {
124 line, err := reader.ReadString('\n')
125 if err != nil {
126 if err == io.EOF {
127 // Handle the end of the stream and possibly pending event
128 if len(eventLines) > 0 {
129 event := parseEvent(eventLines)
130 eventChannel <- event
131 }
132 errorChannel <- nil
133 return
134 }
135 errorChannel <- err
136 return
137 }
138
139 line = strings.TrimRight(line, "\n")
140
141 if line == "" {
142 // End of an SSE event
143 if len(eventLines) > 0 {
144 event := parseEvent(eventLines)
145 eventChannel <- event
146 eventLines = []string{} // Reset for the next event
147 }
148 continue
149 }
150
151 eventLines = append(eventLines, line)
152 }
153 }()
154 }
155
156 func ReadAsSSEWithContext(ctx context.Context, body io.ReadCloser, eventChannel chan *SSEEvent, errorChannel chan error) {
157
158 go func() {
159 defer func() {
160 body.Close()
161 close(eventChannel)
162 }()
163
164 reader := bufio.NewReader(body)
165 var eventLines []string
166
167 for {
168 select {
169 case <-ctx.Done():
170 errorChannel <- ctx.Err()
171 return
172 default:
173 }
174
175 line, err := reader.ReadString('\n')
176 if err != nil {
177 if err == io.EOF {
178 // Handle the end of the stream and possibly pending event
179 if len(eventLines) > 0 {
180 event := parseEvent(eventLines)
181 select {
182 case eventChannel <- event:
183 case <-ctx.Done():
184 errorChannel <- ctx.Err()
185 return
186 }
187 }
188 errorChannel <- nil
189 return
190 }
191 errorChannel <- err
192 return
193 }
194
195 line = strings.TrimRight(line, "\n")
196
197 if line == "" {
198 // End of an SSE event
199 if len(eventLines) > 0 {
200 event := parseEvent(eventLines)
201 select {
202 case eventChannel <- event:
203 case <-ctx.Done():
204 errorChannel <- ctx.Err()
205 return
206 }
207 eventLines = []string{} // Reset for the next event
208 }
209 continue
210 }
211
212 eventLines = append(eventLines, line)
213 }
214 }()
215 }
216