service.go raw

   1  // Package server provides the gRPC server implementation for distributed sync.
   2  package server
   3  
   4  import (
   5  	"bytes"
   6  	"context"
   7  	"encoding/json"
   8  	"net/http"
   9  
  10  	"next.orly.dev/pkg/lol/log"
  11  
  12  	"next.orly.dev/pkg/database"
  13  	"next.orly.dev/pkg/sync/distributed"
  14  	commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
  15  	distributedv1 "next.orly.dev/pkg/proto/orlysync/distributed/v1"
  16  )
  17  
  18  // Service implements the DistributedSyncServiceServer interface.
  19  type Service struct {
  20  	distributedv1.UnimplementedDistributedSyncServiceServer
  21  	mgr   *distributed.Manager
  22  	db    database.Database
  23  	ready bool
  24  }
  25  
  26  // NewService creates a new distributed sync gRPC service.
  27  func NewService(db database.Database, mgr *distributed.Manager) *Service {
  28  	return &Service{
  29  		mgr:   mgr,
  30  		db:    db,
  31  		ready: true,
  32  	}
  33  }
  34  
  35  // Ready returns whether the service is ready to serve requests.
  36  func (s *Service) Ready(ctx context.Context, _ *commonv1.Empty) (*commonv1.ReadyResponse, error) {
  37  	return &commonv1.ReadyResponse{Ready: s.ready}, nil
  38  }
  39  
  40  // GetInfo returns current sync service information.
  41  func (s *Service) GetInfo(ctx context.Context, _ *commonv1.Empty) (*commonv1.SyncInfo, error) {
  42  	peers := s.mgr.GetPeers()
  43  	return &commonv1.SyncInfo{
  44  		NodeId:        s.mgr.GetNodeID(),
  45  		RelayUrl:      s.mgr.GetRelayURL(),
  46  		CurrentSerial: s.mgr.GetCurrentSerial(),
  47  		PeerCount:     int32(len(peers)),
  48  		Status:        "running",
  49  	}, nil
  50  }
  51  
  52  // GetCurrentSerial returns this relay's current serial number.
  53  func (s *Service) GetCurrentSerial(ctx context.Context, req *distributedv1.CurrentRequest) (*distributedv1.CurrentResponse, error) {
  54  	return &distributedv1.CurrentResponse{
  55  		NodeId:   s.mgr.GetNodeID(),
  56  		RelayUrl: s.mgr.GetRelayURL(),
  57  		Serial:   s.mgr.GetCurrentSerial(),
  58  	}, nil
  59  }
  60  
  61  // GetEventIDs returns event IDs for a serial range.
  62  func (s *Service) GetEventIDs(ctx context.Context, req *distributedv1.EventIDsRequest) (*distributedv1.EventIDsResponse, error) {
  63  	eventMap, err := s.mgr.GetEventsWithIDs(req.From, req.To)
  64  	if err != nil {
  65  		return nil, err
  66  	}
  67  
  68  	return &distributedv1.EventIDsResponse{
  69  		EventMap: eventMap,
  70  	}, nil
  71  }
  72  
  73  // HandleCurrentRequest proxies /api/sync/current HTTP requests.
  74  func (s *Service) HandleCurrentRequest(ctx context.Context, req *commonv1.HTTPRequest) (*commonv1.HTTPResponse, error) {
  75  	if req.Method != http.MethodPost {
  76  		return &commonv1.HTTPResponse{
  77  			StatusCode: http.StatusMethodNotAllowed,
  78  			Body:       []byte("Method not allowed"),
  79  		}, nil
  80  	}
  81  
  82  	var currentReq distributed.CurrentRequest
  83  	if err := json.Unmarshal(req.Body, &currentReq); err != nil {
  84  		return &commonv1.HTTPResponse{
  85  			StatusCode: http.StatusBadRequest,
  86  			Body:       []byte("Invalid JSON"),
  87  		}, nil
  88  	}
  89  
  90  	// Check if request is from ourselves
  91  	if s.mgr.IsSelfNodeID(currentReq.NodeID) {
  92  		if currentReq.RelayURL != "" {
  93  			s.mgr.MarkSelfURL(currentReq.RelayURL)
  94  		}
  95  		return &commonv1.HTTPResponse{
  96  			StatusCode: http.StatusBadRequest,
  97  			Body:       []byte("Cannot sync with self"),
  98  		}, nil
  99  	}
 100  
 101  	resp := distributed.CurrentResponse{
 102  		NodeID:   s.mgr.GetNodeID(),
 103  		RelayURL: s.mgr.GetRelayURL(),
 104  		Serial:   s.mgr.GetCurrentSerial(),
 105  	}
 106  
 107  	respBody, err := json.Marshal(resp)
 108  	if err != nil {
 109  		return &commonv1.HTTPResponse{
 110  			StatusCode: http.StatusInternalServerError,
 111  			Body:       []byte("Failed to marshal response"),
 112  		}, nil
 113  	}
 114  
 115  	return &commonv1.HTTPResponse{
 116  		StatusCode: http.StatusOK,
 117  		Headers:    map[string]string{"Content-Type": "application/json"},
 118  		Body:       respBody,
 119  	}, nil
 120  }
 121  
 122  // HandleEventIDsRequest proxies /api/sync/event-ids HTTP requests.
 123  func (s *Service) HandleEventIDsRequest(ctx context.Context, req *commonv1.HTTPRequest) (*commonv1.HTTPResponse, error) {
 124  	if req.Method != http.MethodPost {
 125  		return &commonv1.HTTPResponse{
 126  			StatusCode: http.StatusMethodNotAllowed,
 127  			Body:       []byte("Method not allowed"),
 128  		}, nil
 129  	}
 130  
 131  	var eventIDsReq distributed.EventIDsRequest
 132  	if err := json.Unmarshal(req.Body, &eventIDsReq); err != nil {
 133  		return &commonv1.HTTPResponse{
 134  			StatusCode: http.StatusBadRequest,
 135  			Body:       []byte("Invalid JSON"),
 136  		}, nil
 137  	}
 138  
 139  	// Check if request is from ourselves
 140  	if s.mgr.IsSelfNodeID(eventIDsReq.NodeID) {
 141  		if eventIDsReq.RelayURL != "" {
 142  			s.mgr.MarkSelfURL(eventIDsReq.RelayURL)
 143  		}
 144  		return &commonv1.HTTPResponse{
 145  			StatusCode: http.StatusBadRequest,
 146  			Body:       []byte("Cannot sync with self"),
 147  		}, nil
 148  	}
 149  
 150  	eventMap, err := s.mgr.GetEventsWithIDs(eventIDsReq.From, eventIDsReq.To)
 151  	if err != nil {
 152  		return &commonv1.HTTPResponse{
 153  			StatusCode: http.StatusInternalServerError,
 154  			Body:       []byte("Failed to get event IDs: " + err.Error()),
 155  		}, nil
 156  	}
 157  
 158  	resp := distributed.EventIDsResponse{
 159  		EventMap: eventMap,
 160  	}
 161  
 162  	respBody, err := json.Marshal(resp)
 163  	if err != nil {
 164  		return &commonv1.HTTPResponse{
 165  			StatusCode: http.StatusInternalServerError,
 166  			Body:       []byte("Failed to marshal response"),
 167  		}, nil
 168  	}
 169  
 170  	return &commonv1.HTTPResponse{
 171  		StatusCode: http.StatusOK,
 172  		Headers:    map[string]string{"Content-Type": "application/json"},
 173  		Body:       respBody,
 174  	}, nil
 175  }
 176  
 177  // GetPeers returns the current list of sync peers.
 178  func (s *Service) GetPeers(ctx context.Context, _ *commonv1.Empty) (*distributedv1.PeersResponse, error) {
 179  	peers := s.mgr.GetPeers()
 180  	return &distributedv1.PeersResponse{
 181  		Peers: peers,
 182  	}, nil
 183  }
 184  
 185  // UpdatePeers updates the peer list.
 186  func (s *Service) UpdatePeers(ctx context.Context, req *distributedv1.UpdatePeersRequest) (*commonv1.Empty, error) {
 187  	s.mgr.UpdatePeers(req.Peers)
 188  	return &commonv1.Empty{}, nil
 189  }
 190  
 191  // IsAuthorizedPeer checks if a peer is authorized by validating its NIP-11 pubkey.
 192  func (s *Service) IsAuthorizedPeer(ctx context.Context, req *distributedv1.AuthorizedPeerRequest) (*distributedv1.AuthorizedPeerResponse, error) {
 193  	authorized := s.mgr.IsAuthorizedPeer(req.PeerUrl, req.ExpectedPubkey)
 194  	return &distributedv1.AuthorizedPeerResponse{
 195  		Authorized: authorized,
 196  	}, nil
 197  }
 198  
 199  // GetPeerPubkey fetches the pubkey for a peer relay via NIP-11.
 200  func (s *Service) GetPeerPubkey(ctx context.Context, req *distributedv1.PeerPubkeyRequest) (*distributedv1.PeerPubkeyResponse, error) {
 201  	pubkey, err := s.mgr.GetPeerPubkey(req.PeerUrl)
 202  	if err != nil {
 203  		// Return empty pubkey on error since the proto doesn't have an error field
 204  		return &distributedv1.PeerPubkeyResponse{
 205  			Pubkey: "",
 206  		}, nil
 207  	}
 208  	return &distributedv1.PeerPubkeyResponse{
 209  		Pubkey: pubkey,
 210  	}, nil
 211  }
 212  
 213  // UpdateSerial updates the current serial from database.
 214  func (s *Service) UpdateSerial(ctx context.Context, _ *commonv1.Empty) (*commonv1.Empty, error) {
 215  	s.mgr.UpdateSerial()
 216  	return &commonv1.Empty{}, nil
 217  }
 218  
 219  // NotifyNewEvent notifies the service of a new event being stored.
 220  func (s *Service) NotifyNewEvent(ctx context.Context, req *distributedv1.NewEventNotification) (*commonv1.Empty, error) {
 221  	s.mgr.NotifyNewEvent(req.EventId, req.Serial)
 222  	return &commonv1.Empty{}, nil
 223  }
 224  
 225  // TriggerSync manually triggers a sync cycle with all peers.
 226  func (s *Service) TriggerSync(ctx context.Context, _ *commonv1.Empty) (*commonv1.Empty, error) {
 227  	log.D.F("manual sync trigger requested")
 228  	return &commonv1.Empty{}, nil
 229  }
 230  
 231  // GetSyncStatus returns current sync status for all peers.
 232  func (s *Service) GetSyncStatus(ctx context.Context, _ *commonv1.Empty) (*distributedv1.SyncStatusResponse, error) {
 233  	peerStatus := s.mgr.GetPeerStatus()
 234  	peerInfos := make([]*commonv1.PeerInfo, 0, len(peerStatus))
 235  	for url, serial := range peerStatus {
 236  		peerInfos = append(peerInfos, &commonv1.PeerInfo{
 237  			Url:        url,
 238  			LastSerial: serial,
 239  			Status:     "active",
 240  		})
 241  	}
 242  	return &distributedv1.SyncStatusResponse{
 243  		CurrentSerial: s.mgr.GetCurrentSerial(),
 244  		Peers:         peerInfos,
 245  	}, nil
 246  }
 247  
 248  // httpResponseWriter is a simple buffer for capturing HTTP responses.
 249  type httpResponseWriter struct {
 250  	statusCode int
 251  	headers    http.Header
 252  	body       bytes.Buffer
 253  }
 254  
 255  func (w *httpResponseWriter) Header() http.Header {
 256  	if w.headers == nil {
 257  		w.headers = make(http.Header)
 258  	}
 259  	return w.headers
 260  }
 261  
 262  func (w *httpResponseWriter) Write(b []byte) (int, error) {
 263  	return w.body.Write(b)
 264  }
 265  
 266  func (w *httpResponseWriter) WriteHeader(statusCode int) {
 267  	w.statusCode = statusCode
 268  }
 269