service.go raw
1 // Package server provides the gRPC server implementation for negentropy sync.
2 package server
3
4 import (
5 "context"
6 "encoding/hex"
7 "fmt"
8
9 "google.golang.org/grpc"
10 "next.orly.dev/pkg/lol/log"
11
12 "next.orly.dev/pkg/nostr/encoders/filter"
13 negentropylib "next.orly.dev/pkg/nostr/negentropy"
14 "next.orly.dev/pkg/database"
15 "next.orly.dev/pkg/sync/negentropy"
16 commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
17 negentropyv1 "next.orly.dev/pkg/proto/orlysync/negentropy/v1"
18 )
19
20 // Service implements the NegentropyServiceServer interface.
21 type Service struct {
22 negentropyv1.UnimplementedNegentropyServiceServer
23 mgr *negentropy.Manager
24 db database.Database
25 ready bool
26 }
27
28 // NewService creates a new negentropy gRPC service.
29 func NewService(db database.Database, mgr *negentropy.Manager) *Service {
30 return &Service{
31 mgr: mgr,
32 db: db,
33 ready: true,
34 }
35 }
36
37 // Ready returns whether the service is ready to serve requests.
38 func (s *Service) Ready(ctx context.Context, _ *commonv1.Empty) (*commonv1.ReadyResponse, error) {
39 return &commonv1.ReadyResponse{Ready: s.ready}, nil
40 }
41
42 // Start starts the background relay-to-relay sync.
43 func (s *Service) Start(ctx context.Context, _ *commonv1.Empty) (*commonv1.Empty, error) {
44 s.mgr.Start()
45 return &commonv1.Empty{}, nil
46 }
47
48 // Stop stops the background sync.
49 func (s *Service) Stop(ctx context.Context, _ *commonv1.Empty) (*commonv1.Empty, error) {
50 s.mgr.Stop()
51 return &commonv1.Empty{}, nil
52 }
53
54 // HandleNegOpen processes a NEG-OPEN message from a client.
55 func (s *Service) HandleNegOpen(ctx context.Context, req *negentropyv1.NegOpenRequest) (*negentropyv1.NegOpenResponse, error) {
56 // Open a session for this client
57 session := s.mgr.OpenSession(req.ConnectionId, req.SubscriptionId)
58
59 // Build storage from local events matching the filter
60 storage, err := s.buildStorageForFilter(ctx, req.Filter)
61 if err != nil {
62 log.E.F("NEG-OPEN: failed to build storage: %v", err)
63 return &negentropyv1.NegOpenResponse{
64 Message: nil,
65 Error: fmt.Sprintf("failed to build storage: %v", err),
66 }, nil
67 }
68
69 log.D.F("NEG-OPEN: built storage with %d events", storage.Size())
70
71 // Create negentropy instance for this session
72 neg := negentropylib.New(storage, negentropylib.DefaultFrameSizeLimit)
73
74 // Store in session for later use
75 session.SetNegentropy(neg, storage)
76
77 // The client (initiator) MUST provide an initial message in NEG-OPEN.
78 // Per NIP-77: The relay acts as responder and calls reconcile() with
79 // the client's initial message.
80 if len(req.InitialMessage) == 0 {
81 log.E.F("NEG-OPEN: missing initial message from client")
82 return &negentropyv1.NegOpenResponse{
83 Message: nil,
84 Error: "blocked: NEG-OPEN requires initialMessage from initiator",
85 }, nil
86 }
87
88 // Process the client's initial message
89 respMsg, complete, err := neg.Reconcile(req.InitialMessage)
90 if err != nil {
91 log.E.F("NEG-OPEN: reconcile failed: %v", err)
92 return &negentropyv1.NegOpenResponse{
93 Message: nil,
94 Error: fmt.Sprintf("reconcile failed: %v", err),
95 }, nil
96 }
97 log.D.F("NEG-OPEN: reconcile complete=%v, response len=%d", complete, len(respMsg))
98
99 // Collect IDs we have that client needs (to send as events)
100 haveIDs := neg.CollectHaves()
101 var haveIDBytes [][]byte
102 for _, id := range haveIDs {
103 // ID is a hex string, decode to binary
104 if decoded, err := hex.DecodeString(id); err == nil {
105 haveIDBytes = append(haveIDBytes, decoded)
106 }
107 }
108
109 // Collect IDs we need from client
110 needIDs := neg.CollectHaveNots()
111 var needIDBytes [][]byte
112 for _, id := range needIDs {
113 // ID is a hex string, decode to binary
114 if decoded, err := hex.DecodeString(id); err == nil {
115 needIDBytes = append(needIDBytes, decoded)
116 }
117 }
118
119 log.D.F("NEG-OPEN: complete=%v, haves=%d, needs=%d, response len=%d",
120 complete, len(haveIDs), len(needIDs), len(respMsg))
121
122 return &negentropyv1.NegOpenResponse{
123 Message: respMsg,
124 HaveIds: haveIDBytes,
125 NeedIds: needIDBytes,
126 Complete: complete,
127 Error: "",
128 }, nil
129 }
130
131 // HandleNegMsg processes a NEG-MSG message from a client.
132 func (s *Service) HandleNegMsg(ctx context.Context, req *negentropyv1.NegMsgRequest) (*negentropyv1.NegMsgResponse, error) {
133 // Update session activity
134 s.mgr.UpdateSessionActivity(req.ConnectionId, req.SubscriptionId)
135
136 // Look up session
137 session, ok := s.mgr.GetSession(req.ConnectionId, req.SubscriptionId)
138 if !ok {
139 return &negentropyv1.NegMsgResponse{
140 Error: "session not found",
141 }, nil
142 }
143
144 neg := session.GetNegentropy()
145 if neg == nil {
146 return &negentropyv1.NegMsgResponse{
147 Error: "session has no negentropy state",
148 }, nil
149 }
150
151 // Process the message
152 respMsg, complete, err := neg.Reconcile(req.Message)
153 if err != nil {
154 log.E.F("NEG-MSG: reconcile failed: %v", err)
155 return &negentropyv1.NegMsgResponse{
156 Error: fmt.Sprintf("reconcile failed: %v", err),
157 }, nil
158 }
159
160 // Collect IDs we have that client needs (to send as events)
161 haveIDs := neg.CollectHaves()
162 var haveIDBytes [][]byte
163 for _, id := range haveIDs {
164 // ID is a hex string, decode to binary
165 if decoded, err := hex.DecodeString(id); err == nil {
166 haveIDBytes = append(haveIDBytes, decoded)
167 }
168 }
169
170 // Collect IDs we need from client
171 needIDs := neg.CollectHaveNots()
172 var needIDBytes [][]byte
173 for _, id := range needIDs {
174 // ID is a hex string, decode to binary
175 if decoded, err := hex.DecodeString(id); err == nil {
176 needIDBytes = append(needIDBytes, decoded)
177 }
178 }
179
180 log.D.F("NEG-MSG: complete=%v, haves=%d, needs=%d, response len=%d",
181 complete, len(haveIDs), len(needIDs), len(respMsg))
182
183 return &negentropyv1.NegMsgResponse{
184 Message: respMsg,
185 HaveIds: haveIDBytes,
186 NeedIds: needIDBytes,
187 Complete: complete,
188 Error: "",
189 }, nil
190 }
191
192 // HandleNegClose processes a NEG-CLOSE message from a client.
193 func (s *Service) HandleNegClose(ctx context.Context, req *negentropyv1.NegCloseRequest) (*commonv1.Empty, error) {
194 s.mgr.CloseSession(req.ConnectionId, req.SubscriptionId)
195 return &commonv1.Empty{}, nil
196 }
197
198 // SyncWithPeer initiates negentropy sync with a specific peer relay.
199 func (s *Service) SyncWithPeer(req *negentropyv1.SyncPeerRequest, stream grpc.ServerStreamingServer[negentropyv1.SyncProgress]) error {
200 // Send initial progress
201 if err := stream.Send(&negentropyv1.SyncProgress{
202 PeerUrl: req.PeerUrl,
203 Round: 0,
204 Complete: false,
205 }); err != nil {
206 return err
207 }
208
209 // TODO: Implement actual NIP-77 sync with peer
210 // For now, just mark as complete
211
212 s.mgr.TriggerSync(stream.Context(), req.PeerUrl)
213
214 // Send completion
215 return stream.Send(&negentropyv1.SyncProgress{
216 PeerUrl: req.PeerUrl,
217 Round: 1,
218 Complete: true,
219 })
220 }
221
222 // GetSyncStatus returns the current sync status.
223 func (s *Service) GetSyncStatus(ctx context.Context, _ *commonv1.Empty) (*negentropyv1.SyncStatusResponse, error) {
224 peerStates := s.mgr.GetPeerStates()
225
226 states := make([]*negentropyv1.PeerSyncState, 0, len(peerStates))
227 for _, ps := range peerStates {
228 states = append(states, &negentropyv1.PeerSyncState{
229 PeerUrl: ps.URL,
230 LastSync: ps.LastSync.Unix(),
231 EventsSynced: ps.EventsSynced,
232 Status: ps.Status,
233 LastError: ps.LastError,
234 ConsecutiveFailures: ps.ConsecutiveFailures,
235 })
236 }
237
238 return &negentropyv1.SyncStatusResponse{
239 Active: s.mgr.IsActive(),
240 LastSync: s.mgr.LastSync().Unix(),
241 PeerCount: int32(len(peerStates)),
242 PeerStates: states,
243 }, nil
244 }
245
246 // GetPeers returns the list of negentropy sync peers.
247 func (s *Service) GetPeers(ctx context.Context, _ *commonv1.Empty) (*negentropyv1.PeersResponse, error) {
248 return &negentropyv1.PeersResponse{
249 Peers: s.mgr.GetPeers(),
250 }, nil
251 }
252
253 // AddPeer adds a peer for negentropy sync.
254 func (s *Service) AddPeer(ctx context.Context, req *negentropyv1.AddPeerRequest) (*commonv1.Empty, error) {
255 s.mgr.AddPeer(req.PeerUrl)
256 return &commonv1.Empty{}, nil
257 }
258
259 // RemovePeer removes a peer from negentropy sync.
260 func (s *Service) RemovePeer(ctx context.Context, req *negentropyv1.RemovePeerRequest) (*commonv1.Empty, error) {
261 s.mgr.RemovePeer(req.PeerUrl)
262 return &commonv1.Empty{}, nil
263 }
264
265 // TriggerSync manually triggers sync with a specific peer or all peers.
266 func (s *Service) TriggerSync(ctx context.Context, req *negentropyv1.TriggerSyncRequest) (*commonv1.Empty, error) {
267 s.mgr.TriggerSync(ctx, req.PeerUrl)
268 return &commonv1.Empty{}, nil
269 }
270
271 // GetPeerSyncState returns sync state for a specific peer.
272 func (s *Service) GetPeerSyncState(ctx context.Context, req *negentropyv1.PeerSyncStateRequest) (*negentropyv1.PeerSyncStateResponse, error) {
273 state, found := s.mgr.GetPeerState(req.PeerUrl)
274 if !found {
275 return &negentropyv1.PeerSyncStateResponse{
276 Found: false,
277 }, nil
278 }
279
280 return &negentropyv1.PeerSyncStateResponse{
281 Found: true,
282 State: &negentropyv1.PeerSyncState{
283 PeerUrl: state.URL,
284 LastSync: state.LastSync.Unix(),
285 EventsSynced: state.EventsSynced,
286 Status: state.Status,
287 LastError: state.LastError,
288 ConsecutiveFailures: state.ConsecutiveFailures,
289 },
290 }, nil
291 }
292
293 // ListSessions returns active client negentropy sessions.
294 func (s *Service) ListSessions(ctx context.Context, _ *commonv1.Empty) (*negentropyv1.ListSessionsResponse, error) {
295 sessions := s.mgr.ListSessions()
296
297 protoSessions := make([]*negentropyv1.ClientSession, 0, len(sessions))
298 for _, sess := range sessions {
299 protoSessions = append(protoSessions, &negentropyv1.ClientSession{
300 SubscriptionId: sess.SubscriptionID,
301 ConnectionId: sess.ConnectionID,
302 CreatedAt: sess.CreatedAt.Unix(),
303 LastActivity: sess.LastActivity.Unix(),
304 RoundCount: sess.RoundCount,
305 })
306 }
307
308 return &negentropyv1.ListSessionsResponse{
309 Sessions: protoSessions,
310 }, nil
311 }
312
313 // CloseSession forcefully closes a client session.
314 func (s *Service) CloseSession(ctx context.Context, req *negentropyv1.CloseSessionRequest) (*commonv1.Empty, error) {
315 if req.ConnectionId == "" {
316 // Close all sessions with this subscription ID
317 sessions := s.mgr.ListSessions()
318 for _, sess := range sessions {
319 if sess.SubscriptionID == req.SubscriptionId {
320 s.mgr.CloseSession(sess.ConnectionID, sess.SubscriptionID)
321 }
322 }
323 } else {
324 s.mgr.CloseSession(req.ConnectionId, req.SubscriptionId)
325 }
326 return &commonv1.Empty{}, nil
327 }
328
329 // buildStorageForFilter creates a negentropy Vector from local events matching the filter.
330 func (s *Service) buildStorageForFilter(ctx context.Context, protoFilter *commonv1.Filter) (*negentropylib.Vector, error) {
331 storage := negentropylib.NewVector()
332
333 // Convert proto filter to nostr filter
334 f := protoToFilter(protoFilter)
335
336 // If no filter provided, use a reasonable limit
337 if f == nil {
338 limit := uint(1000000)
339 f = &filter.F{Limit: &limit}
340 }
341 if f.Limit == nil {
342 limit := uint(1000000)
343 f.Limit = &limit
344 }
345
346 // Query events from database
347 idPkTs, err := s.db.QueryForIds(ctx, f)
348 if err != nil {
349 return nil, fmt.Errorf("failed to query events: %w", err)
350 }
351
352 for _, item := range idPkTs {
353 storage.Insert(item.Ts, item.IDHex())
354 }
355
356 storage.Seal()
357 return storage, nil
358 }
359
360 // protoToFilter converts a proto filter to a nostr filter.
361 func protoToFilter(pf *commonv1.Filter) *filter.F {
362 if pf == nil {
363 return nil
364 }
365
366 f := &filter.F{}
367
368 // Convert Kinds
369 if len(pf.Kinds) > 0 {
370 // Create kinds from proto
371 // Note: We'd need proper kinds conversion here
372 }
373
374 // Convert Since/Until
375 if pf.Since != nil {
376 // Set Since timestamp
377 }
378 if pf.Until != nil {
379 // Set Until timestamp
380 }
381
382 // Convert Limit
383 if pf.Limit != nil {
384 limit := uint(*pf.Limit)
385 f.Limit = &limit
386 }
387
388 return f
389 }
390