client.go raw

   1  // Package grpc provides a gRPC client for the distributed sync service.
   2  package grpc
   3  
   4  import (
   5  	"context"
   6  	"errors"
   7  	"time"
   8  
   9  	"google.golang.org/grpc"
  10  	"google.golang.org/grpc/credentials/insecure"
  11  	"next.orly.dev/pkg/lol/log"
  12  
  13  	commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
  14  	distributedv1 "next.orly.dev/pkg/proto/orlysync/distributed/v1"
  15  )
  16  
  17  // Client is a gRPC client for the distributed sync service.
  18  type Client struct {
  19  	conn   *grpc.ClientConn
  20  	client distributedv1.DistributedSyncServiceClient
  21  	ready  chan struct{}
  22  }
  23  
  24  // ClientConfig holds configuration for the gRPC client.
  25  type ClientConfig struct {
  26  	ServerAddress  string
  27  	ConnectTimeout time.Duration
  28  }
  29  
  30  // New creates a new gRPC distributed sync client.
  31  func New(ctx context.Context, cfg *ClientConfig) (*Client, error) {
  32  	timeout := cfg.ConnectTimeout
  33  	if timeout == 0 {
  34  		timeout = 10 * time.Second
  35  	}
  36  
  37  	dialCtx, cancel := context.WithTimeout(ctx, timeout)
  38  	defer cancel()
  39  
  40  	conn, err := grpc.DialContext(dialCtx, cfg.ServerAddress,
  41  		grpc.WithTransportCredentials(insecure.NewCredentials()),
  42  		grpc.WithDefaultCallOptions(
  43  			grpc.MaxCallRecvMsgSize(16<<20), // 16MB
  44  			grpc.MaxCallSendMsgSize(16<<20), // 16MB
  45  		),
  46  	)
  47  	if err != nil {
  48  		return nil, err
  49  	}
  50  
  51  	c := &Client{
  52  		conn:   conn,
  53  		client: distributedv1.NewDistributedSyncServiceClient(conn),
  54  		ready:  make(chan struct{}),
  55  	}
  56  
  57  	go c.waitForReady(ctx)
  58  
  59  	return c, nil
  60  }
  61  
  62  func (c *Client) waitForReady(ctx context.Context) {
  63  	for {
  64  		select {
  65  		case <-ctx.Done():
  66  			return
  67  		default:
  68  			resp, err := c.client.Ready(ctx, &commonv1.Empty{})
  69  			if err == nil && resp.Ready {
  70  				close(c.ready)
  71  				log.I.F("gRPC distributed sync client connected and ready")
  72  				return
  73  			}
  74  			time.Sleep(100 * time.Millisecond)
  75  		}
  76  	}
  77  }
  78  
  79  // Close closes the gRPC connection.
  80  func (c *Client) Close() error {
  81  	if c.conn != nil {
  82  		return c.conn.Close()
  83  	}
  84  	return nil
  85  }
  86  
  87  // Ready returns a channel that closes when the client is ready.
  88  func (c *Client) Ready() <-chan struct{} {
  89  	return c.ready
  90  }
  91  
  92  // GetInfo returns current sync service information.
  93  func (c *Client) GetInfo(ctx context.Context) (*commonv1.SyncInfo, error) {
  94  	return c.client.GetInfo(ctx, &commonv1.Empty{})
  95  }
  96  
  97  // GetCurrentSerial returns the current serial number.
  98  func (c *Client) GetCurrentSerial(ctx context.Context) (uint64, error) {
  99  	resp, err := c.client.GetCurrentSerial(ctx, &distributedv1.CurrentRequest{})
 100  	if err != nil {
 101  		return 0, err
 102  	}
 103  	return resp.Serial, nil
 104  }
 105  
 106  // GetEventIDs returns event IDs for a serial range.
 107  func (c *Client) GetEventIDs(ctx context.Context, from, to uint64) (map[string]uint64, error) {
 108  	resp, err := c.client.GetEventIDs(ctx, &distributedv1.EventIDsRequest{
 109  		From: from,
 110  		To:   to,
 111  	})
 112  	if err != nil {
 113  		return nil, err
 114  	}
 115  	return resp.EventMap, nil
 116  }
 117  
 118  // HandleCurrentRequest proxies an HTTP current request.
 119  func (c *Client) HandleCurrentRequest(ctx context.Context, method string, body []byte, headers map[string]string) (int, []byte, map[string]string, error) {
 120  	resp, err := c.client.HandleCurrentRequest(ctx, &commonv1.HTTPRequest{
 121  		Method:  method,
 122  		Body:    body,
 123  		Headers: headers,
 124  	})
 125  	if err != nil {
 126  		return 0, nil, nil, err
 127  	}
 128  	return int(resp.StatusCode), resp.Body, resp.Headers, nil
 129  }
 130  
 131  // HandleEventIDsRequest proxies an HTTP event-ids request.
 132  func (c *Client) HandleEventIDsRequest(ctx context.Context, method string, body []byte, headers map[string]string) (int, []byte, map[string]string, error) {
 133  	resp, err := c.client.HandleEventIDsRequest(ctx, &commonv1.HTTPRequest{
 134  		Method:  method,
 135  		Body:    body,
 136  		Headers: headers,
 137  	})
 138  	if err != nil {
 139  		return 0, nil, nil, err
 140  	}
 141  	return int(resp.StatusCode), resp.Body, resp.Headers, nil
 142  }
 143  
 144  // GetPeers returns the current list of sync peers.
 145  func (c *Client) GetPeers(ctx context.Context) ([]string, error) {
 146  	resp, err := c.client.GetPeers(ctx, &commonv1.Empty{})
 147  	if err != nil {
 148  		return nil, err
 149  	}
 150  	return resp.Peers, nil
 151  }
 152  
 153  // UpdatePeers updates the peer list.
 154  func (c *Client) UpdatePeers(ctx context.Context, peers []string) error {
 155  	_, err := c.client.UpdatePeers(ctx, &distributedv1.UpdatePeersRequest{
 156  		Peers: peers,
 157  	})
 158  	return err
 159  }
 160  
 161  // IsAuthorizedPeer checks if a peer is authorized.
 162  func (c *Client) IsAuthorizedPeer(ctx context.Context, peerURL, expectedPubkey string) (bool, error) {
 163  	resp, err := c.client.IsAuthorizedPeer(ctx, &distributedv1.AuthorizedPeerRequest{
 164  		PeerUrl:        peerURL,
 165  		ExpectedPubkey: expectedPubkey,
 166  	})
 167  	if err != nil {
 168  		return false, err
 169  	}
 170  	return resp.Authorized, nil
 171  }
 172  
 173  // GetPeerPubkey fetches the pubkey for a peer relay.
 174  func (c *Client) GetPeerPubkey(ctx context.Context, peerURL string) (string, error) {
 175  	resp, err := c.client.GetPeerPubkey(ctx, &distributedv1.PeerPubkeyRequest{
 176  		PeerUrl: peerURL,
 177  	})
 178  	if err != nil {
 179  		return "", err
 180  	}
 181  	if resp.Pubkey == "" {
 182  		return "", errors.New("peer pubkey not found")
 183  	}
 184  	return resp.Pubkey, nil
 185  }
 186  
 187  // UpdateSerial updates the current serial from database.
 188  func (c *Client) UpdateSerial(ctx context.Context) error {
 189  	_, err := c.client.UpdateSerial(ctx, &commonv1.Empty{})
 190  	return err
 191  }
 192  
 193  // NotifyNewEvent notifies the service of a new event.
 194  func (c *Client) NotifyNewEvent(ctx context.Context, eventID []byte, serial uint64) error {
 195  	_, err := c.client.NotifyNewEvent(ctx, &distributedv1.NewEventNotification{
 196  		EventId: eventID,
 197  		Serial:  serial,
 198  	})
 199  	return err
 200  }
 201  
 202  // TriggerSync manually triggers a sync cycle.
 203  func (c *Client) TriggerSync(ctx context.Context) error {
 204  	_, err := c.client.TriggerSync(ctx, &commonv1.Empty{})
 205  	return err
 206  }
 207  
 208  // GetSyncStatus returns current sync status.
 209  func (c *Client) GetSyncStatus(ctx context.Context) (uint64, map[string]uint64, error) {
 210  	resp, err := c.client.GetSyncStatus(ctx, &commonv1.Empty{})
 211  	if err != nil {
 212  		return 0, nil, err
 213  	}
 214  	peerSerials := make(map[string]uint64)
 215  	for _, p := range resp.Peers {
 216  		peerSerials[p.Url] = p.LastSerial
 217  	}
 218  	return resp.CurrentSerial, peerSerials, nil
 219  }
 220