client.go raw
1 // Package grpc provides a gRPC client that implements the database.Database interface.
2 // This allows the relay to use a remote database server via gRPC.
3 package grpc
4
5 import (
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "time"
12
13 "google.golang.org/grpc"
14 "google.golang.org/grpc/credentials/insecure"
15 "next.orly.dev/pkg/lol/chk"
16 "next.orly.dev/pkg/lol/log"
17
18 "next.orly.dev/pkg/nostr/encoders/event"
19 "next.orly.dev/pkg/nostr/encoders/filter"
20 "next.orly.dev/pkg/nostr/encoders/tag"
21 "next.orly.dev/pkg/database"
22 indextypes "next.orly.dev/pkg/database/indexes/types"
23 "next.orly.dev/pkg/interfaces/store"
24 orlydbv1 "next.orly.dev/pkg/proto/orlydb/v1"
25 )
26
27 // Client implements the database.Database interface via gRPC.
28 type Client struct {
29 conn *grpc.ClientConn
30 client orlydbv1.DatabaseServiceClient
31 ready chan struct{}
32 path string
33 }
34
35 // Verify Client implements database.Database at compile time.
36 var _ database.Database = (*Client)(nil)
37
38 // Verify Client implements CypherExecutor at compile time.
39 var _ store.CypherExecutor = (*Client)(nil)
40
41 // ClientConfig holds configuration for the gRPC client.
42 type ClientConfig struct {
43 ServerAddress string
44 ConnectTimeout time.Duration
45 }
46
47 // New creates a new gRPC database client.
48 func New(ctx context.Context, cfg *ClientConfig) (*Client, error) {
49 timeout := cfg.ConnectTimeout
50 if timeout == 0 {
51 timeout = 10 * time.Second
52 }
53
54 dialCtx, cancel := context.WithTimeout(ctx, timeout)
55 defer cancel()
56
57 conn, err := grpc.DialContext(dialCtx, cfg.ServerAddress,
58 grpc.WithTransportCredentials(insecure.NewCredentials()),
59 grpc.WithDefaultCallOptions(
60 grpc.MaxCallRecvMsgSize(64<<20), // 64MB
61 grpc.MaxCallSendMsgSize(64<<20), // 64MB
62 ),
63 )
64 if err != nil {
65 return nil, err
66 }
67
68 c := &Client{
69 conn: conn,
70 client: orlydbv1.NewDatabaseServiceClient(conn),
71 ready: make(chan struct{}),
72 path: "grpc://" + cfg.ServerAddress,
73 }
74
75 // Check if server is ready
76 go c.waitForReady(ctx)
77
78 return c, nil
79 }
80
81 func (c *Client) waitForReady(ctx context.Context) {
82 for {
83 select {
84 case <-ctx.Done():
85 return
86 default:
87 resp, err := c.client.Ready(ctx, &orlydbv1.Empty{})
88 if err == nil && resp.Ready {
89 close(c.ready)
90 log.I.F("gRPC database client connected and ready")
91 return
92 }
93 time.Sleep(100 * time.Millisecond)
94 }
95 }
96 }
97
98 // === Lifecycle Methods ===
99
100 func (c *Client) Path() string {
101 // Get the actual database path from the server so blossom can find blob files
102 resp, err := c.client.GetPath(context.Background(), &orlydbv1.Empty{})
103 if err != nil {
104 log.W.F("failed to get path from database server: %v, using local path", err)
105 return c.path
106 }
107 return resp.Path
108 }
109
110 func (c *Client) Init(path string) error {
111 // Not applicable for remote database
112 return nil
113 }
114
115 func (c *Client) Sync() error {
116 _, err := c.client.Sync(context.Background(), &orlydbv1.Empty{})
117 return err
118 }
119
120 func (c *Client) Close() error {
121 return c.conn.Close()
122 }
123
124 func (c *Client) Wipe() error {
125 // Not implemented for remote database (dangerous operation)
126 return nil
127 }
128
129 func (c *Client) SetLogLevel(level string) {
130 _, _ = c.client.SetLogLevel(context.Background(), &orlydbv1.SetLogLevelRequest{Level: level})
131 }
132
133 func (c *Client) Ready() <-chan struct{} {
134 return c.ready
135 }
136
137 // === Event Storage ===
138
139 func (c *Client) SaveEvent(ctx context.Context, ev *event.E) (exists bool, err error) {
140 resp, err := c.client.SaveEvent(ctx, &orlydbv1.SaveEventRequest{
141 Event: orlydbv1.EventToProto(ev),
142 })
143 if err != nil {
144 return false, err
145 }
146 return resp.Exists, nil
147 }
148
149 func (c *Client) GetSerialsFromFilter(f *filter.F) (serials indextypes.Uint40s, err error) {
150 resp, err := c.client.GetSerialsFromFilter(context.Background(), &orlydbv1.GetSerialsFromFilterRequest{
151 Filter: orlydbv1.FilterToProto(f),
152 })
153 if err != nil {
154 return nil, err
155 }
156 return protoToUint40s(resp), nil
157 }
158
159 func (c *Client) WouldReplaceEvent(ev *event.E) (bool, indextypes.Uint40s, error) {
160 resp, err := c.client.WouldReplaceEvent(context.Background(), &orlydbv1.WouldReplaceEventRequest{
161 Event: orlydbv1.EventToProto(ev),
162 })
163 if err != nil {
164 return false, nil, err
165 }
166 serials := make(indextypes.Uint40s, 0, len(resp.ReplacedSerials))
167 for _, s := range resp.ReplacedSerials {
168 u := &indextypes.Uint40{}
169 _ = u.Set(s)
170 serials = append(serials, u)
171 }
172 return resp.WouldReplace, serials, nil
173 }
174
175 // === Event Queries ===
176
177 func (c *Client) QueryEvents(ctx context.Context, f *filter.F) (evs event.S, err error) {
178 stream, err := c.client.QueryEvents(ctx, &orlydbv1.QueryEventsRequest{
179 Filter: orlydbv1.FilterToProto(f),
180 })
181 if err != nil {
182 return nil, err
183 }
184 return c.collectStreamedEvents(stream)
185 }
186
187 func (c *Client) QueryAllVersions(ctx context.Context, f *filter.F) (evs event.S, err error) {
188 stream, err := c.client.QueryAllVersions(ctx, &orlydbv1.QueryEventsRequest{
189 Filter: orlydbv1.FilterToProto(f),
190 })
191 if err != nil {
192 return nil, err
193 }
194 return c.collectStreamedEvents(stream)
195 }
196
197 func (c *Client) QueryEventsWithOptions(ctx context.Context, f *filter.F, includeDeleteEvents bool, showAllVersions bool) (evs event.S, err error) {
198 stream, err := c.client.QueryEventsWithOptions(ctx, &orlydbv1.QueryEventsWithOptionsRequest{
199 Filter: orlydbv1.FilterToProto(f),
200 IncludeDeleteEvents: includeDeleteEvents,
201 ShowAllVersions: showAllVersions,
202 })
203 if err != nil {
204 return nil, err
205 }
206 return c.collectStreamedEvents(stream)
207 }
208
209 func (c *Client) QueryDeleteEventsByTargetId(ctx context.Context, targetEventId []byte) (evs event.S, err error) {
210 stream, err := c.client.QueryDeleteEventsByTargetId(ctx, &orlydbv1.QueryDeleteEventsByTargetIdRequest{
211 TargetEventId: targetEventId,
212 })
213 if err != nil {
214 return nil, err
215 }
216 return c.collectStreamedEvents(stream)
217 }
218
219 func (c *Client) QueryForSerials(ctx context.Context, f *filter.F) (serials indextypes.Uint40s, err error) {
220 resp, err := c.client.QueryForSerials(ctx, &orlydbv1.QueryEventsRequest{
221 Filter: orlydbv1.FilterToProto(f),
222 })
223 if err != nil {
224 return nil, err
225 }
226 return protoToUint40s(resp), nil
227 }
228
229 func (c *Client) QueryForIds(ctx context.Context, f *filter.F) (idPkTs []*store.IdPkTs, err error) {
230 resp, err := c.client.QueryForIds(ctx, &orlydbv1.QueryEventsRequest{
231 Filter: orlydbv1.FilterToProto(f),
232 })
233 if err != nil {
234 return nil, err
235 }
236 return orlydbv1.ProtoToIdPkTsList(resp), nil
237 }
238
239 func (c *Client) CountEvents(ctx context.Context, f *filter.F) (count int, approximate bool, err error) {
240 resp, err := c.client.CountEvents(ctx, &orlydbv1.QueryEventsRequest{
241 Filter: orlydbv1.FilterToProto(f),
242 })
243 if err != nil {
244 return 0, false, err
245 }
246 return int(resp.Count), resp.Approximate, nil
247 }
248
249 // === Event Retrieval by Serial ===
250
251 func (c *Client) FetchEventBySerial(ser *indextypes.Uint40) (ev *event.E, err error) {
252 resp, err := c.client.FetchEventBySerial(context.Background(), &orlydbv1.FetchEventBySerialRequest{
253 Serial: ser.Get(),
254 })
255 if err != nil {
256 return nil, err
257 }
258 if !resp.Found {
259 return nil, nil
260 }
261 return orlydbv1.ProtoToEvent(resp.Event), nil
262 }
263
264 func (c *Client) FetchEventsBySerials(serials []*indextypes.Uint40) (events map[uint64]*event.E, err error) {
265 serialList := make([]uint64, 0, len(serials))
266 for _, s := range serials {
267 serialList = append(serialList, s.Get())
268 }
269 resp, err := c.client.FetchEventsBySerials(context.Background(), &orlydbv1.FetchEventsBySerialRequest{
270 Serials: serialList,
271 })
272 if err != nil {
273 return nil, err
274 }
275 return orlydbv1.ProtoToEventMap(resp), nil
276 }
277
278 func (c *Client) GetSerialById(id []byte) (ser *indextypes.Uint40, err error) {
279 resp, err := c.client.GetSerialById(context.Background(), &orlydbv1.GetSerialByIdRequest{
280 Id: id,
281 })
282 if err != nil {
283 return nil, err
284 }
285 if !resp.Found {
286 return nil, nil
287 }
288 u := &indextypes.Uint40{}
289 _ = u.Set(resp.Serial)
290 return u, nil
291 }
292
293 func (c *Client) GetSerialsByIds(ids *tag.T) (serials map[string]*indextypes.Uint40, err error) {
294 idList := make([][]byte, 0, len(ids.T))
295 for _, id := range ids.T {
296 idList = append(idList, id)
297 }
298 resp, err := c.client.GetSerialsByIds(context.Background(), &orlydbv1.GetSerialsByIdsRequest{
299 Ids: idList,
300 })
301 if err != nil {
302 return nil, err
303 }
304 result := make(map[string]*indextypes.Uint40)
305 for k, v := range resp.Serials {
306 u := &indextypes.Uint40{}
307 _ = u.Set(v)
308 result[k] = u
309 }
310 return result, nil
311 }
312
313 func (c *Client) GetSerialsByIdsWithFilter(ids *tag.T, fn func(ev *event.E, ser *indextypes.Uint40) bool) (serials map[string]*indextypes.Uint40, err error) {
314 // Note: Filter function cannot be passed over gRPC, so we just get all serials
315 return c.GetSerialsByIds(ids)
316 }
317
318 func (c *Client) GetSerialsByRange(idx database.Range) (serials indextypes.Uint40s, err error) {
319 resp, err := c.client.GetSerialsByRange(context.Background(), &orlydbv1.GetSerialsByRangeRequest{
320 Range: orlydbv1.RangeToProto(idx),
321 })
322 if err != nil {
323 return nil, err
324 }
325 return protoToUint40s(resp), nil
326 }
327
328 func (c *Client) GetFullIdPubkeyBySerial(ser *indextypes.Uint40) (fidpk *store.IdPkTs, err error) {
329 resp, err := c.client.GetFullIdPubkeyBySerial(context.Background(), &orlydbv1.GetFullIdPubkeyBySerialRequest{
330 Serial: ser.Get(),
331 })
332 if err != nil {
333 return nil, err
334 }
335 return orlydbv1.ProtoToIdPkTs(resp), nil
336 }
337
338 func (c *Client) GetFullIdPubkeyBySerials(sers []*indextypes.Uint40) (fidpks []*store.IdPkTs, err error) {
339 serialList := make([]uint64, 0, len(sers))
340 for _, s := range sers {
341 serialList = append(serialList, s.Get())
342 }
343 resp, err := c.client.GetFullIdPubkeyBySerials(context.Background(), &orlydbv1.GetFullIdPubkeyBySerialsRequest{
344 Serials: serialList,
345 })
346 if err != nil {
347 return nil, err
348 }
349 return orlydbv1.ProtoToIdPkTsList(resp), nil
350 }
351
352 // === Event Deletion ===
353
354 func (c *Client) DeleteEvent(ctx context.Context, eid []byte) error {
355 _, err := c.client.DeleteEvent(ctx, &orlydbv1.DeleteEventRequest{
356 EventId: eid,
357 })
358 return err
359 }
360
361 func (c *Client) DeleteEventBySerial(ctx context.Context, ser *indextypes.Uint40, ev *event.E) error {
362 _, err := c.client.DeleteEventBySerial(ctx, &orlydbv1.DeleteEventBySerialRequest{
363 Serial: ser.Get(),
364 Event: orlydbv1.EventToProto(ev),
365 })
366 return err
367 }
368
369 func (c *Client) DeleteExpired() {
370 _, _ = c.client.DeleteExpired(context.Background(), &orlydbv1.Empty{})
371 }
372
373 func (c *Client) ProcessDelete(ev *event.E, admins [][]byte) error {
374 _, err := c.client.ProcessDelete(context.Background(), &orlydbv1.ProcessDeleteRequest{
375 Event: orlydbv1.EventToProto(ev),
376 Admins: admins,
377 })
378 return err
379 }
380
381 func (c *Client) CheckForDeleted(ev *event.E, admins [][]byte) error {
382 _, err := c.client.CheckForDeleted(context.Background(), &orlydbv1.CheckForDeletedRequest{
383 Event: orlydbv1.EventToProto(ev),
384 Admins: admins,
385 })
386 return err
387 }
388
389 // === Import/Export ===
390
391 func (c *Client) Import(rr io.Reader) {
392 stream, err := c.client.Import(context.Background())
393 if chk.E(err) {
394 return
395 }
396
397 buf := make([]byte, 64*1024)
398 for {
399 n, err := rr.Read(buf)
400 if err == io.EOF {
401 break
402 }
403 if chk.E(err) {
404 return
405 }
406 if err := stream.Send(&orlydbv1.ImportChunk{Data: buf[:n]}); chk.E(err) {
407 return
408 }
409 }
410
411 _, _ = stream.CloseAndRecv()
412 }
413
414 func (c *Client) Export(ctx context.Context, w io.Writer, pubkeys ...[]byte) {
415 stream, err := c.client.Export(ctx, &orlydbv1.ExportRequest{
416 Pubkeys: pubkeys,
417 })
418 if chk.E(err) {
419 return
420 }
421
422 for {
423 chunk, err := stream.Recv()
424 if err == io.EOF {
425 return
426 }
427 if chk.E(err) {
428 return
429 }
430 if _, err := w.Write(chunk.Data); chk.E(err) {
431 return
432 }
433 }
434 }
435
436 func (c *Client) ImportEventsFromReader(ctx context.Context, rr io.Reader) error {
437 c.Import(rr)
438 return nil
439 }
440
441 func (c *Client) ImportEventsFromStrings(ctx context.Context, eventJSONs []string, policyManager interface{ CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error) }) error {
442 _, err := c.client.ImportEventsFromStrings(ctx, &orlydbv1.ImportEventsFromStringsRequest{
443 EventJsons: eventJSONs,
444 CheckPolicy: policyManager != nil,
445 })
446 return err
447 }
448
449 // === Relay Identity ===
450
451 func (c *Client) GetRelayIdentitySecret() (skb []byte, err error) {
452 resp, err := c.client.GetRelayIdentitySecret(context.Background(), &orlydbv1.Empty{})
453 if err != nil {
454 return nil, err
455 }
456 return resp.SecretKey, nil
457 }
458
459 func (c *Client) SetRelayIdentitySecret(skb []byte) error {
460 _, err := c.client.SetRelayIdentitySecret(context.Background(), &orlydbv1.SetRelayIdentitySecretRequest{
461 SecretKey: skb,
462 })
463 return err
464 }
465
466 func (c *Client) GetOrCreateRelayIdentitySecret() (skb []byte, err error) {
467 resp, err := c.client.GetOrCreateRelayIdentitySecret(context.Background(), &orlydbv1.Empty{})
468 if err != nil {
469 return nil, err
470 }
471 return resp.SecretKey, nil
472 }
473
474 // === Markers ===
475
476 func (c *Client) SetMarker(key string, value []byte) error {
477 _, err := c.client.SetMarker(context.Background(), &orlydbv1.SetMarkerRequest{
478 Key: key,
479 Value: value,
480 })
481 return err
482 }
483
484 func (c *Client) GetMarker(key string) (value []byte, err error) {
485 resp, err := c.client.GetMarker(context.Background(), &orlydbv1.GetMarkerRequest{
486 Key: key,
487 })
488 if err != nil {
489 return nil, err
490 }
491 if !resp.Found {
492 return nil, nil
493 }
494 return resp.Value, nil
495 }
496
497 func (c *Client) HasMarker(key string) bool {
498 resp, err := c.client.HasMarker(context.Background(), &orlydbv1.HasMarkerRequest{
499 Key: key,
500 })
501 if err != nil {
502 return false
503 }
504 return resp.Exists
505 }
506
507 func (c *Client) DeleteMarker(key string) error {
508 _, err := c.client.DeleteMarker(context.Background(), &orlydbv1.DeleteMarkerRequest{
509 Key: key,
510 })
511 return err
512 }
513
514 // === Subscriptions ===
515
516 func (c *Client) GetSubscription(pubkey []byte) (*database.Subscription, error) {
517 resp, err := c.client.GetSubscription(context.Background(), &orlydbv1.GetSubscriptionRequest{
518 Pubkey: pubkey,
519 })
520 if err != nil {
521 return nil, err
522 }
523 return orlydbv1.ProtoToSubscription(resp), nil
524 }
525
526 func (c *Client) IsSubscriptionActive(pubkey []byte) (bool, error) {
527 resp, err := c.client.IsSubscriptionActive(context.Background(), &orlydbv1.IsSubscriptionActiveRequest{
528 Pubkey: pubkey,
529 })
530 if err != nil {
531 return false, err
532 }
533 return resp.Active, nil
534 }
535
536 func (c *Client) ExtendSubscription(pubkey []byte, days int) error {
537 _, err := c.client.ExtendSubscription(context.Background(), &orlydbv1.ExtendSubscriptionRequest{
538 Pubkey: pubkey,
539 Days: int32(days),
540 })
541 return err
542 }
543
544 func (c *Client) RecordPayment(pubkey []byte, amount int64, invoice, preimage string) error {
545 _, err := c.client.RecordPayment(context.Background(), &orlydbv1.RecordPaymentRequest{
546 Pubkey: pubkey,
547 Amount: amount,
548 Invoice: invoice,
549 Preimage: preimage,
550 })
551 return err
552 }
553
554 func (c *Client) GetPaymentHistory(pubkey []byte) ([]database.Payment, error) {
555 resp, err := c.client.GetPaymentHistory(context.Background(), &orlydbv1.GetPaymentHistoryRequest{
556 Pubkey: pubkey,
557 })
558 if err != nil {
559 return nil, err
560 }
561 return orlydbv1.ProtoToPaymentList(resp), nil
562 }
563
564 func (c *Client) ExtendBlossomSubscription(pubkey []byte, tier string, storageMB int64, daysExtended int) error {
565 _, err := c.client.ExtendBlossomSubscription(context.Background(), &orlydbv1.ExtendBlossomSubscriptionRequest{
566 Pubkey: pubkey,
567 Tier: tier,
568 StorageMb: storageMB,
569 DaysExtended: int32(daysExtended),
570 })
571 return err
572 }
573
574 func (c *Client) GetBlossomStorageQuota(pubkey []byte) (quotaMB int64, err error) {
575 resp, err := c.client.GetBlossomStorageQuota(context.Background(), &orlydbv1.GetBlossomStorageQuotaRequest{
576 Pubkey: pubkey,
577 })
578 if err != nil {
579 return 0, err
580 }
581 return resp.QuotaMb, nil
582 }
583
584 func (c *Client) IsFirstTimeUser(pubkey []byte) (bool, error) {
585 resp, err := c.client.IsFirstTimeUser(context.Background(), &orlydbv1.IsFirstTimeUserRequest{
586 Pubkey: pubkey,
587 })
588 if err != nil {
589 return false, err
590 }
591 return resp.FirstTime, nil
592 }
593
594 // === NIP-43 ===
595
596 func (c *Client) AddNIP43Member(pubkey []byte, inviteCode string) error {
597 _, err := c.client.AddNIP43Member(context.Background(), &orlydbv1.AddNIP43MemberRequest{
598 Pubkey: pubkey,
599 InviteCode: inviteCode,
600 })
601 return err
602 }
603
604 func (c *Client) RemoveNIP43Member(pubkey []byte) error {
605 _, err := c.client.RemoveNIP43Member(context.Background(), &orlydbv1.RemoveNIP43MemberRequest{
606 Pubkey: pubkey,
607 })
608 return err
609 }
610
611 func (c *Client) IsNIP43Member(pubkey []byte) (isMember bool, err error) {
612 resp, err := c.client.IsNIP43Member(context.Background(), &orlydbv1.IsNIP43MemberRequest{
613 Pubkey: pubkey,
614 })
615 if err != nil {
616 return false, err
617 }
618 return resp.IsMember, nil
619 }
620
621 func (c *Client) GetNIP43Membership(pubkey []byte) (*database.NIP43Membership, error) {
622 resp, err := c.client.GetNIP43Membership(context.Background(), &orlydbv1.GetNIP43MembershipRequest{
623 Pubkey: pubkey,
624 })
625 if err != nil {
626 return nil, err
627 }
628 return orlydbv1.ProtoToNIP43Membership(resp), nil
629 }
630
631 func (c *Client) GetAllNIP43Members() ([][]byte, error) {
632 resp, err := c.client.GetAllNIP43Members(context.Background(), &orlydbv1.Empty{})
633 if err != nil {
634 return nil, err
635 }
636 return resp.Pubkeys, nil
637 }
638
639 func (c *Client) StoreInviteCode(code string, expiresAt time.Time) error {
640 _, err := c.client.StoreInviteCode(context.Background(), &orlydbv1.StoreInviteCodeRequest{
641 Code: code,
642 ExpiresAt: expiresAt.Unix(),
643 })
644 return err
645 }
646
647 func (c *Client) ValidateInviteCode(code string) (valid bool, err error) {
648 resp, err := c.client.ValidateInviteCode(context.Background(), &orlydbv1.ValidateInviteCodeRequest{
649 Code: code,
650 })
651 if err != nil {
652 return false, err
653 }
654 return resp.Valid, nil
655 }
656
657 func (c *Client) DeleteInviteCode(code string) error {
658 _, err := c.client.DeleteInviteCode(context.Background(), &orlydbv1.DeleteInviteCodeRequest{
659 Code: code,
660 })
661 return err
662 }
663
664 func (c *Client) PublishNIP43MembershipEvent(kind int, pubkey []byte) error {
665 _, err := c.client.PublishNIP43MembershipEvent(context.Background(), &orlydbv1.PublishNIP43MembershipEventRequest{
666 Kind: int32(kind),
667 Pubkey: pubkey,
668 })
669 return err
670 }
671
672 // === Migrations ===
673
674 func (c *Client) RunMigrations() {
675 _, _ = c.client.RunMigrations(context.Background(), &orlydbv1.Empty{})
676 }
677
678 // === Query Cache ===
679
680 func (c *Client) GetCachedJSON(f *filter.F) ([][]byte, bool) {
681 resp, err := c.client.GetCachedJSON(context.Background(), &orlydbv1.GetCachedJSONRequest{
682 Filter: orlydbv1.FilterToProto(f),
683 })
684 if err != nil {
685 return nil, false
686 }
687 return resp.JsonItems, resp.Found
688 }
689
690 func (c *Client) CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) {
691 _, _ = c.client.CacheMarshaledJSON(context.Background(), &orlydbv1.CacheMarshaledJSONRequest{
692 Filter: orlydbv1.FilterToProto(f),
693 JsonItems: marshaledJSON,
694 })
695 }
696
697 func (c *Client) GetCachedEvents(f *filter.F) (event.S, bool) {
698 resp, err := c.client.GetCachedEvents(context.Background(), &orlydbv1.GetCachedEventsRequest{
699 Filter: orlydbv1.FilterToProto(f),
700 })
701 if err != nil {
702 return nil, false
703 }
704 return orlydbv1.ProtoToEvents(resp.Events), resp.Found
705 }
706
707 func (c *Client) CacheEvents(f *filter.F, events event.S) {
708 _, _ = c.client.CacheEvents(context.Background(), &orlydbv1.CacheEventsRequest{
709 Filter: orlydbv1.FilterToProto(f),
710 Events: orlydbv1.EventsToProto(events),
711 })
712 }
713
714 func (c *Client) InvalidateQueryCache() {
715 _, _ = c.client.InvalidateQueryCache(context.Background(), &orlydbv1.Empty{})
716 }
717
718 // === Access Tracking ===
719
720 func (c *Client) RecordEventAccess(serial uint64, connectionID string) error {
721 _, err := c.client.RecordEventAccess(context.Background(), &orlydbv1.RecordEventAccessRequest{
722 Serial: serial,
723 ConnectionId: connectionID,
724 })
725 return err
726 }
727
728 func (c *Client) GetEventAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error) {
729 resp, err := c.client.GetEventAccessInfo(context.Background(), &orlydbv1.GetEventAccessInfoRequest{
730 Serial: serial,
731 })
732 if err != nil {
733 return 0, 0, err
734 }
735 return resp.LastAccess, resp.AccessCount, nil
736 }
737
738 func (c *Client) GetLeastAccessedEvents(limit int, minAgeSec int64) (serials []uint64, err error) {
739 resp, err := c.client.GetLeastAccessedEvents(context.Background(), &orlydbv1.GetLeastAccessedEventsRequest{
740 Limit: int32(limit),
741 MinAgeSec: minAgeSec,
742 })
743 if err != nil {
744 return nil, err
745 }
746 return resp.Serials, nil
747 }
748
749 // === Blob Storage (Blossom) ===
750
751 func (c *Client) SaveBlob(sha256Hash []byte, data []byte, pubkey []byte, mimeType string, extension string) error {
752 _, err := c.client.SaveBlob(context.Background(), &orlydbv1.SaveBlobRequest{
753 Sha256Hash: sha256Hash,
754 Data: data,
755 Pubkey: pubkey,
756 MimeType: mimeType,
757 Extension: extension,
758 })
759 return err
760 }
761
762 func (c *Client) SaveBlobMetadata(sha256Hash []byte, size int64, pubkey []byte, mimeType string, extension string) error {
763 _, err := c.client.SaveBlobMetadata(context.Background(), &orlydbv1.SaveBlobMetadataRequest{
764 Sha256Hash: sha256Hash,
765 Size: size,
766 Pubkey: pubkey,
767 MimeType: mimeType,
768 Extension: extension,
769 })
770 return err
771 }
772
773 func (c *Client) GetBlob(sha256Hash []byte) (data []byte, metadata *database.BlobMetadata, err error) {
774 resp, err := c.client.GetBlob(context.Background(), &orlydbv1.GetBlobRequest{
775 Sha256Hash: sha256Hash,
776 })
777 if err != nil {
778 return nil, nil, err
779 }
780 if !resp.Found {
781 return nil, nil, nil
782 }
783 return resp.Data, orlydbv1.ProtoToBlobMetadata(resp.Metadata), nil
784 }
785
786 func (c *Client) HasBlob(sha256Hash []byte) (exists bool, err error) {
787 resp, err := c.client.HasBlob(context.Background(), &orlydbv1.HasBlobRequest{
788 Sha256Hash: sha256Hash,
789 })
790 if err != nil {
791 return false, err
792 }
793 return resp.Exists, nil
794 }
795
796 func (c *Client) DeleteBlob(sha256Hash []byte, pubkey []byte) error {
797 _, err := c.client.DeleteBlob(context.Background(), &orlydbv1.DeleteBlobRequest{
798 Sha256Hash: sha256Hash,
799 Pubkey: pubkey,
800 })
801 return err
802 }
803
804 func (c *Client) ListBlobs(pubkey []byte, since, until int64) ([]*database.BlobDescriptor, error) {
805 resp, err := c.client.ListBlobs(context.Background(), &orlydbv1.ListBlobsRequest{
806 Pubkey: pubkey,
807 Since: since,
808 Until: until,
809 })
810 if err != nil {
811 return nil, err
812 }
813 return orlydbv1.ProtoToBlobDescriptorList(resp.Descriptors), nil
814 }
815
816 func (c *Client) GetBlobMetadata(sha256Hash []byte) (*database.BlobMetadata, error) {
817 resp, err := c.client.GetBlobMetadata(context.Background(), &orlydbv1.GetBlobMetadataRequest{
818 Sha256Hash: sha256Hash,
819 })
820 if err != nil {
821 return nil, err
822 }
823 return orlydbv1.ProtoToBlobMetadata(resp), nil
824 }
825
826 func (c *Client) GetTotalBlobStorageUsed(pubkey []byte) (totalMB int64, err error) {
827 resp, err := c.client.GetTotalBlobStorageUsed(context.Background(), &orlydbv1.GetTotalBlobStorageUsedRequest{
828 Pubkey: pubkey,
829 })
830 if err != nil {
831 return 0, err
832 }
833 return resp.TotalMb, nil
834 }
835
836 func (c *Client) SaveBlobReport(sha256Hash []byte, reportData []byte) error {
837 _, err := c.client.SaveBlobReport(context.Background(), &orlydbv1.SaveBlobReportRequest{
838 Sha256Hash: sha256Hash,
839 ReportData: reportData,
840 })
841 return err
842 }
843
844 func (c *Client) ListAllBlobUserStats() ([]*database.UserBlobStats, error) {
845 resp, err := c.client.ListAllBlobUserStats(context.Background(), &orlydbv1.Empty{})
846 if err != nil {
847 return nil, err
848 }
849 return orlydbv1.ProtoToUserBlobStatsList(resp.Stats), nil
850 }
851
852 func (c *Client) ReconcileBlobMetadata() (reconciled int, err error) {
853 resp, err := c.client.ReconcileBlobMetadata(context.Background(), &orlydbv1.Empty{})
854 if err != nil {
855 return 0, err
856 }
857 return int(resp.Reconciled), nil
858 }
859
860 func (c *Client) ListAllBlobs() ([]*database.BlobDescriptor, error) {
861 resp, err := c.client.ListAllBlobs(context.Background(), &orlydbv1.Empty{})
862 if err != nil {
863 return nil, err
864 }
865 return orlydbv1.ProtoToBlobDescriptorList(resp.Descriptors), nil
866 }
867
868 func (c *Client) GetThumbnail(key string) ([]byte, error) {
869 resp, err := c.client.GetThumbnail(context.Background(), &orlydbv1.GetThumbnailRequest{
870 Key: key,
871 })
872 if err != nil {
873 return nil, err
874 }
875 if !resp.Found {
876 return nil, errors.New("thumbnail not found")
877 }
878 return resp.Data, nil
879 }
880
881 func (c *Client) SaveThumbnail(key string, data []byte) error {
882 _, err := c.client.SaveThumbnail(context.Background(), &orlydbv1.SaveThumbnailRequest{
883 Key: key,
884 Data: data,
885 })
886 return err
887 }
888
889 // === Utility ===
890
891 func (c *Client) EventIdsBySerial(start uint64, count int) (evs []uint64, err error) {
892 resp, err := c.client.EventIdsBySerial(context.Background(), &orlydbv1.EventIdsBySerialRequest{
893 Start: start,
894 Count: int32(count),
895 })
896 if err != nil {
897 return nil, err
898 }
899 return resp.EventIds, nil
900 }
901
902 // === Helper Methods ===
903
904 type eventStream interface {
905 Recv() (*orlydbv1.EventBatch, error)
906 }
907
908 func (c *Client) collectStreamedEvents(stream eventStream) (event.S, error) {
909 var result event.S
910 for {
911 batch, err := stream.Recv()
912 if err == io.EOF {
913 return result, nil
914 }
915 if err != nil {
916 return nil, err
917 }
918 for _, ev := range batch.Events {
919 result = append(result, orlydbv1.ProtoToEvent(ev))
920 }
921 }
922 }
923
924 func protoToUint40s(resp *orlydbv1.SerialList) indextypes.Uint40s {
925 if resp == nil {
926 return nil
927 }
928 result := make(indextypes.Uint40s, 0, len(resp.Serials))
929 for _, s := range resp.Serials {
930 u := &indextypes.Uint40{}
931 _ = u.Set(s)
932 result = append(result, u)
933 }
934 return result
935 }
936
937 // === Cypher Query ===
938
939 // ExecuteCypherRead implements store.CypherExecutor by proxying through gRPC.
940 func (c *Client) ExecuteCypherRead(ctx context.Context, cypher string, params map[string]any) ([]map[string]any, error) {
941 // Encode params as JSON bytes for proto transport
942 protoParams := make(map[string][]byte, len(params))
943 for k, v := range params {
944 b, err := json.Marshal(v)
945 if err != nil {
946 return nil, fmt.Errorf("failed to marshal param %q: %w", k, err)
947 }
948 protoParams[k] = b
949 }
950
951 resp, err := c.client.ExecuteCypherRead(ctx, &orlydbv1.CypherReadRequest{
952 Cypher: cypher,
953 Params: protoParams,
954 })
955 if err != nil {
956 return nil, err
957 }
958 if resp.Error != "" {
959 return nil, fmt.Errorf("%s", resp.Error)
960 }
961
962 // Decode JSON records
963 records := make([]map[string]any, 0, len(resp.Records))
964 for _, raw := range resp.Records {
965 var rec map[string]any
966 if err := json.Unmarshal(raw, &rec); err != nil {
967 return nil, fmt.Errorf("failed to unmarshal record: %w", err)
968 }
969 records = append(records, rec)
970 }
971
972 return records, nil
973 }
974
975 // === Paid ACL ===
976
977 func (c *Client) SavePaidSubscription(sub *database.PaidSubscription) error {
978 _, err := c.client.SavePaidSubscription(context.Background(), orlydbv1.PaidSubscriptionToProto(sub))
979 return err
980 }
981
982 func (c *Client) GetPaidSubscription(pubkeyHex string) (*database.PaidSubscription, error) {
983 resp, err := c.client.GetPaidSubscription(context.Background(), &orlydbv1.GetPaidSubscriptionRequest{
984 PubkeyHex: pubkeyHex,
985 })
986 if err != nil {
987 return nil, err
988 }
989 return orlydbv1.ProtoToPaidSubscription(resp), nil
990 }
991
992 func (c *Client) DeletePaidSubscription(pubkeyHex string) error {
993 _, err := c.client.DeletePaidSubscription(context.Background(), &orlydbv1.DeletePaidSubscriptionRequest{
994 PubkeyHex: pubkeyHex,
995 })
996 return err
997 }
998
999 func (c *Client) ListPaidSubscriptions() ([]*database.PaidSubscription, error) {
1000 resp, err := c.client.ListPaidSubscriptions(context.Background(), &orlydbv1.Empty{})
1001 if err != nil {
1002 return nil, err
1003 }
1004 return orlydbv1.ProtoToPaidSubscriptionList(resp), nil
1005 }
1006
1007 func (c *Client) ClaimAlias(alias, pubkeyHex string) error {
1008 _, err := c.client.ClaimAlias(context.Background(), &orlydbv1.ClaimAliasRequest{
1009 Alias: alias,
1010 PubkeyHex: pubkeyHex,
1011 })
1012 return err
1013 }
1014
1015 func (c *Client) GetAliasByPubkey(pubkeyHex string) (string, error) {
1016 resp, err := c.client.GetAliasByPubkey(context.Background(), &orlydbv1.GetAliasByPubkeyRequest{
1017 PubkeyHex: pubkeyHex,
1018 })
1019 if err != nil {
1020 return "", err
1021 }
1022 return resp.Alias, nil
1023 }
1024
1025 func (c *Client) GetPubkeyByAlias(alias string) (string, error) {
1026 resp, err := c.client.GetPubkeyByAlias(context.Background(), &orlydbv1.GetPubkeyByAliasRequest{
1027 Alias: alias,
1028 })
1029 if err != nil {
1030 return "", err
1031 }
1032 return resp.PubkeyHex, nil
1033 }
1034
1035 func (c *Client) IsAliasTaken(alias string) (bool, error) {
1036 resp, err := c.client.IsAliasTaken(context.Background(), &orlydbv1.IsAliasTakenRequest{
1037 Alias: alias,
1038 })
1039 if err != nil {
1040 return false, err
1041 }
1042 return resp.Taken, nil
1043 }
1044
1045 // NRC (Nostr Relay Connect) stubs - not supported in gRPC client
1046 // NRC management must be done on the database server side
1047
1048 var errNRCNotSupportedGRPC = fmt.Errorf("NRC not supported in gRPC client - manage directly on database server")
1049
1050 func (c *Client) CreateNRCConnection(label string, createdBy []byte) (*database.NRCConnection, error) {
1051 return nil, errNRCNotSupportedGRPC
1052 }
1053
1054 func (c *Client) GetNRCConnection(id string) (*database.NRCConnection, error) {
1055 return nil, errNRCNotSupportedGRPC
1056 }
1057
1058 func (c *Client) GetNRCConnectionByDerivedPubkey(derivedPubkey []byte) (*database.NRCConnection, error) {
1059 return nil, errNRCNotSupportedGRPC
1060 }
1061
1062 func (c *Client) SaveNRCConnection(conn *database.NRCConnection) error {
1063 return errNRCNotSupportedGRPC
1064 }
1065
1066 func (c *Client) DeleteNRCConnection(id string) error {
1067 return errNRCNotSupportedGRPC
1068 }
1069
1070 func (c *Client) GetAllNRCConnections() ([]*database.NRCConnection, error) {
1071 return nil, errNRCNotSupportedGRPC
1072 }
1073
1074 func (c *Client) GetNRCAuthorizedSecrets() (map[string]string, error) {
1075 return nil, errNRCNotSupportedGRPC
1076 }
1077
1078 func (c *Client) UpdateNRCConnectionLastUsed(id string) error {
1079 return errNRCNotSupportedGRPC
1080 }
1081
1082 func (c *Client) GetNRCConnectionURI(conn *database.NRCConnection, relayPubkey []byte, rendezvousURL string) (string, error) {
1083 return "", errNRCNotSupportedGRPC
1084 }
1085