client.go raw
1 // Package grpc provides a gRPC client for the cluster sync service.
2 package grpc
3
4 import (
5 "context"
6 "time"
7
8 "google.golang.org/grpc"
9 "google.golang.org/grpc/credentials/insecure"
10 "next.orly.dev/pkg/lol/log"
11
12 commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
13 clusterv1 "next.orly.dev/pkg/proto/orlysync/cluster/v1"
14 )
15
16 // Client is a gRPC client for the cluster sync service.
17 type Client struct {
18 conn *grpc.ClientConn
19 client clusterv1.ClusterSyncServiceClient
20 ready chan struct{}
21 }
22
23 // ClientConfig holds configuration for the gRPC client.
24 type ClientConfig struct {
25 ServerAddress string
26 ConnectTimeout time.Duration
27 }
28
29 // New creates a new gRPC cluster sync client.
30 func New(ctx context.Context, cfg *ClientConfig) (*Client, error) {
31 timeout := cfg.ConnectTimeout
32 if timeout == 0 {
33 timeout = 10 * time.Second
34 }
35
36 dialCtx, cancel := context.WithTimeout(ctx, timeout)
37 defer cancel()
38
39 conn, err := grpc.DialContext(dialCtx, cfg.ServerAddress,
40 grpc.WithTransportCredentials(insecure.NewCredentials()),
41 grpc.WithDefaultCallOptions(
42 grpc.MaxCallRecvMsgSize(16<<20), // 16MB
43 grpc.MaxCallSendMsgSize(16<<20), // 16MB
44 ),
45 )
46 if err != nil {
47 return nil, err
48 }
49
50 c := &Client{
51 conn: conn,
52 client: clusterv1.NewClusterSyncServiceClient(conn),
53 ready: make(chan struct{}),
54 }
55
56 go c.waitForReady(ctx)
57
58 return c, nil
59 }
60
61 func (c *Client) waitForReady(ctx context.Context) {
62 for {
63 select {
64 case <-ctx.Done():
65 return
66 default:
67 resp, err := c.client.Ready(ctx, &commonv1.Empty{})
68 if err == nil && resp.Ready {
69 close(c.ready)
70 log.I.F("gRPC cluster sync client connected and ready")
71 return
72 }
73 time.Sleep(100 * time.Millisecond)
74 }
75 }
76 }
77
78 // Close closes the gRPC connection.
79 func (c *Client) Close() error {
80 if c.conn != nil {
81 return c.conn.Close()
82 }
83 return nil
84 }
85
86 // Ready returns a channel that closes when the client is ready.
87 func (c *Client) Ready() <-chan struct{} {
88 return c.ready
89 }
90
91 // Start starts the cluster polling loop.
92 func (c *Client) Start(ctx context.Context) error {
93 _, err := c.client.Start(ctx, &commonv1.Empty{})
94 return err
95 }
96
97 // Stop stops the cluster polling loop.
98 func (c *Client) Stop(ctx context.Context) error {
99 _, err := c.client.Stop(ctx, &commonv1.Empty{})
100 return err
101 }
102
103 // HandleLatestSerial proxies an HTTP latest serial request.
104 func (c *Client) HandleLatestSerial(ctx context.Context, method string, body []byte, headers map[string]string) (int, []byte, map[string]string, error) {
105 resp, err := c.client.HandleLatestSerial(ctx, &commonv1.HTTPRequest{
106 Method: method,
107 Body: body,
108 Headers: headers,
109 })
110 if err != nil {
111 return 0, nil, nil, err
112 }
113 return int(resp.StatusCode), resp.Body, resp.Headers, nil
114 }
115
116 // HandleEventsRange proxies an HTTP events range request.
117 func (c *Client) HandleEventsRange(ctx context.Context, method string, body []byte, headers map[string]string, queryString string) (int, []byte, map[string]string, error) {
118 resp, err := c.client.HandleEventsRange(ctx, &commonv1.HTTPRequest{
119 Method: method,
120 Body: body,
121 Headers: headers,
122 QueryString: queryString,
123 })
124 if err != nil {
125 return 0, nil, nil, err
126 }
127 return int(resp.StatusCode), resp.Body, resp.Headers, nil
128 }
129
130 // GetMembers returns the current cluster members.
131 func (c *Client) GetMembers(ctx context.Context) ([]*clusterv1.ClusterMember, error) {
132 resp, err := c.client.GetMembers(ctx, &commonv1.Empty{})
133 if err != nil {
134 return nil, err
135 }
136 return resp.Members, nil
137 }
138
139 // UpdateMembership updates cluster membership.
140 func (c *Client) UpdateMembership(ctx context.Context, relayURLs []string) error {
141 _, err := c.client.UpdateMembership(ctx, &clusterv1.UpdateMembershipRequest{
142 RelayUrls: relayURLs,
143 })
144 return err
145 }
146
147 // HandleMembershipEvent processes a cluster membership event.
148 func (c *Client) HandleMembershipEvent(ctx context.Context, event *commonv1.Event) error {
149 _, err := c.client.HandleMembershipEvent(ctx, &clusterv1.MembershipEventRequest{
150 Event: event,
151 })
152 return err
153 }
154
155 // GetClusterStatus returns overall cluster status.
156 func (c *Client) GetClusterStatus(ctx context.Context) (*clusterv1.ClusterStatusResponse, error) {
157 return c.client.GetClusterStatus(ctx, &commonv1.Empty{})
158 }
159
160 // GetMemberStatus returns status for a specific member.
161 func (c *Client) GetMemberStatus(ctx context.Context, httpURL string) (*clusterv1.MemberStatusResponse, error) {
162 return c.client.GetMemberStatus(ctx, &clusterv1.MemberStatusRequest{
163 HttpUrl: httpURL,
164 })
165 }
166
167 // GetLatestSerial returns the latest serial from this relay's database.
168 func (c *Client) GetLatestSerial(ctx context.Context) (uint64, int64, error) {
169 resp, err := c.client.GetLatestSerial(ctx, &commonv1.Empty{})
170 if err != nil {
171 return 0, 0, err
172 }
173 return resp.Serial, resp.Timestamp, nil
174 }
175
176 // GetEventsInRange returns event info for a serial range.
177 func (c *Client) GetEventsInRange(ctx context.Context, from, to uint64, limit int32) ([]*clusterv1.EventInfo, bool, uint64, error) {
178 resp, err := c.client.GetEventsInRange(ctx, &clusterv1.EventsRangeRequest{
179 From: from,
180 To: to,
181 Limit: limit,
182 })
183 if err != nil {
184 return nil, false, 0, err
185 }
186 return resp.Events, resp.HasMore, resp.NextFrom, nil
187 }
188