stream.go raw

   1  package dara
   2  
   3  import (
   4  	"bufio"
   5  	"bytes"
   6  	"context"
   7  	"encoding/json"
   8  	"fmt"
   9  	"io"
  10  	"io/ioutil"
  11  	"strings"
  12  )
  13  
  14  type SSEEvent struct {
  15  	Id    *string
  16  	Event *string
  17  	Data  *string
  18  	Retry *int
  19  }
  20  
  21  func parseEvent(lines []string) *SSEEvent {
  22  	event := &SSEEvent{}
  23  	for _, line := range lines {
  24  		if strings.HasPrefix(line, "data:") {
  25  			var data string
  26  			if strings.HasPrefix(line, "data: ") {
  27  				data = strings.TrimPrefix(line, "data: ") + "\n"
  28  			} else {
  29  				data = strings.TrimPrefix(line, "data:") + "\n"
  30  			}
  31  			if event.Data == nil {
  32  				event.Data = new(string)
  33  			}
  34  			*event.Data += data
  35  		} else if strings.HasPrefix(line, "event:") {
  36  			var eventName string
  37  			if strings.HasPrefix(line, "event: ") {
  38  				eventName = strings.TrimPrefix(line, "event: ")
  39  			} else {
  40  				eventName = strings.TrimPrefix(line, "event:")
  41  			}
  42  			event.Event = &eventName
  43  		} else if strings.HasPrefix(line, "id:") {
  44  			var id string
  45  			if strings.HasPrefix(line, "id: ") {
  46  				id = strings.TrimPrefix(line, "id: ")
  47  			} else {
  48  				id = strings.TrimPrefix(line, "id:")
  49  			}
  50  			event.Id = &id
  51  		} else if strings.HasPrefix(line, "retry:") {
  52  			var retryStr string
  53  			if strings.HasPrefix(line, "retry: ") {
  54  				retryStr = strings.TrimPrefix(line, "retry: ")
  55  			} else {
  56  				retryStr = strings.TrimPrefix(line, "retry:")
  57  			}
  58  			var retry int
  59  			fmt.Sscanf(retryStr, "%d", &retry)
  60  			event.Retry = &retry
  61  		}
  62  	}
  63  	if event.Data != nil {
  64  		data := strings.TrimRight(*event.Data, "\n")
  65  		event.Data = &data
  66  	}
  67  	return event
  68  }
  69  
  70  func ReadAsBytes(body io.Reader) ([]byte, error) {
  71  	byt, err := ioutil.ReadAll(body)
  72  	if err != nil {
  73  		return nil, err
  74  	}
  75  	r, ok := body.(io.ReadCloser)
  76  	if ok {
  77  		r.Close()
  78  	}
  79  	return byt, nil
  80  }
  81  
  82  func ReadAsJSON(body io.Reader) (result interface{}, err error) {
  83  	byt, err := ioutil.ReadAll(body)
  84  	if err != nil {
  85  		return
  86  	}
  87  	if string(byt) == "" {
  88  		return
  89  	}
  90  	r, ok := body.(io.ReadCloser)
  91  	if ok {
  92  		r.Close()
  93  	}
  94  	d := json.NewDecoder(bytes.NewReader(byt))
  95  	d.UseNumber()
  96  	err = d.Decode(&result)
  97  	return
  98  }
  99  
 100  func ReadAsString(body io.Reader) (string, error) {
 101  	byt, err := ioutil.ReadAll(body)
 102  	if err != nil {
 103  		return "", err
 104  	}
 105  	r, ok := body.(io.ReadCloser)
 106  	if ok {
 107  		r.Close()
 108  	}
 109  	return string(byt), nil
 110  }
 111  
 112  func ReadAsSSE(body io.ReadCloser, eventChannel chan *SSEEvent, errorChannel chan error) {
 113  
 114  	go func() {
 115  		defer func() {
 116  			body.Close()
 117  			close(eventChannel)
 118  		}()
 119  
 120  		reader := bufio.NewReader(body)
 121  		var eventLines []string
 122  
 123  		for {
 124  			line, err := reader.ReadString('\n')
 125  			if err != nil {
 126  				if err == io.EOF {
 127  					// Handle the end of the stream and possibly pending event
 128  					if len(eventLines) > 0 {
 129  						event := parseEvent(eventLines)
 130  						eventChannel <- event
 131  					}
 132  					errorChannel <- nil
 133  					return
 134  				}
 135  				errorChannel <- err
 136  				return
 137  			}
 138  
 139  			line = strings.TrimRight(line, "\n")
 140  
 141  			if line == "" {
 142  				// End of an SSE event
 143  				if len(eventLines) > 0 {
 144  					event := parseEvent(eventLines)
 145  					eventChannel <- event
 146  					eventLines = []string{} // Reset for the next event
 147  				}
 148  				continue
 149  			}
 150  
 151  			eventLines = append(eventLines, line)
 152  		}
 153  	}()
 154  }
 155  
 156  func ReadAsSSEWithContext(ctx context.Context, body io.ReadCloser, eventChannel chan *SSEEvent, errorChannel chan error) {
 157  
 158  	go func() {
 159  		defer func() {
 160  			body.Close()
 161  			close(eventChannel)
 162  		}()
 163  
 164  		reader := bufio.NewReader(body)
 165  		var eventLines []string
 166  
 167  		for {
 168  			select {
 169  			case <-ctx.Done():
 170  				errorChannel <- ctx.Err()
 171  				return
 172  			default:
 173  			}
 174  
 175  			line, err := reader.ReadString('\n')
 176  			if err != nil {
 177  				if err == io.EOF {
 178  					// Handle the end of the stream and possibly pending event
 179  					if len(eventLines) > 0 {
 180  						event := parseEvent(eventLines)
 181  						select {
 182  						case eventChannel <- event:
 183  						case <-ctx.Done():
 184  							errorChannel <- ctx.Err()
 185  							return
 186  						}
 187  					}
 188  					errorChannel <- nil
 189  					return
 190  				}
 191  				errorChannel <- err
 192  				return
 193  			}
 194  
 195  			line = strings.TrimRight(line, "\n")
 196  
 197  			if line == "" {
 198  				// End of an SSE event
 199  				if len(eventLines) > 0 {
 200  					event := parseEvent(eventLines)
 201  					select {
 202  					case eventChannel <- event:
 203  					case <-ctx.Done():
 204  						errorChannel <- ctx.Err()
 205  						return
 206  					}
 207  					eventLines = []string{} // Reset for the next event
 208  				}
 209  				continue
 210  			}
 211  
 212  			eventLines = append(eventLines, line)
 213  		}
 214  	}()
 215  }
 216