service.go raw
1 package main
2
3 import (
4 "context"
5 "io"
6
7 "google.golang.org/grpc/codes"
8 "google.golang.org/grpc/status"
9 "next.orly.dev/pkg/lol/chk"
10 "next.orly.dev/pkg/lol/log"
11
12 "next.orly.dev/pkg/database"
13 orlydbv1 "next.orly.dev/pkg/proto/orlydb/v1"
14 )
15
16 // DatabaseService implements the orlydbv1.DatabaseServiceServer interface.
17 type DatabaseService struct {
18 orlydbv1.UnimplementedDatabaseServiceServer
19 db database.Database
20 cfg *Config
21 }
22
23 // NewDatabaseService creates a new database service.
24 func NewDatabaseService(db database.Database, cfg *Config) *DatabaseService {
25 return &DatabaseService{
26 db: db,
27 cfg: cfg,
28 }
29 }
30
31 // === Lifecycle Methods ===
32
33 func (s *DatabaseService) GetPath(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.PathResponse, error) {
34 return &orlydbv1.PathResponse{Path: s.db.Path()}, nil
35 }
36
37 func (s *DatabaseService) Sync(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.Empty, error) {
38 if err := s.db.Sync(); err != nil {
39 return nil, status.Errorf(codes.Internal, "sync failed: %v", err)
40 }
41 return &orlydbv1.Empty{}, nil
42 }
43
44 func (s *DatabaseService) Ready(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.ReadyResponse, error) {
45 // Check if ready channel is closed
46 select {
47 case <-s.db.Ready():
48 return &orlydbv1.ReadyResponse{Ready: true}, nil
49 default:
50 return &orlydbv1.ReadyResponse{Ready: false}, nil
51 }
52 }
53
54 func (s *DatabaseService) SetLogLevel(ctx context.Context, req *orlydbv1.SetLogLevelRequest) (*orlydbv1.Empty, error) {
55 s.db.SetLogLevel(req.Level)
56 return &orlydbv1.Empty{}, nil
57 }
58
59 // === Event Storage ===
60
61 func (s *DatabaseService) SaveEvent(ctx context.Context, req *orlydbv1.SaveEventRequest) (*orlydbv1.SaveEventResponse, error) {
62 ev := orlydbv1.ProtoToEvent(req.Event)
63 exists, err := s.db.SaveEvent(ctx, ev)
64 if err != nil {
65 return nil, status.Errorf(codes.Internal, "save event failed: %v", err)
66 }
67 return &orlydbv1.SaveEventResponse{Exists: exists}, nil
68 }
69
70 func (s *DatabaseService) GetSerialsFromFilter(ctx context.Context, req *orlydbv1.GetSerialsFromFilterRequest) (*orlydbv1.SerialList, error) {
71 f := orlydbv1.ProtoToFilter(req.Filter)
72 serials, err := s.db.GetSerialsFromFilter(f)
73 if err != nil {
74 return nil, status.Errorf(codes.Internal, "get serials failed: %v", err)
75 }
76 return orlydbv1.Uint40sToProto(serials), nil
77 }
78
79 func (s *DatabaseService) WouldReplaceEvent(ctx context.Context, req *orlydbv1.WouldReplaceEventRequest) (*orlydbv1.WouldReplaceEventResponse, error) {
80 ev := orlydbv1.ProtoToEvent(req.Event)
81 wouldReplace, replacedSerials, err := s.db.WouldReplaceEvent(ev)
82 if err != nil {
83 return nil, status.Errorf(codes.Internal, "would replace check failed: %v", err)
84 }
85 resp := &orlydbv1.WouldReplaceEventResponse{
86 WouldReplace: wouldReplace,
87 }
88 for _, ser := range replacedSerials {
89 resp.ReplacedSerials = append(resp.ReplacedSerials, ser.Get())
90 }
91 return resp, nil
92 }
93
94 // === Event Queries (Streaming) ===
95
96 func (s *DatabaseService) QueryEvents(req *orlydbv1.QueryEventsRequest, stream orlydbv1.DatabaseService_QueryEventsServer) error {
97 f := orlydbv1.ProtoToFilter(req.Filter)
98 events, err := s.db.QueryEvents(stream.Context(), f)
99 if err != nil {
100 return status.Errorf(codes.Internal, "query events failed: %v", err)
101 }
102 return s.streamEvents(orlydbv1.EventsToProto(events), stream)
103 }
104
105 func (s *DatabaseService) QueryAllVersions(req *orlydbv1.QueryEventsRequest, stream orlydbv1.DatabaseService_QueryAllVersionsServer) error {
106 f := orlydbv1.ProtoToFilter(req.Filter)
107 events, err := s.db.QueryAllVersions(stream.Context(), f)
108 if err != nil {
109 return status.Errorf(codes.Internal, "query all versions failed: %v", err)
110 }
111 return s.streamEvents(orlydbv1.EventsToProto(events), stream)
112 }
113
114 func (s *DatabaseService) QueryEventsWithOptions(req *orlydbv1.QueryEventsWithOptionsRequest, stream orlydbv1.DatabaseService_QueryEventsWithOptionsServer) error {
115 f := orlydbv1.ProtoToFilter(req.Filter)
116 events, err := s.db.QueryEventsWithOptions(stream.Context(), f, req.IncludeDeleteEvents, req.ShowAllVersions)
117 if err != nil {
118 return status.Errorf(codes.Internal, "query events with options failed: %v", err)
119 }
120 return s.streamEvents(orlydbv1.EventsToProto(events), stream)
121 }
122
123 func (s *DatabaseService) QueryDeleteEventsByTargetId(req *orlydbv1.QueryDeleteEventsByTargetIdRequest, stream orlydbv1.DatabaseService_QueryDeleteEventsByTargetIdServer) error {
124 events, err := s.db.QueryDeleteEventsByTargetId(stream.Context(), req.TargetEventId)
125 if err != nil {
126 return status.Errorf(codes.Internal, "query delete events failed: %v", err)
127 }
128 return s.streamEvents(orlydbv1.EventsToProto(events), stream)
129 }
130
131 func (s *DatabaseService) QueryForSerials(ctx context.Context, req *orlydbv1.QueryEventsRequest) (*orlydbv1.SerialList, error) {
132 f := orlydbv1.ProtoToFilter(req.Filter)
133 serials, err := s.db.QueryForSerials(ctx, f)
134 if err != nil {
135 return nil, status.Errorf(codes.Internal, "query for serials failed: %v", err)
136 }
137 return orlydbv1.Uint40sToProto(serials), nil
138 }
139
140 func (s *DatabaseService) QueryForIds(ctx context.Context, req *orlydbv1.QueryEventsRequest) (*orlydbv1.IdPkTsList, error) {
141 f := orlydbv1.ProtoToFilter(req.Filter)
142 idPkTs, err := s.db.QueryForIds(ctx, f)
143 if err != nil {
144 return nil, status.Errorf(codes.Internal, "query for ids failed: %v", err)
145 }
146 return orlydbv1.IdPkTsListToProto(idPkTs), nil
147 }
148
149 func (s *DatabaseService) CountEvents(ctx context.Context, req *orlydbv1.QueryEventsRequest) (*orlydbv1.CountEventsResponse, error) {
150 f := orlydbv1.ProtoToFilter(req.Filter)
151 count, approximate, err := s.db.CountEvents(ctx, f)
152 if err != nil {
153 return nil, status.Errorf(codes.Internal, "count events failed: %v", err)
154 }
155 return &orlydbv1.CountEventsResponse{
156 Count: int32(count),
157 Approximate: approximate,
158 }, nil
159 }
160
161 // === Event Retrieval by Serial ===
162
163 func (s *DatabaseService) FetchEventBySerial(ctx context.Context, req *orlydbv1.FetchEventBySerialRequest) (*orlydbv1.FetchEventBySerialResponse, error) {
164 ser := orlydbv1.ProtoToUint40(&orlydbv1.Uint40{Value: req.Serial})
165 ev, err := s.db.FetchEventBySerial(ser)
166 if err != nil {
167 return nil, status.Errorf(codes.Internal, "fetch event by serial failed: %v", err)
168 }
169 return &orlydbv1.FetchEventBySerialResponse{
170 Event: orlydbv1.EventToProto(ev),
171 Found: ev != nil,
172 }, nil
173 }
174
175 func (s *DatabaseService) FetchEventsBySerials(ctx context.Context, req *orlydbv1.FetchEventsBySerialRequest) (*orlydbv1.EventMap, error) {
176 serials := orlydbv1.ProtoToUint40s(&orlydbv1.SerialList{Serials: req.Serials})
177 events, err := s.db.FetchEventsBySerials(serials)
178 if err != nil {
179 return nil, status.Errorf(codes.Internal, "fetch events by serials failed: %v", err)
180 }
181 return orlydbv1.EventMapToProto(events), nil
182 }
183
184 func (s *DatabaseService) GetSerialById(ctx context.Context, req *orlydbv1.GetSerialByIdRequest) (*orlydbv1.GetSerialByIdResponse, error) {
185 ser, err := s.db.GetSerialById(req.Id)
186 if err != nil {
187 return nil, status.Errorf(codes.Internal, "get serial by id failed: %v", err)
188 }
189 if ser == nil {
190 return &orlydbv1.GetSerialByIdResponse{Found: false}, nil
191 }
192 return &orlydbv1.GetSerialByIdResponse{
193 Serial: ser.Get(),
194 Found: true,
195 }, nil
196 }
197
198 func (s *DatabaseService) GetSerialsByIds(ctx context.Context, req *orlydbv1.GetSerialsByIdsRequest) (*orlydbv1.SerialMap, error) {
199 // Convert request IDs to tag format
200 ids := orlydbv1.BytesToTag(req.Ids)
201 serials, err := s.db.GetSerialsByIds(ids)
202 if err != nil {
203 return nil, status.Errorf(codes.Internal, "get serials by ids failed: %v", err)
204 }
205 result := &orlydbv1.SerialMap{
206 Serials: make(map[string]uint64),
207 }
208 for k, v := range serials {
209 if v != nil {
210 result.Serials[k] = v.Get()
211 }
212 }
213 return result, nil
214 }
215
216 func (s *DatabaseService) GetSerialsByRange(ctx context.Context, req *orlydbv1.GetSerialsByRangeRequest) (*orlydbv1.SerialList, error) {
217 r := orlydbv1.ProtoToRange(req.Range)
218 serials, err := s.db.GetSerialsByRange(r)
219 if err != nil {
220 return nil, status.Errorf(codes.Internal, "get serials by range failed: %v", err)
221 }
222 return orlydbv1.Uint40sToProto(serials), nil
223 }
224
225 func (s *DatabaseService) GetFullIdPubkeyBySerial(ctx context.Context, req *orlydbv1.GetFullIdPubkeyBySerialRequest) (*orlydbv1.IdPkTs, error) {
226 ser := orlydbv1.ProtoToUint40(&orlydbv1.Uint40{Value: req.Serial})
227 idPkTs, err := s.db.GetFullIdPubkeyBySerial(ser)
228 if err != nil {
229 return nil, status.Errorf(codes.Internal, "get full id pubkey by serial failed: %v", err)
230 }
231 return orlydbv1.IdPkTsToProto(idPkTs), nil
232 }
233
234 func (s *DatabaseService) GetFullIdPubkeyBySerials(ctx context.Context, req *orlydbv1.GetFullIdPubkeyBySerialsRequest) (*orlydbv1.IdPkTsList, error) {
235 serials := orlydbv1.ProtoToUint40s(&orlydbv1.SerialList{Serials: req.Serials})
236 idPkTs, err := s.db.GetFullIdPubkeyBySerials(serials)
237 if err != nil {
238 return nil, status.Errorf(codes.Internal, "get full id pubkey by serials failed: %v", err)
239 }
240 return orlydbv1.IdPkTsListToProto(idPkTs), nil
241 }
242
243 // === Event Deletion ===
244
245 func (s *DatabaseService) DeleteEvent(ctx context.Context, req *orlydbv1.DeleteEventRequest) (*orlydbv1.Empty, error) {
246 if err := s.db.DeleteEvent(ctx, req.EventId); err != nil {
247 return nil, status.Errorf(codes.Internal, "delete event failed: %v", err)
248 }
249 return &orlydbv1.Empty{}, nil
250 }
251
252 func (s *DatabaseService) DeleteEventBySerial(ctx context.Context, req *orlydbv1.DeleteEventBySerialRequest) (*orlydbv1.Empty, error) {
253 ser := orlydbv1.ProtoToUint40(&orlydbv1.Uint40{Value: req.Serial})
254 ev := orlydbv1.ProtoToEvent(req.Event)
255 if err := s.db.DeleteEventBySerial(ctx, ser, ev); err != nil {
256 return nil, status.Errorf(codes.Internal, "delete event by serial failed: %v", err)
257 }
258 return &orlydbv1.Empty{}, nil
259 }
260
261 func (s *DatabaseService) DeleteExpired(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.Empty, error) {
262 s.db.DeleteExpired()
263 return &orlydbv1.Empty{}, nil
264 }
265
266 func (s *DatabaseService) ProcessDelete(ctx context.Context, req *orlydbv1.ProcessDeleteRequest) (*orlydbv1.Empty, error) {
267 ev := orlydbv1.ProtoToEvent(req.Event)
268 if err := s.db.ProcessDelete(ev, req.Admins); err != nil {
269 return nil, status.Errorf(codes.Internal, "process delete failed: %v", err)
270 }
271 return &orlydbv1.Empty{}, nil
272 }
273
274 func (s *DatabaseService) CheckForDeleted(ctx context.Context, req *orlydbv1.CheckForDeletedRequest) (*orlydbv1.Empty, error) {
275 ev := orlydbv1.ProtoToEvent(req.Event)
276 if err := s.db.CheckForDeleted(ev, req.Admins); err != nil {
277 return nil, status.Errorf(codes.Internal, "check for deleted failed: %v", err)
278 }
279 return &orlydbv1.Empty{}, nil
280 }
281
282 // === Import/Export ===
283
284 func (s *DatabaseService) Import(stream orlydbv1.DatabaseService_ImportServer) error {
285 pr, pw := io.Pipe()
286
287 // Goroutine to read from gRPC stream and write to pipe
288 go func() {
289 defer pw.Close()
290 for {
291 chunk, err := stream.Recv()
292 if err == io.EOF {
293 return
294 }
295 if err != nil {
296 log.E.F("import stream error: %v", err)
297 pw.CloseWithError(err)
298 return
299 }
300 if _, err := pw.Write(chunk.Data); chk.E(err) {
301 return
302 }
303 }
304 }()
305
306 // Import from pipe
307 s.db.Import(pr)
308
309 return stream.SendAndClose(&orlydbv1.ImportResponse{
310 EventsImported: 0, // TODO: Track count
311 EventsSkipped: 0,
312 })
313 }
314
315 func (s *DatabaseService) Export(req *orlydbv1.ExportRequest, stream orlydbv1.DatabaseService_ExportServer) error {
316 pr, pw := io.Pipe()
317
318 // Goroutine to export to pipe
319 go func() {
320 defer pw.Close()
321 s.db.Export(stream.Context(), pw, req.Pubkeys...)
322 }()
323
324 // Read from pipe and send to stream
325 buf := make([]byte, 64*1024) // 64KB chunks
326 for {
327 n, err := pr.Read(buf)
328 if err == io.EOF {
329 return nil
330 }
331 if err != nil {
332 return status.Errorf(codes.Internal, "export failed: %v", err)
333 }
334 if err := stream.Send(&orlydbv1.ExportChunk{Data: buf[:n]}); err != nil {
335 return err
336 }
337 }
338 }
339
340 func (s *DatabaseService) ImportEventsFromStrings(ctx context.Context, req *orlydbv1.ImportEventsFromStringsRequest) (*orlydbv1.ImportResponse, error) {
341 // Note: We can't pass policy manager over gRPC, so we pass nil
342 if err := s.db.ImportEventsFromStrings(ctx, req.EventJsons, nil); err != nil {
343 return nil, status.Errorf(codes.Internal, "import events from strings failed: %v", err)
344 }
345 return &orlydbv1.ImportResponse{
346 EventsImported: int64(len(req.EventJsons)),
347 }, nil
348 }
349
350 // === Relay Identity ===
351
352 func (s *DatabaseService) GetRelayIdentitySecret(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.GetRelayIdentitySecretResponse, error) {
353 secret, err := s.db.GetRelayIdentitySecret()
354 if err != nil {
355 return nil, status.Errorf(codes.Internal, "get relay identity secret failed: %v", err)
356 }
357 return &orlydbv1.GetRelayIdentitySecretResponse{SecretKey: secret}, nil
358 }
359
360 func (s *DatabaseService) SetRelayIdentitySecret(ctx context.Context, req *orlydbv1.SetRelayIdentitySecretRequest) (*orlydbv1.Empty, error) {
361 if err := s.db.SetRelayIdentitySecret(req.SecretKey); err != nil {
362 return nil, status.Errorf(codes.Internal, "set relay identity secret failed: %v", err)
363 }
364 return &orlydbv1.Empty{}, nil
365 }
366
367 func (s *DatabaseService) GetOrCreateRelayIdentitySecret(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.GetRelayIdentitySecretResponse, error) {
368 secret, err := s.db.GetOrCreateRelayIdentitySecret()
369 if err != nil {
370 return nil, status.Errorf(codes.Internal, "get or create relay identity secret failed: %v", err)
371 }
372 return &orlydbv1.GetRelayIdentitySecretResponse{SecretKey: secret}, nil
373 }
374
375 // === Markers ===
376
377 func (s *DatabaseService) SetMarker(ctx context.Context, req *orlydbv1.SetMarkerRequest) (*orlydbv1.Empty, error) {
378 if err := s.db.SetMarker(req.Key, req.Value); err != nil {
379 return nil, status.Errorf(codes.Internal, "set marker failed: %v", err)
380 }
381 return &orlydbv1.Empty{}, nil
382 }
383
384 func (s *DatabaseService) GetMarker(ctx context.Context, req *orlydbv1.GetMarkerRequest) (*orlydbv1.GetMarkerResponse, error) {
385 value, err := s.db.GetMarker(req.Key)
386 if err != nil {
387 return nil, status.Errorf(codes.Internal, "get marker failed: %v", err)
388 }
389 return &orlydbv1.GetMarkerResponse{
390 Value: value,
391 Found: value != nil,
392 }, nil
393 }
394
395 func (s *DatabaseService) HasMarker(ctx context.Context, req *orlydbv1.HasMarkerRequest) (*orlydbv1.HasMarkerResponse, error) {
396 exists := s.db.HasMarker(req.Key)
397 return &orlydbv1.HasMarkerResponse{Exists: exists}, nil
398 }
399
400 func (s *DatabaseService) DeleteMarker(ctx context.Context, req *orlydbv1.DeleteMarkerRequest) (*orlydbv1.Empty, error) {
401 if err := s.db.DeleteMarker(req.Key); err != nil {
402 return nil, status.Errorf(codes.Internal, "delete marker failed: %v", err)
403 }
404 return &orlydbv1.Empty{}, nil
405 }
406
407 // === Subscriptions ===
408
409 func (s *DatabaseService) GetSubscription(ctx context.Context, req *orlydbv1.GetSubscriptionRequest) (*orlydbv1.Subscription, error) {
410 sub, err := s.db.GetSubscription(req.Pubkey)
411 if err != nil {
412 return nil, status.Errorf(codes.Internal, "get subscription failed: %v", err)
413 }
414 return orlydbv1.SubscriptionToProto(sub, req.Pubkey), nil
415 }
416
417 func (s *DatabaseService) IsSubscriptionActive(ctx context.Context, req *orlydbv1.IsSubscriptionActiveRequest) (*orlydbv1.IsSubscriptionActiveResponse, error) {
418 active, err := s.db.IsSubscriptionActive(req.Pubkey)
419 if err != nil {
420 return nil, status.Errorf(codes.Internal, "is subscription active failed: %v", err)
421 }
422 return &orlydbv1.IsSubscriptionActiveResponse{Active: active}, nil
423 }
424
425 func (s *DatabaseService) ExtendSubscription(ctx context.Context, req *orlydbv1.ExtendSubscriptionRequest) (*orlydbv1.Empty, error) {
426 if err := s.db.ExtendSubscription(req.Pubkey, int(req.Days)); err != nil {
427 return nil, status.Errorf(codes.Internal, "extend subscription failed: %v", err)
428 }
429 return &orlydbv1.Empty{}, nil
430 }
431
432 func (s *DatabaseService) RecordPayment(ctx context.Context, req *orlydbv1.RecordPaymentRequest) (*orlydbv1.Empty, error) {
433 if err := s.db.RecordPayment(req.Pubkey, req.Amount, req.Invoice, req.Preimage); err != nil {
434 return nil, status.Errorf(codes.Internal, "record payment failed: %v", err)
435 }
436 return &orlydbv1.Empty{}, nil
437 }
438
439 func (s *DatabaseService) GetPaymentHistory(ctx context.Context, req *orlydbv1.GetPaymentHistoryRequest) (*orlydbv1.PaymentList, error) {
440 payments, err := s.db.GetPaymentHistory(req.Pubkey)
441 if err != nil {
442 return nil, status.Errorf(codes.Internal, "get payment history failed: %v", err)
443 }
444 return orlydbv1.PaymentListToProto(payments), nil
445 }
446
447 func (s *DatabaseService) ExtendBlossomSubscription(ctx context.Context, req *orlydbv1.ExtendBlossomSubscriptionRequest) (*orlydbv1.Empty, error) {
448 if err := s.db.ExtendBlossomSubscription(req.Pubkey, req.Tier, req.StorageMb, int(req.DaysExtended)); err != nil {
449 return nil, status.Errorf(codes.Internal, "extend blossom subscription failed: %v", err)
450 }
451 return &orlydbv1.Empty{}, nil
452 }
453
454 func (s *DatabaseService) GetBlossomStorageQuota(ctx context.Context, req *orlydbv1.GetBlossomStorageQuotaRequest) (*orlydbv1.GetBlossomStorageQuotaResponse, error) {
455 quota, err := s.db.GetBlossomStorageQuota(req.Pubkey)
456 if err != nil {
457 return nil, status.Errorf(codes.Internal, "get blossom storage quota failed: %v", err)
458 }
459 return &orlydbv1.GetBlossomStorageQuotaResponse{QuotaMb: quota}, nil
460 }
461
462 func (s *DatabaseService) IsFirstTimeUser(ctx context.Context, req *orlydbv1.IsFirstTimeUserRequest) (*orlydbv1.IsFirstTimeUserResponse, error) {
463 firstTime, err := s.db.IsFirstTimeUser(req.Pubkey)
464 if err != nil {
465 return nil, status.Errorf(codes.Internal, "is first time user failed: %v", err)
466 }
467 return &orlydbv1.IsFirstTimeUserResponse{FirstTime: firstTime}, nil
468 }
469
470 // === NIP-43 ===
471
472 func (s *DatabaseService) AddNIP43Member(ctx context.Context, req *orlydbv1.AddNIP43MemberRequest) (*orlydbv1.Empty, error) {
473 if err := s.db.AddNIP43Member(req.Pubkey, req.InviteCode); err != nil {
474 return nil, status.Errorf(codes.Internal, "add NIP-43 member failed: %v", err)
475 }
476 return &orlydbv1.Empty{}, nil
477 }
478
479 func (s *DatabaseService) RemoveNIP43Member(ctx context.Context, req *orlydbv1.RemoveNIP43MemberRequest) (*orlydbv1.Empty, error) {
480 if err := s.db.RemoveNIP43Member(req.Pubkey); err != nil {
481 return nil, status.Errorf(codes.Internal, "remove NIP-43 member failed: %v", err)
482 }
483 return &orlydbv1.Empty{}, nil
484 }
485
486 func (s *DatabaseService) IsNIP43Member(ctx context.Context, req *orlydbv1.IsNIP43MemberRequest) (*orlydbv1.IsNIP43MemberResponse, error) {
487 isMember, err := s.db.IsNIP43Member(req.Pubkey)
488 if err != nil {
489 return nil, status.Errorf(codes.Internal, "is NIP-43 member failed: %v", err)
490 }
491 return &orlydbv1.IsNIP43MemberResponse{IsMember: isMember}, nil
492 }
493
494 func (s *DatabaseService) GetNIP43Membership(ctx context.Context, req *orlydbv1.GetNIP43MembershipRequest) (*orlydbv1.NIP43Membership, error) {
495 membership, err := s.db.GetNIP43Membership(req.Pubkey)
496 if err != nil {
497 return nil, status.Errorf(codes.Internal, "get NIP-43 membership failed: %v", err)
498 }
499 return orlydbv1.NIP43MembershipToProto(membership), nil
500 }
501
502 func (s *DatabaseService) GetAllNIP43Members(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.PubkeyList, error) {
503 members, err := s.db.GetAllNIP43Members()
504 if err != nil {
505 return nil, status.Errorf(codes.Internal, "get all NIP-43 members failed: %v", err)
506 }
507 return &orlydbv1.PubkeyList{Pubkeys: members}, nil
508 }
509
510 func (s *DatabaseService) StoreInviteCode(ctx context.Context, req *orlydbv1.StoreInviteCodeRequest) (*orlydbv1.Empty, error) {
511 expiresAt := orlydbv1.TimeFromUnix(req.ExpiresAt)
512 if err := s.db.StoreInviteCode(req.Code, expiresAt); err != nil {
513 return nil, status.Errorf(codes.Internal, "store invite code failed: %v", err)
514 }
515 return &orlydbv1.Empty{}, nil
516 }
517
518 func (s *DatabaseService) ValidateInviteCode(ctx context.Context, req *orlydbv1.ValidateInviteCodeRequest) (*orlydbv1.ValidateInviteCodeResponse, error) {
519 valid, err := s.db.ValidateInviteCode(req.Code)
520 if err != nil {
521 return nil, status.Errorf(codes.Internal, "validate invite code failed: %v", err)
522 }
523 return &orlydbv1.ValidateInviteCodeResponse{Valid: valid}, nil
524 }
525
526 func (s *DatabaseService) DeleteInviteCode(ctx context.Context, req *orlydbv1.DeleteInviteCodeRequest) (*orlydbv1.Empty, error) {
527 if err := s.db.DeleteInviteCode(req.Code); err != nil {
528 return nil, status.Errorf(codes.Internal, "delete invite code failed: %v", err)
529 }
530 return &orlydbv1.Empty{}, nil
531 }
532
533 func (s *DatabaseService) PublishNIP43MembershipEvent(ctx context.Context, req *orlydbv1.PublishNIP43MembershipEventRequest) (*orlydbv1.Empty, error) {
534 if err := s.db.PublishNIP43MembershipEvent(int(req.Kind), req.Pubkey); err != nil {
535 return nil, status.Errorf(codes.Internal, "publish NIP-43 membership event failed: %v", err)
536 }
537 return &orlydbv1.Empty{}, nil
538 }
539
540 // === Query Cache ===
541
542 func (s *DatabaseService) GetCachedJSON(ctx context.Context, req *orlydbv1.GetCachedJSONRequest) (*orlydbv1.GetCachedJSONResponse, error) {
543 f := orlydbv1.ProtoToFilter(req.Filter)
544 jsonItems, found := s.db.GetCachedJSON(f)
545 return &orlydbv1.GetCachedJSONResponse{
546 JsonItems: jsonItems,
547 Found: found,
548 }, nil
549 }
550
551 func (s *DatabaseService) CacheMarshaledJSON(ctx context.Context, req *orlydbv1.CacheMarshaledJSONRequest) (*orlydbv1.Empty, error) {
552 f := orlydbv1.ProtoToFilter(req.Filter)
553 s.db.CacheMarshaledJSON(f, req.JsonItems)
554 return &orlydbv1.Empty{}, nil
555 }
556
557 func (s *DatabaseService) GetCachedEvents(ctx context.Context, req *orlydbv1.GetCachedEventsRequest) (*orlydbv1.GetCachedEventsResponse, error) {
558 f := orlydbv1.ProtoToFilter(req.Filter)
559 events, found := s.db.GetCachedEvents(f)
560 return &orlydbv1.GetCachedEventsResponse{
561 Events: orlydbv1.EventsToProto(events),
562 Found: found,
563 }, nil
564 }
565
566 func (s *DatabaseService) CacheEvents(ctx context.Context, req *orlydbv1.CacheEventsRequest) (*orlydbv1.Empty, error) {
567 f := orlydbv1.ProtoToFilter(req.Filter)
568 events := orlydbv1.ProtoToEvents(req.Events)
569 s.db.CacheEvents(f, events)
570 return &orlydbv1.Empty{}, nil
571 }
572
573 func (s *DatabaseService) InvalidateQueryCache(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.Empty, error) {
574 s.db.InvalidateQueryCache()
575 return &orlydbv1.Empty{}, nil
576 }
577
578 // === Access Tracking ===
579
580 func (s *DatabaseService) RecordEventAccess(ctx context.Context, req *orlydbv1.RecordEventAccessRequest) (*orlydbv1.Empty, error) {
581 if err := s.db.RecordEventAccess(req.Serial, req.ConnectionId); err != nil {
582 return nil, status.Errorf(codes.Internal, "record event access failed: %v", err)
583 }
584 return &orlydbv1.Empty{}, nil
585 }
586
587 func (s *DatabaseService) GetEventAccessInfo(ctx context.Context, req *orlydbv1.GetEventAccessInfoRequest) (*orlydbv1.GetEventAccessInfoResponse, error) {
588 lastAccess, accessCount, err := s.db.GetEventAccessInfo(req.Serial)
589 if err != nil {
590 return nil, status.Errorf(codes.Internal, "get event access info failed: %v", err)
591 }
592 return &orlydbv1.GetEventAccessInfoResponse{
593 LastAccess: lastAccess,
594 AccessCount: accessCount,
595 }, nil
596 }
597
598 func (s *DatabaseService) GetLeastAccessedEvents(ctx context.Context, req *orlydbv1.GetLeastAccessedEventsRequest) (*orlydbv1.SerialList, error) {
599 serials, err := s.db.GetLeastAccessedEvents(int(req.Limit), req.MinAgeSec)
600 if err != nil {
601 return nil, status.Errorf(codes.Internal, "get least accessed events failed: %v", err)
602 }
603 return &orlydbv1.SerialList{Serials: serials}, nil
604 }
605
606 // === Utility ===
607
608 func (s *DatabaseService) EventIdsBySerial(ctx context.Context, req *orlydbv1.EventIdsBySerialRequest) (*orlydbv1.EventIdsBySerialResponse, error) {
609 eventIds, err := s.db.EventIdsBySerial(req.Start, int(req.Count))
610 if err != nil {
611 return nil, status.Errorf(codes.Internal, "event ids by serial failed: %v", err)
612 }
613 return &orlydbv1.EventIdsBySerialResponse{EventIds: eventIds}, nil
614 }
615
616 func (s *DatabaseService) RunMigrations(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.Empty, error) {
617 s.db.RunMigrations()
618 return &orlydbv1.Empty{}, nil
619 }
620
621 // === Blob Storage (Blossom) ===
622
623 func (s *DatabaseService) SaveBlob(ctx context.Context, req *orlydbv1.SaveBlobRequest) (*orlydbv1.Empty, error) {
624 if err := s.db.SaveBlob(req.Sha256Hash, req.Data, req.Pubkey, req.MimeType, req.Extension); err != nil {
625 return nil, status.Errorf(codes.Internal, "save blob failed: %v", err)
626 }
627 return &orlydbv1.Empty{}, nil
628 }
629
630 func (s *DatabaseService) SaveBlobMetadata(ctx context.Context, req *orlydbv1.SaveBlobMetadataRequest) (*orlydbv1.Empty, error) {
631 if err := s.db.SaveBlobMetadata(req.Sha256Hash, req.Size, req.Pubkey, req.MimeType, req.Extension); err != nil {
632 return nil, status.Errorf(codes.Internal, "save blob metadata failed: %v", err)
633 }
634 return &orlydbv1.Empty{}, nil
635 }
636
637 func (s *DatabaseService) GetBlob(ctx context.Context, req *orlydbv1.GetBlobRequest) (*orlydbv1.GetBlobResponse, error) {
638 data, metadata, err := s.db.GetBlob(req.Sha256Hash)
639 if err != nil {
640 // Return not found as a response, not an error
641 return &orlydbv1.GetBlobResponse{Found: false}, nil
642 }
643 return &orlydbv1.GetBlobResponse{
644 Found: true,
645 Data: data,
646 Metadata: orlydbv1.BlobMetadataToProto(metadata),
647 }, nil
648 }
649
650 func (s *DatabaseService) HasBlob(ctx context.Context, req *orlydbv1.HasBlobRequest) (*orlydbv1.HasBlobResponse, error) {
651 exists, err := s.db.HasBlob(req.Sha256Hash)
652 if err != nil {
653 return nil, status.Errorf(codes.Internal, "has blob failed: %v", err)
654 }
655 return &orlydbv1.HasBlobResponse{Exists: exists}, nil
656 }
657
658 func (s *DatabaseService) DeleteBlob(ctx context.Context, req *orlydbv1.DeleteBlobRequest) (*orlydbv1.Empty, error) {
659 if err := s.db.DeleteBlob(req.Sha256Hash, req.Pubkey); err != nil {
660 return nil, status.Errorf(codes.Internal, "delete blob failed: %v", err)
661 }
662 return &orlydbv1.Empty{}, nil
663 }
664
665 func (s *DatabaseService) ListBlobs(ctx context.Context, req *orlydbv1.ListBlobsRequest) (*orlydbv1.ListBlobsResponse, error) {
666 descriptors, err := s.db.ListBlobs(req.Pubkey, req.Since, req.Until)
667 if err != nil {
668 return nil, status.Errorf(codes.Internal, "list blobs failed: %v", err)
669 }
670 return &orlydbv1.ListBlobsResponse{
671 Descriptors: orlydbv1.BlobDescriptorListToProto(descriptors),
672 }, nil
673 }
674
675 func (s *DatabaseService) GetBlobMetadata(ctx context.Context, req *orlydbv1.GetBlobMetadataRequest) (*orlydbv1.BlobMetadata, error) {
676 metadata, err := s.db.GetBlobMetadata(req.Sha256Hash)
677 if err != nil {
678 return nil, status.Errorf(codes.NotFound, "blob metadata not found: %v", err)
679 }
680 return orlydbv1.BlobMetadataToProto(metadata), nil
681 }
682
683 func (s *DatabaseService) GetTotalBlobStorageUsed(ctx context.Context, req *orlydbv1.GetTotalBlobStorageUsedRequest) (*orlydbv1.GetTotalBlobStorageUsedResponse, error) {
684 totalMB, err := s.db.GetTotalBlobStorageUsed(req.Pubkey)
685 if err != nil {
686 return nil, status.Errorf(codes.Internal, "get total blob storage used failed: %v", err)
687 }
688 return &orlydbv1.GetTotalBlobStorageUsedResponse{TotalMb: totalMB}, nil
689 }
690
691 func (s *DatabaseService) SaveBlobReport(ctx context.Context, req *orlydbv1.SaveBlobReportRequest) (*orlydbv1.Empty, error) {
692 if err := s.db.SaveBlobReport(req.Sha256Hash, req.ReportData); err != nil {
693 return nil, status.Errorf(codes.Internal, "save blob report failed: %v", err)
694 }
695 return &orlydbv1.Empty{}, nil
696 }
697
698 func (s *DatabaseService) ListAllBlobUserStats(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.ListAllBlobUserStatsResponse, error) {
699 stats, err := s.db.ListAllBlobUserStats()
700 if err != nil {
701 return nil, status.Errorf(codes.Internal, "list all blob user stats failed: %v", err)
702 }
703 return &orlydbv1.ListAllBlobUserStatsResponse{
704 Stats: orlydbv1.UserBlobStatsListToProto(stats),
705 }, nil
706 }
707
708 // === Helper Methods ===
709
710 // streamEvents is a helper to stream events in batches.
711 type eventStreamer interface {
712 Send(*orlydbv1.EventBatch) error
713 Context() context.Context
714 }
715
716 func (s *DatabaseService) streamEvents(events []*orlydbv1.Event, stream eventStreamer) error {
717 batchSize := s.cfg.StreamBatchSize
718 if batchSize == 0 {
719 batchSize = 100
720 }
721
722 for i := 0; i < len(events); i += batchSize {
723 end := i + batchSize
724 if end > len(events) {
725 end = len(events)
726 }
727
728 batch := &orlydbv1.EventBatch{
729 Events: events[i:end],
730 }
731
732 if err := stream.Send(batch); err != nil {
733 return err
734 }
735 }
736
737 return nil
738 }
739