service.go raw
1 package server
2
3 import (
4 "context"
5 "encoding/json"
6 "io"
7 "time"
8
9 "google.golang.org/grpc/codes"
10 "google.golang.org/grpc/status"
11 "next.orly.dev/pkg/lol/chk"
12 "next.orly.dev/pkg/lol/log"
13
14 "next.orly.dev/pkg/database"
15 "next.orly.dev/pkg/interfaces/store"
16 orlydbv1 "next.orly.dev/pkg/proto/orlydb/v1"
17 )
18
19 // DatabaseService implements the orlydbv1.DatabaseServiceServer interface.
20 type DatabaseService struct {
21 orlydbv1.UnimplementedDatabaseServiceServer
22 db database.Database
23 cfg *Config
24 }
25
26 // NewDatabaseService creates a new database service.
27 func NewDatabaseService(db database.Database, cfg *Config) *DatabaseService {
28 return &DatabaseService{
29 db: db,
30 cfg: cfg,
31 }
32 }
33
34 // === Lifecycle Methods ===
35
36 func (s *DatabaseService) GetPath(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.PathResponse, error) {
37 return &orlydbv1.PathResponse{Path: s.db.Path()}, nil
38 }
39
40 func (s *DatabaseService) Sync(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.Empty, error) {
41 if err := s.db.Sync(); err != nil {
42 return nil, status.Errorf(codes.Internal, "sync failed: %v", err)
43 }
44 return &orlydbv1.Empty{}, nil
45 }
46
47 func (s *DatabaseService) Ready(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.ReadyResponse, error) {
48 // Check if ready channel is closed
49 select {
50 case <-s.db.Ready():
51 return &orlydbv1.ReadyResponse{Ready: true}, nil
52 default:
53 return &orlydbv1.ReadyResponse{Ready: false}, nil
54 }
55 }
56
57 func (s *DatabaseService) SetLogLevel(ctx context.Context, req *orlydbv1.SetLogLevelRequest) (*orlydbv1.Empty, error) {
58 s.db.SetLogLevel(req.Level)
59 return &orlydbv1.Empty{}, nil
60 }
61
62 // === Event Storage ===
63
64 func (s *DatabaseService) SaveEvent(ctx context.Context, req *orlydbv1.SaveEventRequest) (*orlydbv1.SaveEventResponse, error) {
65 ev := orlydbv1.ProtoToEvent(req.Event)
66 exists, err := s.db.SaveEvent(ctx, ev)
67 if err != nil {
68 return nil, status.Errorf(codes.Internal, "save event failed: %v", err)
69 }
70 return &orlydbv1.SaveEventResponse{Exists: exists}, nil
71 }
72
73 func (s *DatabaseService) GetSerialsFromFilter(ctx context.Context, req *orlydbv1.GetSerialsFromFilterRequest) (*orlydbv1.SerialList, error) {
74 f := orlydbv1.ProtoToFilter(req.Filter)
75 serials, err := s.db.GetSerialsFromFilter(f)
76 if err != nil {
77 return nil, status.Errorf(codes.Internal, "get serials failed: %v", err)
78 }
79 return orlydbv1.Uint40sToProto(serials), nil
80 }
81
82 func (s *DatabaseService) WouldReplaceEvent(ctx context.Context, req *orlydbv1.WouldReplaceEventRequest) (*orlydbv1.WouldReplaceEventResponse, error) {
83 ev := orlydbv1.ProtoToEvent(req.Event)
84 wouldReplace, replacedSerials, err := s.db.WouldReplaceEvent(ev)
85 if err != nil {
86 return nil, status.Errorf(codes.Internal, "would replace check failed: %v", err)
87 }
88 resp := &orlydbv1.WouldReplaceEventResponse{
89 WouldReplace: wouldReplace,
90 }
91 for _, ser := range replacedSerials {
92 resp.ReplacedSerials = append(resp.ReplacedSerials, ser.Get())
93 }
94 return resp, nil
95 }
96
97 // === Event Queries (Streaming) ===
98
99 func (s *DatabaseService) QueryEvents(req *orlydbv1.QueryEventsRequest, stream orlydbv1.DatabaseService_QueryEventsServer) error {
100 f := orlydbv1.ProtoToFilter(req.Filter)
101 events, err := s.db.QueryEvents(stream.Context(), f)
102 if err != nil {
103 return status.Errorf(codes.Internal, "query events failed: %v", err)
104 }
105 return s.streamEvents(orlydbv1.EventsToProto(events), stream)
106 }
107
108 func (s *DatabaseService) QueryAllVersions(req *orlydbv1.QueryEventsRequest, stream orlydbv1.DatabaseService_QueryAllVersionsServer) error {
109 f := orlydbv1.ProtoToFilter(req.Filter)
110 events, err := s.db.QueryAllVersions(stream.Context(), f)
111 if err != nil {
112 return status.Errorf(codes.Internal, "query all versions failed: %v", err)
113 }
114 return s.streamEvents(orlydbv1.EventsToProto(events), stream)
115 }
116
117 func (s *DatabaseService) QueryEventsWithOptions(req *orlydbv1.QueryEventsWithOptionsRequest, stream orlydbv1.DatabaseService_QueryEventsWithOptionsServer) error {
118 f := orlydbv1.ProtoToFilter(req.Filter)
119 events, err := s.db.QueryEventsWithOptions(stream.Context(), f, req.IncludeDeleteEvents, req.ShowAllVersions)
120 if err != nil {
121 return status.Errorf(codes.Internal, "query events with options failed: %v", err)
122 }
123 return s.streamEvents(orlydbv1.EventsToProto(events), stream)
124 }
125
126 func (s *DatabaseService) QueryDeleteEventsByTargetId(req *orlydbv1.QueryDeleteEventsByTargetIdRequest, stream orlydbv1.DatabaseService_QueryDeleteEventsByTargetIdServer) error {
127 events, err := s.db.QueryDeleteEventsByTargetId(stream.Context(), req.TargetEventId)
128 if err != nil {
129 return status.Errorf(codes.Internal, "query delete events failed: %v", err)
130 }
131 return s.streamEvents(orlydbv1.EventsToProto(events), stream)
132 }
133
134 func (s *DatabaseService) QueryForSerials(ctx context.Context, req *orlydbv1.QueryEventsRequest) (*orlydbv1.SerialList, error) {
135 f := orlydbv1.ProtoToFilter(req.Filter)
136 serials, err := s.db.QueryForSerials(ctx, f)
137 if err != nil {
138 return nil, status.Errorf(codes.Internal, "query for serials failed: %v", err)
139 }
140 return orlydbv1.Uint40sToProto(serials), nil
141 }
142
143 func (s *DatabaseService) QueryForIds(ctx context.Context, req *orlydbv1.QueryEventsRequest) (*orlydbv1.IdPkTsList, error) {
144 f := orlydbv1.ProtoToFilter(req.Filter)
145 idPkTs, err := s.db.QueryForIds(ctx, f)
146 if err != nil {
147 return nil, status.Errorf(codes.Internal, "query for ids failed: %v", err)
148 }
149 return orlydbv1.IdPkTsListToProto(idPkTs), nil
150 }
151
152 func (s *DatabaseService) CountEvents(ctx context.Context, req *orlydbv1.QueryEventsRequest) (*orlydbv1.CountEventsResponse, error) {
153 f := orlydbv1.ProtoToFilter(req.Filter)
154 count, approximate, err := s.db.CountEvents(ctx, f)
155 if err != nil {
156 return nil, status.Errorf(codes.Internal, "count events failed: %v", err)
157 }
158 return &orlydbv1.CountEventsResponse{
159 Count: int32(count),
160 Approximate: approximate,
161 }, nil
162 }
163
164 // === Event Retrieval by Serial ===
165
166 func (s *DatabaseService) FetchEventBySerial(ctx context.Context, req *orlydbv1.FetchEventBySerialRequest) (*orlydbv1.FetchEventBySerialResponse, error) {
167 ser := orlydbv1.ProtoToUint40(&orlydbv1.Uint40{Value: req.Serial})
168 ev, err := s.db.FetchEventBySerial(ser)
169 if err != nil {
170 return nil, status.Errorf(codes.Internal, "fetch event by serial failed: %v", err)
171 }
172 return &orlydbv1.FetchEventBySerialResponse{
173 Event: orlydbv1.EventToProto(ev),
174 Found: ev != nil,
175 }, nil
176 }
177
178 func (s *DatabaseService) FetchEventsBySerials(ctx context.Context, req *orlydbv1.FetchEventsBySerialRequest) (*orlydbv1.EventMap, error) {
179 serials := orlydbv1.ProtoToUint40s(&orlydbv1.SerialList{Serials: req.Serials})
180 events, err := s.db.FetchEventsBySerials(serials)
181 if err != nil {
182 return nil, status.Errorf(codes.Internal, "fetch events by serials failed: %v", err)
183 }
184 return orlydbv1.EventMapToProto(events), nil
185 }
186
187 func (s *DatabaseService) GetSerialById(ctx context.Context, req *orlydbv1.GetSerialByIdRequest) (*orlydbv1.GetSerialByIdResponse, error) {
188 ser, err := s.db.GetSerialById(req.Id)
189 if err != nil {
190 return nil, status.Errorf(codes.Internal, "get serial by id failed: %v", err)
191 }
192 if ser == nil {
193 return &orlydbv1.GetSerialByIdResponse{Found: false}, nil
194 }
195 return &orlydbv1.GetSerialByIdResponse{
196 Serial: ser.Get(),
197 Found: true,
198 }, nil
199 }
200
201 func (s *DatabaseService) GetSerialsByIds(ctx context.Context, req *orlydbv1.GetSerialsByIdsRequest) (*orlydbv1.SerialMap, error) {
202 // Convert request IDs to tag format
203 ids := orlydbv1.BytesToTag(req.Ids)
204 serials, err := s.db.GetSerialsByIds(ids)
205 if err != nil {
206 return nil, status.Errorf(codes.Internal, "get serials by ids failed: %v", err)
207 }
208 result := &orlydbv1.SerialMap{
209 Serials: make(map[string]uint64),
210 }
211 for k, v := range serials {
212 if v != nil {
213 result.Serials[k] = v.Get()
214 }
215 }
216 return result, nil
217 }
218
219 func (s *DatabaseService) GetSerialsByRange(ctx context.Context, req *orlydbv1.GetSerialsByRangeRequest) (*orlydbv1.SerialList, error) {
220 r := orlydbv1.ProtoToRange(req.Range)
221 serials, err := s.db.GetSerialsByRange(r)
222 if err != nil {
223 return nil, status.Errorf(codes.Internal, "get serials by range failed: %v", err)
224 }
225 return orlydbv1.Uint40sToProto(serials), nil
226 }
227
228 func (s *DatabaseService) GetFullIdPubkeyBySerial(ctx context.Context, req *orlydbv1.GetFullIdPubkeyBySerialRequest) (*orlydbv1.IdPkTs, error) {
229 ser := orlydbv1.ProtoToUint40(&orlydbv1.Uint40{Value: req.Serial})
230 idPkTs, err := s.db.GetFullIdPubkeyBySerial(ser)
231 if err != nil {
232 return nil, status.Errorf(codes.Internal, "get full id pubkey by serial failed: %v", err)
233 }
234 return orlydbv1.IdPkTsToProto(idPkTs), nil
235 }
236
237 func (s *DatabaseService) GetFullIdPubkeyBySerials(ctx context.Context, req *orlydbv1.GetFullIdPubkeyBySerialsRequest) (*orlydbv1.IdPkTsList, error) {
238 serials := orlydbv1.ProtoToUint40s(&orlydbv1.SerialList{Serials: req.Serials})
239 idPkTs, err := s.db.GetFullIdPubkeyBySerials(serials)
240 if err != nil {
241 return nil, status.Errorf(codes.Internal, "get full id pubkey by serials failed: %v", err)
242 }
243 return orlydbv1.IdPkTsListToProto(idPkTs), nil
244 }
245
246 // === Event Deletion ===
247
248 func (s *DatabaseService) DeleteEvent(ctx context.Context, req *orlydbv1.DeleteEventRequest) (*orlydbv1.Empty, error) {
249 if err := s.db.DeleteEvent(ctx, req.EventId); err != nil {
250 return nil, status.Errorf(codes.Internal, "delete event failed: %v", err)
251 }
252 return &orlydbv1.Empty{}, nil
253 }
254
255 func (s *DatabaseService) DeleteEventBySerial(ctx context.Context, req *orlydbv1.DeleteEventBySerialRequest) (*orlydbv1.Empty, error) {
256 ser := orlydbv1.ProtoToUint40(&orlydbv1.Uint40{Value: req.Serial})
257 ev := orlydbv1.ProtoToEvent(req.Event)
258 if err := s.db.DeleteEventBySerial(ctx, ser, ev); err != nil {
259 return nil, status.Errorf(codes.Internal, "delete event by serial failed: %v", err)
260 }
261 return &orlydbv1.Empty{}, nil
262 }
263
264 func (s *DatabaseService) DeleteExpired(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.Empty, error) {
265 s.db.DeleteExpired()
266 return &orlydbv1.Empty{}, nil
267 }
268
269 func (s *DatabaseService) ProcessDelete(ctx context.Context, req *orlydbv1.ProcessDeleteRequest) (*orlydbv1.Empty, error) {
270 ev := orlydbv1.ProtoToEvent(req.Event)
271 if err := s.db.ProcessDelete(ev, req.Admins); err != nil {
272 return nil, status.Errorf(codes.Internal, "process delete failed: %v", err)
273 }
274 return &orlydbv1.Empty{}, nil
275 }
276
277 func (s *DatabaseService) CheckForDeleted(ctx context.Context, req *orlydbv1.CheckForDeletedRequest) (*orlydbv1.Empty, error) {
278 ev := orlydbv1.ProtoToEvent(req.Event)
279 if err := s.db.CheckForDeleted(ev, req.Admins); err != nil {
280 return nil, status.Errorf(codes.Internal, "check for deleted failed: %v", err)
281 }
282 return &orlydbv1.Empty{}, nil
283 }
284
285 // === Import/Export ===
286
287 func (s *DatabaseService) Import(stream orlydbv1.DatabaseService_ImportServer) error {
288 pr, pw := io.Pipe()
289
290 // Goroutine to read from gRPC stream and write to pipe
291 go func() {
292 defer pw.Close()
293 for {
294 chunk, err := stream.Recv()
295 if err == io.EOF {
296 return
297 }
298 if err != nil {
299 log.E.F("import stream error: %v", err)
300 pw.CloseWithError(err)
301 return
302 }
303 if _, err := pw.Write(chunk.Data); chk.E(err) {
304 return
305 }
306 }
307 }()
308
309 // Import from pipe
310 s.db.Import(pr)
311
312 return stream.SendAndClose(&orlydbv1.ImportResponse{
313 EventsImported: 0, // TODO: Track count
314 EventsSkipped: 0,
315 })
316 }
317
318 func (s *DatabaseService) Export(req *orlydbv1.ExportRequest, stream orlydbv1.DatabaseService_ExportServer) error {
319 pr, pw := io.Pipe()
320
321 // Goroutine to export to pipe
322 go func() {
323 defer pw.Close()
324 s.db.Export(stream.Context(), pw, req.Pubkeys...)
325 }()
326
327 // Read from pipe and send to stream
328 buf := make([]byte, 64*1024) // 64KB chunks
329 for {
330 n, err := pr.Read(buf)
331 if err == io.EOF {
332 return nil
333 }
334 if err != nil {
335 return status.Errorf(codes.Internal, "export failed: %v", err)
336 }
337 if err := stream.Send(&orlydbv1.ExportChunk{Data: buf[:n]}); err != nil {
338 return err
339 }
340 }
341 }
342
343 func (s *DatabaseService) ImportEventsFromStrings(ctx context.Context, req *orlydbv1.ImportEventsFromStringsRequest) (*orlydbv1.ImportResponse, error) {
344 // Note: We can't pass policy manager over gRPC, so we pass nil
345 if err := s.db.ImportEventsFromStrings(ctx, req.EventJsons, nil); err != nil {
346 return nil, status.Errorf(codes.Internal, "import events from strings failed: %v", err)
347 }
348 return &orlydbv1.ImportResponse{
349 EventsImported: int64(len(req.EventJsons)),
350 }, nil
351 }
352
353 // === Relay Identity ===
354
355 func (s *DatabaseService) GetRelayIdentitySecret(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.GetRelayIdentitySecretResponse, error) {
356 secret, err := s.db.GetRelayIdentitySecret()
357 if err != nil {
358 return nil, status.Errorf(codes.Internal, "get relay identity secret failed: %v", err)
359 }
360 return &orlydbv1.GetRelayIdentitySecretResponse{SecretKey: secret}, nil
361 }
362
363 func (s *DatabaseService) SetRelayIdentitySecret(ctx context.Context, req *orlydbv1.SetRelayIdentitySecretRequest) (*orlydbv1.Empty, error) {
364 if err := s.db.SetRelayIdentitySecret(req.SecretKey); err != nil {
365 return nil, status.Errorf(codes.Internal, "set relay identity secret failed: %v", err)
366 }
367 return &orlydbv1.Empty{}, nil
368 }
369
370 func (s *DatabaseService) GetOrCreateRelayIdentitySecret(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.GetRelayIdentitySecretResponse, error) {
371 secret, err := s.db.GetOrCreateRelayIdentitySecret()
372 if err != nil {
373 return nil, status.Errorf(codes.Internal, "get or create relay identity secret failed: %v", err)
374 }
375 return &orlydbv1.GetRelayIdentitySecretResponse{SecretKey: secret}, nil
376 }
377
378 // === Markers ===
379
380 func (s *DatabaseService) SetMarker(ctx context.Context, req *orlydbv1.SetMarkerRequest) (*orlydbv1.Empty, error) {
381 if err := s.db.SetMarker(req.Key, req.Value); err != nil {
382 return nil, status.Errorf(codes.Internal, "set marker failed: %v", err)
383 }
384 return &orlydbv1.Empty{}, nil
385 }
386
387 func (s *DatabaseService) GetMarker(ctx context.Context, req *orlydbv1.GetMarkerRequest) (*orlydbv1.GetMarkerResponse, error) {
388 value, err := s.db.GetMarker(req.Key)
389 if err != nil {
390 return nil, status.Errorf(codes.Internal, "get marker failed: %v", err)
391 }
392 return &orlydbv1.GetMarkerResponse{
393 Value: value,
394 Found: value != nil,
395 }, nil
396 }
397
398 func (s *DatabaseService) HasMarker(ctx context.Context, req *orlydbv1.HasMarkerRequest) (*orlydbv1.HasMarkerResponse, error) {
399 exists := s.db.HasMarker(req.Key)
400 return &orlydbv1.HasMarkerResponse{Exists: exists}, nil
401 }
402
403 func (s *DatabaseService) DeleteMarker(ctx context.Context, req *orlydbv1.DeleteMarkerRequest) (*orlydbv1.Empty, error) {
404 if err := s.db.DeleteMarker(req.Key); err != nil {
405 return nil, status.Errorf(codes.Internal, "delete marker failed: %v", err)
406 }
407 return &orlydbv1.Empty{}, nil
408 }
409
410 // === Subscriptions ===
411
412 func (s *DatabaseService) GetSubscription(ctx context.Context, req *orlydbv1.GetSubscriptionRequest) (*orlydbv1.Subscription, error) {
413 sub, err := s.db.GetSubscription(req.Pubkey)
414 if err != nil {
415 return nil, status.Errorf(codes.Internal, "get subscription failed: %v", err)
416 }
417 return orlydbv1.SubscriptionToProto(sub, req.Pubkey), nil
418 }
419
420 func (s *DatabaseService) IsSubscriptionActive(ctx context.Context, req *orlydbv1.IsSubscriptionActiveRequest) (*orlydbv1.IsSubscriptionActiveResponse, error) {
421 active, err := s.db.IsSubscriptionActive(req.Pubkey)
422 if err != nil {
423 return nil, status.Errorf(codes.Internal, "is subscription active failed: %v", err)
424 }
425 return &orlydbv1.IsSubscriptionActiveResponse{Active: active}, nil
426 }
427
428 func (s *DatabaseService) ExtendSubscription(ctx context.Context, req *orlydbv1.ExtendSubscriptionRequest) (*orlydbv1.Empty, error) {
429 if err := s.db.ExtendSubscription(req.Pubkey, int(req.Days)); err != nil {
430 return nil, status.Errorf(codes.Internal, "extend subscription failed: %v", err)
431 }
432 return &orlydbv1.Empty{}, nil
433 }
434
435 func (s *DatabaseService) RecordPayment(ctx context.Context, req *orlydbv1.RecordPaymentRequest) (*orlydbv1.Empty, error) {
436 if err := s.db.RecordPayment(req.Pubkey, req.Amount, req.Invoice, req.Preimage); err != nil {
437 return nil, status.Errorf(codes.Internal, "record payment failed: %v", err)
438 }
439 return &orlydbv1.Empty{}, nil
440 }
441
442 func (s *DatabaseService) GetPaymentHistory(ctx context.Context, req *orlydbv1.GetPaymentHistoryRequest) (*orlydbv1.PaymentList, error) {
443 payments, err := s.db.GetPaymentHistory(req.Pubkey)
444 if err != nil {
445 return nil, status.Errorf(codes.Internal, "get payment history failed: %v", err)
446 }
447 return orlydbv1.PaymentListToProto(payments), nil
448 }
449
450 func (s *DatabaseService) ExtendBlossomSubscription(ctx context.Context, req *orlydbv1.ExtendBlossomSubscriptionRequest) (*orlydbv1.Empty, error) {
451 if err := s.db.ExtendBlossomSubscription(req.Pubkey, req.Tier, req.StorageMb, int(req.DaysExtended)); err != nil {
452 return nil, status.Errorf(codes.Internal, "extend blossom subscription failed: %v", err)
453 }
454 return &orlydbv1.Empty{}, nil
455 }
456
457 func (s *DatabaseService) GetBlossomStorageQuota(ctx context.Context, req *orlydbv1.GetBlossomStorageQuotaRequest) (*orlydbv1.GetBlossomStorageQuotaResponse, error) {
458 quota, err := s.db.GetBlossomStorageQuota(req.Pubkey)
459 if err != nil {
460 return nil, status.Errorf(codes.Internal, "get blossom storage quota failed: %v", err)
461 }
462 return &orlydbv1.GetBlossomStorageQuotaResponse{QuotaMb: quota}, nil
463 }
464
465 func (s *DatabaseService) IsFirstTimeUser(ctx context.Context, req *orlydbv1.IsFirstTimeUserRequest) (*orlydbv1.IsFirstTimeUserResponse, error) {
466 firstTime, err := s.db.IsFirstTimeUser(req.Pubkey)
467 if err != nil {
468 return nil, status.Errorf(codes.Internal, "is first time user failed: %v", err)
469 }
470 return &orlydbv1.IsFirstTimeUserResponse{FirstTime: firstTime}, nil
471 }
472
473 // === Paid ACL ===
474
475 func (s *DatabaseService) SavePaidSubscription(ctx context.Context, req *orlydbv1.PaidSubscriptionMsg) (*orlydbv1.Empty, error) {
476 sub := orlydbv1.ProtoToPaidSubscription(req)
477 if err := s.db.SavePaidSubscription(sub); err != nil {
478 return nil, status.Errorf(codes.Internal, "save paid subscription failed: %v", err)
479 }
480 return &orlydbv1.Empty{}, nil
481 }
482
483 func (s *DatabaseService) GetPaidSubscription(ctx context.Context, req *orlydbv1.GetPaidSubscriptionRequest) (*orlydbv1.PaidSubscriptionMsg, error) {
484 sub, err := s.db.GetPaidSubscription(req.PubkeyHex)
485 if err != nil {
486 return nil, status.Errorf(codes.Internal, "get paid subscription failed: %v", err)
487 }
488 if sub == nil {
489 return nil, status.Errorf(codes.NotFound, "paid subscription not found")
490 }
491 return orlydbv1.PaidSubscriptionToProto(sub), nil
492 }
493
494 func (s *DatabaseService) DeletePaidSubscription(ctx context.Context, req *orlydbv1.DeletePaidSubscriptionRequest) (*orlydbv1.Empty, error) {
495 if err := s.db.DeletePaidSubscription(req.PubkeyHex); err != nil {
496 return nil, status.Errorf(codes.Internal, "delete paid subscription failed: %v", err)
497 }
498 return &orlydbv1.Empty{}, nil
499 }
500
501 func (s *DatabaseService) ListPaidSubscriptions(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.PaidSubscriptionList, error) {
502 subs, err := s.db.ListPaidSubscriptions()
503 if err != nil {
504 return nil, status.Errorf(codes.Internal, "list paid subscriptions failed: %v", err)
505 }
506 return orlydbv1.PaidSubscriptionListToProto(subs), nil
507 }
508
509 func (s *DatabaseService) ClaimAlias(ctx context.Context, req *orlydbv1.ClaimAliasRequest) (*orlydbv1.Empty, error) {
510 if err := s.db.ClaimAlias(req.Alias, req.PubkeyHex); err != nil {
511 return nil, status.Errorf(codes.Internal, "claim alias failed: %v", err)
512 }
513 return &orlydbv1.Empty{}, nil
514 }
515
516 func (s *DatabaseService) GetAliasByPubkey(ctx context.Context, req *orlydbv1.GetAliasByPubkeyRequest) (*orlydbv1.AliasResponse, error) {
517 alias, err := s.db.GetAliasByPubkey(req.PubkeyHex)
518 if err != nil {
519 return nil, status.Errorf(codes.Internal, "get alias by pubkey failed: %v", err)
520 }
521 return &orlydbv1.AliasResponse{Alias: alias}, nil
522 }
523
524 func (s *DatabaseService) GetPubkeyByAlias(ctx context.Context, req *orlydbv1.GetPubkeyByAliasRequest) (*orlydbv1.PubkeyResponse, error) {
525 pubkey, err := s.db.GetPubkeyByAlias(req.Alias)
526 if err != nil {
527 return nil, status.Errorf(codes.Internal, "get pubkey by alias failed: %v", err)
528 }
529 return &orlydbv1.PubkeyResponse{PubkeyHex: pubkey}, nil
530 }
531
532 func (s *DatabaseService) IsAliasTaken(ctx context.Context, req *orlydbv1.IsAliasTakenRequest) (*orlydbv1.IsAliasTakenResponse, error) {
533 taken, err := s.db.IsAliasTaken(req.Alias)
534 if err != nil {
535 return nil, status.Errorf(codes.Internal, "is alias taken failed: %v", err)
536 }
537 return &orlydbv1.IsAliasTakenResponse{Taken: taken}, nil
538 }
539
540 // === NIP-43 ===
541
542 func (s *DatabaseService) AddNIP43Member(ctx context.Context, req *orlydbv1.AddNIP43MemberRequest) (*orlydbv1.Empty, error) {
543 if err := s.db.AddNIP43Member(req.Pubkey, req.InviteCode); err != nil {
544 return nil, status.Errorf(codes.Internal, "add NIP-43 member failed: %v", err)
545 }
546 return &orlydbv1.Empty{}, nil
547 }
548
549 func (s *DatabaseService) RemoveNIP43Member(ctx context.Context, req *orlydbv1.RemoveNIP43MemberRequest) (*orlydbv1.Empty, error) {
550 if err := s.db.RemoveNIP43Member(req.Pubkey); err != nil {
551 return nil, status.Errorf(codes.Internal, "remove NIP-43 member failed: %v", err)
552 }
553 return &orlydbv1.Empty{}, nil
554 }
555
556 func (s *DatabaseService) IsNIP43Member(ctx context.Context, req *orlydbv1.IsNIP43MemberRequest) (*orlydbv1.IsNIP43MemberResponse, error) {
557 isMember, err := s.db.IsNIP43Member(req.Pubkey)
558 if err != nil {
559 return nil, status.Errorf(codes.Internal, "is NIP-43 member failed: %v", err)
560 }
561 return &orlydbv1.IsNIP43MemberResponse{IsMember: isMember}, nil
562 }
563
564 func (s *DatabaseService) GetNIP43Membership(ctx context.Context, req *orlydbv1.GetNIP43MembershipRequest) (*orlydbv1.NIP43Membership, error) {
565 membership, err := s.db.GetNIP43Membership(req.Pubkey)
566 if err != nil {
567 return nil, status.Errorf(codes.Internal, "get NIP-43 membership failed: %v", err)
568 }
569 return orlydbv1.NIP43MembershipToProto(membership), nil
570 }
571
572 func (s *DatabaseService) GetAllNIP43Members(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.PubkeyList, error) {
573 members, err := s.db.GetAllNIP43Members()
574 if err != nil {
575 return nil, status.Errorf(codes.Internal, "get all NIP-43 members failed: %v", err)
576 }
577 return &orlydbv1.PubkeyList{Pubkeys: members}, nil
578 }
579
580 func (s *DatabaseService) StoreInviteCode(ctx context.Context, req *orlydbv1.StoreInviteCodeRequest) (*orlydbv1.Empty, error) {
581 expiresAt := orlydbv1.TimeFromUnix(req.ExpiresAt)
582 if err := s.db.StoreInviteCode(req.Code, expiresAt); err != nil {
583 return nil, status.Errorf(codes.Internal, "store invite code failed: %v", err)
584 }
585 return &orlydbv1.Empty{}, nil
586 }
587
588 func (s *DatabaseService) ValidateInviteCode(ctx context.Context, req *orlydbv1.ValidateInviteCodeRequest) (*orlydbv1.ValidateInviteCodeResponse, error) {
589 valid, err := s.db.ValidateInviteCode(req.Code)
590 if err != nil {
591 return nil, status.Errorf(codes.Internal, "validate invite code failed: %v", err)
592 }
593 return &orlydbv1.ValidateInviteCodeResponse{Valid: valid}, nil
594 }
595
596 func (s *DatabaseService) DeleteInviteCode(ctx context.Context, req *orlydbv1.DeleteInviteCodeRequest) (*orlydbv1.Empty, error) {
597 if err := s.db.DeleteInviteCode(req.Code); err != nil {
598 return nil, status.Errorf(codes.Internal, "delete invite code failed: %v", err)
599 }
600 return &orlydbv1.Empty{}, nil
601 }
602
603 func (s *DatabaseService) PublishNIP43MembershipEvent(ctx context.Context, req *orlydbv1.PublishNIP43MembershipEventRequest) (*orlydbv1.Empty, error) {
604 if err := s.db.PublishNIP43MembershipEvent(int(req.Kind), req.Pubkey); err != nil {
605 return nil, status.Errorf(codes.Internal, "publish NIP-43 membership event failed: %v", err)
606 }
607 return &orlydbv1.Empty{}, nil
608 }
609
610 // === Query Cache ===
611
612 func (s *DatabaseService) GetCachedJSON(ctx context.Context, req *orlydbv1.GetCachedJSONRequest) (*orlydbv1.GetCachedJSONResponse, error) {
613 f := orlydbv1.ProtoToFilter(req.Filter)
614 jsonItems, found := s.db.GetCachedJSON(f)
615 return &orlydbv1.GetCachedJSONResponse{
616 JsonItems: jsonItems,
617 Found: found,
618 }, nil
619 }
620
621 func (s *DatabaseService) CacheMarshaledJSON(ctx context.Context, req *orlydbv1.CacheMarshaledJSONRequest) (*orlydbv1.Empty, error) {
622 f := orlydbv1.ProtoToFilter(req.Filter)
623 s.db.CacheMarshaledJSON(f, req.JsonItems)
624 return &orlydbv1.Empty{}, nil
625 }
626
627 func (s *DatabaseService) GetCachedEvents(ctx context.Context, req *orlydbv1.GetCachedEventsRequest) (*orlydbv1.GetCachedEventsResponse, error) {
628 f := orlydbv1.ProtoToFilter(req.Filter)
629 events, found := s.db.GetCachedEvents(f)
630 return &orlydbv1.GetCachedEventsResponse{
631 Events: orlydbv1.EventsToProto(events),
632 Found: found,
633 }, nil
634 }
635
636 func (s *DatabaseService) CacheEvents(ctx context.Context, req *orlydbv1.CacheEventsRequest) (*orlydbv1.Empty, error) {
637 f := orlydbv1.ProtoToFilter(req.Filter)
638 events := orlydbv1.ProtoToEvents(req.Events)
639 s.db.CacheEvents(f, events)
640 return &orlydbv1.Empty{}, nil
641 }
642
643 func (s *DatabaseService) InvalidateQueryCache(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.Empty, error) {
644 s.db.InvalidateQueryCache()
645 return &orlydbv1.Empty{}, nil
646 }
647
648 // === Access Tracking ===
649
650 func (s *DatabaseService) RecordEventAccess(ctx context.Context, req *orlydbv1.RecordEventAccessRequest) (*orlydbv1.Empty, error) {
651 if err := s.db.RecordEventAccess(req.Serial, req.ConnectionId); err != nil {
652 return nil, status.Errorf(codes.Internal, "record event access failed: %v", err)
653 }
654 return &orlydbv1.Empty{}, nil
655 }
656
657 func (s *DatabaseService) GetEventAccessInfo(ctx context.Context, req *orlydbv1.GetEventAccessInfoRequest) (*orlydbv1.GetEventAccessInfoResponse, error) {
658 lastAccess, accessCount, err := s.db.GetEventAccessInfo(req.Serial)
659 if err != nil {
660 return nil, status.Errorf(codes.Internal, "get event access info failed: %v", err)
661 }
662 return &orlydbv1.GetEventAccessInfoResponse{
663 LastAccess: lastAccess,
664 AccessCount: accessCount,
665 }, nil
666 }
667
668 func (s *DatabaseService) GetLeastAccessedEvents(ctx context.Context, req *orlydbv1.GetLeastAccessedEventsRequest) (*orlydbv1.SerialList, error) {
669 serials, err := s.db.GetLeastAccessedEvents(int(req.Limit), req.MinAgeSec)
670 if err != nil {
671 return nil, status.Errorf(codes.Internal, "get least accessed events failed: %v", err)
672 }
673 return &orlydbv1.SerialList{Serials: serials}, nil
674 }
675
676 // === Utility ===
677
678 func (s *DatabaseService) EventIdsBySerial(ctx context.Context, req *orlydbv1.EventIdsBySerialRequest) (*orlydbv1.EventIdsBySerialResponse, error) {
679 eventIds, err := s.db.EventIdsBySerial(req.Start, int(req.Count))
680 if err != nil {
681 return nil, status.Errorf(codes.Internal, "event ids by serial failed: %v", err)
682 }
683 return &orlydbv1.EventIdsBySerialResponse{EventIds: eventIds}, nil
684 }
685
686 func (s *DatabaseService) RunMigrations(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.Empty, error) {
687 s.db.RunMigrations()
688 return &orlydbv1.Empty{}, nil
689 }
690
691 // === Blob Storage (Blossom) ===
692
693 func (s *DatabaseService) SaveBlob(ctx context.Context, req *orlydbv1.SaveBlobRequest) (*orlydbv1.Empty, error) {
694 if err := s.db.SaveBlob(req.Sha256Hash, req.Data, req.Pubkey, req.MimeType, req.Extension); err != nil {
695 return nil, status.Errorf(codes.Internal, "save blob failed: %v", err)
696 }
697 return &orlydbv1.Empty{}, nil
698 }
699
700 func (s *DatabaseService) SaveBlobMetadata(ctx context.Context, req *orlydbv1.SaveBlobMetadataRequest) (*orlydbv1.Empty, error) {
701 if err := s.db.SaveBlobMetadata(req.Sha256Hash, req.Size, req.Pubkey, req.MimeType, req.Extension); err != nil {
702 return nil, status.Errorf(codes.Internal, "save blob metadata failed: %v", err)
703 }
704 return &orlydbv1.Empty{}, nil
705 }
706
707 func (s *DatabaseService) GetBlob(ctx context.Context, req *orlydbv1.GetBlobRequest) (*orlydbv1.GetBlobResponse, error) {
708 data, metadata, err := s.db.GetBlob(req.Sha256Hash)
709 if err != nil {
710 // Return not found as a response, not an error
711 return &orlydbv1.GetBlobResponse{Found: false}, nil
712 }
713 return &orlydbv1.GetBlobResponse{
714 Found: true,
715 Data: data,
716 Metadata: orlydbv1.BlobMetadataToProto(metadata),
717 }, nil
718 }
719
720 func (s *DatabaseService) HasBlob(ctx context.Context, req *orlydbv1.HasBlobRequest) (*orlydbv1.HasBlobResponse, error) {
721 exists, err := s.db.HasBlob(req.Sha256Hash)
722 if err != nil {
723 return nil, status.Errorf(codes.Internal, "has blob failed: %v", err)
724 }
725 return &orlydbv1.HasBlobResponse{Exists: exists}, nil
726 }
727
728 func (s *DatabaseService) DeleteBlob(ctx context.Context, req *orlydbv1.DeleteBlobRequest) (*orlydbv1.Empty, error) {
729 if err := s.db.DeleteBlob(req.Sha256Hash, req.Pubkey); err != nil {
730 return nil, status.Errorf(codes.Internal, "delete blob failed: %v", err)
731 }
732 return &orlydbv1.Empty{}, nil
733 }
734
735 func (s *DatabaseService) ListBlobs(ctx context.Context, req *orlydbv1.ListBlobsRequest) (*orlydbv1.ListBlobsResponse, error) {
736 descriptors, err := s.db.ListBlobs(req.Pubkey, req.Since, req.Until)
737 if err != nil {
738 return nil, status.Errorf(codes.Internal, "list blobs failed: %v", err)
739 }
740 return &orlydbv1.ListBlobsResponse{
741 Descriptors: orlydbv1.BlobDescriptorListToProto(descriptors),
742 }, nil
743 }
744
745 func (s *DatabaseService) GetBlobMetadata(ctx context.Context, req *orlydbv1.GetBlobMetadataRequest) (*orlydbv1.BlobMetadata, error) {
746 metadata, err := s.db.GetBlobMetadata(req.Sha256Hash)
747 if err != nil {
748 return nil, status.Errorf(codes.NotFound, "blob metadata not found: %v", err)
749 }
750 return orlydbv1.BlobMetadataToProto(metadata), nil
751 }
752
753 func (s *DatabaseService) GetTotalBlobStorageUsed(ctx context.Context, req *orlydbv1.GetTotalBlobStorageUsedRequest) (*orlydbv1.GetTotalBlobStorageUsedResponse, error) {
754 totalMB, err := s.db.GetTotalBlobStorageUsed(req.Pubkey)
755 if err != nil {
756 return nil, status.Errorf(codes.Internal, "get total blob storage used failed: %v", err)
757 }
758 return &orlydbv1.GetTotalBlobStorageUsedResponse{TotalMb: totalMB}, nil
759 }
760
761 func (s *DatabaseService) SaveBlobReport(ctx context.Context, req *orlydbv1.SaveBlobReportRequest) (*orlydbv1.Empty, error) {
762 if err := s.db.SaveBlobReport(req.Sha256Hash, req.ReportData); err != nil {
763 return nil, status.Errorf(codes.Internal, "save blob report failed: %v", err)
764 }
765 return &orlydbv1.Empty{}, nil
766 }
767
768 func (s *DatabaseService) ListAllBlobUserStats(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.ListAllBlobUserStatsResponse, error) {
769 stats, err := s.db.ListAllBlobUserStats()
770 if err != nil {
771 return nil, status.Errorf(codes.Internal, "list all blob user stats failed: %v", err)
772 }
773 return &orlydbv1.ListAllBlobUserStatsResponse{
774 Stats: orlydbv1.UserBlobStatsListToProto(stats),
775 }, nil
776 }
777
778 func (s *DatabaseService) ReconcileBlobMetadata(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.ReconcileBlobMetadataResponse, error) {
779 reconciled, err := s.db.ReconcileBlobMetadata()
780 if err != nil {
781 return nil, status.Errorf(codes.Internal, "reconcile blob metadata failed: %v", err)
782 }
783 return &orlydbv1.ReconcileBlobMetadataResponse{
784 Reconciled: int32(reconciled),
785 }, nil
786 }
787
788 func (s *DatabaseService) ListAllBlobs(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.ListBlobsResponse, error) {
789 descriptors, err := s.db.ListAllBlobs()
790 if err != nil {
791 return nil, status.Errorf(codes.Internal, "list all blobs failed: %v", err)
792 }
793 return &orlydbv1.ListBlobsResponse{
794 Descriptors: orlydbv1.BlobDescriptorListToProto(descriptors),
795 }, nil
796 }
797
798 func (s *DatabaseService) GetThumbnail(ctx context.Context, req *orlydbv1.GetThumbnailRequest) (*orlydbv1.GetThumbnailResponse, error) {
799 data, err := s.db.GetThumbnail(req.Key)
800 if err != nil {
801 // Not found is a valid response, not an error
802 return &orlydbv1.GetThumbnailResponse{Found: false}, nil
803 }
804 return &orlydbv1.GetThumbnailResponse{
805 Found: true,
806 Data: data,
807 }, nil
808 }
809
810 func (s *DatabaseService) SaveThumbnail(ctx context.Context, req *orlydbv1.SaveThumbnailRequest) (*orlydbv1.Empty, error) {
811 if err := s.db.SaveThumbnail(req.Key, req.Data); err != nil {
812 return nil, status.Errorf(codes.Internal, "save thumbnail failed: %v", err)
813 }
814 return &orlydbv1.Empty{}, nil
815 }
816
817 // === Cypher Query ===
818
819 const (
820 cypherDefaultTimeout = 30 * time.Second
821 cypherMaxTimeout = 120 * time.Second
822 )
823
824 func (s *DatabaseService) ExecuteCypherRead(ctx context.Context, req *orlydbv1.CypherReadRequest) (*orlydbv1.CypherReadResponse, error) {
825 executor, ok := s.db.(store.CypherExecutor)
826 if !ok {
827 return nil, status.Errorf(codes.Unimplemented, "database backend does not support Cypher queries")
828 }
829
830 // Decode JSON-encoded params
831 params := make(map[string]any, len(req.Params))
832 for k, v := range req.Params {
833 var decoded any
834 if err := json.Unmarshal(v, &decoded); err != nil {
835 return nil, status.Errorf(codes.InvalidArgument, "invalid JSON for param %q: %v", k, err)
836 }
837 params[k] = decoded
838 }
839
840 // Enforce timeout
841 timeout := cypherDefaultTimeout
842 if req.TimeoutSeconds > 0 {
843 timeout = time.Duration(req.TimeoutSeconds) * time.Second
844 if timeout > cypherMaxTimeout {
845 timeout = cypherMaxTimeout
846 }
847 }
848 ctx, cancel := context.WithTimeout(ctx, timeout)
849 defer cancel()
850
851 records, err := executor.ExecuteCypherRead(ctx, req.Cypher, params)
852 if err != nil {
853 return nil, status.Errorf(codes.Internal, "cypher query failed: %v", err)
854 }
855
856 // Encode each record as JSON bytes
857 encoded := make([][]byte, 0, len(records))
858 for _, rec := range records {
859 b, err := json.Marshal(rec)
860 if err != nil {
861 return nil, status.Errorf(codes.Internal, "failed to marshal record: %v", err)
862 }
863 encoded = append(encoded, b)
864 }
865
866 return &orlydbv1.CypherReadResponse{Records: encoded}, nil
867 }
868
869 // === Helper Methods ===
870
871 // streamEvents is a helper to stream events in batches.
872 type eventStreamer interface {
873 Send(*orlydbv1.EventBatch) error
874 Context() context.Context
875 }
876
877 func (s *DatabaseService) streamEvents(events []*orlydbv1.Event, stream eventStreamer) error {
878 batchSize := s.cfg.StreamBatchSize
879 if batchSize == 0 {
880 batchSize = 100
881 }
882
883 for i := 0; i < len(events); i += batchSize {
884 end := i + batchSize
885 if end > len(events) {
886 end = len(events)
887 }
888
889 batch := &orlydbv1.EventBatch{
890 Events: events[i:end],
891 }
892
893 if err := stream.Send(batch); err != nil {
894 return err
895 }
896 }
897
898 return nil
899 }
900