client.go raw

   1  // Package grpc provides a gRPC client for the negentropy sync service.
   2  package grpc
   3  
   4  import (
   5  	"context"
   6  	"io"
   7  	"time"
   8  
   9  	"google.golang.org/grpc"
  10  	"google.golang.org/grpc/credentials/insecure"
  11  	"next.orly.dev/pkg/lol/log"
  12  
  13  	negentropyiface "next.orly.dev/pkg/interfaces/negentropy"
  14  	commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
  15  	negentropyv1 "next.orly.dev/pkg/proto/orlysync/negentropy/v1"
  16  )
  17  
  18  // Verify Client implements the Handler interface
  19  var _ negentropyiface.Handler = (*Client)(nil)
  20  
  21  // Client is a gRPC client for the negentropy sync service.
  22  type Client struct {
  23  	conn   *grpc.ClientConn
  24  	client negentropyv1.NegentropyServiceClient
  25  	ready  chan struct{}
  26  }
  27  
  28  // ClientConfig holds configuration for the gRPC client.
  29  type ClientConfig struct {
  30  	ServerAddress  string
  31  	ConnectTimeout time.Duration
  32  }
  33  
  34  // New creates a new gRPC negentropy sync client.
  35  func New(ctx context.Context, cfg *ClientConfig) (*Client, error) {
  36  	timeout := cfg.ConnectTimeout
  37  	if timeout == 0 {
  38  		timeout = 10 * time.Second
  39  	}
  40  
  41  	dialCtx, cancel := context.WithTimeout(ctx, timeout)
  42  	defer cancel()
  43  
  44  	conn, err := grpc.DialContext(dialCtx, cfg.ServerAddress,
  45  		grpc.WithTransportCredentials(insecure.NewCredentials()),
  46  		grpc.WithDefaultCallOptions(
  47  			grpc.MaxCallRecvMsgSize(16<<20), // 16MB
  48  			grpc.MaxCallSendMsgSize(16<<20), // 16MB
  49  		),
  50  	)
  51  	if err != nil {
  52  		return nil, err
  53  	}
  54  
  55  	c := &Client{
  56  		conn:   conn,
  57  		client: negentropyv1.NewNegentropyServiceClient(conn),
  58  		ready:  make(chan struct{}),
  59  	}
  60  
  61  	go c.waitForReady(ctx)
  62  
  63  	return c, nil
  64  }
  65  
  66  func (c *Client) waitForReady(ctx context.Context) {
  67  	for {
  68  		select {
  69  		case <-ctx.Done():
  70  			return
  71  		default:
  72  			resp, err := c.client.Ready(ctx, &commonv1.Empty{})
  73  			if err == nil && resp.Ready {
  74  				close(c.ready)
  75  				log.I.F("gRPC negentropy sync client connected and ready")
  76  				return
  77  			}
  78  			time.Sleep(100 * time.Millisecond)
  79  		}
  80  	}
  81  }
  82  
  83  // Close closes the gRPC connection.
  84  func (c *Client) Close() error {
  85  	if c.conn != nil {
  86  		return c.conn.Close()
  87  	}
  88  	return nil
  89  }
  90  
  91  // Ready returns a channel that closes when the client is ready.
  92  func (c *Client) Ready() <-chan struct{} {
  93  	return c.ready
  94  }
  95  
  96  // Start starts the background relay-to-relay sync.
  97  func (c *Client) Start(ctx context.Context) error {
  98  	_, err := c.client.Start(ctx, &commonv1.Empty{})
  99  	return err
 100  }
 101  
 102  // Stop stops the background sync.
 103  func (c *Client) Stop(ctx context.Context) error {
 104  	_, err := c.client.Stop(ctx, &commonv1.Empty{})
 105  	return err
 106  }
 107  
 108  // HandleNegOpen processes a NEG-OPEN message from a client.
 109  // Returns: message, haveIDs, needIDs, complete, errorStr, error
 110  func (c *Client) HandleNegOpen(ctx context.Context, connectionID, subscriptionID string, filter *commonv1.Filter, initialMessage []byte) ([]byte, [][]byte, [][]byte, bool, string, error) {
 111  	resp, err := c.client.HandleNegOpen(ctx, &negentropyv1.NegOpenRequest{
 112  		ConnectionId:   connectionID,
 113  		SubscriptionId: subscriptionID,
 114  		Filter:         filter,
 115  		InitialMessage: initialMessage,
 116  	})
 117  	if err != nil {
 118  		return nil, nil, nil, false, "", err
 119  	}
 120  	return resp.Message, resp.HaveIds, resp.NeedIds, resp.Complete, resp.Error, nil
 121  }
 122  
 123  // HandleNegMsg processes a NEG-MSG message from a client.
 124  func (c *Client) HandleNegMsg(ctx context.Context, connectionID, subscriptionID string, message []byte) ([]byte, [][]byte, [][]byte, bool, string, error) {
 125  	resp, err := c.client.HandleNegMsg(ctx, &negentropyv1.NegMsgRequest{
 126  		ConnectionId:   connectionID,
 127  		SubscriptionId: subscriptionID,
 128  		Message:        message,
 129  	})
 130  	if err != nil {
 131  		return nil, nil, nil, false, "", err
 132  	}
 133  	return resp.Message, resp.HaveIds, resp.NeedIds, resp.Complete, resp.Error, nil
 134  }
 135  
 136  // HandleNegClose processes a NEG-CLOSE message from a client.
 137  func (c *Client) HandleNegClose(ctx context.Context, connectionID, subscriptionID string) error {
 138  	_, err := c.client.HandleNegClose(ctx, &negentropyv1.NegCloseRequest{
 139  		ConnectionId:   connectionID,
 140  		SubscriptionId: subscriptionID,
 141  	})
 142  	return err
 143  }
 144  
 145  // SyncProgress represents progress during a peer sync operation.
 146  type SyncProgress struct {
 147  	PeerURL      string
 148  	Round        int32
 149  	HaveCount    int64
 150  	NeedCount    int64
 151  	FetchedCount int64
 152  	SentCount    int64
 153  	Complete     bool
 154  	Error        string
 155  }
 156  
 157  // SyncWithPeer initiates negentropy sync with a specific peer relay.
 158  // Returns a channel that receives progress updates.
 159  func (c *Client) SyncWithPeer(ctx context.Context, peerURL string, filter *commonv1.Filter, since int64) (<-chan *SyncProgress, error) {
 160  	stream, err := c.client.SyncWithPeer(ctx, &negentropyv1.SyncPeerRequest{
 161  		PeerUrl: peerURL,
 162  		Filter:  filter,
 163  		Since:   since,
 164  	})
 165  	if err != nil {
 166  		return nil, err
 167  	}
 168  
 169  	ch := make(chan *SyncProgress, 10)
 170  	go func() {
 171  		defer close(ch)
 172  		for {
 173  			progress, err := stream.Recv()
 174  			if err == io.EOF {
 175  				return
 176  			}
 177  			if err != nil {
 178  				ch <- &SyncProgress{
 179  					PeerURL: peerURL,
 180  					Error:   err.Error(),
 181  				}
 182  				return
 183  			}
 184  			ch <- &SyncProgress{
 185  				PeerURL:      progress.PeerUrl,
 186  				Round:        progress.Round,
 187  				HaveCount:    progress.HaveCount,
 188  				NeedCount:    progress.NeedCount,
 189  				FetchedCount: progress.FetchedCount,
 190  				SentCount:    progress.SentCount,
 191  				Complete:     progress.Complete,
 192  				Error:        progress.Error,
 193  			}
 194  		}
 195  	}()
 196  
 197  	return ch, nil
 198  }
 199  
 200  // SyncStatus represents the sync status response.
 201  type SyncStatus struct {
 202  	Active     bool
 203  	LastSync   int64
 204  	PeerCount  int32
 205  	PeerStates []*PeerSyncState
 206  }
 207  
 208  // PeerSyncState represents sync state for a peer.
 209  type PeerSyncState struct {
 210  	PeerURL             string
 211  	LastSync            int64
 212  	EventsSynced        int64
 213  	Status              string
 214  	LastError           string
 215  	ConsecutiveFailures int32
 216  }
 217  
 218  // GetSyncStatus returns the current sync status.
 219  func (c *Client) GetSyncStatus(ctx context.Context) (*SyncStatus, error) {
 220  	resp, err := c.client.GetSyncStatus(ctx, &commonv1.Empty{})
 221  	if err != nil {
 222  		return nil, err
 223  	}
 224  
 225  	states := make([]*PeerSyncState, 0, len(resp.PeerStates))
 226  	for _, ps := range resp.PeerStates {
 227  		states = append(states, &PeerSyncState{
 228  			PeerURL:             ps.PeerUrl,
 229  			LastSync:            ps.LastSync,
 230  			EventsSynced:        ps.EventsSynced,
 231  			Status:              ps.Status,
 232  			LastError:           ps.LastError,
 233  			ConsecutiveFailures: ps.ConsecutiveFailures,
 234  		})
 235  	}
 236  
 237  	return &SyncStatus{
 238  		Active:     resp.Active,
 239  		LastSync:   resp.LastSync,
 240  		PeerCount:  resp.PeerCount,
 241  		PeerStates: states,
 242  	}, nil
 243  }
 244  
 245  // GetPeers returns the list of negentropy sync peers.
 246  func (c *Client) GetPeers(ctx context.Context) ([]string, error) {
 247  	resp, err := c.client.GetPeers(ctx, &commonv1.Empty{})
 248  	if err != nil {
 249  		return nil, err
 250  	}
 251  	return resp.Peers, nil
 252  }
 253  
 254  // AddPeer adds a peer for negentropy sync.
 255  func (c *Client) AddPeer(ctx context.Context, peerURL string) error {
 256  	_, err := c.client.AddPeer(ctx, &negentropyv1.AddPeerRequest{
 257  		PeerUrl: peerURL,
 258  	})
 259  	return err
 260  }
 261  
 262  // RemovePeer removes a peer from negentropy sync.
 263  func (c *Client) RemovePeer(ctx context.Context, peerURL string) error {
 264  	_, err := c.client.RemovePeer(ctx, &negentropyv1.RemovePeerRequest{
 265  		PeerUrl: peerURL,
 266  	})
 267  	return err
 268  }
 269  
 270  // TriggerSync manually triggers sync with a specific peer or all peers.
 271  func (c *Client) TriggerSync(ctx context.Context, peerURL string, filter *commonv1.Filter) error {
 272  	_, err := c.client.TriggerSync(ctx, &negentropyv1.TriggerSyncRequest{
 273  		PeerUrl: peerURL,
 274  		Filter:  filter,
 275  	})
 276  	return err
 277  }
 278  
 279  // GetPeerSyncState returns sync state for a specific peer.
 280  func (c *Client) GetPeerSyncState(ctx context.Context, peerURL string) (*PeerSyncState, bool, error) {
 281  	resp, err := c.client.GetPeerSyncState(ctx, &negentropyv1.PeerSyncStateRequest{
 282  		PeerUrl: peerURL,
 283  	})
 284  	if err != nil {
 285  		return nil, false, err
 286  	}
 287  	if !resp.Found {
 288  		return nil, false, nil
 289  	}
 290  	return &PeerSyncState{
 291  		PeerURL:             resp.State.PeerUrl,
 292  		LastSync:            resp.State.LastSync,
 293  		EventsSynced:        resp.State.EventsSynced,
 294  		Status:              resp.State.Status,
 295  		LastError:           resp.State.LastError,
 296  		ConsecutiveFailures: resp.State.ConsecutiveFailures,
 297  	}, true, nil
 298  }
 299  
 300  // ListSessions returns active client negentropy sessions.
 301  func (c *Client) ListSessions(ctx context.Context) ([]*negentropyiface.ClientSession, error) {
 302  	resp, err := c.client.ListSessions(ctx, &commonv1.Empty{})
 303  	if err != nil {
 304  		return nil, err
 305  	}
 306  
 307  	sessions := make([]*negentropyiface.ClientSession, 0, len(resp.Sessions))
 308  	for _, s := range resp.Sessions {
 309  		sessions = append(sessions, &negentropyiface.ClientSession{
 310  			SubscriptionID: s.SubscriptionId,
 311  			ConnectionID:   s.ConnectionId,
 312  			CreatedAt:      s.CreatedAt,
 313  			LastActivity:   s.LastActivity,
 314  			RoundCount:     s.RoundCount,
 315  		})
 316  	}
 317  	return sessions, nil
 318  }
 319  
 320  // CloseSession forcefully closes a client session.
 321  func (c *Client) CloseSession(ctx context.Context, connectionID, subscriptionID string) error {
 322  	_, err := c.client.CloseSession(ctx, &negentropyv1.CloseSessionRequest{
 323  		ConnectionId:   connectionID,
 324  		SubscriptionId: subscriptionID,
 325  	})
 326  	return err
 327  }
 328