import_utils.go raw

   1  //go:build !(js && wasm)
   2  
   3  // Package database provides shared import utilities for events
   4  package database
   5  
   6  import (
   7  	"bufio"
   8  	"context"
   9  	"io"
  10  	"os"
  11  	"runtime/debug"
  12  	"strings"
  13  	"time"
  14  
  15  	"next.orly.dev/pkg/lol/chk"
  16  	"next.orly.dev/pkg/lol/log"
  17  	"next.orly.dev/pkg/nostr/encoders/event"
  18  )
  19  
  20  const maxLen = 500000000
  21  
  22  // ImportEventsFromReader imports events from an io.Reader containing JSONL data
  23  func (d *D) ImportEventsFromReader(ctx context.Context, rr io.Reader) error {
  24  	startTime := time.Now()
  25  	log.I.F("import: starting import operation")
  26  
  27  	// store to disk so we can return fast
  28  	tmpPath := os.TempDir() + string(os.PathSeparator) + "orly"
  29  	os.MkdirAll(tmpPath, 0700)
  30  	tmp, err := os.CreateTemp(tmpPath, "")
  31  	if chk.E(err) {
  32  		return err
  33  	}
  34  	defer os.Remove(tmp.Name()) // Clean up temp file when done
  35  
  36  	log.I.F("import: buffering upload to %s", tmp.Name())
  37  	bufferStart := time.Now()
  38  	bytesBuffered, err := io.Copy(tmp, rr)
  39  	if chk.E(err) {
  40  		return err
  41  	}
  42  	bufferElapsed := time.Since(bufferStart)
  43  	log.I.F("import: buffered %.2f MB in %v (%.2f MB/sec)",
  44  		float64(bytesBuffered)/1024/1024, bufferElapsed.Round(time.Millisecond),
  45  		float64(bytesBuffered)/bufferElapsed.Seconds()/1024/1024)
  46  
  47  	if _, err = tmp.Seek(0, 0); chk.E(err) {
  48  		return err
  49  	}
  50  
  51  	processErr := d.processJSONLEvents(ctx, tmp)
  52  
  53  	totalElapsed := time.Since(startTime)
  54  	log.I.F("import: total operation time: %v", totalElapsed.Round(time.Millisecond))
  55  
  56  	return processErr
  57  }
  58  
  59  // ImportEventsFromStrings imports events from a slice of JSON strings with policy filtering
  60  func (d *D) ImportEventsFromStrings(ctx context.Context, eventJSONs []string, policyManager interface{ CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error) }) error {
  61  	// Create a reader from the string slice
  62  	reader := strings.NewReader(strings.Join(eventJSONs, "\n"))
  63  	return d.processJSONLEventsWithPolicy(ctx, reader, policyManager)
  64  }
  65  
  66  // processJSONLEvents processes JSONL events from a reader
  67  func (d *D) processJSONLEvents(ctx context.Context, rr io.Reader) error {
  68  	return d.processJSONLEventsWithPolicy(ctx, rr, nil)
  69  }
  70  
  71  // processJSONLEventsWithPolicy processes JSONL events from a reader with optional policy filtering
  72  func (d *D) processJSONLEventsWithPolicy(ctx context.Context, rr io.Reader, policyManager interface{ CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error) }) error {
  73  	// Create a scanner to read the buffer line by line
  74  	scan := bufio.NewScanner(rr)
  75  	scanBuf := make([]byte, maxLen)
  76  	scan.Buffer(scanBuf, maxLen)
  77  
  78  	// Performance tracking
  79  	startTime := time.Now()
  80  	lastLogTime := startTime
  81  	const logInterval = 5 * time.Second
  82  
  83  	var count, total, skipped, policyRejected, unmarshalErrors, saveErrors int
  84  	for scan.Scan() {
  85  		select {
  86  		case <-ctx.Done():
  87  			log.I.F("import: context closed after %d events", count)
  88  			return ctx.Err()
  89  		default:
  90  		}
  91  
  92  		b := scan.Bytes()
  93  		total += len(b) + 1
  94  		if len(b) < 1 {
  95  			skipped++
  96  			continue
  97  		}
  98  
  99  		ev := event.New()
 100  		if _, err := ev.Unmarshal(b); err != nil {
 101  			// return the pooled buffer on error
 102  			ev.Free()
 103  			unmarshalErrors++
 104  			log.W.F("failed to unmarshal event: %v", err)
 105  			continue
 106  		}
 107  
 108  		// Apply policy checking if policy manager is provided
 109  		if policyManager != nil {
 110  			// For sync imports, we treat events as coming from system/trusted source
 111  			// Use nil pubkey and empty remote to indicate system-level import
 112  			allowed, policyErr := policyManager.CheckPolicy("write", ev, nil, "")
 113  			if policyErr != nil {
 114  				log.W.F("policy check failed for event %x: %v", ev.ID, policyErr)
 115  				ev.Free()
 116  				policyRejected++
 117  				continue
 118  			}
 119  			if !allowed {
 120  				log.D.F("policy rejected event %x during sync import", ev.ID)
 121  				ev.Free()
 122  				policyRejected++
 123  				continue
 124  			}
 125  			log.D.F("policy allowed event %x during sync import", ev.ID)
 126  		}
 127  
 128  		// Apply rate limiting before write operation if limiter is configured
 129  		if d.rateLimiter != nil && d.rateLimiter.IsEnabled() {
 130  			d.rateLimiter.Wait(ctx, WriteOpType)
 131  		}
 132  
 133  		if _, err := d.SaveEvent(ctx, ev); err != nil {
 134  			// return the pooled buffer on error paths too
 135  			ev.Free()
 136  			saveErrors++
 137  			log.W.F("failed to save event: %v", err)
 138  			continue
 139  		}
 140  
 141  		// return the pooled buffer after successful save
 142  		ev.Free()
 143  		b = nil
 144  		count++
 145  
 146  		// Progress logging every logInterval
 147  		if time.Since(lastLogTime) >= logInterval {
 148  			elapsed := time.Since(startTime)
 149  			eventsPerSec := float64(count) / elapsed.Seconds()
 150  			mbPerSec := float64(total) / elapsed.Seconds() / 1024 / 1024
 151  			log.I.F("import: progress %d events saved, %.2f MB read, %.0f events/sec, %.2f MB/sec",
 152  				count, float64(total)/1024/1024, eventsPerSec, mbPerSec)
 153  			lastLogTime = time.Now()
 154  			debug.FreeOSMemory()
 155  		}
 156  	}
 157  
 158  	// Final summary
 159  	elapsed := time.Since(startTime)
 160  	eventsPerSec := float64(count) / elapsed.Seconds()
 161  	mbPerSec := float64(total) / elapsed.Seconds() / 1024 / 1024
 162  	log.I.F("import: completed - %d events saved, %.2f MB in %v (%.0f events/sec, %.2f MB/sec)",
 163  		count, float64(total)/1024/1024, elapsed.Round(time.Millisecond), eventsPerSec, mbPerSec)
 164  	if unmarshalErrors > 0 || saveErrors > 0 || policyRejected > 0 || skipped > 0 {
 165  		log.I.F("import: stats - %d unmarshal errors, %d save errors, %d policy rejected, %d skipped empty lines",
 166  			unmarshalErrors, saveErrors, policyRejected, skipped)
 167  	}
 168  
 169  	if err := scan.Err(); err != nil {
 170  		return err
 171  	}
 172  
 173  	return nil
 174  }
 175