import_utils.go raw
1 //go:build !(js && wasm)
2
3 // Package database provides shared import utilities for events
4 package database
5
6 import (
7 "bufio"
8 "context"
9 "io"
10 "os"
11 "runtime/debug"
12 "strings"
13 "time"
14
15 "next.orly.dev/pkg/lol/chk"
16 "next.orly.dev/pkg/lol/log"
17 "next.orly.dev/pkg/nostr/encoders/event"
18 )
19
20 const maxLen = 500000000
21
22 // ImportEventsFromReader imports events from an io.Reader containing JSONL data
23 func (d *D) ImportEventsFromReader(ctx context.Context, rr io.Reader) error {
24 startTime := time.Now()
25 log.I.F("import: starting import operation")
26
27 // store to disk so we can return fast
28 tmpPath := os.TempDir() + string(os.PathSeparator) + "orly"
29 os.MkdirAll(tmpPath, 0700)
30 tmp, err := os.CreateTemp(tmpPath, "")
31 if chk.E(err) {
32 return err
33 }
34 defer os.Remove(tmp.Name()) // Clean up temp file when done
35
36 log.I.F("import: buffering upload to %s", tmp.Name())
37 bufferStart := time.Now()
38 bytesBuffered, err := io.Copy(tmp, rr)
39 if chk.E(err) {
40 return err
41 }
42 bufferElapsed := time.Since(bufferStart)
43 log.I.F("import: buffered %.2f MB in %v (%.2f MB/sec)",
44 float64(bytesBuffered)/1024/1024, bufferElapsed.Round(time.Millisecond),
45 float64(bytesBuffered)/bufferElapsed.Seconds()/1024/1024)
46
47 if _, err = tmp.Seek(0, 0); chk.E(err) {
48 return err
49 }
50
51 processErr := d.processJSONLEvents(ctx, tmp)
52
53 totalElapsed := time.Since(startTime)
54 log.I.F("import: total operation time: %v", totalElapsed.Round(time.Millisecond))
55
56 return processErr
57 }
58
59 // ImportEventsFromStrings imports events from a slice of JSON strings with policy filtering
60 func (d *D) ImportEventsFromStrings(ctx context.Context, eventJSONs []string, policyManager interface{ CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error) }) error {
61 // Create a reader from the string slice
62 reader := strings.NewReader(strings.Join(eventJSONs, "\n"))
63 return d.processJSONLEventsWithPolicy(ctx, reader, policyManager)
64 }
65
66 // processJSONLEvents processes JSONL events from a reader
67 func (d *D) processJSONLEvents(ctx context.Context, rr io.Reader) error {
68 return d.processJSONLEventsWithPolicy(ctx, rr, nil)
69 }
70
71 // processJSONLEventsWithPolicy processes JSONL events from a reader with optional policy filtering
72 func (d *D) processJSONLEventsWithPolicy(ctx context.Context, rr io.Reader, policyManager interface{ CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error) }) error {
73 // Create a scanner to read the buffer line by line
74 scan := bufio.NewScanner(rr)
75 scanBuf := make([]byte, maxLen)
76 scan.Buffer(scanBuf, maxLen)
77
78 // Performance tracking
79 startTime := time.Now()
80 lastLogTime := startTime
81 const logInterval = 5 * time.Second
82
83 var count, total, skipped, policyRejected, unmarshalErrors, saveErrors int
84 for scan.Scan() {
85 select {
86 case <-ctx.Done():
87 log.I.F("import: context closed after %d events", count)
88 return ctx.Err()
89 default:
90 }
91
92 b := scan.Bytes()
93 total += len(b) + 1
94 if len(b) < 1 {
95 skipped++
96 continue
97 }
98
99 ev := event.New()
100 if _, err := ev.Unmarshal(b); err != nil {
101 // return the pooled buffer on error
102 ev.Free()
103 unmarshalErrors++
104 log.W.F("failed to unmarshal event: %v", err)
105 continue
106 }
107
108 // Apply policy checking if policy manager is provided
109 if policyManager != nil {
110 // For sync imports, we treat events as coming from system/trusted source
111 // Use nil pubkey and empty remote to indicate system-level import
112 allowed, policyErr := policyManager.CheckPolicy("write", ev, nil, "")
113 if policyErr != nil {
114 log.W.F("policy check failed for event %x: %v", ev.ID, policyErr)
115 ev.Free()
116 policyRejected++
117 continue
118 }
119 if !allowed {
120 log.D.F("policy rejected event %x during sync import", ev.ID)
121 ev.Free()
122 policyRejected++
123 continue
124 }
125 log.D.F("policy allowed event %x during sync import", ev.ID)
126 }
127
128 // Apply rate limiting before write operation if limiter is configured
129 if d.rateLimiter != nil && d.rateLimiter.IsEnabled() {
130 d.rateLimiter.Wait(ctx, WriteOpType)
131 }
132
133 if _, err := d.SaveEvent(ctx, ev); err != nil {
134 // return the pooled buffer on error paths too
135 ev.Free()
136 saveErrors++
137 log.W.F("failed to save event: %v", err)
138 continue
139 }
140
141 // return the pooled buffer after successful save
142 ev.Free()
143 b = nil
144 count++
145
146 // Progress logging every logInterval
147 if time.Since(lastLogTime) >= logInterval {
148 elapsed := time.Since(startTime)
149 eventsPerSec := float64(count) / elapsed.Seconds()
150 mbPerSec := float64(total) / elapsed.Seconds() / 1024 / 1024
151 log.I.F("import: progress %d events saved, %.2f MB read, %.0f events/sec, %.2f MB/sec",
152 count, float64(total)/1024/1024, eventsPerSec, mbPerSec)
153 lastLogTime = time.Now()
154 debug.FreeOSMemory()
155 }
156 }
157
158 // Final summary
159 elapsed := time.Since(startTime)
160 eventsPerSec := float64(count) / elapsed.Seconds()
161 mbPerSec := float64(total) / elapsed.Seconds() / 1024 / 1024
162 log.I.F("import: completed - %d events saved, %.2f MB in %v (%.0f events/sec, %.2f MB/sec)",
163 count, float64(total)/1024/1024, elapsed.Round(time.Millisecond), eventsPerSec, mbPerSec)
164 if unmarshalErrors > 0 || saveErrors > 0 || policyRejected > 0 || skipped > 0 {
165 log.I.F("import: stats - %d unmarshal errors, %d save errors, %d policy rejected, %d skipped empty lines",
166 unmarshalErrors, saveErrors, policyRejected, skipped)
167 }
168
169 if err := scan.Err(); err != nil {
170 return err
171 }
172
173 return nil
174 }
175