event_stream.go raw

   1  package main
   2  
   3  import (
   4  	"bufio"
   5  	"encoding/json"
   6  	"fmt"
   7  	"math"
   8  	"math/rand"
   9  	"os"
  10  	"path/filepath"
  11  	"time"
  12  
  13  	"next.orly.dev/pkg/nostr/encoders/event"
  14  	"next.orly.dev/pkg/nostr/encoders/tag"
  15  	"next.orly.dev/pkg/nostr/encoders/timestamp"
  16  	"next.orly.dev/pkg/nostr/interfaces/signer/p8k"
  17  )
  18  
  19  // EventStream manages disk-based event generation to avoid memory bloat
  20  type EventStream struct {
  21  	baseDir   string
  22  	count     int
  23  	chunkSize int
  24  	rng       *rand.Rand
  25  }
  26  
  27  // NewEventStream creates a new event stream that stores events on disk
  28  func NewEventStream(baseDir string, count int) (*EventStream, error) {
  29  	// Create events directory
  30  	eventsDir := filepath.Join(baseDir, "events")
  31  	if err := os.MkdirAll(eventsDir, 0755); err != nil {
  32  		return nil, fmt.Errorf("failed to create events directory: %w", err)
  33  	}
  34  
  35  	return &EventStream{
  36  		baseDir:   eventsDir,
  37  		count:     count,
  38  		chunkSize: 1000, // Store 1000 events per file to balance I/O
  39  		rng:       rand.New(rand.NewSource(time.Now().UnixNano())),
  40  	}, nil
  41  }
  42  
  43  // Generate creates all events and stores them in chunk files
  44  func (es *EventStream) Generate() error {
  45  	numChunks := (es.count + es.chunkSize - 1) / es.chunkSize
  46  
  47  	for chunk := 0; chunk < numChunks; chunk++ {
  48  		chunkFile := filepath.Join(es.baseDir, fmt.Sprintf("chunk_%04d.jsonl", chunk))
  49  		f, err := os.Create(chunkFile)
  50  		if err != nil {
  51  			return fmt.Errorf("failed to create chunk file %s: %w", chunkFile, err)
  52  		}
  53  
  54  		writer := bufio.NewWriter(f)
  55  		startIdx := chunk * es.chunkSize
  56  		endIdx := min(startIdx+es.chunkSize, es.count)
  57  
  58  		for i := startIdx; i < endIdx; i++ {
  59  			ev, err := es.generateEvent(i)
  60  			if err != nil {
  61  				f.Close()
  62  				return fmt.Errorf("failed to generate event %d: %w", i, err)
  63  			}
  64  
  65  			// Marshal event to JSON
  66  			eventJSON, err := json.Marshal(ev)
  67  			if err != nil {
  68  				f.Close()
  69  				return fmt.Errorf("failed to marshal event %d: %w", i, err)
  70  			}
  71  
  72  			// Write JSON line
  73  			if _, err := writer.Write(eventJSON); err != nil {
  74  				f.Close()
  75  				return fmt.Errorf("failed to write event %d: %w", i, err)
  76  			}
  77  			if _, err := writer.WriteString("\n"); err != nil {
  78  				f.Close()
  79  				return fmt.Errorf("failed to write newline after event %d: %w", i, err)
  80  			}
  81  		}
  82  
  83  		if err := writer.Flush(); err != nil {
  84  			f.Close()
  85  			return fmt.Errorf("failed to flush chunk file %s: %w", chunkFile, err)
  86  		}
  87  
  88  		if err := f.Close(); err != nil {
  89  			return fmt.Errorf("failed to close chunk file %s: %w", chunkFile, err)
  90  		}
  91  
  92  		if (chunk+1)%10 == 0 || chunk == numChunks-1 {
  93  			fmt.Printf("  Generated %d/%d events (%.1f%%)\n",
  94  				endIdx, es.count, float64(endIdx)/float64(es.count)*100)
  95  		}
  96  	}
  97  
  98  	return nil
  99  }
 100  
 101  // generateEvent creates a single event with realistic size distribution
 102  func (es *EventStream) generateEvent(index int) (*event.E, error) {
 103  	// Create signer for this event
 104  	keys, err := p8k.New()
 105  	if err != nil {
 106  		return nil, fmt.Errorf("failed to create signer: %w", err)
 107  	}
 108  	if err := keys.Generate(); err != nil {
 109  		return nil, fmt.Errorf("failed to generate keys: %w", err)
 110  	}
 111  
 112  	ev := event.New()
 113  	ev.Kind = 1 // Text note
 114  	ev.CreatedAt = timestamp.Now().I64()
 115  
 116  	// Add some tags for realism
 117  	numTags := es.rng.Intn(5)
 118  	tags := make([]*tag.T, 0, numTags)
 119  	for i := 0; i < numTags; i++ {
 120  		tags = append(tags, tag.NewFromBytesSlice(
 121  			[]byte("t"),
 122  			[]byte(fmt.Sprintf("tag%d", es.rng.Intn(100))),
 123  		))
 124  	}
 125  	ev.Tags = tag.NewS(tags...)
 126  
 127  	// Generate content with log-distributed size
 128  	contentSize := es.generateLogDistributedSize()
 129  	ev.Content = []byte(es.generateRandomContent(contentSize))
 130  
 131  	// Sign the event
 132  	if err := ev.Sign(keys); err != nil {
 133  		return nil, fmt.Errorf("failed to sign event: %w", err)
 134  	}
 135  
 136  	return ev, nil
 137  }
 138  
 139  // generateLogDistributedSize generates sizes following a power law distribution
 140  // This creates realistic size distribution:
 141  // - Most events are small (< 1KB)
 142  // - Some events are medium (1-10KB)
 143  // - Few events are large (10-100KB)
 144  func (es *EventStream) generateLogDistributedSize() int {
 145  	// Use power law with exponent 4.0 for strong skew toward small sizes
 146  	const powerExponent = 4.0
 147  	uniform := es.rng.Float64()
 148  	skewed := math.Pow(uniform, powerExponent)
 149  
 150  	// Scale to max size of 100KB
 151  	const maxSize = 100 * 1024
 152  	size := int(skewed * maxSize)
 153  
 154  	// Ensure minimum size of 10 bytes
 155  	if size < 10 {
 156  		size = 10
 157  	}
 158  
 159  	return size
 160  }
 161  
 162  // generateRandomContent creates random text content of specified size
 163  func (es *EventStream) generateRandomContent(size int) string {
 164  	const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 \n"
 165  	content := make([]byte, size)
 166  	for i := range content {
 167  		content[i] = charset[es.rng.Intn(len(charset))]
 168  	}
 169  	return string(content)
 170  }
 171  
 172  // GetEventChannel returns a channel that streams events from disk
 173  // bufferSize controls memory usage - larger buffers improve throughput but use more memory
 174  func (es *EventStream) GetEventChannel(bufferSize int) (<-chan *event.E, <-chan error) {
 175  	eventChan := make(chan *event.E, bufferSize)
 176  	errChan := make(chan error, 1)
 177  
 178  	go func() {
 179  		defer close(eventChan)
 180  		defer close(errChan)
 181  
 182  		numChunks := (es.count + es.chunkSize - 1) / es.chunkSize
 183  
 184  		for chunk := 0; chunk < numChunks; chunk++ {
 185  			chunkFile := filepath.Join(es.baseDir, fmt.Sprintf("chunk_%04d.jsonl", chunk))
 186  			f, err := os.Open(chunkFile)
 187  			if err != nil {
 188  				errChan <- fmt.Errorf("failed to open chunk file %s: %w", chunkFile, err)
 189  				return
 190  			}
 191  
 192  			scanner := bufio.NewScanner(f)
 193  			// Increase buffer size for large events
 194  			buf := make([]byte, 0, 64*1024)
 195  			scanner.Buffer(buf, 1024*1024) // Max 1MB per line
 196  
 197  			for scanner.Scan() {
 198  				var ev event.E
 199  				if err := json.Unmarshal(scanner.Bytes(), &ev); err != nil {
 200  					f.Close()
 201  					errChan <- fmt.Errorf("failed to unmarshal event: %w", err)
 202  					return
 203  				}
 204  				eventChan <- &ev
 205  			}
 206  
 207  			if err := scanner.Err(); err != nil {
 208  				f.Close()
 209  				errChan <- fmt.Errorf("error reading chunk file %s: %w", chunkFile, err)
 210  				return
 211  			}
 212  
 213  			f.Close()
 214  		}
 215  	}()
 216  
 217  	return eventChan, errChan
 218  }
 219  
 220  // ForEach iterates over all events without loading them all into memory
 221  func (es *EventStream) ForEach(fn func(*event.E) error) error {
 222  	numChunks := (es.count + es.chunkSize - 1) / es.chunkSize
 223  
 224  	for chunk := 0; chunk < numChunks; chunk++ {
 225  		chunkFile := filepath.Join(es.baseDir, fmt.Sprintf("chunk_%04d.jsonl", chunk))
 226  		f, err := os.Open(chunkFile)
 227  		if err != nil {
 228  			return fmt.Errorf("failed to open chunk file %s: %w", chunkFile, err)
 229  		}
 230  
 231  		scanner := bufio.NewScanner(f)
 232  		buf := make([]byte, 0, 64*1024)
 233  		scanner.Buffer(buf, 1024*1024)
 234  
 235  		for scanner.Scan() {
 236  			var ev event.E
 237  			if err := json.Unmarshal(scanner.Bytes(), &ev); err != nil {
 238  				f.Close()
 239  				return fmt.Errorf("failed to unmarshal event: %w", err)
 240  			}
 241  
 242  			if err := fn(&ev); err != nil {
 243  				f.Close()
 244  				return err
 245  			}
 246  		}
 247  
 248  		if err := scanner.Err(); err != nil {
 249  			f.Close()
 250  			return fmt.Errorf("error reading chunk file %s: %w", chunkFile, err)
 251  		}
 252  
 253  		f.Close()
 254  	}
 255  
 256  	return nil
 257  }
 258