service.go raw
1 // Package server provides the gRPC server implementation for distributed sync.
2 package server
3
4 import (
5 "bytes"
6 "context"
7 "encoding/json"
8 "net/http"
9
10 "next.orly.dev/pkg/lol/log"
11
12 "next.orly.dev/pkg/database"
13 "next.orly.dev/pkg/sync/distributed"
14 commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
15 distributedv1 "next.orly.dev/pkg/proto/orlysync/distributed/v1"
16 )
17
18 // Service implements the DistributedSyncServiceServer interface.
19 type Service struct {
20 distributedv1.UnimplementedDistributedSyncServiceServer
21 mgr *distributed.Manager
22 db database.Database
23 ready bool
24 }
25
26 // NewService creates a new distributed sync gRPC service.
27 func NewService(db database.Database, mgr *distributed.Manager) *Service {
28 return &Service{
29 mgr: mgr,
30 db: db,
31 ready: true,
32 }
33 }
34
35 // Ready returns whether the service is ready to serve requests.
36 func (s *Service) Ready(ctx context.Context, _ *commonv1.Empty) (*commonv1.ReadyResponse, error) {
37 return &commonv1.ReadyResponse{Ready: s.ready}, nil
38 }
39
40 // GetInfo returns current sync service information.
41 func (s *Service) GetInfo(ctx context.Context, _ *commonv1.Empty) (*commonv1.SyncInfo, error) {
42 peers := s.mgr.GetPeers()
43 return &commonv1.SyncInfo{
44 NodeId: s.mgr.GetNodeID(),
45 RelayUrl: s.mgr.GetRelayURL(),
46 CurrentSerial: s.mgr.GetCurrentSerial(),
47 PeerCount: int32(len(peers)),
48 Status: "running",
49 }, nil
50 }
51
52 // GetCurrentSerial returns this relay's current serial number.
53 func (s *Service) GetCurrentSerial(ctx context.Context, req *distributedv1.CurrentRequest) (*distributedv1.CurrentResponse, error) {
54 return &distributedv1.CurrentResponse{
55 NodeId: s.mgr.GetNodeID(),
56 RelayUrl: s.mgr.GetRelayURL(),
57 Serial: s.mgr.GetCurrentSerial(),
58 }, nil
59 }
60
61 // GetEventIDs returns event IDs for a serial range.
62 func (s *Service) GetEventIDs(ctx context.Context, req *distributedv1.EventIDsRequest) (*distributedv1.EventIDsResponse, error) {
63 eventMap, err := s.mgr.GetEventsWithIDs(req.From, req.To)
64 if err != nil {
65 return nil, err
66 }
67
68 return &distributedv1.EventIDsResponse{
69 EventMap: eventMap,
70 }, nil
71 }
72
73 // HandleCurrentRequest proxies /api/sync/current HTTP requests.
74 func (s *Service) HandleCurrentRequest(ctx context.Context, req *commonv1.HTTPRequest) (*commonv1.HTTPResponse, error) {
75 if req.Method != http.MethodPost {
76 return &commonv1.HTTPResponse{
77 StatusCode: http.StatusMethodNotAllowed,
78 Body: []byte("Method not allowed"),
79 }, nil
80 }
81
82 var currentReq distributed.CurrentRequest
83 if err := json.Unmarshal(req.Body, ¤tReq); err != nil {
84 return &commonv1.HTTPResponse{
85 StatusCode: http.StatusBadRequest,
86 Body: []byte("Invalid JSON"),
87 }, nil
88 }
89
90 // Check if request is from ourselves
91 if s.mgr.IsSelfNodeID(currentReq.NodeID) {
92 if currentReq.RelayURL != "" {
93 s.mgr.MarkSelfURL(currentReq.RelayURL)
94 }
95 return &commonv1.HTTPResponse{
96 StatusCode: http.StatusBadRequest,
97 Body: []byte("Cannot sync with self"),
98 }, nil
99 }
100
101 resp := distributed.CurrentResponse{
102 NodeID: s.mgr.GetNodeID(),
103 RelayURL: s.mgr.GetRelayURL(),
104 Serial: s.mgr.GetCurrentSerial(),
105 }
106
107 respBody, err := json.Marshal(resp)
108 if err != nil {
109 return &commonv1.HTTPResponse{
110 StatusCode: http.StatusInternalServerError,
111 Body: []byte("Failed to marshal response"),
112 }, nil
113 }
114
115 return &commonv1.HTTPResponse{
116 StatusCode: http.StatusOK,
117 Headers: map[string]string{"Content-Type": "application/json"},
118 Body: respBody,
119 }, nil
120 }
121
122 // HandleEventIDsRequest proxies /api/sync/event-ids HTTP requests.
123 func (s *Service) HandleEventIDsRequest(ctx context.Context, req *commonv1.HTTPRequest) (*commonv1.HTTPResponse, error) {
124 if req.Method != http.MethodPost {
125 return &commonv1.HTTPResponse{
126 StatusCode: http.StatusMethodNotAllowed,
127 Body: []byte("Method not allowed"),
128 }, nil
129 }
130
131 var eventIDsReq distributed.EventIDsRequest
132 if err := json.Unmarshal(req.Body, &eventIDsReq); err != nil {
133 return &commonv1.HTTPResponse{
134 StatusCode: http.StatusBadRequest,
135 Body: []byte("Invalid JSON"),
136 }, nil
137 }
138
139 // Check if request is from ourselves
140 if s.mgr.IsSelfNodeID(eventIDsReq.NodeID) {
141 if eventIDsReq.RelayURL != "" {
142 s.mgr.MarkSelfURL(eventIDsReq.RelayURL)
143 }
144 return &commonv1.HTTPResponse{
145 StatusCode: http.StatusBadRequest,
146 Body: []byte("Cannot sync with self"),
147 }, nil
148 }
149
150 eventMap, err := s.mgr.GetEventsWithIDs(eventIDsReq.From, eventIDsReq.To)
151 if err != nil {
152 return &commonv1.HTTPResponse{
153 StatusCode: http.StatusInternalServerError,
154 Body: []byte("Failed to get event IDs: " + err.Error()),
155 }, nil
156 }
157
158 resp := distributed.EventIDsResponse{
159 EventMap: eventMap,
160 }
161
162 respBody, err := json.Marshal(resp)
163 if err != nil {
164 return &commonv1.HTTPResponse{
165 StatusCode: http.StatusInternalServerError,
166 Body: []byte("Failed to marshal response"),
167 }, nil
168 }
169
170 return &commonv1.HTTPResponse{
171 StatusCode: http.StatusOK,
172 Headers: map[string]string{"Content-Type": "application/json"},
173 Body: respBody,
174 }, nil
175 }
176
177 // GetPeers returns the current list of sync peers.
178 func (s *Service) GetPeers(ctx context.Context, _ *commonv1.Empty) (*distributedv1.PeersResponse, error) {
179 peers := s.mgr.GetPeers()
180 return &distributedv1.PeersResponse{
181 Peers: peers,
182 }, nil
183 }
184
185 // UpdatePeers updates the peer list.
186 func (s *Service) UpdatePeers(ctx context.Context, req *distributedv1.UpdatePeersRequest) (*commonv1.Empty, error) {
187 s.mgr.UpdatePeers(req.Peers)
188 return &commonv1.Empty{}, nil
189 }
190
191 // IsAuthorizedPeer checks if a peer is authorized by validating its NIP-11 pubkey.
192 func (s *Service) IsAuthorizedPeer(ctx context.Context, req *distributedv1.AuthorizedPeerRequest) (*distributedv1.AuthorizedPeerResponse, error) {
193 authorized := s.mgr.IsAuthorizedPeer(req.PeerUrl, req.ExpectedPubkey)
194 return &distributedv1.AuthorizedPeerResponse{
195 Authorized: authorized,
196 }, nil
197 }
198
199 // GetPeerPubkey fetches the pubkey for a peer relay via NIP-11.
200 func (s *Service) GetPeerPubkey(ctx context.Context, req *distributedv1.PeerPubkeyRequest) (*distributedv1.PeerPubkeyResponse, error) {
201 pubkey, err := s.mgr.GetPeerPubkey(req.PeerUrl)
202 if err != nil {
203 // Return empty pubkey on error since the proto doesn't have an error field
204 return &distributedv1.PeerPubkeyResponse{
205 Pubkey: "",
206 }, nil
207 }
208 return &distributedv1.PeerPubkeyResponse{
209 Pubkey: pubkey,
210 }, nil
211 }
212
213 // UpdateSerial updates the current serial from database.
214 func (s *Service) UpdateSerial(ctx context.Context, _ *commonv1.Empty) (*commonv1.Empty, error) {
215 s.mgr.UpdateSerial()
216 return &commonv1.Empty{}, nil
217 }
218
219 // NotifyNewEvent notifies the service of a new event being stored.
220 func (s *Service) NotifyNewEvent(ctx context.Context, req *distributedv1.NewEventNotification) (*commonv1.Empty, error) {
221 s.mgr.NotifyNewEvent(req.EventId, req.Serial)
222 return &commonv1.Empty{}, nil
223 }
224
225 // TriggerSync manually triggers a sync cycle with all peers.
226 func (s *Service) TriggerSync(ctx context.Context, _ *commonv1.Empty) (*commonv1.Empty, error) {
227 log.D.F("manual sync trigger requested")
228 return &commonv1.Empty{}, nil
229 }
230
231 // GetSyncStatus returns current sync status for all peers.
232 func (s *Service) GetSyncStatus(ctx context.Context, _ *commonv1.Empty) (*distributedv1.SyncStatusResponse, error) {
233 peerStatus := s.mgr.GetPeerStatus()
234 peerInfos := make([]*commonv1.PeerInfo, 0, len(peerStatus))
235 for url, serial := range peerStatus {
236 peerInfos = append(peerInfos, &commonv1.PeerInfo{
237 Url: url,
238 LastSerial: serial,
239 Status: "active",
240 })
241 }
242 return &distributedv1.SyncStatusResponse{
243 CurrentSerial: s.mgr.GetCurrentSerial(),
244 Peers: peerInfos,
245 }, nil
246 }
247
248 // httpResponseWriter is a simple buffer for capturing HTTP responses.
249 type httpResponseWriter struct {
250 statusCode int
251 headers http.Header
252 body bytes.Buffer
253 }
254
255 func (w *httpResponseWriter) Header() http.Header {
256 if w.headers == nil {
257 w.headers = make(http.Header)
258 }
259 return w.headers
260 }
261
262 func (w *httpResponseWriter) Write(b []byte) (int, error) {
263 return w.body.Write(b)
264 }
265
266 func (w *httpResponseWriter) WriteHeader(statusCode int) {
267 w.statusCode = statusCode
268 }
269