import-export.go raw
1 //go:build js && wasm
2
3 package wasmdb
4
5 import (
6 "bufio"
7 "bytes"
8 "context"
9 "encoding/json"
10 "io"
11
12 "github.com/aperturerobotics/go-indexeddb/idb"
13 "next.orly.dev/pkg/lol/chk"
14
15 "next.orly.dev/pkg/nostr/encoders/event"
16 "next.orly.dev/pkg/nostr/encoders/filter"
17 "next.orly.dev/pkg/nostr/encoders/tag"
18 "next.orly.dev/pkg/database"
19 "next.orly.dev/pkg/database/indexes"
20 "next.orly.dev/pkg/database/indexes/types"
21 )
22
23 // Import reads events from a JSONL reader and imports them into the database
24 func (w *W) Import(rr io.Reader) {
25 ctx := context.Background()
26 scanner := bufio.NewScanner(rr)
27 // Increase buffer size for large events
28 buf := make([]byte, 1024*1024) // 1MB buffer
29 scanner.Buffer(buf, len(buf))
30
31 imported := 0
32 for scanner.Scan() {
33 line := scanner.Bytes()
34 if len(line) == 0 {
35 continue
36 }
37
38 ev := event.New()
39 if err := json.Unmarshal(line, ev); err != nil {
40 w.Logger.Warnf("Import: failed to unmarshal event: %v", err)
41 continue
42 }
43
44 if _, err := w.SaveEvent(ctx, ev); err != nil {
45 w.Logger.Debugf("Import: failed to save event: %v", err)
46 continue
47 }
48 imported++
49 }
50
51 if err := scanner.Err(); err != nil {
52 w.Logger.Errorf("Import: scanner error: %v", err)
53 }
54
55 w.Logger.Infof("Import: imported %d events", imported)
56 }
57
58 // Export writes events to a JSONL writer, optionally filtered by pubkeys
59 func (w *W) Export(c context.Context, wr io.Writer, pubkeys ...[]byte) {
60 var evs event.S
61 var err error
62
63 // Query events
64 if len(pubkeys) > 0 {
65 // Export only events from specified pubkeys
66 for _, pk := range pubkeys {
67 // Get all serials for this pubkey
68 serials, err := w.GetSerialsByPubkey(pk)
69 if err != nil {
70 w.Logger.Warnf("Export: failed to get serials for pubkey: %v", err)
71 continue
72 }
73
74 for _, ser := range serials {
75 ev, err := w.FetchEventBySerial(ser)
76 if err != nil || ev == nil {
77 continue
78 }
79 evs = append(evs, ev)
80 }
81 }
82 } else {
83 // Export all events
84 evs, err = w.getAllEvents(c)
85 if err != nil {
86 w.Logger.Errorf("Export: failed to get all events: %v", err)
87 return
88 }
89 }
90
91 // Write events as JSONL
92 exported := 0
93 for _, ev := range evs {
94 data, err := json.Marshal(ev)
95 if err != nil {
96 w.Logger.Warnf("Export: failed to marshal event: %v", err)
97 continue
98 }
99 wr.Write(data)
100 wr.Write([]byte("\n"))
101 exported++
102 }
103
104 w.Logger.Infof("Export: exported %d events", exported)
105 }
106
107 // ImportEventsFromReader imports events from a JSONL reader with context support
108 func (w *W) ImportEventsFromReader(ctx context.Context, rr io.Reader) error {
109 scanner := bufio.NewScanner(rr)
110 buf := make([]byte, 1024*1024)
111 scanner.Buffer(buf, len(buf))
112
113 imported := 0
114 for scanner.Scan() {
115 select {
116 case <-ctx.Done():
117 w.Logger.Infof("ImportEventsFromReader: cancelled after %d events", imported)
118 return ctx.Err()
119 default:
120 }
121
122 line := scanner.Bytes()
123 if len(line) == 0 {
124 continue
125 }
126
127 ev := event.New()
128 if err := json.Unmarshal(line, ev); err != nil {
129 w.Logger.Warnf("ImportEventsFromReader: failed to unmarshal: %v", err)
130 continue
131 }
132
133 if _, err := w.SaveEvent(ctx, ev); err != nil {
134 w.Logger.Debugf("ImportEventsFromReader: failed to save: %v", err)
135 continue
136 }
137 imported++
138 }
139
140 if err := scanner.Err(); err != nil {
141 return err
142 }
143
144 w.Logger.Infof("ImportEventsFromReader: imported %d events", imported)
145 return nil
146 }
147
148 // ImportEventsFromStrings imports events from JSON strings with policy checking
149 func (w *W) ImportEventsFromStrings(
150 ctx context.Context,
151 eventJSONs []string,
152 policyManager interface {
153 CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error)
154 },
155 ) error {
156 imported := 0
157
158 for _, jsonStr := range eventJSONs {
159 select {
160 case <-ctx.Done():
161 w.Logger.Infof("ImportEventsFromStrings: cancelled after %d events", imported)
162 return ctx.Err()
163 default:
164 }
165
166 ev := event.New()
167 if err := json.Unmarshal([]byte(jsonStr), ev); err != nil {
168 w.Logger.Warnf("ImportEventsFromStrings: failed to unmarshal: %v", err)
169 continue
170 }
171
172 // Check policy if manager is provided
173 if policyManager != nil {
174 allowed, err := policyManager.CheckPolicy("write", ev, ev.Pubkey, "import")
175 if err != nil || !allowed {
176 w.Logger.Debugf("ImportEventsFromStrings: policy rejected event")
177 continue
178 }
179 }
180
181 if _, err := w.SaveEvent(ctx, ev); err != nil {
182 w.Logger.Debugf("ImportEventsFromStrings: failed to save: %v", err)
183 continue
184 }
185 imported++
186 }
187
188 w.Logger.Infof("ImportEventsFromStrings: imported %d events", imported)
189 return nil
190 }
191
192 // GetSerialsByPubkey returns all event serials for a given pubkey
193 func (w *W) GetSerialsByPubkey(pubkey []byte) ([]*types.Uint40, error) {
194 // Build range for pubkey index
195 idx, err := database.GetIndexesFromFilter(&filter.F{
196 Authors: tag.NewFromBytesSlice(pubkey),
197 })
198 if chk.E(err) {
199 return nil, err
200 }
201
202 var serials []*types.Uint40
203 for _, r := range idx {
204 sers, err := w.GetSerialsByRange(r)
205 if err != nil {
206 continue
207 }
208 serials = append(serials, sers...)
209 }
210
211 return serials, nil
212 }
213
214 // getAllEvents retrieves all events from the database
215 func (w *W) getAllEvents(c context.Context) (event.S, error) {
216 // Scan through the small event store and large event store
217 var events event.S
218
219 // Get events from small event store
220 sevEvents, err := w.scanEventStore(string(indexes.SmallEventPrefix), true)
221 if err == nil {
222 events = append(events, sevEvents...)
223 }
224
225 // Get events from large event store
226 evtEvents, err := w.scanEventStore(string(indexes.EventPrefix), false)
227 if err == nil {
228 events = append(events, evtEvents...)
229 }
230
231 return events, nil
232 }
233
234 // scanEventStore scans an event store and returns all events
235 func (w *W) scanEventStore(storeName string, isSmallEvent bool) (event.S, error) {
236 tx, err := w.db.Transaction(idb.TransactionReadOnly, storeName)
237 if err != nil {
238 return nil, err
239 }
240
241 store, err := tx.ObjectStore(storeName)
242 if err != nil {
243 return nil, err
244 }
245
246 var events event.S
247
248 cursorReq, err := store.OpenCursor(idb.CursorNext)
249 if err != nil {
250 return nil, err
251 }
252
253 err = cursorReq.Iter(w.ctx, func(cursor *idb.CursorWithValue) error {
254 var eventData []byte
255
256 if isSmallEvent {
257 // Small events: data is embedded in the key
258 keyVal, keyErr := cursor.Key()
259 if keyErr != nil {
260 return keyErr
261 }
262 keyBytes := safeValueToBytes(keyVal)
263 // Format: sev|serial|size_uint16|event_data
264 if len(keyBytes) > 10 { // 3 + 5 + 2 minimum
265 sizeOffset := 8 // 3 prefix + 5 serial
266 if len(keyBytes) > sizeOffset+2 {
267 size := int(keyBytes[sizeOffset])<<8 | int(keyBytes[sizeOffset+1])
268 if len(keyBytes) >= sizeOffset+2+size {
269 eventData = keyBytes[sizeOffset+2 : sizeOffset+2+size]
270 }
271 }
272 }
273 } else {
274 // Large events: data is in the value
275 val, valErr := cursor.Value()
276 if valErr != nil {
277 return valErr
278 }
279 eventData = safeValueToBytes(val)
280 }
281
282 if len(eventData) > 0 {
283 ev := event.New()
284 if err := ev.UnmarshalBinary(bytes.NewReader(eventData)); err == nil {
285 events = append(events, ev)
286 }
287 }
288
289 return cursor.Continue()
290 })
291
292 return events, err
293 }
294