import-export.go raw

   1  //go:build js && wasm
   2  
   3  package wasmdb
   4  
   5  import (
   6  	"bufio"
   7  	"bytes"
   8  	"context"
   9  	"encoding/json"
  10  	"io"
  11  
  12  	"github.com/aperturerobotics/go-indexeddb/idb"
  13  	"next.orly.dev/pkg/lol/chk"
  14  
  15  	"next.orly.dev/pkg/nostr/encoders/event"
  16  	"next.orly.dev/pkg/nostr/encoders/filter"
  17  	"next.orly.dev/pkg/nostr/encoders/tag"
  18  	"next.orly.dev/pkg/database"
  19  	"next.orly.dev/pkg/database/indexes"
  20  	"next.orly.dev/pkg/database/indexes/types"
  21  )
  22  
  23  // Import reads events from a JSONL reader and imports them into the database
  24  func (w *W) Import(rr io.Reader) {
  25  	ctx := context.Background()
  26  	scanner := bufio.NewScanner(rr)
  27  	// Increase buffer size for large events
  28  	buf := make([]byte, 1024*1024) // 1MB buffer
  29  	scanner.Buffer(buf, len(buf))
  30  
  31  	imported := 0
  32  	for scanner.Scan() {
  33  		line := scanner.Bytes()
  34  		if len(line) == 0 {
  35  			continue
  36  		}
  37  
  38  		ev := event.New()
  39  		if err := json.Unmarshal(line, ev); err != nil {
  40  			w.Logger.Warnf("Import: failed to unmarshal event: %v", err)
  41  			continue
  42  		}
  43  
  44  		if _, err := w.SaveEvent(ctx, ev); err != nil {
  45  			w.Logger.Debugf("Import: failed to save event: %v", err)
  46  			continue
  47  		}
  48  		imported++
  49  	}
  50  
  51  	if err := scanner.Err(); err != nil {
  52  		w.Logger.Errorf("Import: scanner error: %v", err)
  53  	}
  54  
  55  	w.Logger.Infof("Import: imported %d events", imported)
  56  }
  57  
  58  // Export writes events to a JSONL writer, optionally filtered by pubkeys
  59  func (w *W) Export(c context.Context, wr io.Writer, pubkeys ...[]byte) {
  60  	var evs event.S
  61  	var err error
  62  
  63  	// Query events
  64  	if len(pubkeys) > 0 {
  65  		// Export only events from specified pubkeys
  66  		for _, pk := range pubkeys {
  67  			// Get all serials for this pubkey
  68  			serials, err := w.GetSerialsByPubkey(pk)
  69  			if err != nil {
  70  				w.Logger.Warnf("Export: failed to get serials for pubkey: %v", err)
  71  				continue
  72  			}
  73  
  74  			for _, ser := range serials {
  75  				ev, err := w.FetchEventBySerial(ser)
  76  				if err != nil || ev == nil {
  77  					continue
  78  				}
  79  				evs = append(evs, ev)
  80  			}
  81  		}
  82  	} else {
  83  		// Export all events
  84  		evs, err = w.getAllEvents(c)
  85  		if err != nil {
  86  			w.Logger.Errorf("Export: failed to get all events: %v", err)
  87  			return
  88  		}
  89  	}
  90  
  91  	// Write events as JSONL
  92  	exported := 0
  93  	for _, ev := range evs {
  94  		data, err := json.Marshal(ev)
  95  		if err != nil {
  96  			w.Logger.Warnf("Export: failed to marshal event: %v", err)
  97  			continue
  98  		}
  99  		wr.Write(data)
 100  		wr.Write([]byte("\n"))
 101  		exported++
 102  	}
 103  
 104  	w.Logger.Infof("Export: exported %d events", exported)
 105  }
 106  
 107  // ImportEventsFromReader imports events from a JSONL reader with context support
 108  func (w *W) ImportEventsFromReader(ctx context.Context, rr io.Reader) error {
 109  	scanner := bufio.NewScanner(rr)
 110  	buf := make([]byte, 1024*1024)
 111  	scanner.Buffer(buf, len(buf))
 112  
 113  	imported := 0
 114  	for scanner.Scan() {
 115  		select {
 116  		case <-ctx.Done():
 117  			w.Logger.Infof("ImportEventsFromReader: cancelled after %d events", imported)
 118  			return ctx.Err()
 119  		default:
 120  		}
 121  
 122  		line := scanner.Bytes()
 123  		if len(line) == 0 {
 124  			continue
 125  		}
 126  
 127  		ev := event.New()
 128  		if err := json.Unmarshal(line, ev); err != nil {
 129  			w.Logger.Warnf("ImportEventsFromReader: failed to unmarshal: %v", err)
 130  			continue
 131  		}
 132  
 133  		if _, err := w.SaveEvent(ctx, ev); err != nil {
 134  			w.Logger.Debugf("ImportEventsFromReader: failed to save: %v", err)
 135  			continue
 136  		}
 137  		imported++
 138  	}
 139  
 140  	if err := scanner.Err(); err != nil {
 141  		return err
 142  	}
 143  
 144  	w.Logger.Infof("ImportEventsFromReader: imported %d events", imported)
 145  	return nil
 146  }
 147  
 148  // ImportEventsFromStrings imports events from JSON strings with policy checking
 149  func (w *W) ImportEventsFromStrings(
 150  	ctx context.Context,
 151  	eventJSONs []string,
 152  	policyManager interface {
 153  		CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error)
 154  	},
 155  ) error {
 156  	imported := 0
 157  
 158  	for _, jsonStr := range eventJSONs {
 159  		select {
 160  		case <-ctx.Done():
 161  			w.Logger.Infof("ImportEventsFromStrings: cancelled after %d events", imported)
 162  			return ctx.Err()
 163  		default:
 164  		}
 165  
 166  		ev := event.New()
 167  		if err := json.Unmarshal([]byte(jsonStr), ev); err != nil {
 168  			w.Logger.Warnf("ImportEventsFromStrings: failed to unmarshal: %v", err)
 169  			continue
 170  		}
 171  
 172  		// Check policy if manager is provided
 173  		if policyManager != nil {
 174  			allowed, err := policyManager.CheckPolicy("write", ev, ev.Pubkey, "import")
 175  			if err != nil || !allowed {
 176  				w.Logger.Debugf("ImportEventsFromStrings: policy rejected event")
 177  				continue
 178  			}
 179  		}
 180  
 181  		if _, err := w.SaveEvent(ctx, ev); err != nil {
 182  			w.Logger.Debugf("ImportEventsFromStrings: failed to save: %v", err)
 183  			continue
 184  		}
 185  		imported++
 186  	}
 187  
 188  	w.Logger.Infof("ImportEventsFromStrings: imported %d events", imported)
 189  	return nil
 190  }
 191  
 192  // GetSerialsByPubkey returns all event serials for a given pubkey
 193  func (w *W) GetSerialsByPubkey(pubkey []byte) ([]*types.Uint40, error) {
 194  	// Build range for pubkey index
 195  	idx, err := database.GetIndexesFromFilter(&filter.F{
 196  		Authors: tag.NewFromBytesSlice(pubkey),
 197  	})
 198  	if chk.E(err) {
 199  		return nil, err
 200  	}
 201  
 202  	var serials []*types.Uint40
 203  	for _, r := range idx {
 204  		sers, err := w.GetSerialsByRange(r)
 205  		if err != nil {
 206  			continue
 207  		}
 208  		serials = append(serials, sers...)
 209  	}
 210  
 211  	return serials, nil
 212  }
 213  
 214  // getAllEvents retrieves all events from the database
 215  func (w *W) getAllEvents(c context.Context) (event.S, error) {
 216  	// Scan through the small event store and large event store
 217  	var events event.S
 218  
 219  	// Get events from small event store
 220  	sevEvents, err := w.scanEventStore(string(indexes.SmallEventPrefix), true)
 221  	if err == nil {
 222  		events = append(events, sevEvents...)
 223  	}
 224  
 225  	// Get events from large event store
 226  	evtEvents, err := w.scanEventStore(string(indexes.EventPrefix), false)
 227  	if err == nil {
 228  		events = append(events, evtEvents...)
 229  	}
 230  
 231  	return events, nil
 232  }
 233  
 234  // scanEventStore scans an event store and returns all events
 235  func (w *W) scanEventStore(storeName string, isSmallEvent bool) (event.S, error) {
 236  	tx, err := w.db.Transaction(idb.TransactionReadOnly, storeName)
 237  	if err != nil {
 238  		return nil, err
 239  	}
 240  
 241  	store, err := tx.ObjectStore(storeName)
 242  	if err != nil {
 243  		return nil, err
 244  	}
 245  
 246  	var events event.S
 247  
 248  	cursorReq, err := store.OpenCursor(idb.CursorNext)
 249  	if err != nil {
 250  		return nil, err
 251  	}
 252  
 253  	err = cursorReq.Iter(w.ctx, func(cursor *idb.CursorWithValue) error {
 254  		var eventData []byte
 255  
 256  		if isSmallEvent {
 257  			// Small events: data is embedded in the key
 258  			keyVal, keyErr := cursor.Key()
 259  			if keyErr != nil {
 260  				return keyErr
 261  			}
 262  			keyBytes := safeValueToBytes(keyVal)
 263  			// Format: sev|serial|size_uint16|event_data
 264  			if len(keyBytes) > 10 { // 3 + 5 + 2 minimum
 265  				sizeOffset := 8 // 3 prefix + 5 serial
 266  				if len(keyBytes) > sizeOffset+2 {
 267  					size := int(keyBytes[sizeOffset])<<8 | int(keyBytes[sizeOffset+1])
 268  					if len(keyBytes) >= sizeOffset+2+size {
 269  						eventData = keyBytes[sizeOffset+2 : sizeOffset+2+size]
 270  					}
 271  				}
 272  			}
 273  		} else {
 274  			// Large events: data is in the value
 275  			val, valErr := cursor.Value()
 276  			if valErr != nil {
 277  				return valErr
 278  			}
 279  			eventData = safeValueToBytes(val)
 280  		}
 281  
 282  		if len(eventData) > 0 {
 283  			ev := event.New()
 284  			if err := ev.UnmarshalBinary(bytes.NewReader(eventData)); err == nil {
 285  				events = append(events, ev)
 286  			}
 287  		}
 288  
 289  		return cursor.Continue()
 290  	})
 291  
 292  	return events, err
 293  }
 294