client.go raw
1 // Package grpc provides a gRPC client for the negentropy sync service.
2 package grpc
3
4 import (
5 "context"
6 "io"
7 "time"
8
9 "google.golang.org/grpc"
10 "google.golang.org/grpc/credentials/insecure"
11 "next.orly.dev/pkg/lol/log"
12
13 negentropyiface "next.orly.dev/pkg/interfaces/negentropy"
14 commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
15 negentropyv1 "next.orly.dev/pkg/proto/orlysync/negentropy/v1"
16 )
17
18 // Verify Client implements the Handler interface
19 var _ negentropyiface.Handler = (*Client)(nil)
20
21 // Client is a gRPC client for the negentropy sync service.
22 type Client struct {
23 conn *grpc.ClientConn
24 client negentropyv1.NegentropyServiceClient
25 ready chan struct{}
26 }
27
28 // ClientConfig holds configuration for the gRPC client.
29 type ClientConfig struct {
30 ServerAddress string
31 ConnectTimeout time.Duration
32 }
33
34 // New creates a new gRPC negentropy sync client.
35 func New(ctx context.Context, cfg *ClientConfig) (*Client, error) {
36 timeout := cfg.ConnectTimeout
37 if timeout == 0 {
38 timeout = 10 * time.Second
39 }
40
41 dialCtx, cancel := context.WithTimeout(ctx, timeout)
42 defer cancel()
43
44 conn, err := grpc.DialContext(dialCtx, cfg.ServerAddress,
45 grpc.WithTransportCredentials(insecure.NewCredentials()),
46 grpc.WithDefaultCallOptions(
47 grpc.MaxCallRecvMsgSize(16<<20), // 16MB
48 grpc.MaxCallSendMsgSize(16<<20), // 16MB
49 ),
50 )
51 if err != nil {
52 return nil, err
53 }
54
55 c := &Client{
56 conn: conn,
57 client: negentropyv1.NewNegentropyServiceClient(conn),
58 ready: make(chan struct{}),
59 }
60
61 go c.waitForReady(ctx)
62
63 return c, nil
64 }
65
66 func (c *Client) waitForReady(ctx context.Context) {
67 for {
68 select {
69 case <-ctx.Done():
70 return
71 default:
72 resp, err := c.client.Ready(ctx, &commonv1.Empty{})
73 if err == nil && resp.Ready {
74 close(c.ready)
75 log.I.F("gRPC negentropy sync client connected and ready")
76 return
77 }
78 time.Sleep(100 * time.Millisecond)
79 }
80 }
81 }
82
83 // Close closes the gRPC connection.
84 func (c *Client) Close() error {
85 if c.conn != nil {
86 return c.conn.Close()
87 }
88 return nil
89 }
90
91 // Ready returns a channel that closes when the client is ready.
92 func (c *Client) Ready() <-chan struct{} {
93 return c.ready
94 }
95
96 // Start starts the background relay-to-relay sync.
97 func (c *Client) Start(ctx context.Context) error {
98 _, err := c.client.Start(ctx, &commonv1.Empty{})
99 return err
100 }
101
102 // Stop stops the background sync.
103 func (c *Client) Stop(ctx context.Context) error {
104 _, err := c.client.Stop(ctx, &commonv1.Empty{})
105 return err
106 }
107
108 // HandleNegOpen processes a NEG-OPEN message from a client.
109 // Returns: message, haveIDs, needIDs, complete, errorStr, error
110 func (c *Client) HandleNegOpen(ctx context.Context, connectionID, subscriptionID string, filter *commonv1.Filter, initialMessage []byte) ([]byte, [][]byte, [][]byte, bool, string, error) {
111 resp, err := c.client.HandleNegOpen(ctx, &negentropyv1.NegOpenRequest{
112 ConnectionId: connectionID,
113 SubscriptionId: subscriptionID,
114 Filter: filter,
115 InitialMessage: initialMessage,
116 })
117 if err != nil {
118 return nil, nil, nil, false, "", err
119 }
120 return resp.Message, resp.HaveIds, resp.NeedIds, resp.Complete, resp.Error, nil
121 }
122
123 // HandleNegMsg processes a NEG-MSG message from a client.
124 func (c *Client) HandleNegMsg(ctx context.Context, connectionID, subscriptionID string, message []byte) ([]byte, [][]byte, [][]byte, bool, string, error) {
125 resp, err := c.client.HandleNegMsg(ctx, &negentropyv1.NegMsgRequest{
126 ConnectionId: connectionID,
127 SubscriptionId: subscriptionID,
128 Message: message,
129 })
130 if err != nil {
131 return nil, nil, nil, false, "", err
132 }
133 return resp.Message, resp.HaveIds, resp.NeedIds, resp.Complete, resp.Error, nil
134 }
135
136 // HandleNegClose processes a NEG-CLOSE message from a client.
137 func (c *Client) HandleNegClose(ctx context.Context, connectionID, subscriptionID string) error {
138 _, err := c.client.HandleNegClose(ctx, &negentropyv1.NegCloseRequest{
139 ConnectionId: connectionID,
140 SubscriptionId: subscriptionID,
141 })
142 return err
143 }
144
145 // SyncProgress represents progress during a peer sync operation.
146 type SyncProgress struct {
147 PeerURL string
148 Round int32
149 HaveCount int64
150 NeedCount int64
151 FetchedCount int64
152 SentCount int64
153 Complete bool
154 Error string
155 }
156
157 // SyncWithPeer initiates negentropy sync with a specific peer relay.
158 // Returns a channel that receives progress updates.
159 func (c *Client) SyncWithPeer(ctx context.Context, peerURL string, filter *commonv1.Filter, since int64) (<-chan *SyncProgress, error) {
160 stream, err := c.client.SyncWithPeer(ctx, &negentropyv1.SyncPeerRequest{
161 PeerUrl: peerURL,
162 Filter: filter,
163 Since: since,
164 })
165 if err != nil {
166 return nil, err
167 }
168
169 ch := make(chan *SyncProgress, 10)
170 go func() {
171 defer close(ch)
172 for {
173 progress, err := stream.Recv()
174 if err == io.EOF {
175 return
176 }
177 if err != nil {
178 ch <- &SyncProgress{
179 PeerURL: peerURL,
180 Error: err.Error(),
181 }
182 return
183 }
184 ch <- &SyncProgress{
185 PeerURL: progress.PeerUrl,
186 Round: progress.Round,
187 HaveCount: progress.HaveCount,
188 NeedCount: progress.NeedCount,
189 FetchedCount: progress.FetchedCount,
190 SentCount: progress.SentCount,
191 Complete: progress.Complete,
192 Error: progress.Error,
193 }
194 }
195 }()
196
197 return ch, nil
198 }
199
200 // SyncStatus represents the sync status response.
201 type SyncStatus struct {
202 Active bool
203 LastSync int64
204 PeerCount int32
205 PeerStates []*PeerSyncState
206 }
207
208 // PeerSyncState represents sync state for a peer.
209 type PeerSyncState struct {
210 PeerURL string
211 LastSync int64
212 EventsSynced int64
213 Status string
214 LastError string
215 ConsecutiveFailures int32
216 }
217
218 // GetSyncStatus returns the current sync status.
219 func (c *Client) GetSyncStatus(ctx context.Context) (*SyncStatus, error) {
220 resp, err := c.client.GetSyncStatus(ctx, &commonv1.Empty{})
221 if err != nil {
222 return nil, err
223 }
224
225 states := make([]*PeerSyncState, 0, len(resp.PeerStates))
226 for _, ps := range resp.PeerStates {
227 states = append(states, &PeerSyncState{
228 PeerURL: ps.PeerUrl,
229 LastSync: ps.LastSync,
230 EventsSynced: ps.EventsSynced,
231 Status: ps.Status,
232 LastError: ps.LastError,
233 ConsecutiveFailures: ps.ConsecutiveFailures,
234 })
235 }
236
237 return &SyncStatus{
238 Active: resp.Active,
239 LastSync: resp.LastSync,
240 PeerCount: resp.PeerCount,
241 PeerStates: states,
242 }, nil
243 }
244
245 // GetPeers returns the list of negentropy sync peers.
246 func (c *Client) GetPeers(ctx context.Context) ([]string, error) {
247 resp, err := c.client.GetPeers(ctx, &commonv1.Empty{})
248 if err != nil {
249 return nil, err
250 }
251 return resp.Peers, nil
252 }
253
254 // AddPeer adds a peer for negentropy sync.
255 func (c *Client) AddPeer(ctx context.Context, peerURL string) error {
256 _, err := c.client.AddPeer(ctx, &negentropyv1.AddPeerRequest{
257 PeerUrl: peerURL,
258 })
259 return err
260 }
261
262 // RemovePeer removes a peer from negentropy sync.
263 func (c *Client) RemovePeer(ctx context.Context, peerURL string) error {
264 _, err := c.client.RemovePeer(ctx, &negentropyv1.RemovePeerRequest{
265 PeerUrl: peerURL,
266 })
267 return err
268 }
269
270 // TriggerSync manually triggers sync with a specific peer or all peers.
271 func (c *Client) TriggerSync(ctx context.Context, peerURL string, filter *commonv1.Filter) error {
272 _, err := c.client.TriggerSync(ctx, &negentropyv1.TriggerSyncRequest{
273 PeerUrl: peerURL,
274 Filter: filter,
275 })
276 return err
277 }
278
279 // GetPeerSyncState returns sync state for a specific peer.
280 func (c *Client) GetPeerSyncState(ctx context.Context, peerURL string) (*PeerSyncState, bool, error) {
281 resp, err := c.client.GetPeerSyncState(ctx, &negentropyv1.PeerSyncStateRequest{
282 PeerUrl: peerURL,
283 })
284 if err != nil {
285 return nil, false, err
286 }
287 if !resp.Found {
288 return nil, false, nil
289 }
290 return &PeerSyncState{
291 PeerURL: resp.State.PeerUrl,
292 LastSync: resp.State.LastSync,
293 EventsSynced: resp.State.EventsSynced,
294 Status: resp.State.Status,
295 LastError: resp.State.LastError,
296 ConsecutiveFailures: resp.State.ConsecutiveFailures,
297 }, true, nil
298 }
299
300 // ListSessions returns active client negentropy sessions.
301 func (c *Client) ListSessions(ctx context.Context) ([]*negentropyiface.ClientSession, error) {
302 resp, err := c.client.ListSessions(ctx, &commonv1.Empty{})
303 if err != nil {
304 return nil, err
305 }
306
307 sessions := make([]*negentropyiface.ClientSession, 0, len(resp.Sessions))
308 for _, s := range resp.Sessions {
309 sessions = append(sessions, &negentropyiface.ClientSession{
310 SubscriptionID: s.SubscriptionId,
311 ConnectionID: s.ConnectionId,
312 CreatedAt: s.CreatedAt,
313 LastActivity: s.LastActivity,
314 RoundCount: s.RoundCount,
315 })
316 }
317 return sessions, nil
318 }
319
320 // CloseSession forcefully closes a client session.
321 func (c *Client) CloseSession(ctx context.Context, connectionID, subscriptionID string) error {
322 _, err := c.client.CloseSession(ctx, &negentropyv1.CloseSessionRequest{
323 ConnectionId: connectionID,
324 SubscriptionId: subscriptionID,
325 })
326 return err
327 }
328