worker.mx raw
1 // Package worker provides the store — owns all storage and event processing.
2 // Direct function calls, no channels. Correct for moxie's sequential model.
3 package worker
4
5 import (
6 "fmt"
7 "os"
8
9 "smesh.lol/pkg/acl"
10 "smesh.lol/pkg/nostr/envelope"
11 "smesh.lol/pkg/nostr/event"
12 "smesh.lol/pkg/nostr/filter"
13 "smesh.lol/pkg/relay/pipeline"
14 "smesh.lol/pkg/store"
15 )
16
17 // Store owns the database and event pipeline.
18 type Store struct {
19 eng *store.Engine
20 pipe *pipeline.Pipeline
21 }
22
23 // Open creates a Store rooted at the given data directory.
24 func Open(dataDir string) (*Store, error) {
25 eng, err := store.Open(dataDir | "/db")
26 if err != nil {
27 return nil, fmt.Errorf("store: %w", err)
28 }
29 a := acl.Open{}
30 pipe := pipeline.New(eng, a, nil, pipeline.DefaultConfig())
31 return &Store{eng: eng, pipe: pipe}, nil
32 }
33
34 // Close flushes and closes the database.
35 func (s *Store) Close() {
36 s.eng.Flush()
37 s.eng.Close()
38 }
39
40 // IngestResult holds the outcome of an event ingestion.
41 type IngestResult struct {
42 OK bool
43 EventID []byte
44 Reason []byte
45 Event *event.E // non-nil if accepted (for broadcast)
46 }
47
48 // Ingest validates and stores an event.
49 func (s *Store) Ingest(msg []byte) IngestResult {
50 _, rem, err := envelope.Identify(msg)
51 if err != nil {
52 return IngestResult{Reason: []byte("invalid: parse error")}
53 }
54 var es envelope.EventSubmission
55 if _, err := es.Unmarshal(rem); err != nil {
56 return IngestResult{Reason: []byte("invalid: parse error")}
57 }
58
59 result := s.pipe.Ingest(es.E)
60 ir := IngestResult{
61 OK: result.OK,
62 EventID: es.E.ID,
63 Reason: result.Reason,
64 }
65 if result.OK {
66 ir.Event = es.E
67 }
68 return ir
69 }
70
71 // Query runs filters and returns matching events plus EOSE marker.
72 func (s *Store) Query(msg []byte) [][]byte {
73 _, rem, err := envelope.Identify(msg)
74 if err != nil {
75 return nil
76 }
77 var req envelope.Req
78 if _, err := req.Unmarshal(rem); err != nil {
79 return nil
80 }
81
82 filters := filter.S(*req.Filters)
83 seen := map[string]bool{}
84 var out [][]byte
85 for _, f := range filters {
86 events, err := s.eng.QueryEvents(f)
87 if err != nil {
88 continue
89 }
90 for _, ev := range events {
91 k := string(ev.ID)
92 if seen[k] {
93 continue
94 }
95 seen[k] = true
96 er := &envelope.EventResult{
97 Subscription: req.Subscription,
98 Event: ev,
99 }
100 out = append(out, er.Marshal(nil))
101 }
102 }
103 out = append(out, (&envelope.EOSE{Subscription: req.Subscription}).Marshal(nil))
104 return out
105 }
106
107 // Count returns the count of events matching the filters.
108 func (s *Store) Count(msg []byte) []byte {
109 _, rem, err := envelope.Identify(msg)
110 if err != nil {
111 return nil
112 }
113 var cr envelope.CountRequest
114 if _, err := cr.Unmarshal(rem); err != nil {
115 return nil
116 }
117 var total int
118 for _, f := range cr.Filters {
119 evs, err := s.eng.QueryEvents(f)
120 if err != nil {
121 continue
122 }
123 total += len(evs)
124 }
125 return (&envelope.CountResponse{
126 Subscription: cr.Subscription,
127 Count: total,
128 }).Marshal(nil)
129 }
130
131 // Engine returns the underlying storage engine for direct queries.
132 func (s *Store) Engine() *store.Engine { return s.eng }
133
134 // Flush forces a WAL flush.
135 func (s *Store) Flush() {
136 s.eng.Flush()
137 }
138
139 func init() {
140 // Suppress unused import warnings.
141 _ = fmt.Sprintf
142 _ = os.Stderr
143 }
144