embedded.go raw
1 package negentropy
2
3 import (
4 "context"
5 "encoding/hex"
6 "fmt"
7
8 "next.orly.dev/pkg/lol/log"
9
10 "next.orly.dev/pkg/nostr/encoders/filter"
11 "next.orly.dev/pkg/nostr/encoders/kind"
12 "next.orly.dev/pkg/nostr/encoders/tag"
13 "next.orly.dev/pkg/nostr/encoders/timestamp"
14 negentropylib "next.orly.dev/pkg/nostr/negentropy"
15 "next.orly.dev/pkg/database"
16 negentropyiface "next.orly.dev/pkg/interfaces/negentropy"
17 commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
18 )
19
20 // EmbeddedHandler wraps the negentropy Manager to implement the Handler interface.
21 // This allows negentropy to run embedded in monolithic mode without gRPC.
22 type EmbeddedHandler struct {
23 mgr *Manager
24 db database.Database
25 ready chan struct{}
26 }
27
28 // NewEmbeddedHandler creates a new embedded negentropy handler.
29 func NewEmbeddedHandler(db database.Database, cfg *Config) *EmbeddedHandler {
30 h := &EmbeddedHandler{
31 mgr: NewManager(db, cfg),
32 db: db,
33 ready: make(chan struct{}),
34 }
35 close(h.ready) // Immediately ready in embedded mode
36 return h
37 }
38
39 // Start starts the background sync loop.
40 func (h *EmbeddedHandler) Start() {
41 h.mgr.Start()
42 }
43
44 // Stop stops the background sync.
45 func (h *EmbeddedHandler) Stop() {
46 h.mgr.Stop()
47 }
48
49 // Ready returns a channel that closes when the handler is ready.
50 func (h *EmbeddedHandler) Ready() <-chan struct{} {
51 return h.ready
52 }
53
54 // Close cleans up resources.
55 func (h *EmbeddedHandler) Close() error {
56 h.mgr.Stop()
57 return nil
58 }
59
60 // HandleNegOpen processes a NEG-OPEN message from a client.
61 func (h *EmbeddedHandler) HandleNegOpen(ctx context.Context, connectionID, subscriptionID string, protoFilter *commonv1.Filter, initialMessage []byte) ([]byte, [][]byte, [][]byte, bool, string, error) {
62 // Open a session for this client
63 session := h.mgr.OpenSession(connectionID, subscriptionID)
64
65 // Build storage from local events matching the filter
66 storage, err := h.buildStorageForFilter(ctx, protoFilter)
67 if err != nil {
68 log.E.F("NEG-OPEN: failed to build storage: %v", err)
69 return nil, nil, nil, false, fmt.Sprintf("failed to build storage: %v", err), nil
70 }
71
72 log.D.F("NEG-OPEN: built storage with %d events", storage.Size())
73
74 // Create negentropy instance for this session
75 neg := negentropylib.New(storage, negentropylib.DefaultFrameSizeLimit)
76
77 // Store in session for later use
78 session.SetNegentropy(neg, storage)
79
80 // If we have an initial message from client, process it
81 var respMsg []byte
82 var complete bool
83 if len(initialMessage) > 0 {
84 respMsg, complete, err = neg.Reconcile(initialMessage)
85 if err != nil {
86 log.E.F("NEG-OPEN: reconcile failed: %v", err)
87 return nil, nil, nil, false, fmt.Sprintf("reconcile failed: %v", err), nil
88 }
89 log.D.F("NEG-OPEN: reconcile complete=%v, response len=%d", complete, len(respMsg))
90 // Debug: dump first bytes and initial message first bytes
91 if len(respMsg) > 0 {
92 end := 64
93 if end > len(respMsg) {
94 end = len(respMsg)
95 }
96 log.D.F("NEG-OPEN: initial msg first 64 bytes: %x", initialMessage[:min(64, len(initialMessage))])
97 log.D.F("NEG-OPEN: response first 64 bytes: %x", respMsg[:end])
98 }
99 } else {
100 // No initial message, start as server (initiator)
101 respMsg, err = neg.Start()
102 if err != nil {
103 log.E.F("NEG-OPEN: failed to start: %v", err)
104 return nil, nil, nil, false, fmt.Sprintf("failed to start: %v", err), nil
105 }
106 log.D.F("NEG-OPEN: started negentropy, initial msg len=%d", len(respMsg))
107 }
108
109 // Collect IDs we have that client needs (to send as events)
110 haveIDs := neg.CollectHaves()
111 var haveIDBytes [][]byte
112 for _, id := range haveIDs {
113 if decoded, err := hex.DecodeString(id); err == nil {
114 haveIDBytes = append(haveIDBytes, decoded)
115 }
116 }
117
118 // Collect IDs we need from client
119 needIDs := neg.CollectHaveNots()
120 var needIDBytes [][]byte
121 for _, id := range needIDs {
122 if decoded, err := hex.DecodeString(id); err == nil {
123 needIDBytes = append(needIDBytes, decoded)
124 }
125 }
126
127 log.D.F("NEG-OPEN: complete=%v, haves=%d, needs=%d, response len=%d",
128 complete, len(haveIDs), len(needIDs), len(respMsg))
129
130 return respMsg, haveIDBytes, needIDBytes, complete, "", nil
131 }
132
133 // HandleNegMsg processes a NEG-MSG message from a client.
134 func (h *EmbeddedHandler) HandleNegMsg(ctx context.Context, connectionID, subscriptionID string, message []byte) ([]byte, [][]byte, [][]byte, bool, string, error) {
135 // Update session activity
136 h.mgr.UpdateSessionActivity(connectionID, subscriptionID)
137
138 // Look up session
139 session, ok := h.mgr.GetSession(connectionID, subscriptionID)
140 if !ok {
141 return nil, nil, nil, false, "session not found", nil
142 }
143
144 neg := session.GetNegentropy()
145 if neg == nil {
146 return nil, nil, nil, false, "session has no negentropy state", nil
147 }
148
149 // Process the message
150 respMsg, complete, err := neg.Reconcile(message)
151 if err != nil {
152 log.E.F("NEG-MSG: reconcile failed: %v", err)
153 return nil, nil, nil, false, fmt.Sprintf("reconcile failed: %v", err), nil
154 }
155
156 // Collect IDs we have that client needs
157 haveIDs := neg.CollectHaves()
158 var haveIDBytes [][]byte
159 for _, id := range haveIDs {
160 if decoded, err := hex.DecodeString(id); err == nil {
161 haveIDBytes = append(haveIDBytes, decoded)
162 }
163 }
164
165 // Collect IDs we need from client
166 needIDs := neg.CollectHaveNots()
167 var needIDBytes [][]byte
168 for _, id := range needIDs {
169 if decoded, err := hex.DecodeString(id); err == nil {
170 needIDBytes = append(needIDBytes, decoded)
171 }
172 }
173
174 log.D.F("NEG-MSG: complete=%v, haves=%d, needs=%d, response len=%d",
175 complete, len(haveIDs), len(needIDs), len(respMsg))
176
177 return respMsg, haveIDBytes, needIDBytes, complete, "", nil
178 }
179
180 // HandleNegClose processes a NEG-CLOSE message from a client.
181 func (h *EmbeddedHandler) HandleNegClose(ctx context.Context, connectionID, subscriptionID string) error {
182 h.mgr.CloseSession(connectionID, subscriptionID)
183 return nil
184 }
185
186 // ListSessions returns active client negentropy sessions.
187 func (h *EmbeddedHandler) ListSessions(ctx context.Context) ([]*negentropyiface.ClientSession, error) {
188 sessions := h.mgr.ListSessions()
189
190 result := make([]*negentropyiface.ClientSession, 0, len(sessions))
191 for _, sess := range sessions {
192 result = append(result, &negentropyiface.ClientSession{
193 SubscriptionID: sess.SubscriptionID,
194 ConnectionID: sess.ConnectionID,
195 CreatedAt: sess.CreatedAt.Unix(),
196 LastActivity: sess.LastActivity.Unix(),
197 RoundCount: sess.RoundCount,
198 })
199 }
200 return result, nil
201 }
202
203 // CloseSession forcefully closes a client session.
204 func (h *EmbeddedHandler) CloseSession(ctx context.Context, connectionID, subscriptionID string) error {
205 if connectionID == "" {
206 // Close all sessions with this subscription ID
207 sessions := h.mgr.ListSessions()
208 for _, sess := range sessions {
209 if sess.SubscriptionID == subscriptionID {
210 h.mgr.CloseSession(sess.ConnectionID, sess.SubscriptionID)
211 }
212 }
213 } else {
214 h.mgr.CloseSession(connectionID, subscriptionID)
215 }
216 return nil
217 }
218
219 // buildStorageForFilter creates a negentropy Vector from local events matching the filter.
220 func (h *EmbeddedHandler) buildStorageForFilter(ctx context.Context, protoFilter *commonv1.Filter) (*negentropylib.Vector, error) {
221 storage := negentropylib.NewVector()
222
223 // Convert proto filter to nostr filter
224 f := protoToFilter(protoFilter)
225
226 // If no filter provided, use a reasonable limit
227 if f == nil {
228 limit := uint(1000000)
229 f = &filter.F{Limit: &limit}
230 }
231 if f.Limit == nil {
232 limit := uint(1000000)
233 f.Limit = &limit
234 }
235
236 // Query events from database
237 idPkTs, err := h.db.QueryForIds(ctx, f)
238 if err != nil {
239 return nil, fmt.Errorf("failed to query events: %w", err)
240 }
241
242 for _, item := range idPkTs {
243 storage.Insert(item.Ts, item.IDHex())
244 }
245
246 storage.Seal()
247 return storage, nil
248 }
249
250 // protoToFilter converts a proto filter to a nostr filter.
251 func protoToFilter(pf *commonv1.Filter) *filter.F {
252 if pf == nil {
253 return nil
254 }
255
256 f := &filter.F{}
257
258 // Convert IDs (binary 32-byte event IDs)
259 if len(pf.Ids) > 0 {
260 f.Ids = &tag.T{T: pf.Ids}
261 }
262
263 // Convert Kinds (uint32 → kind.K with uint16)
264 if len(pf.Kinds) > 0 {
265 ks := kind.NewWithCap(len(pf.Kinds))
266 for _, k := range pf.Kinds {
267 ks.K = append(ks.K, kind.New(k))
268 }
269 f.Kinds = ks
270 }
271
272 // Convert Authors (binary 32-byte pubkeys)
273 if len(pf.Authors) > 0 {
274 f.Authors = &tag.T{T: pf.Authors}
275 }
276
277 // Convert Since
278 if pf.Since != nil {
279 f.Since = timestamp.New(*pf.Since)
280 }
281
282 // Convert Until
283 if pf.Until != nil {
284 f.Until = timestamp.New(*pf.Until)
285 }
286
287 // Convert Limit
288 if pf.Limit != nil {
289 limit := uint(*pf.Limit)
290 f.Limit = &limit
291 }
292
293 return f
294 }
295