client.go raw

   1  // Package grpc provides a gRPC client for the cluster sync service.
   2  package grpc
   3  
   4  import (
   5  	"context"
   6  	"time"
   7  
   8  	"google.golang.org/grpc"
   9  	"google.golang.org/grpc/credentials/insecure"
  10  	"next.orly.dev/pkg/lol/log"
  11  
  12  	commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
  13  	clusterv1 "next.orly.dev/pkg/proto/orlysync/cluster/v1"
  14  )
  15  
  16  // Client is a gRPC client for the cluster sync service.
  17  type Client struct {
  18  	conn   *grpc.ClientConn
  19  	client clusterv1.ClusterSyncServiceClient
  20  	ready  chan struct{}
  21  }
  22  
  23  // ClientConfig holds configuration for the gRPC client.
  24  type ClientConfig struct {
  25  	ServerAddress  string
  26  	ConnectTimeout time.Duration
  27  }
  28  
  29  // New creates a new gRPC cluster sync client.
  30  func New(ctx context.Context, cfg *ClientConfig) (*Client, error) {
  31  	timeout := cfg.ConnectTimeout
  32  	if timeout == 0 {
  33  		timeout = 10 * time.Second
  34  	}
  35  
  36  	dialCtx, cancel := context.WithTimeout(ctx, timeout)
  37  	defer cancel()
  38  
  39  	conn, err := grpc.DialContext(dialCtx, cfg.ServerAddress,
  40  		grpc.WithTransportCredentials(insecure.NewCredentials()),
  41  		grpc.WithDefaultCallOptions(
  42  			grpc.MaxCallRecvMsgSize(16<<20), // 16MB
  43  			grpc.MaxCallSendMsgSize(16<<20), // 16MB
  44  		),
  45  	)
  46  	if err != nil {
  47  		return nil, err
  48  	}
  49  
  50  	c := &Client{
  51  		conn:   conn,
  52  		client: clusterv1.NewClusterSyncServiceClient(conn),
  53  		ready:  make(chan struct{}),
  54  	}
  55  
  56  	go c.waitForReady(ctx)
  57  
  58  	return c, nil
  59  }
  60  
  61  func (c *Client) waitForReady(ctx context.Context) {
  62  	for {
  63  		select {
  64  		case <-ctx.Done():
  65  			return
  66  		default:
  67  			resp, err := c.client.Ready(ctx, &commonv1.Empty{})
  68  			if err == nil && resp.Ready {
  69  				close(c.ready)
  70  				log.I.F("gRPC cluster sync client connected and ready")
  71  				return
  72  			}
  73  			time.Sleep(100 * time.Millisecond)
  74  		}
  75  	}
  76  }
  77  
  78  // Close closes the gRPC connection.
  79  func (c *Client) Close() error {
  80  	if c.conn != nil {
  81  		return c.conn.Close()
  82  	}
  83  	return nil
  84  }
  85  
  86  // Ready returns a channel that closes when the client is ready.
  87  func (c *Client) Ready() <-chan struct{} {
  88  	return c.ready
  89  }
  90  
  91  // Start starts the cluster polling loop.
  92  func (c *Client) Start(ctx context.Context) error {
  93  	_, err := c.client.Start(ctx, &commonv1.Empty{})
  94  	return err
  95  }
  96  
  97  // Stop stops the cluster polling loop.
  98  func (c *Client) Stop(ctx context.Context) error {
  99  	_, err := c.client.Stop(ctx, &commonv1.Empty{})
 100  	return err
 101  }
 102  
 103  // HandleLatestSerial proxies an HTTP latest serial request.
 104  func (c *Client) HandleLatestSerial(ctx context.Context, method string, body []byte, headers map[string]string) (int, []byte, map[string]string, error) {
 105  	resp, err := c.client.HandleLatestSerial(ctx, &commonv1.HTTPRequest{
 106  		Method:  method,
 107  		Body:    body,
 108  		Headers: headers,
 109  	})
 110  	if err != nil {
 111  		return 0, nil, nil, err
 112  	}
 113  	return int(resp.StatusCode), resp.Body, resp.Headers, nil
 114  }
 115  
 116  // HandleEventsRange proxies an HTTP events range request.
 117  func (c *Client) HandleEventsRange(ctx context.Context, method string, body []byte, headers map[string]string, queryString string) (int, []byte, map[string]string, error) {
 118  	resp, err := c.client.HandleEventsRange(ctx, &commonv1.HTTPRequest{
 119  		Method:      method,
 120  		Body:        body,
 121  		Headers:     headers,
 122  		QueryString: queryString,
 123  	})
 124  	if err != nil {
 125  		return 0, nil, nil, err
 126  	}
 127  	return int(resp.StatusCode), resp.Body, resp.Headers, nil
 128  }
 129  
 130  // GetMembers returns the current cluster members.
 131  func (c *Client) GetMembers(ctx context.Context) ([]*clusterv1.ClusterMember, error) {
 132  	resp, err := c.client.GetMembers(ctx, &commonv1.Empty{})
 133  	if err != nil {
 134  		return nil, err
 135  	}
 136  	return resp.Members, nil
 137  }
 138  
 139  // UpdateMembership updates cluster membership.
 140  func (c *Client) UpdateMembership(ctx context.Context, relayURLs []string) error {
 141  	_, err := c.client.UpdateMembership(ctx, &clusterv1.UpdateMembershipRequest{
 142  		RelayUrls: relayURLs,
 143  	})
 144  	return err
 145  }
 146  
 147  // HandleMembershipEvent processes a cluster membership event.
 148  func (c *Client) HandleMembershipEvent(ctx context.Context, event *commonv1.Event) error {
 149  	_, err := c.client.HandleMembershipEvent(ctx, &clusterv1.MembershipEventRequest{
 150  		Event: event,
 151  	})
 152  	return err
 153  }
 154  
 155  // GetClusterStatus returns overall cluster status.
 156  func (c *Client) GetClusterStatus(ctx context.Context) (*clusterv1.ClusterStatusResponse, error) {
 157  	return c.client.GetClusterStatus(ctx, &commonv1.Empty{})
 158  }
 159  
 160  // GetMemberStatus returns status for a specific member.
 161  func (c *Client) GetMemberStatus(ctx context.Context, httpURL string) (*clusterv1.MemberStatusResponse, error) {
 162  	return c.client.GetMemberStatus(ctx, &clusterv1.MemberStatusRequest{
 163  		HttpUrl: httpURL,
 164  	})
 165  }
 166  
 167  // GetLatestSerial returns the latest serial from this relay's database.
 168  func (c *Client) GetLatestSerial(ctx context.Context) (uint64, int64, error) {
 169  	resp, err := c.client.GetLatestSerial(ctx, &commonv1.Empty{})
 170  	if err != nil {
 171  		return 0, 0, err
 172  	}
 173  	return resp.Serial, resp.Timestamp, nil
 174  }
 175  
 176  // GetEventsInRange returns event info for a serial range.
 177  func (c *Client) GetEventsInRange(ctx context.Context, from, to uint64, limit int32) ([]*clusterv1.EventInfo, bool, uint64, error) {
 178  	resp, err := c.client.GetEventsInRange(ctx, &clusterv1.EventsRangeRequest{
 179  		From:  from,
 180  		To:    to,
 181  		Limit: limit,
 182  	})
 183  	if err != nil {
 184  		return nil, false, 0, err
 185  	}
 186  	return resp.Events, resp.HasMore, resp.NextFrom, nil
 187  }
 188