service.go raw

   1  // Package server provides the gRPC server implementation for negentropy sync.
   2  package server
   3  
   4  import (
   5  	"context"
   6  	"encoding/hex"
   7  	"fmt"
   8  
   9  	"google.golang.org/grpc"
  10  	"next.orly.dev/pkg/lol/log"
  11  
  12  	"next.orly.dev/pkg/nostr/encoders/filter"
  13  	negentropylib "next.orly.dev/pkg/nostr/negentropy"
  14  	"next.orly.dev/pkg/database"
  15  	"next.orly.dev/pkg/sync/negentropy"
  16  	commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
  17  	negentropyv1 "next.orly.dev/pkg/proto/orlysync/negentropy/v1"
  18  )
  19  
  20  // Service implements the NegentropyServiceServer interface.
  21  type Service struct {
  22  	negentropyv1.UnimplementedNegentropyServiceServer
  23  	mgr   *negentropy.Manager
  24  	db    database.Database
  25  	ready bool
  26  }
  27  
  28  // NewService creates a new negentropy gRPC service.
  29  func NewService(db database.Database, mgr *negentropy.Manager) *Service {
  30  	return &Service{
  31  		mgr:   mgr,
  32  		db:    db,
  33  		ready: true,
  34  	}
  35  }
  36  
  37  // Ready returns whether the service is ready to serve requests.
  38  func (s *Service) Ready(ctx context.Context, _ *commonv1.Empty) (*commonv1.ReadyResponse, error) {
  39  	return &commonv1.ReadyResponse{Ready: s.ready}, nil
  40  }
  41  
  42  // Start starts the background relay-to-relay sync.
  43  func (s *Service) Start(ctx context.Context, _ *commonv1.Empty) (*commonv1.Empty, error) {
  44  	s.mgr.Start()
  45  	return &commonv1.Empty{}, nil
  46  }
  47  
  48  // Stop stops the background sync.
  49  func (s *Service) Stop(ctx context.Context, _ *commonv1.Empty) (*commonv1.Empty, error) {
  50  	s.mgr.Stop()
  51  	return &commonv1.Empty{}, nil
  52  }
  53  
  54  // HandleNegOpen processes a NEG-OPEN message from a client.
  55  func (s *Service) HandleNegOpen(ctx context.Context, req *negentropyv1.NegOpenRequest) (*negentropyv1.NegOpenResponse, error) {
  56  	// Open a session for this client
  57  	session := s.mgr.OpenSession(req.ConnectionId, req.SubscriptionId)
  58  
  59  	// Build storage from local events matching the filter
  60  	storage, err := s.buildStorageForFilter(ctx, req.Filter)
  61  	if err != nil {
  62  		log.E.F("NEG-OPEN: failed to build storage: %v", err)
  63  		return &negentropyv1.NegOpenResponse{
  64  			Message: nil,
  65  			Error:   fmt.Sprintf("failed to build storage: %v", err),
  66  		}, nil
  67  	}
  68  
  69  	log.D.F("NEG-OPEN: built storage with %d events", storage.Size())
  70  
  71  	// Create negentropy instance for this session
  72  	neg := negentropylib.New(storage, negentropylib.DefaultFrameSizeLimit)
  73  
  74  	// Store in session for later use
  75  	session.SetNegentropy(neg, storage)
  76  
  77  	// The client (initiator) MUST provide an initial message in NEG-OPEN.
  78  	// Per NIP-77: The relay acts as responder and calls reconcile() with
  79  	// the client's initial message.
  80  	if len(req.InitialMessage) == 0 {
  81  		log.E.F("NEG-OPEN: missing initial message from client")
  82  		return &negentropyv1.NegOpenResponse{
  83  			Message: nil,
  84  			Error:   "blocked: NEG-OPEN requires initialMessage from initiator",
  85  		}, nil
  86  	}
  87  
  88  	// Process the client's initial message
  89  	respMsg, complete, err := neg.Reconcile(req.InitialMessage)
  90  	if err != nil {
  91  		log.E.F("NEG-OPEN: reconcile failed: %v", err)
  92  		return &negentropyv1.NegOpenResponse{
  93  			Message: nil,
  94  			Error:   fmt.Sprintf("reconcile failed: %v", err),
  95  		}, nil
  96  	}
  97  	log.D.F("NEG-OPEN: reconcile complete=%v, response len=%d", complete, len(respMsg))
  98  
  99  	// Collect IDs we have that client needs (to send as events)
 100  	haveIDs := neg.CollectHaves()
 101  	var haveIDBytes [][]byte
 102  	for _, id := range haveIDs {
 103  		// ID is a hex string, decode to binary
 104  		if decoded, err := hex.DecodeString(id); err == nil {
 105  			haveIDBytes = append(haveIDBytes, decoded)
 106  		}
 107  	}
 108  
 109  	// Collect IDs we need from client
 110  	needIDs := neg.CollectHaveNots()
 111  	var needIDBytes [][]byte
 112  	for _, id := range needIDs {
 113  		// ID is a hex string, decode to binary
 114  		if decoded, err := hex.DecodeString(id); err == nil {
 115  			needIDBytes = append(needIDBytes, decoded)
 116  		}
 117  	}
 118  
 119  	log.D.F("NEG-OPEN: complete=%v, haves=%d, needs=%d, response len=%d",
 120  		complete, len(haveIDs), len(needIDs), len(respMsg))
 121  
 122  	return &negentropyv1.NegOpenResponse{
 123  		Message:  respMsg,
 124  		HaveIds:  haveIDBytes,
 125  		NeedIds:  needIDBytes,
 126  		Complete: complete,
 127  		Error:    "",
 128  	}, nil
 129  }
 130  
 131  // HandleNegMsg processes a NEG-MSG message from a client.
 132  func (s *Service) HandleNegMsg(ctx context.Context, req *negentropyv1.NegMsgRequest) (*negentropyv1.NegMsgResponse, error) {
 133  	// Update session activity
 134  	s.mgr.UpdateSessionActivity(req.ConnectionId, req.SubscriptionId)
 135  
 136  	// Look up session
 137  	session, ok := s.mgr.GetSession(req.ConnectionId, req.SubscriptionId)
 138  	if !ok {
 139  		return &negentropyv1.NegMsgResponse{
 140  			Error: "session not found",
 141  		}, nil
 142  	}
 143  
 144  	neg := session.GetNegentropy()
 145  	if neg == nil {
 146  		return &negentropyv1.NegMsgResponse{
 147  			Error: "session has no negentropy state",
 148  		}, nil
 149  	}
 150  
 151  	// Process the message
 152  	respMsg, complete, err := neg.Reconcile(req.Message)
 153  	if err != nil {
 154  		log.E.F("NEG-MSG: reconcile failed: %v", err)
 155  		return &negentropyv1.NegMsgResponse{
 156  			Error: fmt.Sprintf("reconcile failed: %v", err),
 157  		}, nil
 158  	}
 159  
 160  	// Collect IDs we have that client needs (to send as events)
 161  	haveIDs := neg.CollectHaves()
 162  	var haveIDBytes [][]byte
 163  	for _, id := range haveIDs {
 164  		// ID is a hex string, decode to binary
 165  		if decoded, err := hex.DecodeString(id); err == nil {
 166  			haveIDBytes = append(haveIDBytes, decoded)
 167  		}
 168  	}
 169  
 170  	// Collect IDs we need from client
 171  	needIDs := neg.CollectHaveNots()
 172  	var needIDBytes [][]byte
 173  	for _, id := range needIDs {
 174  		// ID is a hex string, decode to binary
 175  		if decoded, err := hex.DecodeString(id); err == nil {
 176  			needIDBytes = append(needIDBytes, decoded)
 177  		}
 178  	}
 179  
 180  	log.D.F("NEG-MSG: complete=%v, haves=%d, needs=%d, response len=%d",
 181  		complete, len(haveIDs), len(needIDs), len(respMsg))
 182  
 183  	return &negentropyv1.NegMsgResponse{
 184  		Message:  respMsg,
 185  		HaveIds:  haveIDBytes,
 186  		NeedIds:  needIDBytes,
 187  		Complete: complete,
 188  		Error:    "",
 189  	}, nil
 190  }
 191  
 192  // HandleNegClose processes a NEG-CLOSE message from a client.
 193  func (s *Service) HandleNegClose(ctx context.Context, req *negentropyv1.NegCloseRequest) (*commonv1.Empty, error) {
 194  	s.mgr.CloseSession(req.ConnectionId, req.SubscriptionId)
 195  	return &commonv1.Empty{}, nil
 196  }
 197  
 198  // SyncWithPeer initiates negentropy sync with a specific peer relay.
 199  func (s *Service) SyncWithPeer(req *negentropyv1.SyncPeerRequest, stream grpc.ServerStreamingServer[negentropyv1.SyncProgress]) error {
 200  	// Send initial progress
 201  	if err := stream.Send(&negentropyv1.SyncProgress{
 202  		PeerUrl:  req.PeerUrl,
 203  		Round:    0,
 204  		Complete: false,
 205  	}); err != nil {
 206  		return err
 207  	}
 208  
 209  	// TODO: Implement actual NIP-77 sync with peer
 210  	// For now, just mark as complete
 211  
 212  	s.mgr.TriggerSync(stream.Context(), req.PeerUrl)
 213  
 214  	// Send completion
 215  	return stream.Send(&negentropyv1.SyncProgress{
 216  		PeerUrl:  req.PeerUrl,
 217  		Round:    1,
 218  		Complete: true,
 219  	})
 220  }
 221  
 222  // GetSyncStatus returns the current sync status.
 223  func (s *Service) GetSyncStatus(ctx context.Context, _ *commonv1.Empty) (*negentropyv1.SyncStatusResponse, error) {
 224  	peerStates := s.mgr.GetPeerStates()
 225  
 226  	states := make([]*negentropyv1.PeerSyncState, 0, len(peerStates))
 227  	for _, ps := range peerStates {
 228  		states = append(states, &negentropyv1.PeerSyncState{
 229  			PeerUrl:             ps.URL,
 230  			LastSync:            ps.LastSync.Unix(),
 231  			EventsSynced:        ps.EventsSynced,
 232  			Status:              ps.Status,
 233  			LastError:           ps.LastError,
 234  			ConsecutiveFailures: ps.ConsecutiveFailures,
 235  		})
 236  	}
 237  
 238  	return &negentropyv1.SyncStatusResponse{
 239  		Active:     s.mgr.IsActive(),
 240  		LastSync:   s.mgr.LastSync().Unix(),
 241  		PeerCount:  int32(len(peerStates)),
 242  		PeerStates: states,
 243  	}, nil
 244  }
 245  
 246  // GetPeers returns the list of negentropy sync peers.
 247  func (s *Service) GetPeers(ctx context.Context, _ *commonv1.Empty) (*negentropyv1.PeersResponse, error) {
 248  	return &negentropyv1.PeersResponse{
 249  		Peers: s.mgr.GetPeers(),
 250  	}, nil
 251  }
 252  
 253  // AddPeer adds a peer for negentropy sync.
 254  func (s *Service) AddPeer(ctx context.Context, req *negentropyv1.AddPeerRequest) (*commonv1.Empty, error) {
 255  	s.mgr.AddPeer(req.PeerUrl)
 256  	return &commonv1.Empty{}, nil
 257  }
 258  
 259  // RemovePeer removes a peer from negentropy sync.
 260  func (s *Service) RemovePeer(ctx context.Context, req *negentropyv1.RemovePeerRequest) (*commonv1.Empty, error) {
 261  	s.mgr.RemovePeer(req.PeerUrl)
 262  	return &commonv1.Empty{}, nil
 263  }
 264  
 265  // TriggerSync manually triggers sync with a specific peer or all peers.
 266  func (s *Service) TriggerSync(ctx context.Context, req *negentropyv1.TriggerSyncRequest) (*commonv1.Empty, error) {
 267  	s.mgr.TriggerSync(ctx, req.PeerUrl)
 268  	return &commonv1.Empty{}, nil
 269  }
 270  
 271  // GetPeerSyncState returns sync state for a specific peer.
 272  func (s *Service) GetPeerSyncState(ctx context.Context, req *negentropyv1.PeerSyncStateRequest) (*negentropyv1.PeerSyncStateResponse, error) {
 273  	state, found := s.mgr.GetPeerState(req.PeerUrl)
 274  	if !found {
 275  		return &negentropyv1.PeerSyncStateResponse{
 276  			Found: false,
 277  		}, nil
 278  	}
 279  
 280  	return &negentropyv1.PeerSyncStateResponse{
 281  		Found: true,
 282  		State: &negentropyv1.PeerSyncState{
 283  			PeerUrl:             state.URL,
 284  			LastSync:            state.LastSync.Unix(),
 285  			EventsSynced:        state.EventsSynced,
 286  			Status:              state.Status,
 287  			LastError:           state.LastError,
 288  			ConsecutiveFailures: state.ConsecutiveFailures,
 289  		},
 290  	}, nil
 291  }
 292  
 293  // ListSessions returns active client negentropy sessions.
 294  func (s *Service) ListSessions(ctx context.Context, _ *commonv1.Empty) (*negentropyv1.ListSessionsResponse, error) {
 295  	sessions := s.mgr.ListSessions()
 296  
 297  	protoSessions := make([]*negentropyv1.ClientSession, 0, len(sessions))
 298  	for _, sess := range sessions {
 299  		protoSessions = append(protoSessions, &negentropyv1.ClientSession{
 300  			SubscriptionId: sess.SubscriptionID,
 301  			ConnectionId:   sess.ConnectionID,
 302  			CreatedAt:      sess.CreatedAt.Unix(),
 303  			LastActivity:   sess.LastActivity.Unix(),
 304  			RoundCount:     sess.RoundCount,
 305  		})
 306  	}
 307  
 308  	return &negentropyv1.ListSessionsResponse{
 309  		Sessions: protoSessions,
 310  	}, nil
 311  }
 312  
 313  // CloseSession forcefully closes a client session.
 314  func (s *Service) CloseSession(ctx context.Context, req *negentropyv1.CloseSessionRequest) (*commonv1.Empty, error) {
 315  	if req.ConnectionId == "" {
 316  		// Close all sessions with this subscription ID
 317  		sessions := s.mgr.ListSessions()
 318  		for _, sess := range sessions {
 319  			if sess.SubscriptionID == req.SubscriptionId {
 320  				s.mgr.CloseSession(sess.ConnectionID, sess.SubscriptionID)
 321  			}
 322  		}
 323  	} else {
 324  		s.mgr.CloseSession(req.ConnectionId, req.SubscriptionId)
 325  	}
 326  	return &commonv1.Empty{}, nil
 327  }
 328  
 329  // buildStorageForFilter creates a negentropy Vector from local events matching the filter.
 330  func (s *Service) buildStorageForFilter(ctx context.Context, protoFilter *commonv1.Filter) (*negentropylib.Vector, error) {
 331  	storage := negentropylib.NewVector()
 332  
 333  	// Convert proto filter to nostr filter
 334  	f := protoToFilter(protoFilter)
 335  
 336  	// If no filter provided, use a reasonable limit
 337  	if f == nil {
 338  		limit := uint(1000000)
 339  		f = &filter.F{Limit: &limit}
 340  	}
 341  	if f.Limit == nil {
 342  		limit := uint(1000000)
 343  		f.Limit = &limit
 344  	}
 345  
 346  	// Query events from database
 347  	idPkTs, err := s.db.QueryForIds(ctx, f)
 348  	if err != nil {
 349  		return nil, fmt.Errorf("failed to query events: %w", err)
 350  	}
 351  
 352  	for _, item := range idPkTs {
 353  		storage.Insert(item.Ts, item.IDHex())
 354  	}
 355  
 356  	storage.Seal()
 357  	return storage, nil
 358  }
 359  
 360  // protoToFilter converts a proto filter to a nostr filter.
 361  func protoToFilter(pf *commonv1.Filter) *filter.F {
 362  	if pf == nil {
 363  		return nil
 364  	}
 365  
 366  	f := &filter.F{}
 367  
 368  	// Convert Kinds
 369  	if len(pf.Kinds) > 0 {
 370  		// Create kinds from proto
 371  		// Note: We'd need proper kinds conversion here
 372  	}
 373  
 374  	// Convert Since/Until
 375  	if pf.Since != nil {
 376  		// Set Since timestamp
 377  	}
 378  	if pf.Until != nil {
 379  		// Set Until timestamp
 380  	}
 381  
 382  	// Convert Limit
 383  	if pf.Limit != nil {
 384  		limit := uint(*pf.Limit)
 385  		f.Limit = &limit
 386  	}
 387  
 388  	return f
 389  }
 390