export.go raw

   1  //go:build !(js && wasm)
   2  
   3  package database
   4  
   5  import (
   6  	"bytes"
   7  	"context"
   8  	"io"
   9  	"time"
  10  
  11  	"github.com/dgraph-io/badger/v4"
  12  	"next.orly.dev/pkg/lol/chk"
  13  	"next.orly.dev/pkg/lol/log"
  14  	"next.orly.dev/pkg/database/indexes"
  15  	"next.orly.dev/pkg/database/indexes/types"
  16  	"next.orly.dev/pkg/nostr/encoders/event"
  17  	"next.orly.dev/pkg/nostr/utils/units"
  18  )
  19  
  20  // Flusher interface for HTTP streaming
  21  type flusher interface {
  22  	Flush()
  23  }
  24  
  25  // Export the complete database of stored events to an io.Writer in line structured minified
  26  // JSON. Supports both legacy and compact event formats.
  27  func (d *D) Export(c context.Context, w io.Writer, pubkeys ...[]byte) {
  28  	var err error
  29  	evB := make([]byte, 0, units.Mb)
  30  	evBuf := bytes.NewBuffer(evB)
  31  
  32  	// Get flusher for HTTP streaming if available
  33  	var f flusher
  34  	if fl, ok := w.(flusher); ok {
  35  		f = fl
  36  	}
  37  
  38  	// Performance tracking
  39  	startTime := time.Now()
  40  	var eventCount, bytesWritten int64
  41  	lastLogTime := startTime
  42  	const logInterval = 5 * time.Second
  43  	const flushInterval = 100 // Flush every N events
  44  
  45  	log.I.F("export: starting export operation")
  46  
  47  	// Create resolver for compact event decoding
  48  	resolver := NewDatabaseSerialResolver(d, d.serialCache)
  49  
  50  	// Helper function to unmarshal event data (handles both legacy and compact formats)
  51  	unmarshalEventData := func(val []byte, ser *types.Uint40) (*event.E, error) {
  52  		// Check if this is compact format (starts with version byte 1)
  53  		// Note: Legacy events whose ID starts with 0x01 will also match this check,
  54  		// so we fall back to legacy format if the SerialEventId mapping doesn't exist.
  55  		if len(val) > 0 && val[0] == CompactFormatVersion {
  56  			// Try to get event ID mapping - if it exists, this is truly compact format
  57  			eventId, idErr := d.GetEventIdBySerial(ser)
  58  			if idErr == nil {
  59  				// SerialEventId mapping exists - this is compact format
  60  				return UnmarshalCompactEvent(val, eventId, resolver)
  61  			}
  62  			// No SerialEventId mapping - this is likely a legacy event whose ID starts with 0x01
  63  			// Fall through to legacy unmarshal
  64  		}
  65  
  66  		// Legacy binary format
  67  		ev := event.New()
  68  		evBuf.Reset()
  69  		evBuf.Write(val)
  70  		if err := ev.UnmarshalBinary(evBuf); err != nil {
  71  			return nil, err
  72  		}
  73  		return ev, nil
  74  	}
  75  
  76  	if len(pubkeys) == 0 {
  77  		// Export all events - prefer cmp table, fall back to evt
  78  		if err = d.View(
  79  			func(txn *badger.Txn) (err error) {
  80  				// First try cmp (compact format) table
  81  				cmpBuf := new(bytes.Buffer)
  82  				if err = indexes.CompactEventEnc(nil).MarshalWrite(cmpBuf); chk.E(err) {
  83  					return
  84  				}
  85  
  86  				it := txn.NewIterator(badger.IteratorOptions{Prefix: cmpBuf.Bytes()})
  87  				defer it.Close()
  88  
  89  				seenSerials := make(map[uint64]bool)
  90  
  91  				for it.Rewind(); it.Valid(); it.Next() {
  92  					item := it.Item()
  93  					key := item.Key()
  94  
  95  					// Extract serial from key
  96  					ser := new(types.Uint40)
  97  					if err = ser.UnmarshalRead(bytes.NewReader(key[3:8])); chk.E(err) {
  98  						continue
  99  					}
 100  
 101  					seenSerials[ser.Get()] = true
 102  
 103  					var val []byte
 104  					if val, err = item.ValueCopy(nil); chk.E(err) {
 105  						continue
 106  					}
 107  
 108  					ev, unmarshalErr := unmarshalEventData(val, ser)
 109  					if unmarshalErr != nil {
 110  						continue
 111  					}
 112  
 113  					// Serialize the event to JSON and write it to the output
 114  					data := ev.Serialize()
 115  					if _, err = w.Write(data); chk.E(err) {
 116  						ev.Free()
 117  						return
 118  					}
 119  					if _, err = w.Write([]byte{'\n'}); chk.E(err) {
 120  						ev.Free()
 121  						return
 122  					}
 123  					bytesWritten += int64(len(data) + 1)
 124  					eventCount++
 125  					ev.Free()
 126  
 127  					// Flush periodically for HTTP streaming
 128  					if f != nil && eventCount%flushInterval == 0 {
 129  						f.Flush()
 130  					}
 131  
 132  					// Progress logging every logInterval
 133  					if time.Since(lastLogTime) >= logInterval {
 134  						elapsed := time.Since(startTime)
 135  						eventsPerSec := float64(eventCount) / elapsed.Seconds()
 136  						mbPerSec := float64(bytesWritten) / elapsed.Seconds() / 1024 / 1024
 137  						log.I.F("export: progress %d events, %.2f MB written, %.0f events/sec, %.2f MB/sec",
 138  							eventCount, float64(bytesWritten)/1024/1024, eventsPerSec, mbPerSec)
 139  						lastLogTime = time.Now()
 140  					}
 141  				}
 142  				it.Close()
 143  
 144  				// Then fall back to evt (legacy) table for any events not in cmp
 145  				evtBuf := new(bytes.Buffer)
 146  				if err = indexes.EventEnc(nil).MarshalWrite(evtBuf); chk.E(err) {
 147  					return
 148  				}
 149  
 150  				it2 := txn.NewIterator(badger.IteratorOptions{Prefix: evtBuf.Bytes()})
 151  				defer it2.Close()
 152  
 153  				for it2.Rewind(); it2.Valid(); it2.Next() {
 154  					item := it2.Item()
 155  					key := item.Key()
 156  
 157  					// Extract serial from key
 158  					ser := new(types.Uint40)
 159  					if err = ser.UnmarshalRead(bytes.NewReader(key[3:8])); chk.E(err) {
 160  						continue
 161  					}
 162  
 163  					// Skip if already exported from cmp table
 164  					if seenSerials[ser.Get()] {
 165  						continue
 166  					}
 167  
 168  					var val []byte
 169  					if val, err = item.ValueCopy(nil); chk.E(err) {
 170  						continue
 171  					}
 172  
 173  					ev, unmarshalErr := unmarshalEventData(val, ser)
 174  					if unmarshalErr != nil {
 175  						continue
 176  					}
 177  
 178  					// Serialize the event to JSON and write it to the output
 179  					data := ev.Serialize()
 180  					if _, err = w.Write(data); chk.E(err) {
 181  						ev.Free()
 182  						return
 183  					}
 184  					if _, err = w.Write([]byte{'\n'}); chk.E(err) {
 185  						ev.Free()
 186  						return
 187  					}
 188  					bytesWritten += int64(len(data) + 1)
 189  					eventCount++
 190  					ev.Free()
 191  
 192  					// Flush periodically for HTTP streaming
 193  					if f != nil && eventCount%flushInterval == 0 {
 194  						f.Flush()
 195  					}
 196  
 197  					// Progress logging every logInterval
 198  					if time.Since(lastLogTime) >= logInterval {
 199  						elapsed := time.Since(startTime)
 200  						eventsPerSec := float64(eventCount) / elapsed.Seconds()
 201  						mbPerSec := float64(bytesWritten) / elapsed.Seconds() / 1024 / 1024
 202  						log.I.F("export: progress %d events, %.2f MB written, %.0f events/sec, %.2f MB/sec",
 203  							eventCount, float64(bytesWritten)/1024/1024, eventsPerSec, mbPerSec)
 204  						lastLogTime = time.Now()
 205  					}
 206  				}
 207  
 208  				return
 209  			},
 210  		); err != nil {
 211  			return
 212  		}
 213  
 214  		// Final flush
 215  		if f != nil {
 216  			f.Flush()
 217  		}
 218  
 219  		// Final export summary
 220  		elapsed := time.Since(startTime)
 221  		eventsPerSec := float64(eventCount) / elapsed.Seconds()
 222  		mbPerSec := float64(bytesWritten) / elapsed.Seconds() / 1024 / 1024
 223  		log.I.F("export: completed - %d events, %.2f MB in %v (%.0f events/sec, %.2f MB/sec)",
 224  			eventCount, float64(bytesWritten)/1024/1024, elapsed.Round(time.Millisecond), eventsPerSec, mbPerSec)
 225  	} else {
 226  		// Export events for specific pubkeys
 227  		log.I.F("export: exporting events for %d pubkeys", len(pubkeys))
 228  		for _, pubkey := range pubkeys {
 229  			if err = d.View(
 230  				func(txn *badger.Txn) (err error) {
 231  					pkBuf := new(bytes.Buffer)
 232  					ph := &types.PubHash{}
 233  					if err = ph.FromPubkey(pubkey); chk.E(err) {
 234  						return
 235  					}
 236  					if err = indexes.PubkeyEnc(
 237  						ph, nil, nil,
 238  					).MarshalWrite(pkBuf); chk.E(err) {
 239  						return
 240  					}
 241  					it := txn.NewIterator(badger.IteratorOptions{Prefix: pkBuf.Bytes()})
 242  					defer it.Close()
 243  					for it.Rewind(); it.Valid(); it.Next() {
 244  						item := it.Item()
 245  						key := item.Key()
 246  
 247  						// Extract serial from pubkey index key
 248  						// Key format: pc-|pubkey_hash|created_at|serial
 249  						if len(key) < 3+8+8+5 {
 250  							continue
 251  						}
 252  						ser := new(types.Uint40)
 253  						if err = ser.UnmarshalRead(bytes.NewReader(key[len(key)-5:])); chk.E(err) {
 254  							continue
 255  						}
 256  
 257  						// Fetch the event using FetchEventBySerial which handles all formats
 258  						ev, fetchErr := d.FetchEventBySerial(ser)
 259  						if fetchErr != nil || ev == nil {
 260  							continue
 261  						}
 262  
 263  						// Serialize the event to JSON and write it to the output
 264  						data := ev.Serialize()
 265  						if _, err = w.Write(data); chk.E(err) {
 266  							ev.Free()
 267  							continue
 268  						}
 269  						if _, err = w.Write([]byte{'\n'}); chk.E(err) {
 270  							ev.Free()
 271  							continue
 272  						}
 273  						bytesWritten += int64(len(data) + 1)
 274  						eventCount++
 275  						ev.Free()
 276  
 277  						// Flush periodically for HTTP streaming
 278  						if f != nil && eventCount%flushInterval == 0 {
 279  							f.Flush()
 280  						}
 281  
 282  						// Progress logging every logInterval
 283  						if time.Since(lastLogTime) >= logInterval {
 284  							elapsed := time.Since(startTime)
 285  							eventsPerSec := float64(eventCount) / elapsed.Seconds()
 286  							mbPerSec := float64(bytesWritten) / elapsed.Seconds() / 1024 / 1024
 287  							log.I.F("export: progress %d events, %.2f MB written, %.0f events/sec, %.2f MB/sec",
 288  								eventCount, float64(bytesWritten)/1024/1024, eventsPerSec, mbPerSec)
 289  							lastLogTime = time.Now()
 290  						}
 291  					}
 292  					return
 293  				},
 294  			); err != nil {
 295  				return
 296  			}
 297  		}
 298  
 299  		// Final flush
 300  		if f != nil {
 301  			f.Flush()
 302  		}
 303  
 304  		// Final export summary for pubkey export
 305  		elapsed := time.Since(startTime)
 306  		eventsPerSec := float64(eventCount) / elapsed.Seconds()
 307  		mbPerSec := float64(bytesWritten) / elapsed.Seconds() / 1024 / 1024
 308  		log.I.F("export: completed - %d events, %.2f MB in %v (%.0f events/sec, %.2f MB/sec)",
 309  			eventCount, float64(bytesWritten)/1024/1024, elapsed.Round(time.Millisecond), eventsPerSec, mbPerSec)
 310  	}
 311  }
 312