event_stream.go raw
1 package main
2
3 import (
4 "bufio"
5 "encoding/json"
6 "fmt"
7 "math"
8 "math/rand"
9 "os"
10 "path/filepath"
11 "time"
12
13 "next.orly.dev/pkg/nostr/encoders/event"
14 "next.orly.dev/pkg/nostr/encoders/tag"
15 "next.orly.dev/pkg/nostr/encoders/timestamp"
16 "next.orly.dev/pkg/nostr/interfaces/signer/p8k"
17 )
18
19 // EventStream manages disk-based event generation to avoid memory bloat
20 type EventStream struct {
21 baseDir string
22 count int
23 chunkSize int
24 rng *rand.Rand
25 }
26
27 // NewEventStream creates a new event stream that stores events on disk
28 func NewEventStream(baseDir string, count int) (*EventStream, error) {
29 // Create events directory
30 eventsDir := filepath.Join(baseDir, "events")
31 if err := os.MkdirAll(eventsDir, 0755); err != nil {
32 return nil, fmt.Errorf("failed to create events directory: %w", err)
33 }
34
35 return &EventStream{
36 baseDir: eventsDir,
37 count: count,
38 chunkSize: 1000, // Store 1000 events per file to balance I/O
39 rng: rand.New(rand.NewSource(time.Now().UnixNano())),
40 }, nil
41 }
42
43 // Generate creates all events and stores them in chunk files
44 func (es *EventStream) Generate() error {
45 numChunks := (es.count + es.chunkSize - 1) / es.chunkSize
46
47 for chunk := 0; chunk < numChunks; chunk++ {
48 chunkFile := filepath.Join(es.baseDir, fmt.Sprintf("chunk_%04d.jsonl", chunk))
49 f, err := os.Create(chunkFile)
50 if err != nil {
51 return fmt.Errorf("failed to create chunk file %s: %w", chunkFile, err)
52 }
53
54 writer := bufio.NewWriter(f)
55 startIdx := chunk * es.chunkSize
56 endIdx := min(startIdx+es.chunkSize, es.count)
57
58 for i := startIdx; i < endIdx; i++ {
59 ev, err := es.generateEvent(i)
60 if err != nil {
61 f.Close()
62 return fmt.Errorf("failed to generate event %d: %w", i, err)
63 }
64
65 // Marshal event to JSON
66 eventJSON, err := json.Marshal(ev)
67 if err != nil {
68 f.Close()
69 return fmt.Errorf("failed to marshal event %d: %w", i, err)
70 }
71
72 // Write JSON line
73 if _, err := writer.Write(eventJSON); err != nil {
74 f.Close()
75 return fmt.Errorf("failed to write event %d: %w", i, err)
76 }
77 if _, err := writer.WriteString("\n"); err != nil {
78 f.Close()
79 return fmt.Errorf("failed to write newline after event %d: %w", i, err)
80 }
81 }
82
83 if err := writer.Flush(); err != nil {
84 f.Close()
85 return fmt.Errorf("failed to flush chunk file %s: %w", chunkFile, err)
86 }
87
88 if err := f.Close(); err != nil {
89 return fmt.Errorf("failed to close chunk file %s: %w", chunkFile, err)
90 }
91
92 if (chunk+1)%10 == 0 || chunk == numChunks-1 {
93 fmt.Printf(" Generated %d/%d events (%.1f%%)\n",
94 endIdx, es.count, float64(endIdx)/float64(es.count)*100)
95 }
96 }
97
98 return nil
99 }
100
101 // generateEvent creates a single event with realistic size distribution
102 func (es *EventStream) generateEvent(index int) (*event.E, error) {
103 // Create signer for this event
104 keys, err := p8k.New()
105 if err != nil {
106 return nil, fmt.Errorf("failed to create signer: %w", err)
107 }
108 if err := keys.Generate(); err != nil {
109 return nil, fmt.Errorf("failed to generate keys: %w", err)
110 }
111
112 ev := event.New()
113 ev.Kind = 1 // Text note
114 ev.CreatedAt = timestamp.Now().I64()
115
116 // Add some tags for realism
117 numTags := es.rng.Intn(5)
118 tags := make([]*tag.T, 0, numTags)
119 for i := 0; i < numTags; i++ {
120 tags = append(tags, tag.NewFromBytesSlice(
121 []byte("t"),
122 []byte(fmt.Sprintf("tag%d", es.rng.Intn(100))),
123 ))
124 }
125 ev.Tags = tag.NewS(tags...)
126
127 // Generate content with log-distributed size
128 contentSize := es.generateLogDistributedSize()
129 ev.Content = []byte(es.generateRandomContent(contentSize))
130
131 // Sign the event
132 if err := ev.Sign(keys); err != nil {
133 return nil, fmt.Errorf("failed to sign event: %w", err)
134 }
135
136 return ev, nil
137 }
138
139 // generateLogDistributedSize generates sizes following a power law distribution
140 // This creates realistic size distribution:
141 // - Most events are small (< 1KB)
142 // - Some events are medium (1-10KB)
143 // - Few events are large (10-100KB)
144 func (es *EventStream) generateLogDistributedSize() int {
145 // Use power law with exponent 4.0 for strong skew toward small sizes
146 const powerExponent = 4.0
147 uniform := es.rng.Float64()
148 skewed := math.Pow(uniform, powerExponent)
149
150 // Scale to max size of 100KB
151 const maxSize = 100 * 1024
152 size := int(skewed * maxSize)
153
154 // Ensure minimum size of 10 bytes
155 if size < 10 {
156 size = 10
157 }
158
159 return size
160 }
161
162 // generateRandomContent creates random text content of specified size
163 func (es *EventStream) generateRandomContent(size int) string {
164 const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 \n"
165 content := make([]byte, size)
166 for i := range content {
167 content[i] = charset[es.rng.Intn(len(charset))]
168 }
169 return string(content)
170 }
171
172 // GetEventChannel returns a channel that streams events from disk
173 // bufferSize controls memory usage - larger buffers improve throughput but use more memory
174 func (es *EventStream) GetEventChannel(bufferSize int) (<-chan *event.E, <-chan error) {
175 eventChan := make(chan *event.E, bufferSize)
176 errChan := make(chan error, 1)
177
178 go func() {
179 defer close(eventChan)
180 defer close(errChan)
181
182 numChunks := (es.count + es.chunkSize - 1) / es.chunkSize
183
184 for chunk := 0; chunk < numChunks; chunk++ {
185 chunkFile := filepath.Join(es.baseDir, fmt.Sprintf("chunk_%04d.jsonl", chunk))
186 f, err := os.Open(chunkFile)
187 if err != nil {
188 errChan <- fmt.Errorf("failed to open chunk file %s: %w", chunkFile, err)
189 return
190 }
191
192 scanner := bufio.NewScanner(f)
193 // Increase buffer size for large events
194 buf := make([]byte, 0, 64*1024)
195 scanner.Buffer(buf, 1024*1024) // Max 1MB per line
196
197 for scanner.Scan() {
198 var ev event.E
199 if err := json.Unmarshal(scanner.Bytes(), &ev); err != nil {
200 f.Close()
201 errChan <- fmt.Errorf("failed to unmarshal event: %w", err)
202 return
203 }
204 eventChan <- &ev
205 }
206
207 if err := scanner.Err(); err != nil {
208 f.Close()
209 errChan <- fmt.Errorf("error reading chunk file %s: %w", chunkFile, err)
210 return
211 }
212
213 f.Close()
214 }
215 }()
216
217 return eventChan, errChan
218 }
219
220 // ForEach iterates over all events without loading them all into memory
221 func (es *EventStream) ForEach(fn func(*event.E) error) error {
222 numChunks := (es.count + es.chunkSize - 1) / es.chunkSize
223
224 for chunk := 0; chunk < numChunks; chunk++ {
225 chunkFile := filepath.Join(es.baseDir, fmt.Sprintf("chunk_%04d.jsonl", chunk))
226 f, err := os.Open(chunkFile)
227 if err != nil {
228 return fmt.Errorf("failed to open chunk file %s: %w", chunkFile, err)
229 }
230
231 scanner := bufio.NewScanner(f)
232 buf := make([]byte, 0, 64*1024)
233 scanner.Buffer(buf, 1024*1024)
234
235 for scanner.Scan() {
236 var ev event.E
237 if err := json.Unmarshal(scanner.Bytes(), &ev); err != nil {
238 f.Close()
239 return fmt.Errorf("failed to unmarshal event: %w", err)
240 }
241
242 if err := fn(&ev); err != nil {
243 f.Close()
244 return err
245 }
246 }
247
248 if err := scanner.Err(); err != nil {
249 f.Close()
250 return fmt.Errorf("error reading chunk file %s: %w", chunkFile, err)
251 }
252
253 f.Close()
254 }
255
256 return nil
257 }
258