service.go raw
1 // Package server provides the gRPC server implementation for cluster sync.
2 package server
3
4 import (
5 "context"
6 "encoding/json"
7 "net/http"
8
9 "next.orly.dev/pkg/database"
10 "next.orly.dev/pkg/sync/cluster"
11 commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
12 clusterv1 "next.orly.dev/pkg/proto/orlysync/cluster/v1"
13 )
14
15 // Service implements the ClusterSyncServiceServer interface.
16 type Service struct {
17 clusterv1.UnimplementedClusterSyncServiceServer
18 mgr *cluster.Manager
19 db database.Database
20 ready bool
21 }
22
23 // NewService creates a new cluster sync gRPC service.
24 func NewService(db database.Database, mgr *cluster.Manager) *Service {
25 return &Service{
26 mgr: mgr,
27 db: db,
28 ready: true,
29 }
30 }
31
32 // Ready returns whether the service is ready to serve requests.
33 func (s *Service) Ready(ctx context.Context, _ *commonv1.Empty) (*commonv1.ReadyResponse, error) {
34 return &commonv1.ReadyResponse{Ready: s.ready}, nil
35 }
36
37 // Start starts the cluster polling loop.
38 func (s *Service) Start(ctx context.Context, _ *commonv1.Empty) (*commonv1.Empty, error) {
39 s.mgr.Start()
40 return &commonv1.Empty{}, nil
41 }
42
43 // Stop stops the cluster polling loop.
44 func (s *Service) Stop(ctx context.Context, _ *commonv1.Empty) (*commonv1.Empty, error) {
45 s.mgr.Stop()
46 return &commonv1.Empty{}, nil
47 }
48
49 // HandleLatestSerial proxies GET /cluster/latest HTTP requests.
50 func (s *Service) HandleLatestSerial(ctx context.Context, req *commonv1.HTTPRequest) (*commonv1.HTTPResponse, error) {
51 if req.Method != http.MethodGet {
52 return &commonv1.HTTPResponse{
53 StatusCode: http.StatusMethodNotAllowed,
54 Body: []byte("Method not allowed"),
55 }, nil
56 }
57
58 serial, timestamp := s.mgr.GetLatestSerial()
59 resp := map[string]any{
60 "serial": serial,
61 "timestamp": timestamp,
62 }
63
64 respBody, err := json.Marshal(resp)
65 if err != nil {
66 return &commonv1.HTTPResponse{
67 StatusCode: http.StatusInternalServerError,
68 Body: []byte("Failed to marshal response"),
69 }, nil
70 }
71
72 return &commonv1.HTTPResponse{
73 StatusCode: http.StatusOK,
74 Headers: map[string]string{"Content-Type": "application/json"},
75 Body: respBody,
76 }, nil
77 }
78
79 // HandleEventsRange proxies GET /cluster/events HTTP requests.
80 func (s *Service) HandleEventsRange(ctx context.Context, req *commonv1.HTTPRequest) (*commonv1.HTTPResponse, error) {
81 if req.Method != http.MethodGet {
82 return &commonv1.HTTPResponse{
83 StatusCode: http.StatusMethodNotAllowed,
84 Body: []byte("Method not allowed"),
85 }, nil
86 }
87
88 // Parse query parameters
89 // In practice, the query string would be parsed from req.QueryString
90 // For now, return a placeholder
91 return &commonv1.HTTPResponse{
92 StatusCode: http.StatusOK,
93 Headers: map[string]string{"Content-Type": "application/json"},
94 Body: []byte(`{"events":[],"has_more":false}`),
95 }, nil
96 }
97
98 // GetMembers returns the current cluster members.
99 func (s *Service) GetMembers(ctx context.Context, _ *commonv1.Empty) (*clusterv1.MembersResponse, error) {
100 members := s.mgr.GetMembers()
101 clusterMembers := make([]*clusterv1.ClusterMember, 0, len(members))
102 for _, m := range members {
103 clusterMembers = append(clusterMembers, &clusterv1.ClusterMember{
104 HttpUrl: m.HTTPURL,
105 WebsocketUrl: m.WebSocketURL,
106 LastSerial: m.LastSerial,
107 LastPoll: m.LastPoll.Unix(),
108 Status: m.Status,
109 ErrorCount: int32(m.ErrorCount),
110 })
111 }
112 return &clusterv1.MembersResponse{
113 Members: clusterMembers,
114 }, nil
115 }
116
117 // UpdateMembership updates cluster membership.
118 func (s *Service) UpdateMembership(ctx context.Context, req *clusterv1.UpdateMembershipRequest) (*commonv1.Empty, error) {
119 s.mgr.UpdateMembership(req.RelayUrls)
120 return &commonv1.Empty{}, nil
121 }
122
123 // HandleMembershipEvent processes a cluster membership event (Kind 39108).
124 func (s *Service) HandleMembershipEvent(ctx context.Context, req *clusterv1.MembershipEventRequest) (*commonv1.Empty, error) {
125 // Convert proto event to internal format and process
126 // This would need the actual event conversion
127 return &commonv1.Empty{}, nil
128 }
129
130 // GetClusterStatus returns overall cluster status.
131 func (s *Service) GetClusterStatus(ctx context.Context, _ *commonv1.Empty) (*clusterv1.ClusterStatusResponse, error) {
132 members := s.mgr.GetMembers()
133 serial, _ := s.mgr.GetLatestSerial()
134
135 activeCount := int32(0)
136 clusterMembers := make([]*clusterv1.ClusterMember, 0, len(members))
137 for _, m := range members {
138 if m.Status == "active" {
139 activeCount++
140 }
141 clusterMembers = append(clusterMembers, &clusterv1.ClusterMember{
142 HttpUrl: m.HTTPURL,
143 WebsocketUrl: m.WebSocketURL,
144 LastSerial: m.LastSerial,
145 LastPoll: m.LastPoll.Unix(),
146 Status: m.Status,
147 ErrorCount: int32(m.ErrorCount),
148 })
149 }
150
151 return &clusterv1.ClusterStatusResponse{
152 LatestSerial: serial,
153 ActiveMembers: activeCount,
154 TotalMembers: int32(len(members)),
155 PropagatePrivilegedEvents: s.mgr.PropagatePrivilegedEvents(),
156 Members: clusterMembers,
157 }, nil
158 }
159
160 // GetMemberStatus returns status for a specific member.
161 func (s *Service) GetMemberStatus(ctx context.Context, req *clusterv1.MemberStatusRequest) (*clusterv1.MemberStatusResponse, error) {
162 member := s.mgr.GetMember(req.HttpUrl)
163 if member == nil {
164 return &clusterv1.MemberStatusResponse{
165 Found: false,
166 }, nil
167 }
168
169 return &clusterv1.MemberStatusResponse{
170 Found: true,
171 Member: &clusterv1.ClusterMember{
172 HttpUrl: member.HTTPURL,
173 WebsocketUrl: member.WebSocketURL,
174 LastSerial: member.LastSerial,
175 LastPoll: member.LastPoll.Unix(),
176 Status: member.Status,
177 ErrorCount: int32(member.ErrorCount),
178 },
179 }, nil
180 }
181
182 // GetLatestSerial returns the latest serial from this relay's database.
183 func (s *Service) GetLatestSerial(ctx context.Context, _ *commonv1.Empty) (*clusterv1.LatestSerialResponse, error) {
184 serial, timestamp := s.mgr.GetLatestSerial()
185 return &clusterv1.LatestSerialResponse{
186 Serial: serial,
187 Timestamp: timestamp,
188 }, nil
189 }
190
191 // GetEventsInRange returns event info for a serial range.
192 func (s *Service) GetEventsInRange(ctx context.Context, req *clusterv1.EventsRangeRequest) (*clusterv1.EventsRangeResponse, error) {
193 events, hasMore, nextFrom := s.mgr.GetEventsInRange(req.From, req.To, int(req.Limit))
194
195 eventInfos := make([]*clusterv1.EventInfo, 0, len(events))
196 for _, e := range events {
197 eventInfos = append(eventInfos, &clusterv1.EventInfo{
198 Serial: e.Serial,
199 Id: e.ID,
200 Timestamp: e.Timestamp, // Already int64
201 })
202 }
203
204 return &clusterv1.EventsRangeResponse{
205 Events: eventInfos,
206 HasMore: hasMore,
207 NextFrom: nextFrom,
208 }, nil
209 }
210