service.go raw

   1  package main
   2  
   3  import (
   4  	"context"
   5  	"io"
   6  
   7  	"google.golang.org/grpc/codes"
   8  	"google.golang.org/grpc/status"
   9  	"next.orly.dev/pkg/lol/chk"
  10  	"next.orly.dev/pkg/lol/log"
  11  
  12  	"next.orly.dev/pkg/database"
  13  	orlydbv1 "next.orly.dev/pkg/proto/orlydb/v1"
  14  )
  15  
  16  // DatabaseService implements the orlydbv1.DatabaseServiceServer interface.
  17  type DatabaseService struct {
  18  	orlydbv1.UnimplementedDatabaseServiceServer
  19  	db  database.Database
  20  	cfg *Config
  21  }
  22  
  23  // NewDatabaseService creates a new database service.
  24  func NewDatabaseService(db database.Database, cfg *Config) *DatabaseService {
  25  	return &DatabaseService{
  26  		db:  db,
  27  		cfg: cfg,
  28  	}
  29  }
  30  
  31  // === Lifecycle Methods ===
  32  
  33  func (s *DatabaseService) GetPath(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.PathResponse, error) {
  34  	return &orlydbv1.PathResponse{Path: s.db.Path()}, nil
  35  }
  36  
  37  func (s *DatabaseService) Sync(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.Empty, error) {
  38  	if err := s.db.Sync(); err != nil {
  39  		return nil, status.Errorf(codes.Internal, "sync failed: %v", err)
  40  	}
  41  	return &orlydbv1.Empty{}, nil
  42  }
  43  
  44  func (s *DatabaseService) Ready(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.ReadyResponse, error) {
  45  	// Check if ready channel is closed
  46  	select {
  47  	case <-s.db.Ready():
  48  		return &orlydbv1.ReadyResponse{Ready: true}, nil
  49  	default:
  50  		return &orlydbv1.ReadyResponse{Ready: false}, nil
  51  	}
  52  }
  53  
  54  func (s *DatabaseService) SetLogLevel(ctx context.Context, req *orlydbv1.SetLogLevelRequest) (*orlydbv1.Empty, error) {
  55  	s.db.SetLogLevel(req.Level)
  56  	return &orlydbv1.Empty{}, nil
  57  }
  58  
  59  // === Event Storage ===
  60  
  61  func (s *DatabaseService) SaveEvent(ctx context.Context, req *orlydbv1.SaveEventRequest) (*orlydbv1.SaveEventResponse, error) {
  62  	ev := orlydbv1.ProtoToEvent(req.Event)
  63  	exists, err := s.db.SaveEvent(ctx, ev)
  64  	if err != nil {
  65  		return nil, status.Errorf(codes.Internal, "save event failed: %v", err)
  66  	}
  67  	return &orlydbv1.SaveEventResponse{Exists: exists}, nil
  68  }
  69  
  70  func (s *DatabaseService) GetSerialsFromFilter(ctx context.Context, req *orlydbv1.GetSerialsFromFilterRequest) (*orlydbv1.SerialList, error) {
  71  	f := orlydbv1.ProtoToFilter(req.Filter)
  72  	serials, err := s.db.GetSerialsFromFilter(f)
  73  	if err != nil {
  74  		return nil, status.Errorf(codes.Internal, "get serials failed: %v", err)
  75  	}
  76  	return orlydbv1.Uint40sToProto(serials), nil
  77  }
  78  
  79  func (s *DatabaseService) WouldReplaceEvent(ctx context.Context, req *orlydbv1.WouldReplaceEventRequest) (*orlydbv1.WouldReplaceEventResponse, error) {
  80  	ev := orlydbv1.ProtoToEvent(req.Event)
  81  	wouldReplace, replacedSerials, err := s.db.WouldReplaceEvent(ev)
  82  	if err != nil {
  83  		return nil, status.Errorf(codes.Internal, "would replace check failed: %v", err)
  84  	}
  85  	resp := &orlydbv1.WouldReplaceEventResponse{
  86  		WouldReplace: wouldReplace,
  87  	}
  88  	for _, ser := range replacedSerials {
  89  		resp.ReplacedSerials = append(resp.ReplacedSerials, ser.Get())
  90  	}
  91  	return resp, nil
  92  }
  93  
  94  // === Event Queries (Streaming) ===
  95  
  96  func (s *DatabaseService) QueryEvents(req *orlydbv1.QueryEventsRequest, stream orlydbv1.DatabaseService_QueryEventsServer) error {
  97  	f := orlydbv1.ProtoToFilter(req.Filter)
  98  	events, err := s.db.QueryEvents(stream.Context(), f)
  99  	if err != nil {
 100  		return status.Errorf(codes.Internal, "query events failed: %v", err)
 101  	}
 102  	return s.streamEvents(orlydbv1.EventsToProto(events), stream)
 103  }
 104  
 105  func (s *DatabaseService) QueryAllVersions(req *orlydbv1.QueryEventsRequest, stream orlydbv1.DatabaseService_QueryAllVersionsServer) error {
 106  	f := orlydbv1.ProtoToFilter(req.Filter)
 107  	events, err := s.db.QueryAllVersions(stream.Context(), f)
 108  	if err != nil {
 109  		return status.Errorf(codes.Internal, "query all versions failed: %v", err)
 110  	}
 111  	return s.streamEvents(orlydbv1.EventsToProto(events), stream)
 112  }
 113  
 114  func (s *DatabaseService) QueryEventsWithOptions(req *orlydbv1.QueryEventsWithOptionsRequest, stream orlydbv1.DatabaseService_QueryEventsWithOptionsServer) error {
 115  	f := orlydbv1.ProtoToFilter(req.Filter)
 116  	events, err := s.db.QueryEventsWithOptions(stream.Context(), f, req.IncludeDeleteEvents, req.ShowAllVersions)
 117  	if err != nil {
 118  		return status.Errorf(codes.Internal, "query events with options failed: %v", err)
 119  	}
 120  	return s.streamEvents(orlydbv1.EventsToProto(events), stream)
 121  }
 122  
 123  func (s *DatabaseService) QueryDeleteEventsByTargetId(req *orlydbv1.QueryDeleteEventsByTargetIdRequest, stream orlydbv1.DatabaseService_QueryDeleteEventsByTargetIdServer) error {
 124  	events, err := s.db.QueryDeleteEventsByTargetId(stream.Context(), req.TargetEventId)
 125  	if err != nil {
 126  		return status.Errorf(codes.Internal, "query delete events failed: %v", err)
 127  	}
 128  	return s.streamEvents(orlydbv1.EventsToProto(events), stream)
 129  }
 130  
 131  func (s *DatabaseService) QueryForSerials(ctx context.Context, req *orlydbv1.QueryEventsRequest) (*orlydbv1.SerialList, error) {
 132  	f := orlydbv1.ProtoToFilter(req.Filter)
 133  	serials, err := s.db.QueryForSerials(ctx, f)
 134  	if err != nil {
 135  		return nil, status.Errorf(codes.Internal, "query for serials failed: %v", err)
 136  	}
 137  	return orlydbv1.Uint40sToProto(serials), nil
 138  }
 139  
 140  func (s *DatabaseService) QueryForIds(ctx context.Context, req *orlydbv1.QueryEventsRequest) (*orlydbv1.IdPkTsList, error) {
 141  	f := orlydbv1.ProtoToFilter(req.Filter)
 142  	idPkTs, err := s.db.QueryForIds(ctx, f)
 143  	if err != nil {
 144  		return nil, status.Errorf(codes.Internal, "query for ids failed: %v", err)
 145  	}
 146  	return orlydbv1.IdPkTsListToProto(idPkTs), nil
 147  }
 148  
 149  func (s *DatabaseService) CountEvents(ctx context.Context, req *orlydbv1.QueryEventsRequest) (*orlydbv1.CountEventsResponse, error) {
 150  	f := orlydbv1.ProtoToFilter(req.Filter)
 151  	count, approximate, err := s.db.CountEvents(ctx, f)
 152  	if err != nil {
 153  		return nil, status.Errorf(codes.Internal, "count events failed: %v", err)
 154  	}
 155  	return &orlydbv1.CountEventsResponse{
 156  		Count:       int32(count),
 157  		Approximate: approximate,
 158  	}, nil
 159  }
 160  
 161  // === Event Retrieval by Serial ===
 162  
 163  func (s *DatabaseService) FetchEventBySerial(ctx context.Context, req *orlydbv1.FetchEventBySerialRequest) (*orlydbv1.FetchEventBySerialResponse, error) {
 164  	ser := orlydbv1.ProtoToUint40(&orlydbv1.Uint40{Value: req.Serial})
 165  	ev, err := s.db.FetchEventBySerial(ser)
 166  	if err != nil {
 167  		return nil, status.Errorf(codes.Internal, "fetch event by serial failed: %v", err)
 168  	}
 169  	return &orlydbv1.FetchEventBySerialResponse{
 170  		Event: orlydbv1.EventToProto(ev),
 171  		Found: ev != nil,
 172  	}, nil
 173  }
 174  
 175  func (s *DatabaseService) FetchEventsBySerials(ctx context.Context, req *orlydbv1.FetchEventsBySerialRequest) (*orlydbv1.EventMap, error) {
 176  	serials := orlydbv1.ProtoToUint40s(&orlydbv1.SerialList{Serials: req.Serials})
 177  	events, err := s.db.FetchEventsBySerials(serials)
 178  	if err != nil {
 179  		return nil, status.Errorf(codes.Internal, "fetch events by serials failed: %v", err)
 180  	}
 181  	return orlydbv1.EventMapToProto(events), nil
 182  }
 183  
 184  func (s *DatabaseService) GetSerialById(ctx context.Context, req *orlydbv1.GetSerialByIdRequest) (*orlydbv1.GetSerialByIdResponse, error) {
 185  	ser, err := s.db.GetSerialById(req.Id)
 186  	if err != nil {
 187  		return nil, status.Errorf(codes.Internal, "get serial by id failed: %v", err)
 188  	}
 189  	if ser == nil {
 190  		return &orlydbv1.GetSerialByIdResponse{Found: false}, nil
 191  	}
 192  	return &orlydbv1.GetSerialByIdResponse{
 193  		Serial: ser.Get(),
 194  		Found:  true,
 195  	}, nil
 196  }
 197  
 198  func (s *DatabaseService) GetSerialsByIds(ctx context.Context, req *orlydbv1.GetSerialsByIdsRequest) (*orlydbv1.SerialMap, error) {
 199  	// Convert request IDs to tag format
 200  	ids := orlydbv1.BytesToTag(req.Ids)
 201  	serials, err := s.db.GetSerialsByIds(ids)
 202  	if err != nil {
 203  		return nil, status.Errorf(codes.Internal, "get serials by ids failed: %v", err)
 204  	}
 205  	result := &orlydbv1.SerialMap{
 206  		Serials: make(map[string]uint64),
 207  	}
 208  	for k, v := range serials {
 209  		if v != nil {
 210  			result.Serials[k] = v.Get()
 211  		}
 212  	}
 213  	return result, nil
 214  }
 215  
 216  func (s *DatabaseService) GetSerialsByRange(ctx context.Context, req *orlydbv1.GetSerialsByRangeRequest) (*orlydbv1.SerialList, error) {
 217  	r := orlydbv1.ProtoToRange(req.Range)
 218  	serials, err := s.db.GetSerialsByRange(r)
 219  	if err != nil {
 220  		return nil, status.Errorf(codes.Internal, "get serials by range failed: %v", err)
 221  	}
 222  	return orlydbv1.Uint40sToProto(serials), nil
 223  }
 224  
 225  func (s *DatabaseService) GetFullIdPubkeyBySerial(ctx context.Context, req *orlydbv1.GetFullIdPubkeyBySerialRequest) (*orlydbv1.IdPkTs, error) {
 226  	ser := orlydbv1.ProtoToUint40(&orlydbv1.Uint40{Value: req.Serial})
 227  	idPkTs, err := s.db.GetFullIdPubkeyBySerial(ser)
 228  	if err != nil {
 229  		return nil, status.Errorf(codes.Internal, "get full id pubkey by serial failed: %v", err)
 230  	}
 231  	return orlydbv1.IdPkTsToProto(idPkTs), nil
 232  }
 233  
 234  func (s *DatabaseService) GetFullIdPubkeyBySerials(ctx context.Context, req *orlydbv1.GetFullIdPubkeyBySerialsRequest) (*orlydbv1.IdPkTsList, error) {
 235  	serials := orlydbv1.ProtoToUint40s(&orlydbv1.SerialList{Serials: req.Serials})
 236  	idPkTs, err := s.db.GetFullIdPubkeyBySerials(serials)
 237  	if err != nil {
 238  		return nil, status.Errorf(codes.Internal, "get full id pubkey by serials failed: %v", err)
 239  	}
 240  	return orlydbv1.IdPkTsListToProto(idPkTs), nil
 241  }
 242  
 243  // === Event Deletion ===
 244  
 245  func (s *DatabaseService) DeleteEvent(ctx context.Context, req *orlydbv1.DeleteEventRequest) (*orlydbv1.Empty, error) {
 246  	if err := s.db.DeleteEvent(ctx, req.EventId); err != nil {
 247  		return nil, status.Errorf(codes.Internal, "delete event failed: %v", err)
 248  	}
 249  	return &orlydbv1.Empty{}, nil
 250  }
 251  
 252  func (s *DatabaseService) DeleteEventBySerial(ctx context.Context, req *orlydbv1.DeleteEventBySerialRequest) (*orlydbv1.Empty, error) {
 253  	ser := orlydbv1.ProtoToUint40(&orlydbv1.Uint40{Value: req.Serial})
 254  	ev := orlydbv1.ProtoToEvent(req.Event)
 255  	if err := s.db.DeleteEventBySerial(ctx, ser, ev); err != nil {
 256  		return nil, status.Errorf(codes.Internal, "delete event by serial failed: %v", err)
 257  	}
 258  	return &orlydbv1.Empty{}, nil
 259  }
 260  
 261  func (s *DatabaseService) DeleteExpired(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.Empty, error) {
 262  	s.db.DeleteExpired()
 263  	return &orlydbv1.Empty{}, nil
 264  }
 265  
 266  func (s *DatabaseService) ProcessDelete(ctx context.Context, req *orlydbv1.ProcessDeleteRequest) (*orlydbv1.Empty, error) {
 267  	ev := orlydbv1.ProtoToEvent(req.Event)
 268  	if err := s.db.ProcessDelete(ev, req.Admins); err != nil {
 269  		return nil, status.Errorf(codes.Internal, "process delete failed: %v", err)
 270  	}
 271  	return &orlydbv1.Empty{}, nil
 272  }
 273  
 274  func (s *DatabaseService) CheckForDeleted(ctx context.Context, req *orlydbv1.CheckForDeletedRequest) (*orlydbv1.Empty, error) {
 275  	ev := orlydbv1.ProtoToEvent(req.Event)
 276  	if err := s.db.CheckForDeleted(ev, req.Admins); err != nil {
 277  		return nil, status.Errorf(codes.Internal, "check for deleted failed: %v", err)
 278  	}
 279  	return &orlydbv1.Empty{}, nil
 280  }
 281  
 282  // === Import/Export ===
 283  
 284  func (s *DatabaseService) Import(stream orlydbv1.DatabaseService_ImportServer) error {
 285  	pr, pw := io.Pipe()
 286  
 287  	// Goroutine to read from gRPC stream and write to pipe
 288  	go func() {
 289  		defer pw.Close()
 290  		for {
 291  			chunk, err := stream.Recv()
 292  			if err == io.EOF {
 293  				return
 294  			}
 295  			if err != nil {
 296  				log.E.F("import stream error: %v", err)
 297  				pw.CloseWithError(err)
 298  				return
 299  			}
 300  			if _, err := pw.Write(chunk.Data); chk.E(err) {
 301  				return
 302  			}
 303  		}
 304  	}()
 305  
 306  	// Import from pipe
 307  	s.db.Import(pr)
 308  
 309  	return stream.SendAndClose(&orlydbv1.ImportResponse{
 310  		EventsImported: 0, // TODO: Track count
 311  		EventsSkipped:  0,
 312  	})
 313  }
 314  
 315  func (s *DatabaseService) Export(req *orlydbv1.ExportRequest, stream orlydbv1.DatabaseService_ExportServer) error {
 316  	pr, pw := io.Pipe()
 317  
 318  	// Goroutine to export to pipe
 319  	go func() {
 320  		defer pw.Close()
 321  		s.db.Export(stream.Context(), pw, req.Pubkeys...)
 322  	}()
 323  
 324  	// Read from pipe and send to stream
 325  	buf := make([]byte, 64*1024) // 64KB chunks
 326  	for {
 327  		n, err := pr.Read(buf)
 328  		if err == io.EOF {
 329  			return nil
 330  		}
 331  		if err != nil {
 332  			return status.Errorf(codes.Internal, "export failed: %v", err)
 333  		}
 334  		if err := stream.Send(&orlydbv1.ExportChunk{Data: buf[:n]}); err != nil {
 335  			return err
 336  		}
 337  	}
 338  }
 339  
 340  func (s *DatabaseService) ImportEventsFromStrings(ctx context.Context, req *orlydbv1.ImportEventsFromStringsRequest) (*orlydbv1.ImportResponse, error) {
 341  	// Note: We can't pass policy manager over gRPC, so we pass nil
 342  	if err := s.db.ImportEventsFromStrings(ctx, req.EventJsons, nil); err != nil {
 343  		return nil, status.Errorf(codes.Internal, "import events from strings failed: %v", err)
 344  	}
 345  	return &orlydbv1.ImportResponse{
 346  		EventsImported: int64(len(req.EventJsons)),
 347  	}, nil
 348  }
 349  
 350  // === Relay Identity ===
 351  
 352  func (s *DatabaseService) GetRelayIdentitySecret(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.GetRelayIdentitySecretResponse, error) {
 353  	secret, err := s.db.GetRelayIdentitySecret()
 354  	if err != nil {
 355  		return nil, status.Errorf(codes.Internal, "get relay identity secret failed: %v", err)
 356  	}
 357  	return &orlydbv1.GetRelayIdentitySecretResponse{SecretKey: secret}, nil
 358  }
 359  
 360  func (s *DatabaseService) SetRelayIdentitySecret(ctx context.Context, req *orlydbv1.SetRelayIdentitySecretRequest) (*orlydbv1.Empty, error) {
 361  	if err := s.db.SetRelayIdentitySecret(req.SecretKey); err != nil {
 362  		return nil, status.Errorf(codes.Internal, "set relay identity secret failed: %v", err)
 363  	}
 364  	return &orlydbv1.Empty{}, nil
 365  }
 366  
 367  func (s *DatabaseService) GetOrCreateRelayIdentitySecret(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.GetRelayIdentitySecretResponse, error) {
 368  	secret, err := s.db.GetOrCreateRelayIdentitySecret()
 369  	if err != nil {
 370  		return nil, status.Errorf(codes.Internal, "get or create relay identity secret failed: %v", err)
 371  	}
 372  	return &orlydbv1.GetRelayIdentitySecretResponse{SecretKey: secret}, nil
 373  }
 374  
 375  // === Markers ===
 376  
 377  func (s *DatabaseService) SetMarker(ctx context.Context, req *orlydbv1.SetMarkerRequest) (*orlydbv1.Empty, error) {
 378  	if err := s.db.SetMarker(req.Key, req.Value); err != nil {
 379  		return nil, status.Errorf(codes.Internal, "set marker failed: %v", err)
 380  	}
 381  	return &orlydbv1.Empty{}, nil
 382  }
 383  
 384  func (s *DatabaseService) GetMarker(ctx context.Context, req *orlydbv1.GetMarkerRequest) (*orlydbv1.GetMarkerResponse, error) {
 385  	value, err := s.db.GetMarker(req.Key)
 386  	if err != nil {
 387  		return nil, status.Errorf(codes.Internal, "get marker failed: %v", err)
 388  	}
 389  	return &orlydbv1.GetMarkerResponse{
 390  		Value: value,
 391  		Found: value != nil,
 392  	}, nil
 393  }
 394  
 395  func (s *DatabaseService) HasMarker(ctx context.Context, req *orlydbv1.HasMarkerRequest) (*orlydbv1.HasMarkerResponse, error) {
 396  	exists := s.db.HasMarker(req.Key)
 397  	return &orlydbv1.HasMarkerResponse{Exists: exists}, nil
 398  }
 399  
 400  func (s *DatabaseService) DeleteMarker(ctx context.Context, req *orlydbv1.DeleteMarkerRequest) (*orlydbv1.Empty, error) {
 401  	if err := s.db.DeleteMarker(req.Key); err != nil {
 402  		return nil, status.Errorf(codes.Internal, "delete marker failed: %v", err)
 403  	}
 404  	return &orlydbv1.Empty{}, nil
 405  }
 406  
 407  // === Subscriptions ===
 408  
 409  func (s *DatabaseService) GetSubscription(ctx context.Context, req *orlydbv1.GetSubscriptionRequest) (*orlydbv1.Subscription, error) {
 410  	sub, err := s.db.GetSubscription(req.Pubkey)
 411  	if err != nil {
 412  		return nil, status.Errorf(codes.Internal, "get subscription failed: %v", err)
 413  	}
 414  	return orlydbv1.SubscriptionToProto(sub, req.Pubkey), nil
 415  }
 416  
 417  func (s *DatabaseService) IsSubscriptionActive(ctx context.Context, req *orlydbv1.IsSubscriptionActiveRequest) (*orlydbv1.IsSubscriptionActiveResponse, error) {
 418  	active, err := s.db.IsSubscriptionActive(req.Pubkey)
 419  	if err != nil {
 420  		return nil, status.Errorf(codes.Internal, "is subscription active failed: %v", err)
 421  	}
 422  	return &orlydbv1.IsSubscriptionActiveResponse{Active: active}, nil
 423  }
 424  
 425  func (s *DatabaseService) ExtendSubscription(ctx context.Context, req *orlydbv1.ExtendSubscriptionRequest) (*orlydbv1.Empty, error) {
 426  	if err := s.db.ExtendSubscription(req.Pubkey, int(req.Days)); err != nil {
 427  		return nil, status.Errorf(codes.Internal, "extend subscription failed: %v", err)
 428  	}
 429  	return &orlydbv1.Empty{}, nil
 430  }
 431  
 432  func (s *DatabaseService) RecordPayment(ctx context.Context, req *orlydbv1.RecordPaymentRequest) (*orlydbv1.Empty, error) {
 433  	if err := s.db.RecordPayment(req.Pubkey, req.Amount, req.Invoice, req.Preimage); err != nil {
 434  		return nil, status.Errorf(codes.Internal, "record payment failed: %v", err)
 435  	}
 436  	return &orlydbv1.Empty{}, nil
 437  }
 438  
 439  func (s *DatabaseService) GetPaymentHistory(ctx context.Context, req *orlydbv1.GetPaymentHistoryRequest) (*orlydbv1.PaymentList, error) {
 440  	payments, err := s.db.GetPaymentHistory(req.Pubkey)
 441  	if err != nil {
 442  		return nil, status.Errorf(codes.Internal, "get payment history failed: %v", err)
 443  	}
 444  	return orlydbv1.PaymentListToProto(payments), nil
 445  }
 446  
 447  func (s *DatabaseService) ExtendBlossomSubscription(ctx context.Context, req *orlydbv1.ExtendBlossomSubscriptionRequest) (*orlydbv1.Empty, error) {
 448  	if err := s.db.ExtendBlossomSubscription(req.Pubkey, req.Tier, req.StorageMb, int(req.DaysExtended)); err != nil {
 449  		return nil, status.Errorf(codes.Internal, "extend blossom subscription failed: %v", err)
 450  	}
 451  	return &orlydbv1.Empty{}, nil
 452  }
 453  
 454  func (s *DatabaseService) GetBlossomStorageQuota(ctx context.Context, req *orlydbv1.GetBlossomStorageQuotaRequest) (*orlydbv1.GetBlossomStorageQuotaResponse, error) {
 455  	quota, err := s.db.GetBlossomStorageQuota(req.Pubkey)
 456  	if err != nil {
 457  		return nil, status.Errorf(codes.Internal, "get blossom storage quota failed: %v", err)
 458  	}
 459  	return &orlydbv1.GetBlossomStorageQuotaResponse{QuotaMb: quota}, nil
 460  }
 461  
 462  func (s *DatabaseService) IsFirstTimeUser(ctx context.Context, req *orlydbv1.IsFirstTimeUserRequest) (*orlydbv1.IsFirstTimeUserResponse, error) {
 463  	firstTime, err := s.db.IsFirstTimeUser(req.Pubkey)
 464  	if err != nil {
 465  		return nil, status.Errorf(codes.Internal, "is first time user failed: %v", err)
 466  	}
 467  	return &orlydbv1.IsFirstTimeUserResponse{FirstTime: firstTime}, nil
 468  }
 469  
 470  // === NIP-43 ===
 471  
 472  func (s *DatabaseService) AddNIP43Member(ctx context.Context, req *orlydbv1.AddNIP43MemberRequest) (*orlydbv1.Empty, error) {
 473  	if err := s.db.AddNIP43Member(req.Pubkey, req.InviteCode); err != nil {
 474  		return nil, status.Errorf(codes.Internal, "add NIP-43 member failed: %v", err)
 475  	}
 476  	return &orlydbv1.Empty{}, nil
 477  }
 478  
 479  func (s *DatabaseService) RemoveNIP43Member(ctx context.Context, req *orlydbv1.RemoveNIP43MemberRequest) (*orlydbv1.Empty, error) {
 480  	if err := s.db.RemoveNIP43Member(req.Pubkey); err != nil {
 481  		return nil, status.Errorf(codes.Internal, "remove NIP-43 member failed: %v", err)
 482  	}
 483  	return &orlydbv1.Empty{}, nil
 484  }
 485  
 486  func (s *DatabaseService) IsNIP43Member(ctx context.Context, req *orlydbv1.IsNIP43MemberRequest) (*orlydbv1.IsNIP43MemberResponse, error) {
 487  	isMember, err := s.db.IsNIP43Member(req.Pubkey)
 488  	if err != nil {
 489  		return nil, status.Errorf(codes.Internal, "is NIP-43 member failed: %v", err)
 490  	}
 491  	return &orlydbv1.IsNIP43MemberResponse{IsMember: isMember}, nil
 492  }
 493  
 494  func (s *DatabaseService) GetNIP43Membership(ctx context.Context, req *orlydbv1.GetNIP43MembershipRequest) (*orlydbv1.NIP43Membership, error) {
 495  	membership, err := s.db.GetNIP43Membership(req.Pubkey)
 496  	if err != nil {
 497  		return nil, status.Errorf(codes.Internal, "get NIP-43 membership failed: %v", err)
 498  	}
 499  	return orlydbv1.NIP43MembershipToProto(membership), nil
 500  }
 501  
 502  func (s *DatabaseService) GetAllNIP43Members(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.PubkeyList, error) {
 503  	members, err := s.db.GetAllNIP43Members()
 504  	if err != nil {
 505  		return nil, status.Errorf(codes.Internal, "get all NIP-43 members failed: %v", err)
 506  	}
 507  	return &orlydbv1.PubkeyList{Pubkeys: members}, nil
 508  }
 509  
 510  func (s *DatabaseService) StoreInviteCode(ctx context.Context, req *orlydbv1.StoreInviteCodeRequest) (*orlydbv1.Empty, error) {
 511  	expiresAt := orlydbv1.TimeFromUnix(req.ExpiresAt)
 512  	if err := s.db.StoreInviteCode(req.Code, expiresAt); err != nil {
 513  		return nil, status.Errorf(codes.Internal, "store invite code failed: %v", err)
 514  	}
 515  	return &orlydbv1.Empty{}, nil
 516  }
 517  
 518  func (s *DatabaseService) ValidateInviteCode(ctx context.Context, req *orlydbv1.ValidateInviteCodeRequest) (*orlydbv1.ValidateInviteCodeResponse, error) {
 519  	valid, err := s.db.ValidateInviteCode(req.Code)
 520  	if err != nil {
 521  		return nil, status.Errorf(codes.Internal, "validate invite code failed: %v", err)
 522  	}
 523  	return &orlydbv1.ValidateInviteCodeResponse{Valid: valid}, nil
 524  }
 525  
 526  func (s *DatabaseService) DeleteInviteCode(ctx context.Context, req *orlydbv1.DeleteInviteCodeRequest) (*orlydbv1.Empty, error) {
 527  	if err := s.db.DeleteInviteCode(req.Code); err != nil {
 528  		return nil, status.Errorf(codes.Internal, "delete invite code failed: %v", err)
 529  	}
 530  	return &orlydbv1.Empty{}, nil
 531  }
 532  
 533  func (s *DatabaseService) PublishNIP43MembershipEvent(ctx context.Context, req *orlydbv1.PublishNIP43MembershipEventRequest) (*orlydbv1.Empty, error) {
 534  	if err := s.db.PublishNIP43MembershipEvent(int(req.Kind), req.Pubkey); err != nil {
 535  		return nil, status.Errorf(codes.Internal, "publish NIP-43 membership event failed: %v", err)
 536  	}
 537  	return &orlydbv1.Empty{}, nil
 538  }
 539  
 540  // === Query Cache ===
 541  
 542  func (s *DatabaseService) GetCachedJSON(ctx context.Context, req *orlydbv1.GetCachedJSONRequest) (*orlydbv1.GetCachedJSONResponse, error) {
 543  	f := orlydbv1.ProtoToFilter(req.Filter)
 544  	jsonItems, found := s.db.GetCachedJSON(f)
 545  	return &orlydbv1.GetCachedJSONResponse{
 546  		JsonItems: jsonItems,
 547  		Found:     found,
 548  	}, nil
 549  }
 550  
 551  func (s *DatabaseService) CacheMarshaledJSON(ctx context.Context, req *orlydbv1.CacheMarshaledJSONRequest) (*orlydbv1.Empty, error) {
 552  	f := orlydbv1.ProtoToFilter(req.Filter)
 553  	s.db.CacheMarshaledJSON(f, req.JsonItems)
 554  	return &orlydbv1.Empty{}, nil
 555  }
 556  
 557  func (s *DatabaseService) GetCachedEvents(ctx context.Context, req *orlydbv1.GetCachedEventsRequest) (*orlydbv1.GetCachedEventsResponse, error) {
 558  	f := orlydbv1.ProtoToFilter(req.Filter)
 559  	events, found := s.db.GetCachedEvents(f)
 560  	return &orlydbv1.GetCachedEventsResponse{
 561  		Events: orlydbv1.EventsToProto(events),
 562  		Found:  found,
 563  	}, nil
 564  }
 565  
 566  func (s *DatabaseService) CacheEvents(ctx context.Context, req *orlydbv1.CacheEventsRequest) (*orlydbv1.Empty, error) {
 567  	f := orlydbv1.ProtoToFilter(req.Filter)
 568  	events := orlydbv1.ProtoToEvents(req.Events)
 569  	s.db.CacheEvents(f, events)
 570  	return &orlydbv1.Empty{}, nil
 571  }
 572  
 573  func (s *DatabaseService) InvalidateQueryCache(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.Empty, error) {
 574  	s.db.InvalidateQueryCache()
 575  	return &orlydbv1.Empty{}, nil
 576  }
 577  
 578  // === Access Tracking ===
 579  
 580  func (s *DatabaseService) RecordEventAccess(ctx context.Context, req *orlydbv1.RecordEventAccessRequest) (*orlydbv1.Empty, error) {
 581  	if err := s.db.RecordEventAccess(req.Serial, req.ConnectionId); err != nil {
 582  		return nil, status.Errorf(codes.Internal, "record event access failed: %v", err)
 583  	}
 584  	return &orlydbv1.Empty{}, nil
 585  }
 586  
 587  func (s *DatabaseService) GetEventAccessInfo(ctx context.Context, req *orlydbv1.GetEventAccessInfoRequest) (*orlydbv1.GetEventAccessInfoResponse, error) {
 588  	lastAccess, accessCount, err := s.db.GetEventAccessInfo(req.Serial)
 589  	if err != nil {
 590  		return nil, status.Errorf(codes.Internal, "get event access info failed: %v", err)
 591  	}
 592  	return &orlydbv1.GetEventAccessInfoResponse{
 593  		LastAccess:  lastAccess,
 594  		AccessCount: accessCount,
 595  	}, nil
 596  }
 597  
 598  func (s *DatabaseService) GetLeastAccessedEvents(ctx context.Context, req *orlydbv1.GetLeastAccessedEventsRequest) (*orlydbv1.SerialList, error) {
 599  	serials, err := s.db.GetLeastAccessedEvents(int(req.Limit), req.MinAgeSec)
 600  	if err != nil {
 601  		return nil, status.Errorf(codes.Internal, "get least accessed events failed: %v", err)
 602  	}
 603  	return &orlydbv1.SerialList{Serials: serials}, nil
 604  }
 605  
 606  // === Utility ===
 607  
 608  func (s *DatabaseService) EventIdsBySerial(ctx context.Context, req *orlydbv1.EventIdsBySerialRequest) (*orlydbv1.EventIdsBySerialResponse, error) {
 609  	eventIds, err := s.db.EventIdsBySerial(req.Start, int(req.Count))
 610  	if err != nil {
 611  		return nil, status.Errorf(codes.Internal, "event ids by serial failed: %v", err)
 612  	}
 613  	return &orlydbv1.EventIdsBySerialResponse{EventIds: eventIds}, nil
 614  }
 615  
 616  func (s *DatabaseService) RunMigrations(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.Empty, error) {
 617  	s.db.RunMigrations()
 618  	return &orlydbv1.Empty{}, nil
 619  }
 620  
 621  // === Blob Storage (Blossom) ===
 622  
 623  func (s *DatabaseService) SaveBlob(ctx context.Context, req *orlydbv1.SaveBlobRequest) (*orlydbv1.Empty, error) {
 624  	if err := s.db.SaveBlob(req.Sha256Hash, req.Data, req.Pubkey, req.MimeType, req.Extension); err != nil {
 625  		return nil, status.Errorf(codes.Internal, "save blob failed: %v", err)
 626  	}
 627  	return &orlydbv1.Empty{}, nil
 628  }
 629  
 630  func (s *DatabaseService) SaveBlobMetadata(ctx context.Context, req *orlydbv1.SaveBlobMetadataRequest) (*orlydbv1.Empty, error) {
 631  	if err := s.db.SaveBlobMetadata(req.Sha256Hash, req.Size, req.Pubkey, req.MimeType, req.Extension); err != nil {
 632  		return nil, status.Errorf(codes.Internal, "save blob metadata failed: %v", err)
 633  	}
 634  	return &orlydbv1.Empty{}, nil
 635  }
 636  
 637  func (s *DatabaseService) GetBlob(ctx context.Context, req *orlydbv1.GetBlobRequest) (*orlydbv1.GetBlobResponse, error) {
 638  	data, metadata, err := s.db.GetBlob(req.Sha256Hash)
 639  	if err != nil {
 640  		// Return not found as a response, not an error
 641  		return &orlydbv1.GetBlobResponse{Found: false}, nil
 642  	}
 643  	return &orlydbv1.GetBlobResponse{
 644  		Found:    true,
 645  		Data:     data,
 646  		Metadata: orlydbv1.BlobMetadataToProto(metadata),
 647  	}, nil
 648  }
 649  
 650  func (s *DatabaseService) HasBlob(ctx context.Context, req *orlydbv1.HasBlobRequest) (*orlydbv1.HasBlobResponse, error) {
 651  	exists, err := s.db.HasBlob(req.Sha256Hash)
 652  	if err != nil {
 653  		return nil, status.Errorf(codes.Internal, "has blob failed: %v", err)
 654  	}
 655  	return &orlydbv1.HasBlobResponse{Exists: exists}, nil
 656  }
 657  
 658  func (s *DatabaseService) DeleteBlob(ctx context.Context, req *orlydbv1.DeleteBlobRequest) (*orlydbv1.Empty, error) {
 659  	if err := s.db.DeleteBlob(req.Sha256Hash, req.Pubkey); err != nil {
 660  		return nil, status.Errorf(codes.Internal, "delete blob failed: %v", err)
 661  	}
 662  	return &orlydbv1.Empty{}, nil
 663  }
 664  
 665  func (s *DatabaseService) ListBlobs(ctx context.Context, req *orlydbv1.ListBlobsRequest) (*orlydbv1.ListBlobsResponse, error) {
 666  	descriptors, err := s.db.ListBlobs(req.Pubkey, req.Since, req.Until)
 667  	if err != nil {
 668  		return nil, status.Errorf(codes.Internal, "list blobs failed: %v", err)
 669  	}
 670  	return &orlydbv1.ListBlobsResponse{
 671  		Descriptors: orlydbv1.BlobDescriptorListToProto(descriptors),
 672  	}, nil
 673  }
 674  
 675  func (s *DatabaseService) GetBlobMetadata(ctx context.Context, req *orlydbv1.GetBlobMetadataRequest) (*orlydbv1.BlobMetadata, error) {
 676  	metadata, err := s.db.GetBlobMetadata(req.Sha256Hash)
 677  	if err != nil {
 678  		return nil, status.Errorf(codes.NotFound, "blob metadata not found: %v", err)
 679  	}
 680  	return orlydbv1.BlobMetadataToProto(metadata), nil
 681  }
 682  
 683  func (s *DatabaseService) GetTotalBlobStorageUsed(ctx context.Context, req *orlydbv1.GetTotalBlobStorageUsedRequest) (*orlydbv1.GetTotalBlobStorageUsedResponse, error) {
 684  	totalMB, err := s.db.GetTotalBlobStorageUsed(req.Pubkey)
 685  	if err != nil {
 686  		return nil, status.Errorf(codes.Internal, "get total blob storage used failed: %v", err)
 687  	}
 688  	return &orlydbv1.GetTotalBlobStorageUsedResponse{TotalMb: totalMB}, nil
 689  }
 690  
 691  func (s *DatabaseService) SaveBlobReport(ctx context.Context, req *orlydbv1.SaveBlobReportRequest) (*orlydbv1.Empty, error) {
 692  	if err := s.db.SaveBlobReport(req.Sha256Hash, req.ReportData); err != nil {
 693  		return nil, status.Errorf(codes.Internal, "save blob report failed: %v", err)
 694  	}
 695  	return &orlydbv1.Empty{}, nil
 696  }
 697  
 698  func (s *DatabaseService) ListAllBlobUserStats(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.ListAllBlobUserStatsResponse, error) {
 699  	stats, err := s.db.ListAllBlobUserStats()
 700  	if err != nil {
 701  		return nil, status.Errorf(codes.Internal, "list all blob user stats failed: %v", err)
 702  	}
 703  	return &orlydbv1.ListAllBlobUserStatsResponse{
 704  		Stats: orlydbv1.UserBlobStatsListToProto(stats),
 705  	}, nil
 706  }
 707  
 708  // === Helper Methods ===
 709  
 710  // streamEvents is a helper to stream events in batches.
 711  type eventStreamer interface {
 712  	Send(*orlydbv1.EventBatch) error
 713  	Context() context.Context
 714  }
 715  
 716  func (s *DatabaseService) streamEvents(events []*orlydbv1.Event, stream eventStreamer) error {
 717  	batchSize := s.cfg.StreamBatchSize
 718  	if batchSize == 0 {
 719  		batchSize = 100
 720  	}
 721  
 722  	for i := 0; i < len(events); i += batchSize {
 723  		end := i + batchSize
 724  		if end > len(events) {
 725  			end = len(events)
 726  		}
 727  
 728  		batch := &orlydbv1.EventBatch{
 729  			Events: events[i:end],
 730  		}
 731  
 732  		if err := stream.Send(batch); err != nil {
 733  			return err
 734  		}
 735  	}
 736  
 737  	return nil
 738  }
 739