export.go raw
1 //go:build !(js && wasm)
2
3 package database
4
5 import (
6 "bytes"
7 "context"
8 "io"
9 "time"
10
11 "github.com/dgraph-io/badger/v4"
12 "next.orly.dev/pkg/lol/chk"
13 "next.orly.dev/pkg/lol/log"
14 "next.orly.dev/pkg/database/indexes"
15 "next.orly.dev/pkg/database/indexes/types"
16 "next.orly.dev/pkg/nostr/encoders/event"
17 "next.orly.dev/pkg/nostr/utils/units"
18 )
19
20 // Flusher interface for HTTP streaming
21 type flusher interface {
22 Flush()
23 }
24
25 // Export the complete database of stored events to an io.Writer in line structured minified
26 // JSON. Supports both legacy and compact event formats.
27 func (d *D) Export(c context.Context, w io.Writer, pubkeys ...[]byte) {
28 var err error
29 evB := make([]byte, 0, units.Mb)
30 evBuf := bytes.NewBuffer(evB)
31
32 // Get flusher for HTTP streaming if available
33 var f flusher
34 if fl, ok := w.(flusher); ok {
35 f = fl
36 }
37
38 // Performance tracking
39 startTime := time.Now()
40 var eventCount, bytesWritten int64
41 lastLogTime := startTime
42 const logInterval = 5 * time.Second
43 const flushInterval = 100 // Flush every N events
44
45 log.I.F("export: starting export operation")
46
47 // Create resolver for compact event decoding
48 resolver := NewDatabaseSerialResolver(d, d.serialCache)
49
50 // Helper function to unmarshal event data (handles both legacy and compact formats)
51 unmarshalEventData := func(val []byte, ser *types.Uint40) (*event.E, error) {
52 // Check if this is compact format (starts with version byte 1)
53 // Note: Legacy events whose ID starts with 0x01 will also match this check,
54 // so we fall back to legacy format if the SerialEventId mapping doesn't exist.
55 if len(val) > 0 && val[0] == CompactFormatVersion {
56 // Try to get event ID mapping - if it exists, this is truly compact format
57 eventId, idErr := d.GetEventIdBySerial(ser)
58 if idErr == nil {
59 // SerialEventId mapping exists - this is compact format
60 return UnmarshalCompactEvent(val, eventId, resolver)
61 }
62 // No SerialEventId mapping - this is likely a legacy event whose ID starts with 0x01
63 // Fall through to legacy unmarshal
64 }
65
66 // Legacy binary format
67 ev := event.New()
68 evBuf.Reset()
69 evBuf.Write(val)
70 if err := ev.UnmarshalBinary(evBuf); err != nil {
71 return nil, err
72 }
73 return ev, nil
74 }
75
76 if len(pubkeys) == 0 {
77 // Export all events - prefer cmp table, fall back to evt
78 if err = d.View(
79 func(txn *badger.Txn) (err error) {
80 // First try cmp (compact format) table
81 cmpBuf := new(bytes.Buffer)
82 if err = indexes.CompactEventEnc(nil).MarshalWrite(cmpBuf); chk.E(err) {
83 return
84 }
85
86 it := txn.NewIterator(badger.IteratorOptions{Prefix: cmpBuf.Bytes()})
87 defer it.Close()
88
89 seenSerials := make(map[uint64]bool)
90
91 for it.Rewind(); it.Valid(); it.Next() {
92 item := it.Item()
93 key := item.Key()
94
95 // Extract serial from key
96 ser := new(types.Uint40)
97 if err = ser.UnmarshalRead(bytes.NewReader(key[3:8])); chk.E(err) {
98 continue
99 }
100
101 seenSerials[ser.Get()] = true
102
103 var val []byte
104 if val, err = item.ValueCopy(nil); chk.E(err) {
105 continue
106 }
107
108 ev, unmarshalErr := unmarshalEventData(val, ser)
109 if unmarshalErr != nil {
110 continue
111 }
112
113 // Serialize the event to JSON and write it to the output
114 data := ev.Serialize()
115 if _, err = w.Write(data); chk.E(err) {
116 ev.Free()
117 return
118 }
119 if _, err = w.Write([]byte{'\n'}); chk.E(err) {
120 ev.Free()
121 return
122 }
123 bytesWritten += int64(len(data) + 1)
124 eventCount++
125 ev.Free()
126
127 // Flush periodically for HTTP streaming
128 if f != nil && eventCount%flushInterval == 0 {
129 f.Flush()
130 }
131
132 // Progress logging every logInterval
133 if time.Since(lastLogTime) >= logInterval {
134 elapsed := time.Since(startTime)
135 eventsPerSec := float64(eventCount) / elapsed.Seconds()
136 mbPerSec := float64(bytesWritten) / elapsed.Seconds() / 1024 / 1024
137 log.I.F("export: progress %d events, %.2f MB written, %.0f events/sec, %.2f MB/sec",
138 eventCount, float64(bytesWritten)/1024/1024, eventsPerSec, mbPerSec)
139 lastLogTime = time.Now()
140 }
141 }
142 it.Close()
143
144 // Then fall back to evt (legacy) table for any events not in cmp
145 evtBuf := new(bytes.Buffer)
146 if err = indexes.EventEnc(nil).MarshalWrite(evtBuf); chk.E(err) {
147 return
148 }
149
150 it2 := txn.NewIterator(badger.IteratorOptions{Prefix: evtBuf.Bytes()})
151 defer it2.Close()
152
153 for it2.Rewind(); it2.Valid(); it2.Next() {
154 item := it2.Item()
155 key := item.Key()
156
157 // Extract serial from key
158 ser := new(types.Uint40)
159 if err = ser.UnmarshalRead(bytes.NewReader(key[3:8])); chk.E(err) {
160 continue
161 }
162
163 // Skip if already exported from cmp table
164 if seenSerials[ser.Get()] {
165 continue
166 }
167
168 var val []byte
169 if val, err = item.ValueCopy(nil); chk.E(err) {
170 continue
171 }
172
173 ev, unmarshalErr := unmarshalEventData(val, ser)
174 if unmarshalErr != nil {
175 continue
176 }
177
178 // Serialize the event to JSON and write it to the output
179 data := ev.Serialize()
180 if _, err = w.Write(data); chk.E(err) {
181 ev.Free()
182 return
183 }
184 if _, err = w.Write([]byte{'\n'}); chk.E(err) {
185 ev.Free()
186 return
187 }
188 bytesWritten += int64(len(data) + 1)
189 eventCount++
190 ev.Free()
191
192 // Flush periodically for HTTP streaming
193 if f != nil && eventCount%flushInterval == 0 {
194 f.Flush()
195 }
196
197 // Progress logging every logInterval
198 if time.Since(lastLogTime) >= logInterval {
199 elapsed := time.Since(startTime)
200 eventsPerSec := float64(eventCount) / elapsed.Seconds()
201 mbPerSec := float64(bytesWritten) / elapsed.Seconds() / 1024 / 1024
202 log.I.F("export: progress %d events, %.2f MB written, %.0f events/sec, %.2f MB/sec",
203 eventCount, float64(bytesWritten)/1024/1024, eventsPerSec, mbPerSec)
204 lastLogTime = time.Now()
205 }
206 }
207
208 return
209 },
210 ); err != nil {
211 return
212 }
213
214 // Final flush
215 if f != nil {
216 f.Flush()
217 }
218
219 // Final export summary
220 elapsed := time.Since(startTime)
221 eventsPerSec := float64(eventCount) / elapsed.Seconds()
222 mbPerSec := float64(bytesWritten) / elapsed.Seconds() / 1024 / 1024
223 log.I.F("export: completed - %d events, %.2f MB in %v (%.0f events/sec, %.2f MB/sec)",
224 eventCount, float64(bytesWritten)/1024/1024, elapsed.Round(time.Millisecond), eventsPerSec, mbPerSec)
225 } else {
226 // Export events for specific pubkeys
227 log.I.F("export: exporting events for %d pubkeys", len(pubkeys))
228 for _, pubkey := range pubkeys {
229 if err = d.View(
230 func(txn *badger.Txn) (err error) {
231 pkBuf := new(bytes.Buffer)
232 ph := &types.PubHash{}
233 if err = ph.FromPubkey(pubkey); chk.E(err) {
234 return
235 }
236 if err = indexes.PubkeyEnc(
237 ph, nil, nil,
238 ).MarshalWrite(pkBuf); chk.E(err) {
239 return
240 }
241 it := txn.NewIterator(badger.IteratorOptions{Prefix: pkBuf.Bytes()})
242 defer it.Close()
243 for it.Rewind(); it.Valid(); it.Next() {
244 item := it.Item()
245 key := item.Key()
246
247 // Extract serial from pubkey index key
248 // Key format: pc-|pubkey_hash|created_at|serial
249 if len(key) < 3+8+8+5 {
250 continue
251 }
252 ser := new(types.Uint40)
253 if err = ser.UnmarshalRead(bytes.NewReader(key[len(key)-5:])); chk.E(err) {
254 continue
255 }
256
257 // Fetch the event using FetchEventBySerial which handles all formats
258 ev, fetchErr := d.FetchEventBySerial(ser)
259 if fetchErr != nil || ev == nil {
260 continue
261 }
262
263 // Serialize the event to JSON and write it to the output
264 data := ev.Serialize()
265 if _, err = w.Write(data); chk.E(err) {
266 ev.Free()
267 continue
268 }
269 if _, err = w.Write([]byte{'\n'}); chk.E(err) {
270 ev.Free()
271 continue
272 }
273 bytesWritten += int64(len(data) + 1)
274 eventCount++
275 ev.Free()
276
277 // Flush periodically for HTTP streaming
278 if f != nil && eventCount%flushInterval == 0 {
279 f.Flush()
280 }
281
282 // Progress logging every logInterval
283 if time.Since(lastLogTime) >= logInterval {
284 elapsed := time.Since(startTime)
285 eventsPerSec := float64(eventCount) / elapsed.Seconds()
286 mbPerSec := float64(bytesWritten) / elapsed.Seconds() / 1024 / 1024
287 log.I.F("export: progress %d events, %.2f MB written, %.0f events/sec, %.2f MB/sec",
288 eventCount, float64(bytesWritten)/1024/1024, eventsPerSec, mbPerSec)
289 lastLogTime = time.Now()
290 }
291 }
292 return
293 },
294 ); err != nil {
295 return
296 }
297 }
298
299 // Final flush
300 if f != nil {
301 f.Flush()
302 }
303
304 // Final export summary for pubkey export
305 elapsed := time.Since(startTime)
306 eventsPerSec := float64(eventCount) / elapsed.Seconds()
307 mbPerSec := float64(bytesWritten) / elapsed.Seconds() / 1024 / 1024
308 log.I.F("export: completed - %d events, %.2f MB in %v (%.0f events/sec, %.2f MB/sec)",
309 eventCount, float64(bytesWritten)/1024/1024, elapsed.Round(time.Millisecond), eventsPerSec, mbPerSec)
310 }
311 }
312