service.go raw

   1  package server
   2  
   3  import (
   4  	"context"
   5  	"encoding/json"
   6  	"io"
   7  	"time"
   8  
   9  	"google.golang.org/grpc/codes"
  10  	"google.golang.org/grpc/status"
  11  	"next.orly.dev/pkg/lol/chk"
  12  	"next.orly.dev/pkg/lol/log"
  13  
  14  	"next.orly.dev/pkg/database"
  15  	"next.orly.dev/pkg/interfaces/store"
  16  	orlydbv1 "next.orly.dev/pkg/proto/orlydb/v1"
  17  )
  18  
  19  // DatabaseService implements the orlydbv1.DatabaseServiceServer interface.
  20  type DatabaseService struct {
  21  	orlydbv1.UnimplementedDatabaseServiceServer
  22  	db  database.Database
  23  	cfg *Config
  24  }
  25  
  26  // NewDatabaseService creates a new database service.
  27  func NewDatabaseService(db database.Database, cfg *Config) *DatabaseService {
  28  	return &DatabaseService{
  29  		db:  db,
  30  		cfg: cfg,
  31  	}
  32  }
  33  
  34  // === Lifecycle Methods ===
  35  
  36  func (s *DatabaseService) GetPath(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.PathResponse, error) {
  37  	return &orlydbv1.PathResponse{Path: s.db.Path()}, nil
  38  }
  39  
  40  func (s *DatabaseService) Sync(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.Empty, error) {
  41  	if err := s.db.Sync(); err != nil {
  42  		return nil, status.Errorf(codes.Internal, "sync failed: %v", err)
  43  	}
  44  	return &orlydbv1.Empty{}, nil
  45  }
  46  
  47  func (s *DatabaseService) Ready(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.ReadyResponse, error) {
  48  	// Check if ready channel is closed
  49  	select {
  50  	case <-s.db.Ready():
  51  		return &orlydbv1.ReadyResponse{Ready: true}, nil
  52  	default:
  53  		return &orlydbv1.ReadyResponse{Ready: false}, nil
  54  	}
  55  }
  56  
  57  func (s *DatabaseService) SetLogLevel(ctx context.Context, req *orlydbv1.SetLogLevelRequest) (*orlydbv1.Empty, error) {
  58  	s.db.SetLogLevel(req.Level)
  59  	return &orlydbv1.Empty{}, nil
  60  }
  61  
  62  // === Event Storage ===
  63  
  64  func (s *DatabaseService) SaveEvent(ctx context.Context, req *orlydbv1.SaveEventRequest) (*orlydbv1.SaveEventResponse, error) {
  65  	ev := orlydbv1.ProtoToEvent(req.Event)
  66  	exists, err := s.db.SaveEvent(ctx, ev)
  67  	if err != nil {
  68  		return nil, status.Errorf(codes.Internal, "save event failed: %v", err)
  69  	}
  70  	return &orlydbv1.SaveEventResponse{Exists: exists}, nil
  71  }
  72  
  73  func (s *DatabaseService) GetSerialsFromFilter(ctx context.Context, req *orlydbv1.GetSerialsFromFilterRequest) (*orlydbv1.SerialList, error) {
  74  	f := orlydbv1.ProtoToFilter(req.Filter)
  75  	serials, err := s.db.GetSerialsFromFilter(f)
  76  	if err != nil {
  77  		return nil, status.Errorf(codes.Internal, "get serials failed: %v", err)
  78  	}
  79  	return orlydbv1.Uint40sToProto(serials), nil
  80  }
  81  
  82  func (s *DatabaseService) WouldReplaceEvent(ctx context.Context, req *orlydbv1.WouldReplaceEventRequest) (*orlydbv1.WouldReplaceEventResponse, error) {
  83  	ev := orlydbv1.ProtoToEvent(req.Event)
  84  	wouldReplace, replacedSerials, err := s.db.WouldReplaceEvent(ev)
  85  	if err != nil {
  86  		return nil, status.Errorf(codes.Internal, "would replace check failed: %v", err)
  87  	}
  88  	resp := &orlydbv1.WouldReplaceEventResponse{
  89  		WouldReplace: wouldReplace,
  90  	}
  91  	for _, ser := range replacedSerials {
  92  		resp.ReplacedSerials = append(resp.ReplacedSerials, ser.Get())
  93  	}
  94  	return resp, nil
  95  }
  96  
  97  // === Event Queries (Streaming) ===
  98  
  99  func (s *DatabaseService) QueryEvents(req *orlydbv1.QueryEventsRequest, stream orlydbv1.DatabaseService_QueryEventsServer) error {
 100  	f := orlydbv1.ProtoToFilter(req.Filter)
 101  	events, err := s.db.QueryEvents(stream.Context(), f)
 102  	if err != nil {
 103  		return status.Errorf(codes.Internal, "query events failed: %v", err)
 104  	}
 105  	return s.streamEvents(orlydbv1.EventsToProto(events), stream)
 106  }
 107  
 108  func (s *DatabaseService) QueryAllVersions(req *orlydbv1.QueryEventsRequest, stream orlydbv1.DatabaseService_QueryAllVersionsServer) error {
 109  	f := orlydbv1.ProtoToFilter(req.Filter)
 110  	events, err := s.db.QueryAllVersions(stream.Context(), f)
 111  	if err != nil {
 112  		return status.Errorf(codes.Internal, "query all versions failed: %v", err)
 113  	}
 114  	return s.streamEvents(orlydbv1.EventsToProto(events), stream)
 115  }
 116  
 117  func (s *DatabaseService) QueryEventsWithOptions(req *orlydbv1.QueryEventsWithOptionsRequest, stream orlydbv1.DatabaseService_QueryEventsWithOptionsServer) error {
 118  	f := orlydbv1.ProtoToFilter(req.Filter)
 119  	events, err := s.db.QueryEventsWithOptions(stream.Context(), f, req.IncludeDeleteEvents, req.ShowAllVersions)
 120  	if err != nil {
 121  		return status.Errorf(codes.Internal, "query events with options failed: %v", err)
 122  	}
 123  	return s.streamEvents(orlydbv1.EventsToProto(events), stream)
 124  }
 125  
 126  func (s *DatabaseService) QueryDeleteEventsByTargetId(req *orlydbv1.QueryDeleteEventsByTargetIdRequest, stream orlydbv1.DatabaseService_QueryDeleteEventsByTargetIdServer) error {
 127  	events, err := s.db.QueryDeleteEventsByTargetId(stream.Context(), req.TargetEventId)
 128  	if err != nil {
 129  		return status.Errorf(codes.Internal, "query delete events failed: %v", err)
 130  	}
 131  	return s.streamEvents(orlydbv1.EventsToProto(events), stream)
 132  }
 133  
 134  func (s *DatabaseService) QueryForSerials(ctx context.Context, req *orlydbv1.QueryEventsRequest) (*orlydbv1.SerialList, error) {
 135  	f := orlydbv1.ProtoToFilter(req.Filter)
 136  	serials, err := s.db.QueryForSerials(ctx, f)
 137  	if err != nil {
 138  		return nil, status.Errorf(codes.Internal, "query for serials failed: %v", err)
 139  	}
 140  	return orlydbv1.Uint40sToProto(serials), nil
 141  }
 142  
 143  func (s *DatabaseService) QueryForIds(ctx context.Context, req *orlydbv1.QueryEventsRequest) (*orlydbv1.IdPkTsList, error) {
 144  	f := orlydbv1.ProtoToFilter(req.Filter)
 145  	idPkTs, err := s.db.QueryForIds(ctx, f)
 146  	if err != nil {
 147  		return nil, status.Errorf(codes.Internal, "query for ids failed: %v", err)
 148  	}
 149  	return orlydbv1.IdPkTsListToProto(idPkTs), nil
 150  }
 151  
 152  func (s *DatabaseService) CountEvents(ctx context.Context, req *orlydbv1.QueryEventsRequest) (*orlydbv1.CountEventsResponse, error) {
 153  	f := orlydbv1.ProtoToFilter(req.Filter)
 154  	count, approximate, err := s.db.CountEvents(ctx, f)
 155  	if err != nil {
 156  		return nil, status.Errorf(codes.Internal, "count events failed: %v", err)
 157  	}
 158  	return &orlydbv1.CountEventsResponse{
 159  		Count:       int32(count),
 160  		Approximate: approximate,
 161  	}, nil
 162  }
 163  
 164  // === Event Retrieval by Serial ===
 165  
 166  func (s *DatabaseService) FetchEventBySerial(ctx context.Context, req *orlydbv1.FetchEventBySerialRequest) (*orlydbv1.FetchEventBySerialResponse, error) {
 167  	ser := orlydbv1.ProtoToUint40(&orlydbv1.Uint40{Value: req.Serial})
 168  	ev, err := s.db.FetchEventBySerial(ser)
 169  	if err != nil {
 170  		return nil, status.Errorf(codes.Internal, "fetch event by serial failed: %v", err)
 171  	}
 172  	return &orlydbv1.FetchEventBySerialResponse{
 173  		Event: orlydbv1.EventToProto(ev),
 174  		Found: ev != nil,
 175  	}, nil
 176  }
 177  
 178  func (s *DatabaseService) FetchEventsBySerials(ctx context.Context, req *orlydbv1.FetchEventsBySerialRequest) (*orlydbv1.EventMap, error) {
 179  	serials := orlydbv1.ProtoToUint40s(&orlydbv1.SerialList{Serials: req.Serials})
 180  	events, err := s.db.FetchEventsBySerials(serials)
 181  	if err != nil {
 182  		return nil, status.Errorf(codes.Internal, "fetch events by serials failed: %v", err)
 183  	}
 184  	return orlydbv1.EventMapToProto(events), nil
 185  }
 186  
 187  func (s *DatabaseService) GetSerialById(ctx context.Context, req *orlydbv1.GetSerialByIdRequest) (*orlydbv1.GetSerialByIdResponse, error) {
 188  	ser, err := s.db.GetSerialById(req.Id)
 189  	if err != nil {
 190  		return nil, status.Errorf(codes.Internal, "get serial by id failed: %v", err)
 191  	}
 192  	if ser == nil {
 193  		return &orlydbv1.GetSerialByIdResponse{Found: false}, nil
 194  	}
 195  	return &orlydbv1.GetSerialByIdResponse{
 196  		Serial: ser.Get(),
 197  		Found:  true,
 198  	}, nil
 199  }
 200  
 201  func (s *DatabaseService) GetSerialsByIds(ctx context.Context, req *orlydbv1.GetSerialsByIdsRequest) (*orlydbv1.SerialMap, error) {
 202  	// Convert request IDs to tag format
 203  	ids := orlydbv1.BytesToTag(req.Ids)
 204  	serials, err := s.db.GetSerialsByIds(ids)
 205  	if err != nil {
 206  		return nil, status.Errorf(codes.Internal, "get serials by ids failed: %v", err)
 207  	}
 208  	result := &orlydbv1.SerialMap{
 209  		Serials: make(map[string]uint64),
 210  	}
 211  	for k, v := range serials {
 212  		if v != nil {
 213  			result.Serials[k] = v.Get()
 214  		}
 215  	}
 216  	return result, nil
 217  }
 218  
 219  func (s *DatabaseService) GetSerialsByRange(ctx context.Context, req *orlydbv1.GetSerialsByRangeRequest) (*orlydbv1.SerialList, error) {
 220  	r := orlydbv1.ProtoToRange(req.Range)
 221  	serials, err := s.db.GetSerialsByRange(r)
 222  	if err != nil {
 223  		return nil, status.Errorf(codes.Internal, "get serials by range failed: %v", err)
 224  	}
 225  	return orlydbv1.Uint40sToProto(serials), nil
 226  }
 227  
 228  func (s *DatabaseService) GetFullIdPubkeyBySerial(ctx context.Context, req *orlydbv1.GetFullIdPubkeyBySerialRequest) (*orlydbv1.IdPkTs, error) {
 229  	ser := orlydbv1.ProtoToUint40(&orlydbv1.Uint40{Value: req.Serial})
 230  	idPkTs, err := s.db.GetFullIdPubkeyBySerial(ser)
 231  	if err != nil {
 232  		return nil, status.Errorf(codes.Internal, "get full id pubkey by serial failed: %v", err)
 233  	}
 234  	return orlydbv1.IdPkTsToProto(idPkTs), nil
 235  }
 236  
 237  func (s *DatabaseService) GetFullIdPubkeyBySerials(ctx context.Context, req *orlydbv1.GetFullIdPubkeyBySerialsRequest) (*orlydbv1.IdPkTsList, error) {
 238  	serials := orlydbv1.ProtoToUint40s(&orlydbv1.SerialList{Serials: req.Serials})
 239  	idPkTs, err := s.db.GetFullIdPubkeyBySerials(serials)
 240  	if err != nil {
 241  		return nil, status.Errorf(codes.Internal, "get full id pubkey by serials failed: %v", err)
 242  	}
 243  	return orlydbv1.IdPkTsListToProto(idPkTs), nil
 244  }
 245  
 246  // === Event Deletion ===
 247  
 248  func (s *DatabaseService) DeleteEvent(ctx context.Context, req *orlydbv1.DeleteEventRequest) (*orlydbv1.Empty, error) {
 249  	if err := s.db.DeleteEvent(ctx, req.EventId); err != nil {
 250  		return nil, status.Errorf(codes.Internal, "delete event failed: %v", err)
 251  	}
 252  	return &orlydbv1.Empty{}, nil
 253  }
 254  
 255  func (s *DatabaseService) DeleteEventBySerial(ctx context.Context, req *orlydbv1.DeleteEventBySerialRequest) (*orlydbv1.Empty, error) {
 256  	ser := orlydbv1.ProtoToUint40(&orlydbv1.Uint40{Value: req.Serial})
 257  	ev := orlydbv1.ProtoToEvent(req.Event)
 258  	if err := s.db.DeleteEventBySerial(ctx, ser, ev); err != nil {
 259  		return nil, status.Errorf(codes.Internal, "delete event by serial failed: %v", err)
 260  	}
 261  	return &orlydbv1.Empty{}, nil
 262  }
 263  
 264  func (s *DatabaseService) DeleteExpired(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.Empty, error) {
 265  	s.db.DeleteExpired()
 266  	return &orlydbv1.Empty{}, nil
 267  }
 268  
 269  func (s *DatabaseService) ProcessDelete(ctx context.Context, req *orlydbv1.ProcessDeleteRequest) (*orlydbv1.Empty, error) {
 270  	ev := orlydbv1.ProtoToEvent(req.Event)
 271  	if err := s.db.ProcessDelete(ev, req.Admins); err != nil {
 272  		return nil, status.Errorf(codes.Internal, "process delete failed: %v", err)
 273  	}
 274  	return &orlydbv1.Empty{}, nil
 275  }
 276  
 277  func (s *DatabaseService) CheckForDeleted(ctx context.Context, req *orlydbv1.CheckForDeletedRequest) (*orlydbv1.Empty, error) {
 278  	ev := orlydbv1.ProtoToEvent(req.Event)
 279  	if err := s.db.CheckForDeleted(ev, req.Admins); err != nil {
 280  		return nil, status.Errorf(codes.Internal, "check for deleted failed: %v", err)
 281  	}
 282  	return &orlydbv1.Empty{}, nil
 283  }
 284  
 285  // === Import/Export ===
 286  
 287  func (s *DatabaseService) Import(stream orlydbv1.DatabaseService_ImportServer) error {
 288  	pr, pw := io.Pipe()
 289  
 290  	// Goroutine to read from gRPC stream and write to pipe
 291  	go func() {
 292  		defer pw.Close()
 293  		for {
 294  			chunk, err := stream.Recv()
 295  			if err == io.EOF {
 296  				return
 297  			}
 298  			if err != nil {
 299  				log.E.F("import stream error: %v", err)
 300  				pw.CloseWithError(err)
 301  				return
 302  			}
 303  			if _, err := pw.Write(chunk.Data); chk.E(err) {
 304  				return
 305  			}
 306  		}
 307  	}()
 308  
 309  	// Import from pipe
 310  	s.db.Import(pr)
 311  
 312  	return stream.SendAndClose(&orlydbv1.ImportResponse{
 313  		EventsImported: 0, // TODO: Track count
 314  		EventsSkipped:  0,
 315  	})
 316  }
 317  
 318  func (s *DatabaseService) Export(req *orlydbv1.ExportRequest, stream orlydbv1.DatabaseService_ExportServer) error {
 319  	pr, pw := io.Pipe()
 320  
 321  	// Goroutine to export to pipe
 322  	go func() {
 323  		defer pw.Close()
 324  		s.db.Export(stream.Context(), pw, req.Pubkeys...)
 325  	}()
 326  
 327  	// Read from pipe and send to stream
 328  	buf := make([]byte, 64*1024) // 64KB chunks
 329  	for {
 330  		n, err := pr.Read(buf)
 331  		if err == io.EOF {
 332  			return nil
 333  		}
 334  		if err != nil {
 335  			return status.Errorf(codes.Internal, "export failed: %v", err)
 336  		}
 337  		if err := stream.Send(&orlydbv1.ExportChunk{Data: buf[:n]}); err != nil {
 338  			return err
 339  		}
 340  	}
 341  }
 342  
 343  func (s *DatabaseService) ImportEventsFromStrings(ctx context.Context, req *orlydbv1.ImportEventsFromStringsRequest) (*orlydbv1.ImportResponse, error) {
 344  	// Note: We can't pass policy manager over gRPC, so we pass nil
 345  	if err := s.db.ImportEventsFromStrings(ctx, req.EventJsons, nil); err != nil {
 346  		return nil, status.Errorf(codes.Internal, "import events from strings failed: %v", err)
 347  	}
 348  	return &orlydbv1.ImportResponse{
 349  		EventsImported: int64(len(req.EventJsons)),
 350  	}, nil
 351  }
 352  
 353  // === Relay Identity ===
 354  
 355  func (s *DatabaseService) GetRelayIdentitySecret(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.GetRelayIdentitySecretResponse, error) {
 356  	secret, err := s.db.GetRelayIdentitySecret()
 357  	if err != nil {
 358  		return nil, status.Errorf(codes.Internal, "get relay identity secret failed: %v", err)
 359  	}
 360  	return &orlydbv1.GetRelayIdentitySecretResponse{SecretKey: secret}, nil
 361  }
 362  
 363  func (s *DatabaseService) SetRelayIdentitySecret(ctx context.Context, req *orlydbv1.SetRelayIdentitySecretRequest) (*orlydbv1.Empty, error) {
 364  	if err := s.db.SetRelayIdentitySecret(req.SecretKey); err != nil {
 365  		return nil, status.Errorf(codes.Internal, "set relay identity secret failed: %v", err)
 366  	}
 367  	return &orlydbv1.Empty{}, nil
 368  }
 369  
 370  func (s *DatabaseService) GetOrCreateRelayIdentitySecret(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.GetRelayIdentitySecretResponse, error) {
 371  	secret, err := s.db.GetOrCreateRelayIdentitySecret()
 372  	if err != nil {
 373  		return nil, status.Errorf(codes.Internal, "get or create relay identity secret failed: %v", err)
 374  	}
 375  	return &orlydbv1.GetRelayIdentitySecretResponse{SecretKey: secret}, nil
 376  }
 377  
 378  // === Markers ===
 379  
 380  func (s *DatabaseService) SetMarker(ctx context.Context, req *orlydbv1.SetMarkerRequest) (*orlydbv1.Empty, error) {
 381  	if err := s.db.SetMarker(req.Key, req.Value); err != nil {
 382  		return nil, status.Errorf(codes.Internal, "set marker failed: %v", err)
 383  	}
 384  	return &orlydbv1.Empty{}, nil
 385  }
 386  
 387  func (s *DatabaseService) GetMarker(ctx context.Context, req *orlydbv1.GetMarkerRequest) (*orlydbv1.GetMarkerResponse, error) {
 388  	value, err := s.db.GetMarker(req.Key)
 389  	if err != nil {
 390  		return nil, status.Errorf(codes.Internal, "get marker failed: %v", err)
 391  	}
 392  	return &orlydbv1.GetMarkerResponse{
 393  		Value: value,
 394  		Found: value != nil,
 395  	}, nil
 396  }
 397  
 398  func (s *DatabaseService) HasMarker(ctx context.Context, req *orlydbv1.HasMarkerRequest) (*orlydbv1.HasMarkerResponse, error) {
 399  	exists := s.db.HasMarker(req.Key)
 400  	return &orlydbv1.HasMarkerResponse{Exists: exists}, nil
 401  }
 402  
 403  func (s *DatabaseService) DeleteMarker(ctx context.Context, req *orlydbv1.DeleteMarkerRequest) (*orlydbv1.Empty, error) {
 404  	if err := s.db.DeleteMarker(req.Key); err != nil {
 405  		return nil, status.Errorf(codes.Internal, "delete marker failed: %v", err)
 406  	}
 407  	return &orlydbv1.Empty{}, nil
 408  }
 409  
 410  // === Subscriptions ===
 411  
 412  func (s *DatabaseService) GetSubscription(ctx context.Context, req *orlydbv1.GetSubscriptionRequest) (*orlydbv1.Subscription, error) {
 413  	sub, err := s.db.GetSubscription(req.Pubkey)
 414  	if err != nil {
 415  		return nil, status.Errorf(codes.Internal, "get subscription failed: %v", err)
 416  	}
 417  	return orlydbv1.SubscriptionToProto(sub, req.Pubkey), nil
 418  }
 419  
 420  func (s *DatabaseService) IsSubscriptionActive(ctx context.Context, req *orlydbv1.IsSubscriptionActiveRequest) (*orlydbv1.IsSubscriptionActiveResponse, error) {
 421  	active, err := s.db.IsSubscriptionActive(req.Pubkey)
 422  	if err != nil {
 423  		return nil, status.Errorf(codes.Internal, "is subscription active failed: %v", err)
 424  	}
 425  	return &orlydbv1.IsSubscriptionActiveResponse{Active: active}, nil
 426  }
 427  
 428  func (s *DatabaseService) ExtendSubscription(ctx context.Context, req *orlydbv1.ExtendSubscriptionRequest) (*orlydbv1.Empty, error) {
 429  	if err := s.db.ExtendSubscription(req.Pubkey, int(req.Days)); err != nil {
 430  		return nil, status.Errorf(codes.Internal, "extend subscription failed: %v", err)
 431  	}
 432  	return &orlydbv1.Empty{}, nil
 433  }
 434  
 435  func (s *DatabaseService) RecordPayment(ctx context.Context, req *orlydbv1.RecordPaymentRequest) (*orlydbv1.Empty, error) {
 436  	if err := s.db.RecordPayment(req.Pubkey, req.Amount, req.Invoice, req.Preimage); err != nil {
 437  		return nil, status.Errorf(codes.Internal, "record payment failed: %v", err)
 438  	}
 439  	return &orlydbv1.Empty{}, nil
 440  }
 441  
 442  func (s *DatabaseService) GetPaymentHistory(ctx context.Context, req *orlydbv1.GetPaymentHistoryRequest) (*orlydbv1.PaymentList, error) {
 443  	payments, err := s.db.GetPaymentHistory(req.Pubkey)
 444  	if err != nil {
 445  		return nil, status.Errorf(codes.Internal, "get payment history failed: %v", err)
 446  	}
 447  	return orlydbv1.PaymentListToProto(payments), nil
 448  }
 449  
 450  func (s *DatabaseService) ExtendBlossomSubscription(ctx context.Context, req *orlydbv1.ExtendBlossomSubscriptionRequest) (*orlydbv1.Empty, error) {
 451  	if err := s.db.ExtendBlossomSubscription(req.Pubkey, req.Tier, req.StorageMb, int(req.DaysExtended)); err != nil {
 452  		return nil, status.Errorf(codes.Internal, "extend blossom subscription failed: %v", err)
 453  	}
 454  	return &orlydbv1.Empty{}, nil
 455  }
 456  
 457  func (s *DatabaseService) GetBlossomStorageQuota(ctx context.Context, req *orlydbv1.GetBlossomStorageQuotaRequest) (*orlydbv1.GetBlossomStorageQuotaResponse, error) {
 458  	quota, err := s.db.GetBlossomStorageQuota(req.Pubkey)
 459  	if err != nil {
 460  		return nil, status.Errorf(codes.Internal, "get blossom storage quota failed: %v", err)
 461  	}
 462  	return &orlydbv1.GetBlossomStorageQuotaResponse{QuotaMb: quota}, nil
 463  }
 464  
 465  func (s *DatabaseService) IsFirstTimeUser(ctx context.Context, req *orlydbv1.IsFirstTimeUserRequest) (*orlydbv1.IsFirstTimeUserResponse, error) {
 466  	firstTime, err := s.db.IsFirstTimeUser(req.Pubkey)
 467  	if err != nil {
 468  		return nil, status.Errorf(codes.Internal, "is first time user failed: %v", err)
 469  	}
 470  	return &orlydbv1.IsFirstTimeUserResponse{FirstTime: firstTime}, nil
 471  }
 472  
 473  // === Paid ACL ===
 474  
 475  func (s *DatabaseService) SavePaidSubscription(ctx context.Context, req *orlydbv1.PaidSubscriptionMsg) (*orlydbv1.Empty, error) {
 476  	sub := orlydbv1.ProtoToPaidSubscription(req)
 477  	if err := s.db.SavePaidSubscription(sub); err != nil {
 478  		return nil, status.Errorf(codes.Internal, "save paid subscription failed: %v", err)
 479  	}
 480  	return &orlydbv1.Empty{}, nil
 481  }
 482  
 483  func (s *DatabaseService) GetPaidSubscription(ctx context.Context, req *orlydbv1.GetPaidSubscriptionRequest) (*orlydbv1.PaidSubscriptionMsg, error) {
 484  	sub, err := s.db.GetPaidSubscription(req.PubkeyHex)
 485  	if err != nil {
 486  		return nil, status.Errorf(codes.Internal, "get paid subscription failed: %v", err)
 487  	}
 488  	if sub == nil {
 489  		return nil, status.Errorf(codes.NotFound, "paid subscription not found")
 490  	}
 491  	return orlydbv1.PaidSubscriptionToProto(sub), nil
 492  }
 493  
 494  func (s *DatabaseService) DeletePaidSubscription(ctx context.Context, req *orlydbv1.DeletePaidSubscriptionRequest) (*orlydbv1.Empty, error) {
 495  	if err := s.db.DeletePaidSubscription(req.PubkeyHex); err != nil {
 496  		return nil, status.Errorf(codes.Internal, "delete paid subscription failed: %v", err)
 497  	}
 498  	return &orlydbv1.Empty{}, nil
 499  }
 500  
 501  func (s *DatabaseService) ListPaidSubscriptions(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.PaidSubscriptionList, error) {
 502  	subs, err := s.db.ListPaidSubscriptions()
 503  	if err != nil {
 504  		return nil, status.Errorf(codes.Internal, "list paid subscriptions failed: %v", err)
 505  	}
 506  	return orlydbv1.PaidSubscriptionListToProto(subs), nil
 507  }
 508  
 509  func (s *DatabaseService) ClaimAlias(ctx context.Context, req *orlydbv1.ClaimAliasRequest) (*orlydbv1.Empty, error) {
 510  	if err := s.db.ClaimAlias(req.Alias, req.PubkeyHex); err != nil {
 511  		return nil, status.Errorf(codes.Internal, "claim alias failed: %v", err)
 512  	}
 513  	return &orlydbv1.Empty{}, nil
 514  }
 515  
 516  func (s *DatabaseService) GetAliasByPubkey(ctx context.Context, req *orlydbv1.GetAliasByPubkeyRequest) (*orlydbv1.AliasResponse, error) {
 517  	alias, err := s.db.GetAliasByPubkey(req.PubkeyHex)
 518  	if err != nil {
 519  		return nil, status.Errorf(codes.Internal, "get alias by pubkey failed: %v", err)
 520  	}
 521  	return &orlydbv1.AliasResponse{Alias: alias}, nil
 522  }
 523  
 524  func (s *DatabaseService) GetPubkeyByAlias(ctx context.Context, req *orlydbv1.GetPubkeyByAliasRequest) (*orlydbv1.PubkeyResponse, error) {
 525  	pubkey, err := s.db.GetPubkeyByAlias(req.Alias)
 526  	if err != nil {
 527  		return nil, status.Errorf(codes.Internal, "get pubkey by alias failed: %v", err)
 528  	}
 529  	return &orlydbv1.PubkeyResponse{PubkeyHex: pubkey}, nil
 530  }
 531  
 532  func (s *DatabaseService) IsAliasTaken(ctx context.Context, req *orlydbv1.IsAliasTakenRequest) (*orlydbv1.IsAliasTakenResponse, error) {
 533  	taken, err := s.db.IsAliasTaken(req.Alias)
 534  	if err != nil {
 535  		return nil, status.Errorf(codes.Internal, "is alias taken failed: %v", err)
 536  	}
 537  	return &orlydbv1.IsAliasTakenResponse{Taken: taken}, nil
 538  }
 539  
 540  // === NIP-43 ===
 541  
 542  func (s *DatabaseService) AddNIP43Member(ctx context.Context, req *orlydbv1.AddNIP43MemberRequest) (*orlydbv1.Empty, error) {
 543  	if err := s.db.AddNIP43Member(req.Pubkey, req.InviteCode); err != nil {
 544  		return nil, status.Errorf(codes.Internal, "add NIP-43 member failed: %v", err)
 545  	}
 546  	return &orlydbv1.Empty{}, nil
 547  }
 548  
 549  func (s *DatabaseService) RemoveNIP43Member(ctx context.Context, req *orlydbv1.RemoveNIP43MemberRequest) (*orlydbv1.Empty, error) {
 550  	if err := s.db.RemoveNIP43Member(req.Pubkey); err != nil {
 551  		return nil, status.Errorf(codes.Internal, "remove NIP-43 member failed: %v", err)
 552  	}
 553  	return &orlydbv1.Empty{}, nil
 554  }
 555  
 556  func (s *DatabaseService) IsNIP43Member(ctx context.Context, req *orlydbv1.IsNIP43MemberRequest) (*orlydbv1.IsNIP43MemberResponse, error) {
 557  	isMember, err := s.db.IsNIP43Member(req.Pubkey)
 558  	if err != nil {
 559  		return nil, status.Errorf(codes.Internal, "is NIP-43 member failed: %v", err)
 560  	}
 561  	return &orlydbv1.IsNIP43MemberResponse{IsMember: isMember}, nil
 562  }
 563  
 564  func (s *DatabaseService) GetNIP43Membership(ctx context.Context, req *orlydbv1.GetNIP43MembershipRequest) (*orlydbv1.NIP43Membership, error) {
 565  	membership, err := s.db.GetNIP43Membership(req.Pubkey)
 566  	if err != nil {
 567  		return nil, status.Errorf(codes.Internal, "get NIP-43 membership failed: %v", err)
 568  	}
 569  	return orlydbv1.NIP43MembershipToProto(membership), nil
 570  }
 571  
 572  func (s *DatabaseService) GetAllNIP43Members(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.PubkeyList, error) {
 573  	members, err := s.db.GetAllNIP43Members()
 574  	if err != nil {
 575  		return nil, status.Errorf(codes.Internal, "get all NIP-43 members failed: %v", err)
 576  	}
 577  	return &orlydbv1.PubkeyList{Pubkeys: members}, nil
 578  }
 579  
 580  func (s *DatabaseService) StoreInviteCode(ctx context.Context, req *orlydbv1.StoreInviteCodeRequest) (*orlydbv1.Empty, error) {
 581  	expiresAt := orlydbv1.TimeFromUnix(req.ExpiresAt)
 582  	if err := s.db.StoreInviteCode(req.Code, expiresAt); err != nil {
 583  		return nil, status.Errorf(codes.Internal, "store invite code failed: %v", err)
 584  	}
 585  	return &orlydbv1.Empty{}, nil
 586  }
 587  
 588  func (s *DatabaseService) ValidateInviteCode(ctx context.Context, req *orlydbv1.ValidateInviteCodeRequest) (*orlydbv1.ValidateInviteCodeResponse, error) {
 589  	valid, err := s.db.ValidateInviteCode(req.Code)
 590  	if err != nil {
 591  		return nil, status.Errorf(codes.Internal, "validate invite code failed: %v", err)
 592  	}
 593  	return &orlydbv1.ValidateInviteCodeResponse{Valid: valid}, nil
 594  }
 595  
 596  func (s *DatabaseService) DeleteInviteCode(ctx context.Context, req *orlydbv1.DeleteInviteCodeRequest) (*orlydbv1.Empty, error) {
 597  	if err := s.db.DeleteInviteCode(req.Code); err != nil {
 598  		return nil, status.Errorf(codes.Internal, "delete invite code failed: %v", err)
 599  	}
 600  	return &orlydbv1.Empty{}, nil
 601  }
 602  
 603  func (s *DatabaseService) PublishNIP43MembershipEvent(ctx context.Context, req *orlydbv1.PublishNIP43MembershipEventRequest) (*orlydbv1.Empty, error) {
 604  	if err := s.db.PublishNIP43MembershipEvent(int(req.Kind), req.Pubkey); err != nil {
 605  		return nil, status.Errorf(codes.Internal, "publish NIP-43 membership event failed: %v", err)
 606  	}
 607  	return &orlydbv1.Empty{}, nil
 608  }
 609  
 610  // === Query Cache ===
 611  
 612  func (s *DatabaseService) GetCachedJSON(ctx context.Context, req *orlydbv1.GetCachedJSONRequest) (*orlydbv1.GetCachedJSONResponse, error) {
 613  	f := orlydbv1.ProtoToFilter(req.Filter)
 614  	jsonItems, found := s.db.GetCachedJSON(f)
 615  	return &orlydbv1.GetCachedJSONResponse{
 616  		JsonItems: jsonItems,
 617  		Found:     found,
 618  	}, nil
 619  }
 620  
 621  func (s *DatabaseService) CacheMarshaledJSON(ctx context.Context, req *orlydbv1.CacheMarshaledJSONRequest) (*orlydbv1.Empty, error) {
 622  	f := orlydbv1.ProtoToFilter(req.Filter)
 623  	s.db.CacheMarshaledJSON(f, req.JsonItems)
 624  	return &orlydbv1.Empty{}, nil
 625  }
 626  
 627  func (s *DatabaseService) GetCachedEvents(ctx context.Context, req *orlydbv1.GetCachedEventsRequest) (*orlydbv1.GetCachedEventsResponse, error) {
 628  	f := orlydbv1.ProtoToFilter(req.Filter)
 629  	events, found := s.db.GetCachedEvents(f)
 630  	return &orlydbv1.GetCachedEventsResponse{
 631  		Events: orlydbv1.EventsToProto(events),
 632  		Found:  found,
 633  	}, nil
 634  }
 635  
 636  func (s *DatabaseService) CacheEvents(ctx context.Context, req *orlydbv1.CacheEventsRequest) (*orlydbv1.Empty, error) {
 637  	f := orlydbv1.ProtoToFilter(req.Filter)
 638  	events := orlydbv1.ProtoToEvents(req.Events)
 639  	s.db.CacheEvents(f, events)
 640  	return &orlydbv1.Empty{}, nil
 641  }
 642  
 643  func (s *DatabaseService) InvalidateQueryCache(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.Empty, error) {
 644  	s.db.InvalidateQueryCache()
 645  	return &orlydbv1.Empty{}, nil
 646  }
 647  
 648  // === Access Tracking ===
 649  
 650  func (s *DatabaseService) RecordEventAccess(ctx context.Context, req *orlydbv1.RecordEventAccessRequest) (*orlydbv1.Empty, error) {
 651  	if err := s.db.RecordEventAccess(req.Serial, req.ConnectionId); err != nil {
 652  		return nil, status.Errorf(codes.Internal, "record event access failed: %v", err)
 653  	}
 654  	return &orlydbv1.Empty{}, nil
 655  }
 656  
 657  func (s *DatabaseService) GetEventAccessInfo(ctx context.Context, req *orlydbv1.GetEventAccessInfoRequest) (*orlydbv1.GetEventAccessInfoResponse, error) {
 658  	lastAccess, accessCount, err := s.db.GetEventAccessInfo(req.Serial)
 659  	if err != nil {
 660  		return nil, status.Errorf(codes.Internal, "get event access info failed: %v", err)
 661  	}
 662  	return &orlydbv1.GetEventAccessInfoResponse{
 663  		LastAccess:  lastAccess,
 664  		AccessCount: accessCount,
 665  	}, nil
 666  }
 667  
 668  func (s *DatabaseService) GetLeastAccessedEvents(ctx context.Context, req *orlydbv1.GetLeastAccessedEventsRequest) (*orlydbv1.SerialList, error) {
 669  	serials, err := s.db.GetLeastAccessedEvents(int(req.Limit), req.MinAgeSec)
 670  	if err != nil {
 671  		return nil, status.Errorf(codes.Internal, "get least accessed events failed: %v", err)
 672  	}
 673  	return &orlydbv1.SerialList{Serials: serials}, nil
 674  }
 675  
 676  // === Utility ===
 677  
 678  func (s *DatabaseService) EventIdsBySerial(ctx context.Context, req *orlydbv1.EventIdsBySerialRequest) (*orlydbv1.EventIdsBySerialResponse, error) {
 679  	eventIds, err := s.db.EventIdsBySerial(req.Start, int(req.Count))
 680  	if err != nil {
 681  		return nil, status.Errorf(codes.Internal, "event ids by serial failed: %v", err)
 682  	}
 683  	return &orlydbv1.EventIdsBySerialResponse{EventIds: eventIds}, nil
 684  }
 685  
 686  func (s *DatabaseService) RunMigrations(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.Empty, error) {
 687  	s.db.RunMigrations()
 688  	return &orlydbv1.Empty{}, nil
 689  }
 690  
 691  // === Blob Storage (Blossom) ===
 692  
 693  func (s *DatabaseService) SaveBlob(ctx context.Context, req *orlydbv1.SaveBlobRequest) (*orlydbv1.Empty, error) {
 694  	if err := s.db.SaveBlob(req.Sha256Hash, req.Data, req.Pubkey, req.MimeType, req.Extension); err != nil {
 695  		return nil, status.Errorf(codes.Internal, "save blob failed: %v", err)
 696  	}
 697  	return &orlydbv1.Empty{}, nil
 698  }
 699  
 700  func (s *DatabaseService) SaveBlobMetadata(ctx context.Context, req *orlydbv1.SaveBlobMetadataRequest) (*orlydbv1.Empty, error) {
 701  	if err := s.db.SaveBlobMetadata(req.Sha256Hash, req.Size, req.Pubkey, req.MimeType, req.Extension); err != nil {
 702  		return nil, status.Errorf(codes.Internal, "save blob metadata failed: %v", err)
 703  	}
 704  	return &orlydbv1.Empty{}, nil
 705  }
 706  
 707  func (s *DatabaseService) GetBlob(ctx context.Context, req *orlydbv1.GetBlobRequest) (*orlydbv1.GetBlobResponse, error) {
 708  	data, metadata, err := s.db.GetBlob(req.Sha256Hash)
 709  	if err != nil {
 710  		// Return not found as a response, not an error
 711  		return &orlydbv1.GetBlobResponse{Found: false}, nil
 712  	}
 713  	return &orlydbv1.GetBlobResponse{
 714  		Found:    true,
 715  		Data:     data,
 716  		Metadata: orlydbv1.BlobMetadataToProto(metadata),
 717  	}, nil
 718  }
 719  
 720  func (s *DatabaseService) HasBlob(ctx context.Context, req *orlydbv1.HasBlobRequest) (*orlydbv1.HasBlobResponse, error) {
 721  	exists, err := s.db.HasBlob(req.Sha256Hash)
 722  	if err != nil {
 723  		return nil, status.Errorf(codes.Internal, "has blob failed: %v", err)
 724  	}
 725  	return &orlydbv1.HasBlobResponse{Exists: exists}, nil
 726  }
 727  
 728  func (s *DatabaseService) DeleteBlob(ctx context.Context, req *orlydbv1.DeleteBlobRequest) (*orlydbv1.Empty, error) {
 729  	if err := s.db.DeleteBlob(req.Sha256Hash, req.Pubkey); err != nil {
 730  		return nil, status.Errorf(codes.Internal, "delete blob failed: %v", err)
 731  	}
 732  	return &orlydbv1.Empty{}, nil
 733  }
 734  
 735  func (s *DatabaseService) ListBlobs(ctx context.Context, req *orlydbv1.ListBlobsRequest) (*orlydbv1.ListBlobsResponse, error) {
 736  	descriptors, err := s.db.ListBlobs(req.Pubkey, req.Since, req.Until)
 737  	if err != nil {
 738  		return nil, status.Errorf(codes.Internal, "list blobs failed: %v", err)
 739  	}
 740  	return &orlydbv1.ListBlobsResponse{
 741  		Descriptors: orlydbv1.BlobDescriptorListToProto(descriptors),
 742  	}, nil
 743  }
 744  
 745  func (s *DatabaseService) GetBlobMetadata(ctx context.Context, req *orlydbv1.GetBlobMetadataRequest) (*orlydbv1.BlobMetadata, error) {
 746  	metadata, err := s.db.GetBlobMetadata(req.Sha256Hash)
 747  	if err != nil {
 748  		return nil, status.Errorf(codes.NotFound, "blob metadata not found: %v", err)
 749  	}
 750  	return orlydbv1.BlobMetadataToProto(metadata), nil
 751  }
 752  
 753  func (s *DatabaseService) GetTotalBlobStorageUsed(ctx context.Context, req *orlydbv1.GetTotalBlobStorageUsedRequest) (*orlydbv1.GetTotalBlobStorageUsedResponse, error) {
 754  	totalMB, err := s.db.GetTotalBlobStorageUsed(req.Pubkey)
 755  	if err != nil {
 756  		return nil, status.Errorf(codes.Internal, "get total blob storage used failed: %v", err)
 757  	}
 758  	return &orlydbv1.GetTotalBlobStorageUsedResponse{TotalMb: totalMB}, nil
 759  }
 760  
 761  func (s *DatabaseService) SaveBlobReport(ctx context.Context, req *orlydbv1.SaveBlobReportRequest) (*orlydbv1.Empty, error) {
 762  	if err := s.db.SaveBlobReport(req.Sha256Hash, req.ReportData); err != nil {
 763  		return nil, status.Errorf(codes.Internal, "save blob report failed: %v", err)
 764  	}
 765  	return &orlydbv1.Empty{}, nil
 766  }
 767  
 768  func (s *DatabaseService) ListAllBlobUserStats(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.ListAllBlobUserStatsResponse, error) {
 769  	stats, err := s.db.ListAllBlobUserStats()
 770  	if err != nil {
 771  		return nil, status.Errorf(codes.Internal, "list all blob user stats failed: %v", err)
 772  	}
 773  	return &orlydbv1.ListAllBlobUserStatsResponse{
 774  		Stats: orlydbv1.UserBlobStatsListToProto(stats),
 775  	}, nil
 776  }
 777  
 778  func (s *DatabaseService) ReconcileBlobMetadata(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.ReconcileBlobMetadataResponse, error) {
 779  	reconciled, err := s.db.ReconcileBlobMetadata()
 780  	if err != nil {
 781  		return nil, status.Errorf(codes.Internal, "reconcile blob metadata failed: %v", err)
 782  	}
 783  	return &orlydbv1.ReconcileBlobMetadataResponse{
 784  		Reconciled: int32(reconciled),
 785  	}, nil
 786  }
 787  
 788  func (s *DatabaseService) ListAllBlobs(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.ListBlobsResponse, error) {
 789  	descriptors, err := s.db.ListAllBlobs()
 790  	if err != nil {
 791  		return nil, status.Errorf(codes.Internal, "list all blobs failed: %v", err)
 792  	}
 793  	return &orlydbv1.ListBlobsResponse{
 794  		Descriptors: orlydbv1.BlobDescriptorListToProto(descriptors),
 795  	}, nil
 796  }
 797  
 798  func (s *DatabaseService) GetThumbnail(ctx context.Context, req *orlydbv1.GetThumbnailRequest) (*orlydbv1.GetThumbnailResponse, error) {
 799  	data, err := s.db.GetThumbnail(req.Key)
 800  	if err != nil {
 801  		// Not found is a valid response, not an error
 802  		return &orlydbv1.GetThumbnailResponse{Found: false}, nil
 803  	}
 804  	return &orlydbv1.GetThumbnailResponse{
 805  		Found: true,
 806  		Data:  data,
 807  	}, nil
 808  }
 809  
 810  func (s *DatabaseService) SaveThumbnail(ctx context.Context, req *orlydbv1.SaveThumbnailRequest) (*orlydbv1.Empty, error) {
 811  	if err := s.db.SaveThumbnail(req.Key, req.Data); err != nil {
 812  		return nil, status.Errorf(codes.Internal, "save thumbnail failed: %v", err)
 813  	}
 814  	return &orlydbv1.Empty{}, nil
 815  }
 816  
 817  // === Cypher Query ===
 818  
 819  const (
 820  	cypherDefaultTimeout = 30 * time.Second
 821  	cypherMaxTimeout     = 120 * time.Second
 822  )
 823  
 824  func (s *DatabaseService) ExecuteCypherRead(ctx context.Context, req *orlydbv1.CypherReadRequest) (*orlydbv1.CypherReadResponse, error) {
 825  	executor, ok := s.db.(store.CypherExecutor)
 826  	if !ok {
 827  		return nil, status.Errorf(codes.Unimplemented, "database backend does not support Cypher queries")
 828  	}
 829  
 830  	// Decode JSON-encoded params
 831  	params := make(map[string]any, len(req.Params))
 832  	for k, v := range req.Params {
 833  		var decoded any
 834  		if err := json.Unmarshal(v, &decoded); err != nil {
 835  			return nil, status.Errorf(codes.InvalidArgument, "invalid JSON for param %q: %v", k, err)
 836  		}
 837  		params[k] = decoded
 838  	}
 839  
 840  	// Enforce timeout
 841  	timeout := cypherDefaultTimeout
 842  	if req.TimeoutSeconds > 0 {
 843  		timeout = time.Duration(req.TimeoutSeconds) * time.Second
 844  		if timeout > cypherMaxTimeout {
 845  			timeout = cypherMaxTimeout
 846  		}
 847  	}
 848  	ctx, cancel := context.WithTimeout(ctx, timeout)
 849  	defer cancel()
 850  
 851  	records, err := executor.ExecuteCypherRead(ctx, req.Cypher, params)
 852  	if err != nil {
 853  		return nil, status.Errorf(codes.Internal, "cypher query failed: %v", err)
 854  	}
 855  
 856  	// Encode each record as JSON bytes
 857  	encoded := make([][]byte, 0, len(records))
 858  	for _, rec := range records {
 859  		b, err := json.Marshal(rec)
 860  		if err != nil {
 861  			return nil, status.Errorf(codes.Internal, "failed to marshal record: %v", err)
 862  		}
 863  		encoded = append(encoded, b)
 864  	}
 865  
 866  	return &orlydbv1.CypherReadResponse{Records: encoded}, nil
 867  }
 868  
 869  // === Helper Methods ===
 870  
 871  // streamEvents is a helper to stream events in batches.
 872  type eventStreamer interface {
 873  	Send(*orlydbv1.EventBatch) error
 874  	Context() context.Context
 875  }
 876  
 877  func (s *DatabaseService) streamEvents(events []*orlydbv1.Event, stream eventStreamer) error {
 878  	batchSize := s.cfg.StreamBatchSize
 879  	if batchSize == 0 {
 880  		batchSize = 100
 881  	}
 882  
 883  	for i := 0; i < len(events); i += batchSize {
 884  		end := i + batchSize
 885  		if end > len(events) {
 886  			end = len(events)
 887  		}
 888  
 889  		batch := &orlydbv1.EventBatch{
 890  			Events: events[i:end],
 891  		}
 892  
 893  		if err := stream.Send(batch); err != nil {
 894  			return err
 895  		}
 896  	}
 897  
 898  	return nil
 899  }
 900