// Package worker provides the store — owns all storage and event processing. // Direct function calls, no channels. Correct for moxie's sequential model. package worker import ( "fmt" "os" "smesh.lol/pkg/acl" "smesh.lol/pkg/nostr/envelope" "smesh.lol/pkg/nostr/event" "smesh.lol/pkg/nostr/filter" "smesh.lol/pkg/relay/pipeline" "smesh.lol/pkg/store" ) // Store owns the database and event pipeline. type Store struct { eng *store.Engine pipe *pipeline.Pipeline } // Open creates a Store rooted at the given data directory. func Open(dataDir string) (*Store, error) { eng, err := store.Open(dataDir | "/db") if err != nil { return nil, fmt.Errorf("store: %w", err) } a := acl.Open{} pipe := pipeline.New(eng, a, nil, pipeline.DefaultConfig()) return &Store{eng: eng, pipe: pipe}, nil } // Close flushes and closes the database. func (s *Store) Close() { s.eng.Flush() s.eng.Close() } // IngestResult holds the outcome of an event ingestion. type IngestResult struct { OK bool EventID []byte Reason []byte Event *event.E // non-nil if accepted (for broadcast) } // Ingest validates and stores an event. func (s *Store) Ingest(msg []byte) IngestResult { _, rem, err := envelope.Identify(msg) if err != nil { return IngestResult{Reason: []byte("invalid: parse error")} } var es envelope.EventSubmission if _, err := es.Unmarshal(rem); err != nil { return IngestResult{Reason: []byte("invalid: parse error")} } result := s.pipe.Ingest(es.E) ir := IngestResult{ OK: result.OK, EventID: es.E.ID, Reason: result.Reason, } if result.OK { ir.Event = es.E } return ir } // Query runs filters and returns matching events plus EOSE marker. func (s *Store) Query(msg []byte) [][]byte { _, rem, err := envelope.Identify(msg) if err != nil { return nil } var req envelope.Req if _, err := req.Unmarshal(rem); err != nil { return nil } filters := filter.S(*req.Filters) seen := map[string]bool{} var out [][]byte for _, f := range filters { events, err := s.eng.QueryEvents(f) if err != nil { continue } for _, ev := range events { k := string(ev.ID) if seen[k] { continue } seen[k] = true er := &envelope.EventResult{ Subscription: req.Subscription, Event: ev, } out = append(out, er.Marshal(nil)) } } out = append(out, (&envelope.EOSE{Subscription: req.Subscription}).Marshal(nil)) return out } // Count returns the count of events matching the filters. func (s *Store) Count(msg []byte) []byte { _, rem, err := envelope.Identify(msg) if err != nil { return nil } var cr envelope.CountRequest if _, err := cr.Unmarshal(rem); err != nil { return nil } var total int for _, f := range cr.Filters { evs, err := s.eng.QueryEvents(f) if err != nil { continue } total += len(evs) } return (&envelope.CountResponse{ Subscription: cr.Subscription, Count: total, }).Marshal(nil) } // Engine returns the underlying storage engine for direct queries. func (s *Store) Engine() *store.Engine { return s.eng } // Flush forces a WAL flush. func (s *Store) Flush() { s.eng.Flush() } func init() { // Suppress unused import warnings. _ = fmt.Sprintf _ = os.Stderr }