client.go raw
1 // Package grpc provides a gRPC client for the distributed sync service.
2 package grpc
3
4 import (
5 "context"
6 "errors"
7 "time"
8
9 "google.golang.org/grpc"
10 "google.golang.org/grpc/credentials/insecure"
11 "next.orly.dev/pkg/lol/log"
12
13 commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
14 distributedv1 "next.orly.dev/pkg/proto/orlysync/distributed/v1"
15 )
16
17 // Client is a gRPC client for the distributed sync service.
18 type Client struct {
19 conn *grpc.ClientConn
20 client distributedv1.DistributedSyncServiceClient
21 ready chan struct{}
22 }
23
24 // ClientConfig holds configuration for the gRPC client.
25 type ClientConfig struct {
26 ServerAddress string
27 ConnectTimeout time.Duration
28 }
29
30 // New creates a new gRPC distributed sync client.
31 func New(ctx context.Context, cfg *ClientConfig) (*Client, error) {
32 timeout := cfg.ConnectTimeout
33 if timeout == 0 {
34 timeout = 10 * time.Second
35 }
36
37 dialCtx, cancel := context.WithTimeout(ctx, timeout)
38 defer cancel()
39
40 conn, err := grpc.DialContext(dialCtx, cfg.ServerAddress,
41 grpc.WithTransportCredentials(insecure.NewCredentials()),
42 grpc.WithDefaultCallOptions(
43 grpc.MaxCallRecvMsgSize(16<<20), // 16MB
44 grpc.MaxCallSendMsgSize(16<<20), // 16MB
45 ),
46 )
47 if err != nil {
48 return nil, err
49 }
50
51 c := &Client{
52 conn: conn,
53 client: distributedv1.NewDistributedSyncServiceClient(conn),
54 ready: make(chan struct{}),
55 }
56
57 go c.waitForReady(ctx)
58
59 return c, nil
60 }
61
62 func (c *Client) waitForReady(ctx context.Context) {
63 for {
64 select {
65 case <-ctx.Done():
66 return
67 default:
68 resp, err := c.client.Ready(ctx, &commonv1.Empty{})
69 if err == nil && resp.Ready {
70 close(c.ready)
71 log.I.F("gRPC distributed sync client connected and ready")
72 return
73 }
74 time.Sleep(100 * time.Millisecond)
75 }
76 }
77 }
78
79 // Close closes the gRPC connection.
80 func (c *Client) Close() error {
81 if c.conn != nil {
82 return c.conn.Close()
83 }
84 return nil
85 }
86
87 // Ready returns a channel that closes when the client is ready.
88 func (c *Client) Ready() <-chan struct{} {
89 return c.ready
90 }
91
92 // GetInfo returns current sync service information.
93 func (c *Client) GetInfo(ctx context.Context) (*commonv1.SyncInfo, error) {
94 return c.client.GetInfo(ctx, &commonv1.Empty{})
95 }
96
97 // GetCurrentSerial returns the current serial number.
98 func (c *Client) GetCurrentSerial(ctx context.Context) (uint64, error) {
99 resp, err := c.client.GetCurrentSerial(ctx, &distributedv1.CurrentRequest{})
100 if err != nil {
101 return 0, err
102 }
103 return resp.Serial, nil
104 }
105
106 // GetEventIDs returns event IDs for a serial range.
107 func (c *Client) GetEventIDs(ctx context.Context, from, to uint64) (map[string]uint64, error) {
108 resp, err := c.client.GetEventIDs(ctx, &distributedv1.EventIDsRequest{
109 From: from,
110 To: to,
111 })
112 if err != nil {
113 return nil, err
114 }
115 return resp.EventMap, nil
116 }
117
118 // HandleCurrentRequest proxies an HTTP current request.
119 func (c *Client) HandleCurrentRequest(ctx context.Context, method string, body []byte, headers map[string]string) (int, []byte, map[string]string, error) {
120 resp, err := c.client.HandleCurrentRequest(ctx, &commonv1.HTTPRequest{
121 Method: method,
122 Body: body,
123 Headers: headers,
124 })
125 if err != nil {
126 return 0, nil, nil, err
127 }
128 return int(resp.StatusCode), resp.Body, resp.Headers, nil
129 }
130
131 // HandleEventIDsRequest proxies an HTTP event-ids request.
132 func (c *Client) HandleEventIDsRequest(ctx context.Context, method string, body []byte, headers map[string]string) (int, []byte, map[string]string, error) {
133 resp, err := c.client.HandleEventIDsRequest(ctx, &commonv1.HTTPRequest{
134 Method: method,
135 Body: body,
136 Headers: headers,
137 })
138 if err != nil {
139 return 0, nil, nil, err
140 }
141 return int(resp.StatusCode), resp.Body, resp.Headers, nil
142 }
143
144 // GetPeers returns the current list of sync peers.
145 func (c *Client) GetPeers(ctx context.Context) ([]string, error) {
146 resp, err := c.client.GetPeers(ctx, &commonv1.Empty{})
147 if err != nil {
148 return nil, err
149 }
150 return resp.Peers, nil
151 }
152
153 // UpdatePeers updates the peer list.
154 func (c *Client) UpdatePeers(ctx context.Context, peers []string) error {
155 _, err := c.client.UpdatePeers(ctx, &distributedv1.UpdatePeersRequest{
156 Peers: peers,
157 })
158 return err
159 }
160
161 // IsAuthorizedPeer checks if a peer is authorized.
162 func (c *Client) IsAuthorizedPeer(ctx context.Context, peerURL, expectedPubkey string) (bool, error) {
163 resp, err := c.client.IsAuthorizedPeer(ctx, &distributedv1.AuthorizedPeerRequest{
164 PeerUrl: peerURL,
165 ExpectedPubkey: expectedPubkey,
166 })
167 if err != nil {
168 return false, err
169 }
170 return resp.Authorized, nil
171 }
172
173 // GetPeerPubkey fetches the pubkey for a peer relay.
174 func (c *Client) GetPeerPubkey(ctx context.Context, peerURL string) (string, error) {
175 resp, err := c.client.GetPeerPubkey(ctx, &distributedv1.PeerPubkeyRequest{
176 PeerUrl: peerURL,
177 })
178 if err != nil {
179 return "", err
180 }
181 if resp.Pubkey == "" {
182 return "", errors.New("peer pubkey not found")
183 }
184 return resp.Pubkey, nil
185 }
186
187 // UpdateSerial updates the current serial from database.
188 func (c *Client) UpdateSerial(ctx context.Context) error {
189 _, err := c.client.UpdateSerial(ctx, &commonv1.Empty{})
190 return err
191 }
192
193 // NotifyNewEvent notifies the service of a new event.
194 func (c *Client) NotifyNewEvent(ctx context.Context, eventID []byte, serial uint64) error {
195 _, err := c.client.NotifyNewEvent(ctx, &distributedv1.NewEventNotification{
196 EventId: eventID,
197 Serial: serial,
198 })
199 return err
200 }
201
202 // TriggerSync manually triggers a sync cycle.
203 func (c *Client) TriggerSync(ctx context.Context) error {
204 _, err := c.client.TriggerSync(ctx, &commonv1.Empty{})
205 return err
206 }
207
208 // GetSyncStatus returns current sync status.
209 func (c *Client) GetSyncStatus(ctx context.Context) (uint64, map[string]uint64, error) {
210 resp, err := c.client.GetSyncStatus(ctx, &commonv1.Empty{})
211 if err != nil {
212 return 0, nil, err
213 }
214 peerSerials := make(map[string]uint64)
215 for _, p := range resp.Peers {
216 peerSerials[p.Url] = p.LastSerial
217 }
218 return resp.CurrentSerial, peerSerials, nil
219 }
220