service.go raw

   1  // Package server provides the gRPC server implementation for cluster sync.
   2  package server
   3  
   4  import (
   5  	"context"
   6  	"encoding/json"
   7  	"net/http"
   8  
   9  	"next.orly.dev/pkg/database"
  10  	"next.orly.dev/pkg/sync/cluster"
  11  	commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
  12  	clusterv1 "next.orly.dev/pkg/proto/orlysync/cluster/v1"
  13  )
  14  
  15  // Service implements the ClusterSyncServiceServer interface.
  16  type Service struct {
  17  	clusterv1.UnimplementedClusterSyncServiceServer
  18  	mgr   *cluster.Manager
  19  	db    database.Database
  20  	ready bool
  21  }
  22  
  23  // NewService creates a new cluster sync gRPC service.
  24  func NewService(db database.Database, mgr *cluster.Manager) *Service {
  25  	return &Service{
  26  		mgr:   mgr,
  27  		db:    db,
  28  		ready: true,
  29  	}
  30  }
  31  
  32  // Ready returns whether the service is ready to serve requests.
  33  func (s *Service) Ready(ctx context.Context, _ *commonv1.Empty) (*commonv1.ReadyResponse, error) {
  34  	return &commonv1.ReadyResponse{Ready: s.ready}, nil
  35  }
  36  
  37  // Start starts the cluster polling loop.
  38  func (s *Service) Start(ctx context.Context, _ *commonv1.Empty) (*commonv1.Empty, error) {
  39  	s.mgr.Start()
  40  	return &commonv1.Empty{}, nil
  41  }
  42  
  43  // Stop stops the cluster polling loop.
  44  func (s *Service) Stop(ctx context.Context, _ *commonv1.Empty) (*commonv1.Empty, error) {
  45  	s.mgr.Stop()
  46  	return &commonv1.Empty{}, nil
  47  }
  48  
  49  // HandleLatestSerial proxies GET /cluster/latest HTTP requests.
  50  func (s *Service) HandleLatestSerial(ctx context.Context, req *commonv1.HTTPRequest) (*commonv1.HTTPResponse, error) {
  51  	if req.Method != http.MethodGet {
  52  		return &commonv1.HTTPResponse{
  53  			StatusCode: http.StatusMethodNotAllowed,
  54  			Body:       []byte("Method not allowed"),
  55  		}, nil
  56  	}
  57  
  58  	serial, timestamp := s.mgr.GetLatestSerial()
  59  	resp := map[string]any{
  60  		"serial":    serial,
  61  		"timestamp": timestamp,
  62  	}
  63  
  64  	respBody, err := json.Marshal(resp)
  65  	if err != nil {
  66  		return &commonv1.HTTPResponse{
  67  			StatusCode: http.StatusInternalServerError,
  68  			Body:       []byte("Failed to marshal response"),
  69  		}, nil
  70  	}
  71  
  72  	return &commonv1.HTTPResponse{
  73  		StatusCode: http.StatusOK,
  74  		Headers:    map[string]string{"Content-Type": "application/json"},
  75  		Body:       respBody,
  76  	}, nil
  77  }
  78  
  79  // HandleEventsRange proxies GET /cluster/events HTTP requests.
  80  func (s *Service) HandleEventsRange(ctx context.Context, req *commonv1.HTTPRequest) (*commonv1.HTTPResponse, error) {
  81  	if req.Method != http.MethodGet {
  82  		return &commonv1.HTTPResponse{
  83  			StatusCode: http.StatusMethodNotAllowed,
  84  			Body:       []byte("Method not allowed"),
  85  		}, nil
  86  	}
  87  
  88  	// Parse query parameters
  89  	// In practice, the query string would be parsed from req.QueryString
  90  	// For now, return a placeholder
  91  	return &commonv1.HTTPResponse{
  92  		StatusCode: http.StatusOK,
  93  		Headers:    map[string]string{"Content-Type": "application/json"},
  94  		Body:       []byte(`{"events":[],"has_more":false}`),
  95  	}, nil
  96  }
  97  
  98  // GetMembers returns the current cluster members.
  99  func (s *Service) GetMembers(ctx context.Context, _ *commonv1.Empty) (*clusterv1.MembersResponse, error) {
 100  	members := s.mgr.GetMembers()
 101  	clusterMembers := make([]*clusterv1.ClusterMember, 0, len(members))
 102  	for _, m := range members {
 103  		clusterMembers = append(clusterMembers, &clusterv1.ClusterMember{
 104  			HttpUrl:      m.HTTPURL,
 105  			WebsocketUrl: m.WebSocketURL,
 106  			LastSerial:   m.LastSerial,
 107  			LastPoll:     m.LastPoll.Unix(),
 108  			Status:       m.Status,
 109  			ErrorCount:   int32(m.ErrorCount),
 110  		})
 111  	}
 112  	return &clusterv1.MembersResponse{
 113  		Members: clusterMembers,
 114  	}, nil
 115  }
 116  
 117  // UpdateMembership updates cluster membership.
 118  func (s *Service) UpdateMembership(ctx context.Context, req *clusterv1.UpdateMembershipRequest) (*commonv1.Empty, error) {
 119  	s.mgr.UpdateMembership(req.RelayUrls)
 120  	return &commonv1.Empty{}, nil
 121  }
 122  
 123  // HandleMembershipEvent processes a cluster membership event (Kind 39108).
 124  func (s *Service) HandleMembershipEvent(ctx context.Context, req *clusterv1.MembershipEventRequest) (*commonv1.Empty, error) {
 125  	// Convert proto event to internal format and process
 126  	// This would need the actual event conversion
 127  	return &commonv1.Empty{}, nil
 128  }
 129  
 130  // GetClusterStatus returns overall cluster status.
 131  func (s *Service) GetClusterStatus(ctx context.Context, _ *commonv1.Empty) (*clusterv1.ClusterStatusResponse, error) {
 132  	members := s.mgr.GetMembers()
 133  	serial, _ := s.mgr.GetLatestSerial()
 134  
 135  	activeCount := int32(0)
 136  	clusterMembers := make([]*clusterv1.ClusterMember, 0, len(members))
 137  	for _, m := range members {
 138  		if m.Status == "active" {
 139  			activeCount++
 140  		}
 141  		clusterMembers = append(clusterMembers, &clusterv1.ClusterMember{
 142  			HttpUrl:      m.HTTPURL,
 143  			WebsocketUrl: m.WebSocketURL,
 144  			LastSerial:   m.LastSerial,
 145  			LastPoll:     m.LastPoll.Unix(),
 146  			Status:       m.Status,
 147  			ErrorCount:   int32(m.ErrorCount),
 148  		})
 149  	}
 150  
 151  	return &clusterv1.ClusterStatusResponse{
 152  		LatestSerial:              serial,
 153  		ActiveMembers:             activeCount,
 154  		TotalMembers:              int32(len(members)),
 155  		PropagatePrivilegedEvents: s.mgr.PropagatePrivilegedEvents(),
 156  		Members:                   clusterMembers,
 157  	}, nil
 158  }
 159  
 160  // GetMemberStatus returns status for a specific member.
 161  func (s *Service) GetMemberStatus(ctx context.Context, req *clusterv1.MemberStatusRequest) (*clusterv1.MemberStatusResponse, error) {
 162  	member := s.mgr.GetMember(req.HttpUrl)
 163  	if member == nil {
 164  		return &clusterv1.MemberStatusResponse{
 165  			Found: false,
 166  		}, nil
 167  	}
 168  
 169  	return &clusterv1.MemberStatusResponse{
 170  		Found: true,
 171  		Member: &clusterv1.ClusterMember{
 172  			HttpUrl:      member.HTTPURL,
 173  			WebsocketUrl: member.WebSocketURL,
 174  			LastSerial:   member.LastSerial,
 175  			LastPoll:     member.LastPoll.Unix(),
 176  			Status:       member.Status,
 177  			ErrorCount:   int32(member.ErrorCount),
 178  		},
 179  	}, nil
 180  }
 181  
 182  // GetLatestSerial returns the latest serial from this relay's database.
 183  func (s *Service) GetLatestSerial(ctx context.Context, _ *commonv1.Empty) (*clusterv1.LatestSerialResponse, error) {
 184  	serial, timestamp := s.mgr.GetLatestSerial()
 185  	return &clusterv1.LatestSerialResponse{
 186  		Serial:    serial,
 187  		Timestamp: timestamp,
 188  	}, nil
 189  }
 190  
 191  // GetEventsInRange returns event info for a serial range.
 192  func (s *Service) GetEventsInRange(ctx context.Context, req *clusterv1.EventsRangeRequest) (*clusterv1.EventsRangeResponse, error) {
 193  	events, hasMore, nextFrom := s.mgr.GetEventsInRange(req.From, req.To, int(req.Limit))
 194  
 195  	eventInfos := make([]*clusterv1.EventInfo, 0, len(events))
 196  	for _, e := range events {
 197  		eventInfos = append(eventInfos, &clusterv1.EventInfo{
 198  			Serial:    e.Serial,
 199  			Id:        e.ID,
 200  			Timestamp: e.Timestamp, // Already int64
 201  		})
 202  	}
 203  
 204  	return &clusterv1.EventsRangeResponse{
 205  		Events:   eventInfos,
 206  		HasMore:  hasMore,
 207  		NextFrom: nextFrom,
 208  	}, nil
 209  }
 210