client.go raw

   1  // Package grpc provides a gRPC client that implements the database.Database interface.
   2  // This allows the relay to use a remote database server via gRPC.
   3  package grpc
   4  
   5  import (
   6  	"context"
   7  	"encoding/json"
   8  	"errors"
   9  	"fmt"
  10  	"io"
  11  	"time"
  12  
  13  	"google.golang.org/grpc"
  14  	"google.golang.org/grpc/credentials/insecure"
  15  	"next.orly.dev/pkg/lol/chk"
  16  	"next.orly.dev/pkg/lol/log"
  17  
  18  	"next.orly.dev/pkg/nostr/encoders/event"
  19  	"next.orly.dev/pkg/nostr/encoders/filter"
  20  	"next.orly.dev/pkg/nostr/encoders/tag"
  21  	"next.orly.dev/pkg/database"
  22  	indextypes "next.orly.dev/pkg/database/indexes/types"
  23  	"next.orly.dev/pkg/interfaces/store"
  24  	orlydbv1 "next.orly.dev/pkg/proto/orlydb/v1"
  25  )
  26  
  27  // Client implements the database.Database interface via gRPC.
  28  type Client struct {
  29  	conn   *grpc.ClientConn
  30  	client orlydbv1.DatabaseServiceClient
  31  	ready  chan struct{}
  32  	path   string
  33  }
  34  
  35  // Verify Client implements database.Database at compile time.
  36  var _ database.Database = (*Client)(nil)
  37  
  38  // Verify Client implements CypherExecutor at compile time.
  39  var _ store.CypherExecutor = (*Client)(nil)
  40  
  41  // ClientConfig holds configuration for the gRPC client.
  42  type ClientConfig struct {
  43  	ServerAddress  string
  44  	ConnectTimeout time.Duration
  45  }
  46  
  47  // New creates a new gRPC database client.
  48  func New(ctx context.Context, cfg *ClientConfig) (*Client, error) {
  49  	timeout := cfg.ConnectTimeout
  50  	if timeout == 0 {
  51  		timeout = 10 * time.Second
  52  	}
  53  
  54  	dialCtx, cancel := context.WithTimeout(ctx, timeout)
  55  	defer cancel()
  56  
  57  	conn, err := grpc.DialContext(dialCtx, cfg.ServerAddress,
  58  		grpc.WithTransportCredentials(insecure.NewCredentials()),
  59  		grpc.WithDefaultCallOptions(
  60  			grpc.MaxCallRecvMsgSize(64<<20), // 64MB
  61  			grpc.MaxCallSendMsgSize(64<<20), // 64MB
  62  		),
  63  	)
  64  	if err != nil {
  65  		return nil, err
  66  	}
  67  
  68  	c := &Client{
  69  		conn:   conn,
  70  		client: orlydbv1.NewDatabaseServiceClient(conn),
  71  		ready:  make(chan struct{}),
  72  		path:   "grpc://" + cfg.ServerAddress,
  73  	}
  74  
  75  	// Check if server is ready
  76  	go c.waitForReady(ctx)
  77  
  78  	return c, nil
  79  }
  80  
  81  func (c *Client) waitForReady(ctx context.Context) {
  82  	for {
  83  		select {
  84  		case <-ctx.Done():
  85  			return
  86  		default:
  87  			resp, err := c.client.Ready(ctx, &orlydbv1.Empty{})
  88  			if err == nil && resp.Ready {
  89  				close(c.ready)
  90  				log.I.F("gRPC database client connected and ready")
  91  				return
  92  			}
  93  			time.Sleep(100 * time.Millisecond)
  94  		}
  95  	}
  96  }
  97  
  98  // === Lifecycle Methods ===
  99  
 100  func (c *Client) Path() string {
 101  	// Get the actual database path from the server so blossom can find blob files
 102  	resp, err := c.client.GetPath(context.Background(), &orlydbv1.Empty{})
 103  	if err != nil {
 104  		log.W.F("failed to get path from database server: %v, using local path", err)
 105  		return c.path
 106  	}
 107  	return resp.Path
 108  }
 109  
 110  func (c *Client) Init(path string) error {
 111  	// Not applicable for remote database
 112  	return nil
 113  }
 114  
 115  func (c *Client) Sync() error {
 116  	_, err := c.client.Sync(context.Background(), &orlydbv1.Empty{})
 117  	return err
 118  }
 119  
 120  func (c *Client) Close() error {
 121  	return c.conn.Close()
 122  }
 123  
 124  func (c *Client) Wipe() error {
 125  	// Not implemented for remote database (dangerous operation)
 126  	return nil
 127  }
 128  
 129  func (c *Client) SetLogLevel(level string) {
 130  	_, _ = c.client.SetLogLevel(context.Background(), &orlydbv1.SetLogLevelRequest{Level: level})
 131  }
 132  
 133  func (c *Client) Ready() <-chan struct{} {
 134  	return c.ready
 135  }
 136  
 137  // === Event Storage ===
 138  
 139  func (c *Client) SaveEvent(ctx context.Context, ev *event.E) (exists bool, err error) {
 140  	resp, err := c.client.SaveEvent(ctx, &orlydbv1.SaveEventRequest{
 141  		Event: orlydbv1.EventToProto(ev),
 142  	})
 143  	if err != nil {
 144  		return false, err
 145  	}
 146  	return resp.Exists, nil
 147  }
 148  
 149  func (c *Client) GetSerialsFromFilter(f *filter.F) (serials indextypes.Uint40s, err error) {
 150  	resp, err := c.client.GetSerialsFromFilter(context.Background(), &orlydbv1.GetSerialsFromFilterRequest{
 151  		Filter: orlydbv1.FilterToProto(f),
 152  	})
 153  	if err != nil {
 154  		return nil, err
 155  	}
 156  	return protoToUint40s(resp), nil
 157  }
 158  
 159  func (c *Client) WouldReplaceEvent(ev *event.E) (bool, indextypes.Uint40s, error) {
 160  	resp, err := c.client.WouldReplaceEvent(context.Background(), &orlydbv1.WouldReplaceEventRequest{
 161  		Event: orlydbv1.EventToProto(ev),
 162  	})
 163  	if err != nil {
 164  		return false, nil, err
 165  	}
 166  	serials := make(indextypes.Uint40s, 0, len(resp.ReplacedSerials))
 167  	for _, s := range resp.ReplacedSerials {
 168  		u := &indextypes.Uint40{}
 169  		_ = u.Set(s)
 170  		serials = append(serials, u)
 171  	}
 172  	return resp.WouldReplace, serials, nil
 173  }
 174  
 175  // === Event Queries ===
 176  
 177  func (c *Client) QueryEvents(ctx context.Context, f *filter.F) (evs event.S, err error) {
 178  	stream, err := c.client.QueryEvents(ctx, &orlydbv1.QueryEventsRequest{
 179  		Filter: orlydbv1.FilterToProto(f),
 180  	})
 181  	if err != nil {
 182  		return nil, err
 183  	}
 184  	return c.collectStreamedEvents(stream)
 185  }
 186  
 187  func (c *Client) QueryAllVersions(ctx context.Context, f *filter.F) (evs event.S, err error) {
 188  	stream, err := c.client.QueryAllVersions(ctx, &orlydbv1.QueryEventsRequest{
 189  		Filter: orlydbv1.FilterToProto(f),
 190  	})
 191  	if err != nil {
 192  		return nil, err
 193  	}
 194  	return c.collectStreamedEvents(stream)
 195  }
 196  
 197  func (c *Client) QueryEventsWithOptions(ctx context.Context, f *filter.F, includeDeleteEvents bool, showAllVersions bool) (evs event.S, err error) {
 198  	stream, err := c.client.QueryEventsWithOptions(ctx, &orlydbv1.QueryEventsWithOptionsRequest{
 199  		Filter:              orlydbv1.FilterToProto(f),
 200  		IncludeDeleteEvents: includeDeleteEvents,
 201  		ShowAllVersions:     showAllVersions,
 202  	})
 203  	if err != nil {
 204  		return nil, err
 205  	}
 206  	return c.collectStreamedEvents(stream)
 207  }
 208  
 209  func (c *Client) QueryDeleteEventsByTargetId(ctx context.Context, targetEventId []byte) (evs event.S, err error) {
 210  	stream, err := c.client.QueryDeleteEventsByTargetId(ctx, &orlydbv1.QueryDeleteEventsByTargetIdRequest{
 211  		TargetEventId: targetEventId,
 212  	})
 213  	if err != nil {
 214  		return nil, err
 215  	}
 216  	return c.collectStreamedEvents(stream)
 217  }
 218  
 219  func (c *Client) QueryForSerials(ctx context.Context, f *filter.F) (serials indextypes.Uint40s, err error) {
 220  	resp, err := c.client.QueryForSerials(ctx, &orlydbv1.QueryEventsRequest{
 221  		Filter: orlydbv1.FilterToProto(f),
 222  	})
 223  	if err != nil {
 224  		return nil, err
 225  	}
 226  	return protoToUint40s(resp), nil
 227  }
 228  
 229  func (c *Client) QueryForIds(ctx context.Context, f *filter.F) (idPkTs []*store.IdPkTs, err error) {
 230  	resp, err := c.client.QueryForIds(ctx, &orlydbv1.QueryEventsRequest{
 231  		Filter: orlydbv1.FilterToProto(f),
 232  	})
 233  	if err != nil {
 234  		return nil, err
 235  	}
 236  	return orlydbv1.ProtoToIdPkTsList(resp), nil
 237  }
 238  
 239  func (c *Client) CountEvents(ctx context.Context, f *filter.F) (count int, approximate bool, err error) {
 240  	resp, err := c.client.CountEvents(ctx, &orlydbv1.QueryEventsRequest{
 241  		Filter: orlydbv1.FilterToProto(f),
 242  	})
 243  	if err != nil {
 244  		return 0, false, err
 245  	}
 246  	return int(resp.Count), resp.Approximate, nil
 247  }
 248  
 249  // === Event Retrieval by Serial ===
 250  
 251  func (c *Client) FetchEventBySerial(ser *indextypes.Uint40) (ev *event.E, err error) {
 252  	resp, err := c.client.FetchEventBySerial(context.Background(), &orlydbv1.FetchEventBySerialRequest{
 253  		Serial: ser.Get(),
 254  	})
 255  	if err != nil {
 256  		return nil, err
 257  	}
 258  	if !resp.Found {
 259  		return nil, nil
 260  	}
 261  	return orlydbv1.ProtoToEvent(resp.Event), nil
 262  }
 263  
 264  func (c *Client) FetchEventsBySerials(serials []*indextypes.Uint40) (events map[uint64]*event.E, err error) {
 265  	serialList := make([]uint64, 0, len(serials))
 266  	for _, s := range serials {
 267  		serialList = append(serialList, s.Get())
 268  	}
 269  	resp, err := c.client.FetchEventsBySerials(context.Background(), &orlydbv1.FetchEventsBySerialRequest{
 270  		Serials: serialList,
 271  	})
 272  	if err != nil {
 273  		return nil, err
 274  	}
 275  	return orlydbv1.ProtoToEventMap(resp), nil
 276  }
 277  
 278  func (c *Client) GetSerialById(id []byte) (ser *indextypes.Uint40, err error) {
 279  	resp, err := c.client.GetSerialById(context.Background(), &orlydbv1.GetSerialByIdRequest{
 280  		Id: id,
 281  	})
 282  	if err != nil {
 283  		return nil, err
 284  	}
 285  	if !resp.Found {
 286  		return nil, nil
 287  	}
 288  	u := &indextypes.Uint40{}
 289  	_ = u.Set(resp.Serial)
 290  	return u, nil
 291  }
 292  
 293  func (c *Client) GetSerialsByIds(ids *tag.T) (serials map[string]*indextypes.Uint40, err error) {
 294  	idList := make([][]byte, 0, len(ids.T))
 295  	for _, id := range ids.T {
 296  		idList = append(idList, id)
 297  	}
 298  	resp, err := c.client.GetSerialsByIds(context.Background(), &orlydbv1.GetSerialsByIdsRequest{
 299  		Ids: idList,
 300  	})
 301  	if err != nil {
 302  		return nil, err
 303  	}
 304  	result := make(map[string]*indextypes.Uint40)
 305  	for k, v := range resp.Serials {
 306  		u := &indextypes.Uint40{}
 307  		_ = u.Set(v)
 308  		result[k] = u
 309  	}
 310  	return result, nil
 311  }
 312  
 313  func (c *Client) GetSerialsByIdsWithFilter(ids *tag.T, fn func(ev *event.E, ser *indextypes.Uint40) bool) (serials map[string]*indextypes.Uint40, err error) {
 314  	// Note: Filter function cannot be passed over gRPC, so we just get all serials
 315  	return c.GetSerialsByIds(ids)
 316  }
 317  
 318  func (c *Client) GetSerialsByRange(idx database.Range) (serials indextypes.Uint40s, err error) {
 319  	resp, err := c.client.GetSerialsByRange(context.Background(), &orlydbv1.GetSerialsByRangeRequest{
 320  		Range: orlydbv1.RangeToProto(idx),
 321  	})
 322  	if err != nil {
 323  		return nil, err
 324  	}
 325  	return protoToUint40s(resp), nil
 326  }
 327  
 328  func (c *Client) GetFullIdPubkeyBySerial(ser *indextypes.Uint40) (fidpk *store.IdPkTs, err error) {
 329  	resp, err := c.client.GetFullIdPubkeyBySerial(context.Background(), &orlydbv1.GetFullIdPubkeyBySerialRequest{
 330  		Serial: ser.Get(),
 331  	})
 332  	if err != nil {
 333  		return nil, err
 334  	}
 335  	return orlydbv1.ProtoToIdPkTs(resp), nil
 336  }
 337  
 338  func (c *Client) GetFullIdPubkeyBySerials(sers []*indextypes.Uint40) (fidpks []*store.IdPkTs, err error) {
 339  	serialList := make([]uint64, 0, len(sers))
 340  	for _, s := range sers {
 341  		serialList = append(serialList, s.Get())
 342  	}
 343  	resp, err := c.client.GetFullIdPubkeyBySerials(context.Background(), &orlydbv1.GetFullIdPubkeyBySerialsRequest{
 344  		Serials: serialList,
 345  	})
 346  	if err != nil {
 347  		return nil, err
 348  	}
 349  	return orlydbv1.ProtoToIdPkTsList(resp), nil
 350  }
 351  
 352  // === Event Deletion ===
 353  
 354  func (c *Client) DeleteEvent(ctx context.Context, eid []byte) error {
 355  	_, err := c.client.DeleteEvent(ctx, &orlydbv1.DeleteEventRequest{
 356  		EventId: eid,
 357  	})
 358  	return err
 359  }
 360  
 361  func (c *Client) DeleteEventBySerial(ctx context.Context, ser *indextypes.Uint40, ev *event.E) error {
 362  	_, err := c.client.DeleteEventBySerial(ctx, &orlydbv1.DeleteEventBySerialRequest{
 363  		Serial: ser.Get(),
 364  		Event:  orlydbv1.EventToProto(ev),
 365  	})
 366  	return err
 367  }
 368  
 369  func (c *Client) DeleteExpired() {
 370  	_, _ = c.client.DeleteExpired(context.Background(), &orlydbv1.Empty{})
 371  }
 372  
 373  func (c *Client) ProcessDelete(ev *event.E, admins [][]byte) error {
 374  	_, err := c.client.ProcessDelete(context.Background(), &orlydbv1.ProcessDeleteRequest{
 375  		Event:  orlydbv1.EventToProto(ev),
 376  		Admins: admins,
 377  	})
 378  	return err
 379  }
 380  
 381  func (c *Client) CheckForDeleted(ev *event.E, admins [][]byte) error {
 382  	_, err := c.client.CheckForDeleted(context.Background(), &orlydbv1.CheckForDeletedRequest{
 383  		Event:  orlydbv1.EventToProto(ev),
 384  		Admins: admins,
 385  	})
 386  	return err
 387  }
 388  
 389  // === Import/Export ===
 390  
 391  func (c *Client) Import(rr io.Reader) {
 392  	stream, err := c.client.Import(context.Background())
 393  	if chk.E(err) {
 394  		return
 395  	}
 396  
 397  	buf := make([]byte, 64*1024)
 398  	for {
 399  		n, err := rr.Read(buf)
 400  		if err == io.EOF {
 401  			break
 402  		}
 403  		if chk.E(err) {
 404  			return
 405  		}
 406  		if err := stream.Send(&orlydbv1.ImportChunk{Data: buf[:n]}); chk.E(err) {
 407  			return
 408  		}
 409  	}
 410  
 411  	_, _ = stream.CloseAndRecv()
 412  }
 413  
 414  func (c *Client) Export(ctx context.Context, w io.Writer, pubkeys ...[]byte) {
 415  	stream, err := c.client.Export(ctx, &orlydbv1.ExportRequest{
 416  		Pubkeys: pubkeys,
 417  	})
 418  	if chk.E(err) {
 419  		return
 420  	}
 421  
 422  	for {
 423  		chunk, err := stream.Recv()
 424  		if err == io.EOF {
 425  			return
 426  		}
 427  		if chk.E(err) {
 428  			return
 429  		}
 430  		if _, err := w.Write(chunk.Data); chk.E(err) {
 431  			return
 432  		}
 433  	}
 434  }
 435  
 436  func (c *Client) ImportEventsFromReader(ctx context.Context, rr io.Reader) error {
 437  	c.Import(rr)
 438  	return nil
 439  }
 440  
 441  func (c *Client) ImportEventsFromStrings(ctx context.Context, eventJSONs []string, policyManager interface{ CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error) }) error {
 442  	_, err := c.client.ImportEventsFromStrings(ctx, &orlydbv1.ImportEventsFromStringsRequest{
 443  		EventJsons:  eventJSONs,
 444  		CheckPolicy: policyManager != nil,
 445  	})
 446  	return err
 447  }
 448  
 449  // === Relay Identity ===
 450  
 451  func (c *Client) GetRelayIdentitySecret() (skb []byte, err error) {
 452  	resp, err := c.client.GetRelayIdentitySecret(context.Background(), &orlydbv1.Empty{})
 453  	if err != nil {
 454  		return nil, err
 455  	}
 456  	return resp.SecretKey, nil
 457  }
 458  
 459  func (c *Client) SetRelayIdentitySecret(skb []byte) error {
 460  	_, err := c.client.SetRelayIdentitySecret(context.Background(), &orlydbv1.SetRelayIdentitySecretRequest{
 461  		SecretKey: skb,
 462  	})
 463  	return err
 464  }
 465  
 466  func (c *Client) GetOrCreateRelayIdentitySecret() (skb []byte, err error) {
 467  	resp, err := c.client.GetOrCreateRelayIdentitySecret(context.Background(), &orlydbv1.Empty{})
 468  	if err != nil {
 469  		return nil, err
 470  	}
 471  	return resp.SecretKey, nil
 472  }
 473  
 474  // === Markers ===
 475  
 476  func (c *Client) SetMarker(key string, value []byte) error {
 477  	_, err := c.client.SetMarker(context.Background(), &orlydbv1.SetMarkerRequest{
 478  		Key:   key,
 479  		Value: value,
 480  	})
 481  	return err
 482  }
 483  
 484  func (c *Client) GetMarker(key string) (value []byte, err error) {
 485  	resp, err := c.client.GetMarker(context.Background(), &orlydbv1.GetMarkerRequest{
 486  		Key: key,
 487  	})
 488  	if err != nil {
 489  		return nil, err
 490  	}
 491  	if !resp.Found {
 492  		return nil, nil
 493  	}
 494  	return resp.Value, nil
 495  }
 496  
 497  func (c *Client) HasMarker(key string) bool {
 498  	resp, err := c.client.HasMarker(context.Background(), &orlydbv1.HasMarkerRequest{
 499  		Key: key,
 500  	})
 501  	if err != nil {
 502  		return false
 503  	}
 504  	return resp.Exists
 505  }
 506  
 507  func (c *Client) DeleteMarker(key string) error {
 508  	_, err := c.client.DeleteMarker(context.Background(), &orlydbv1.DeleteMarkerRequest{
 509  		Key: key,
 510  	})
 511  	return err
 512  }
 513  
 514  // === Subscriptions ===
 515  
 516  func (c *Client) GetSubscription(pubkey []byte) (*database.Subscription, error) {
 517  	resp, err := c.client.GetSubscription(context.Background(), &orlydbv1.GetSubscriptionRequest{
 518  		Pubkey: pubkey,
 519  	})
 520  	if err != nil {
 521  		return nil, err
 522  	}
 523  	return orlydbv1.ProtoToSubscription(resp), nil
 524  }
 525  
 526  func (c *Client) IsSubscriptionActive(pubkey []byte) (bool, error) {
 527  	resp, err := c.client.IsSubscriptionActive(context.Background(), &orlydbv1.IsSubscriptionActiveRequest{
 528  		Pubkey: pubkey,
 529  	})
 530  	if err != nil {
 531  		return false, err
 532  	}
 533  	return resp.Active, nil
 534  }
 535  
 536  func (c *Client) ExtendSubscription(pubkey []byte, days int) error {
 537  	_, err := c.client.ExtendSubscription(context.Background(), &orlydbv1.ExtendSubscriptionRequest{
 538  		Pubkey: pubkey,
 539  		Days:   int32(days),
 540  	})
 541  	return err
 542  }
 543  
 544  func (c *Client) RecordPayment(pubkey []byte, amount int64, invoice, preimage string) error {
 545  	_, err := c.client.RecordPayment(context.Background(), &orlydbv1.RecordPaymentRequest{
 546  		Pubkey:   pubkey,
 547  		Amount:   amount,
 548  		Invoice:  invoice,
 549  		Preimage: preimage,
 550  	})
 551  	return err
 552  }
 553  
 554  func (c *Client) GetPaymentHistory(pubkey []byte) ([]database.Payment, error) {
 555  	resp, err := c.client.GetPaymentHistory(context.Background(), &orlydbv1.GetPaymentHistoryRequest{
 556  		Pubkey: pubkey,
 557  	})
 558  	if err != nil {
 559  		return nil, err
 560  	}
 561  	return orlydbv1.ProtoToPaymentList(resp), nil
 562  }
 563  
 564  func (c *Client) ExtendBlossomSubscription(pubkey []byte, tier string, storageMB int64, daysExtended int) error {
 565  	_, err := c.client.ExtendBlossomSubscription(context.Background(), &orlydbv1.ExtendBlossomSubscriptionRequest{
 566  		Pubkey:       pubkey,
 567  		Tier:         tier,
 568  		StorageMb:    storageMB,
 569  		DaysExtended: int32(daysExtended),
 570  	})
 571  	return err
 572  }
 573  
 574  func (c *Client) GetBlossomStorageQuota(pubkey []byte) (quotaMB int64, err error) {
 575  	resp, err := c.client.GetBlossomStorageQuota(context.Background(), &orlydbv1.GetBlossomStorageQuotaRequest{
 576  		Pubkey: pubkey,
 577  	})
 578  	if err != nil {
 579  		return 0, err
 580  	}
 581  	return resp.QuotaMb, nil
 582  }
 583  
 584  func (c *Client) IsFirstTimeUser(pubkey []byte) (bool, error) {
 585  	resp, err := c.client.IsFirstTimeUser(context.Background(), &orlydbv1.IsFirstTimeUserRequest{
 586  		Pubkey: pubkey,
 587  	})
 588  	if err != nil {
 589  		return false, err
 590  	}
 591  	return resp.FirstTime, nil
 592  }
 593  
 594  // === NIP-43 ===
 595  
 596  func (c *Client) AddNIP43Member(pubkey []byte, inviteCode string) error {
 597  	_, err := c.client.AddNIP43Member(context.Background(), &orlydbv1.AddNIP43MemberRequest{
 598  		Pubkey:     pubkey,
 599  		InviteCode: inviteCode,
 600  	})
 601  	return err
 602  }
 603  
 604  func (c *Client) RemoveNIP43Member(pubkey []byte) error {
 605  	_, err := c.client.RemoveNIP43Member(context.Background(), &orlydbv1.RemoveNIP43MemberRequest{
 606  		Pubkey: pubkey,
 607  	})
 608  	return err
 609  }
 610  
 611  func (c *Client) IsNIP43Member(pubkey []byte) (isMember bool, err error) {
 612  	resp, err := c.client.IsNIP43Member(context.Background(), &orlydbv1.IsNIP43MemberRequest{
 613  		Pubkey: pubkey,
 614  	})
 615  	if err != nil {
 616  		return false, err
 617  	}
 618  	return resp.IsMember, nil
 619  }
 620  
 621  func (c *Client) GetNIP43Membership(pubkey []byte) (*database.NIP43Membership, error) {
 622  	resp, err := c.client.GetNIP43Membership(context.Background(), &orlydbv1.GetNIP43MembershipRequest{
 623  		Pubkey: pubkey,
 624  	})
 625  	if err != nil {
 626  		return nil, err
 627  	}
 628  	return orlydbv1.ProtoToNIP43Membership(resp), nil
 629  }
 630  
 631  func (c *Client) GetAllNIP43Members() ([][]byte, error) {
 632  	resp, err := c.client.GetAllNIP43Members(context.Background(), &orlydbv1.Empty{})
 633  	if err != nil {
 634  		return nil, err
 635  	}
 636  	return resp.Pubkeys, nil
 637  }
 638  
 639  func (c *Client) StoreInviteCode(code string, expiresAt time.Time) error {
 640  	_, err := c.client.StoreInviteCode(context.Background(), &orlydbv1.StoreInviteCodeRequest{
 641  		Code:      code,
 642  		ExpiresAt: expiresAt.Unix(),
 643  	})
 644  	return err
 645  }
 646  
 647  func (c *Client) ValidateInviteCode(code string) (valid bool, err error) {
 648  	resp, err := c.client.ValidateInviteCode(context.Background(), &orlydbv1.ValidateInviteCodeRequest{
 649  		Code: code,
 650  	})
 651  	if err != nil {
 652  		return false, err
 653  	}
 654  	return resp.Valid, nil
 655  }
 656  
 657  func (c *Client) DeleteInviteCode(code string) error {
 658  	_, err := c.client.DeleteInviteCode(context.Background(), &orlydbv1.DeleteInviteCodeRequest{
 659  		Code: code,
 660  	})
 661  	return err
 662  }
 663  
 664  func (c *Client) PublishNIP43MembershipEvent(kind int, pubkey []byte) error {
 665  	_, err := c.client.PublishNIP43MembershipEvent(context.Background(), &orlydbv1.PublishNIP43MembershipEventRequest{
 666  		Kind:   int32(kind),
 667  		Pubkey: pubkey,
 668  	})
 669  	return err
 670  }
 671  
 672  // === Migrations ===
 673  
 674  func (c *Client) RunMigrations() {
 675  	_, _ = c.client.RunMigrations(context.Background(), &orlydbv1.Empty{})
 676  }
 677  
 678  // === Query Cache ===
 679  
 680  func (c *Client) GetCachedJSON(f *filter.F) ([][]byte, bool) {
 681  	resp, err := c.client.GetCachedJSON(context.Background(), &orlydbv1.GetCachedJSONRequest{
 682  		Filter: orlydbv1.FilterToProto(f),
 683  	})
 684  	if err != nil {
 685  		return nil, false
 686  	}
 687  	return resp.JsonItems, resp.Found
 688  }
 689  
 690  func (c *Client) CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) {
 691  	_, _ = c.client.CacheMarshaledJSON(context.Background(), &orlydbv1.CacheMarshaledJSONRequest{
 692  		Filter:    orlydbv1.FilterToProto(f),
 693  		JsonItems: marshaledJSON,
 694  	})
 695  }
 696  
 697  func (c *Client) GetCachedEvents(f *filter.F) (event.S, bool) {
 698  	resp, err := c.client.GetCachedEvents(context.Background(), &orlydbv1.GetCachedEventsRequest{
 699  		Filter: orlydbv1.FilterToProto(f),
 700  	})
 701  	if err != nil {
 702  		return nil, false
 703  	}
 704  	return orlydbv1.ProtoToEvents(resp.Events), resp.Found
 705  }
 706  
 707  func (c *Client) CacheEvents(f *filter.F, events event.S) {
 708  	_, _ = c.client.CacheEvents(context.Background(), &orlydbv1.CacheEventsRequest{
 709  		Filter: orlydbv1.FilterToProto(f),
 710  		Events: orlydbv1.EventsToProto(events),
 711  	})
 712  }
 713  
 714  func (c *Client) InvalidateQueryCache() {
 715  	_, _ = c.client.InvalidateQueryCache(context.Background(), &orlydbv1.Empty{})
 716  }
 717  
 718  // === Access Tracking ===
 719  
 720  func (c *Client) RecordEventAccess(serial uint64, connectionID string) error {
 721  	_, err := c.client.RecordEventAccess(context.Background(), &orlydbv1.RecordEventAccessRequest{
 722  		Serial:       serial,
 723  		ConnectionId: connectionID,
 724  	})
 725  	return err
 726  }
 727  
 728  func (c *Client) GetEventAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error) {
 729  	resp, err := c.client.GetEventAccessInfo(context.Background(), &orlydbv1.GetEventAccessInfoRequest{
 730  		Serial: serial,
 731  	})
 732  	if err != nil {
 733  		return 0, 0, err
 734  	}
 735  	return resp.LastAccess, resp.AccessCount, nil
 736  }
 737  
 738  func (c *Client) GetLeastAccessedEvents(limit int, minAgeSec int64) (serials []uint64, err error) {
 739  	resp, err := c.client.GetLeastAccessedEvents(context.Background(), &orlydbv1.GetLeastAccessedEventsRequest{
 740  		Limit:     int32(limit),
 741  		MinAgeSec: minAgeSec,
 742  	})
 743  	if err != nil {
 744  		return nil, err
 745  	}
 746  	return resp.Serials, nil
 747  }
 748  
 749  // === Blob Storage (Blossom) ===
 750  
 751  func (c *Client) SaveBlob(sha256Hash []byte, data []byte, pubkey []byte, mimeType string, extension string) error {
 752  	_, err := c.client.SaveBlob(context.Background(), &orlydbv1.SaveBlobRequest{
 753  		Sha256Hash: sha256Hash,
 754  		Data:       data,
 755  		Pubkey:     pubkey,
 756  		MimeType:   mimeType,
 757  		Extension:  extension,
 758  	})
 759  	return err
 760  }
 761  
 762  func (c *Client) SaveBlobMetadata(sha256Hash []byte, size int64, pubkey []byte, mimeType string, extension string) error {
 763  	_, err := c.client.SaveBlobMetadata(context.Background(), &orlydbv1.SaveBlobMetadataRequest{
 764  		Sha256Hash: sha256Hash,
 765  		Size:       size,
 766  		Pubkey:     pubkey,
 767  		MimeType:   mimeType,
 768  		Extension:  extension,
 769  	})
 770  	return err
 771  }
 772  
 773  func (c *Client) GetBlob(sha256Hash []byte) (data []byte, metadata *database.BlobMetadata, err error) {
 774  	resp, err := c.client.GetBlob(context.Background(), &orlydbv1.GetBlobRequest{
 775  		Sha256Hash: sha256Hash,
 776  	})
 777  	if err != nil {
 778  		return nil, nil, err
 779  	}
 780  	if !resp.Found {
 781  		return nil, nil, nil
 782  	}
 783  	return resp.Data, orlydbv1.ProtoToBlobMetadata(resp.Metadata), nil
 784  }
 785  
 786  func (c *Client) HasBlob(sha256Hash []byte) (exists bool, err error) {
 787  	resp, err := c.client.HasBlob(context.Background(), &orlydbv1.HasBlobRequest{
 788  		Sha256Hash: sha256Hash,
 789  	})
 790  	if err != nil {
 791  		return false, err
 792  	}
 793  	return resp.Exists, nil
 794  }
 795  
 796  func (c *Client) DeleteBlob(sha256Hash []byte, pubkey []byte) error {
 797  	_, err := c.client.DeleteBlob(context.Background(), &orlydbv1.DeleteBlobRequest{
 798  		Sha256Hash: sha256Hash,
 799  		Pubkey:     pubkey,
 800  	})
 801  	return err
 802  }
 803  
 804  func (c *Client) ListBlobs(pubkey []byte, since, until int64) ([]*database.BlobDescriptor, error) {
 805  	resp, err := c.client.ListBlobs(context.Background(), &orlydbv1.ListBlobsRequest{
 806  		Pubkey: pubkey,
 807  		Since:  since,
 808  		Until:  until,
 809  	})
 810  	if err != nil {
 811  		return nil, err
 812  	}
 813  	return orlydbv1.ProtoToBlobDescriptorList(resp.Descriptors), nil
 814  }
 815  
 816  func (c *Client) GetBlobMetadata(sha256Hash []byte) (*database.BlobMetadata, error) {
 817  	resp, err := c.client.GetBlobMetadata(context.Background(), &orlydbv1.GetBlobMetadataRequest{
 818  		Sha256Hash: sha256Hash,
 819  	})
 820  	if err != nil {
 821  		return nil, err
 822  	}
 823  	return orlydbv1.ProtoToBlobMetadata(resp), nil
 824  }
 825  
 826  func (c *Client) GetTotalBlobStorageUsed(pubkey []byte) (totalMB int64, err error) {
 827  	resp, err := c.client.GetTotalBlobStorageUsed(context.Background(), &orlydbv1.GetTotalBlobStorageUsedRequest{
 828  		Pubkey: pubkey,
 829  	})
 830  	if err != nil {
 831  		return 0, err
 832  	}
 833  	return resp.TotalMb, nil
 834  }
 835  
 836  func (c *Client) SaveBlobReport(sha256Hash []byte, reportData []byte) error {
 837  	_, err := c.client.SaveBlobReport(context.Background(), &orlydbv1.SaveBlobReportRequest{
 838  		Sha256Hash: sha256Hash,
 839  		ReportData: reportData,
 840  	})
 841  	return err
 842  }
 843  
 844  func (c *Client) ListAllBlobUserStats() ([]*database.UserBlobStats, error) {
 845  	resp, err := c.client.ListAllBlobUserStats(context.Background(), &orlydbv1.Empty{})
 846  	if err != nil {
 847  		return nil, err
 848  	}
 849  	return orlydbv1.ProtoToUserBlobStatsList(resp.Stats), nil
 850  }
 851  
 852  func (c *Client) ReconcileBlobMetadata() (reconciled int, err error) {
 853  	resp, err := c.client.ReconcileBlobMetadata(context.Background(), &orlydbv1.Empty{})
 854  	if err != nil {
 855  		return 0, err
 856  	}
 857  	return int(resp.Reconciled), nil
 858  }
 859  
 860  func (c *Client) ListAllBlobs() ([]*database.BlobDescriptor, error) {
 861  	resp, err := c.client.ListAllBlobs(context.Background(), &orlydbv1.Empty{})
 862  	if err != nil {
 863  		return nil, err
 864  	}
 865  	return orlydbv1.ProtoToBlobDescriptorList(resp.Descriptors), nil
 866  }
 867  
 868  func (c *Client) GetThumbnail(key string) ([]byte, error) {
 869  	resp, err := c.client.GetThumbnail(context.Background(), &orlydbv1.GetThumbnailRequest{
 870  		Key: key,
 871  	})
 872  	if err != nil {
 873  		return nil, err
 874  	}
 875  	if !resp.Found {
 876  		return nil, errors.New("thumbnail not found")
 877  	}
 878  	return resp.Data, nil
 879  }
 880  
 881  func (c *Client) SaveThumbnail(key string, data []byte) error {
 882  	_, err := c.client.SaveThumbnail(context.Background(), &orlydbv1.SaveThumbnailRequest{
 883  		Key:  key,
 884  		Data: data,
 885  	})
 886  	return err
 887  }
 888  
 889  // === Utility ===
 890  
 891  func (c *Client) EventIdsBySerial(start uint64, count int) (evs []uint64, err error) {
 892  	resp, err := c.client.EventIdsBySerial(context.Background(), &orlydbv1.EventIdsBySerialRequest{
 893  		Start: start,
 894  		Count: int32(count),
 895  	})
 896  	if err != nil {
 897  		return nil, err
 898  	}
 899  	return resp.EventIds, nil
 900  }
 901  
 902  // === Helper Methods ===
 903  
 904  type eventStream interface {
 905  	Recv() (*orlydbv1.EventBatch, error)
 906  }
 907  
 908  func (c *Client) collectStreamedEvents(stream eventStream) (event.S, error) {
 909  	var result event.S
 910  	for {
 911  		batch, err := stream.Recv()
 912  		if err == io.EOF {
 913  			return result, nil
 914  		}
 915  		if err != nil {
 916  			return nil, err
 917  		}
 918  		for _, ev := range batch.Events {
 919  			result = append(result, orlydbv1.ProtoToEvent(ev))
 920  		}
 921  	}
 922  }
 923  
 924  func protoToUint40s(resp *orlydbv1.SerialList) indextypes.Uint40s {
 925  	if resp == nil {
 926  		return nil
 927  	}
 928  	result := make(indextypes.Uint40s, 0, len(resp.Serials))
 929  	for _, s := range resp.Serials {
 930  		u := &indextypes.Uint40{}
 931  		_ = u.Set(s)
 932  		result = append(result, u)
 933  	}
 934  	return result
 935  }
 936  
 937  // === Cypher Query ===
 938  
 939  // ExecuteCypherRead implements store.CypherExecutor by proxying through gRPC.
 940  func (c *Client) ExecuteCypherRead(ctx context.Context, cypher string, params map[string]any) ([]map[string]any, error) {
 941  	// Encode params as JSON bytes for proto transport
 942  	protoParams := make(map[string][]byte, len(params))
 943  	for k, v := range params {
 944  		b, err := json.Marshal(v)
 945  		if err != nil {
 946  			return nil, fmt.Errorf("failed to marshal param %q: %w", k, err)
 947  		}
 948  		protoParams[k] = b
 949  	}
 950  
 951  	resp, err := c.client.ExecuteCypherRead(ctx, &orlydbv1.CypherReadRequest{
 952  		Cypher: cypher,
 953  		Params: protoParams,
 954  	})
 955  	if err != nil {
 956  		return nil, err
 957  	}
 958  	if resp.Error != "" {
 959  		return nil, fmt.Errorf("%s", resp.Error)
 960  	}
 961  
 962  	// Decode JSON records
 963  	records := make([]map[string]any, 0, len(resp.Records))
 964  	for _, raw := range resp.Records {
 965  		var rec map[string]any
 966  		if err := json.Unmarshal(raw, &rec); err != nil {
 967  			return nil, fmt.Errorf("failed to unmarshal record: %w", err)
 968  		}
 969  		records = append(records, rec)
 970  	}
 971  
 972  	return records, nil
 973  }
 974  
 975  // === Paid ACL ===
 976  
 977  func (c *Client) SavePaidSubscription(sub *database.PaidSubscription) error {
 978  	_, err := c.client.SavePaidSubscription(context.Background(), orlydbv1.PaidSubscriptionToProto(sub))
 979  	return err
 980  }
 981  
 982  func (c *Client) GetPaidSubscription(pubkeyHex string) (*database.PaidSubscription, error) {
 983  	resp, err := c.client.GetPaidSubscription(context.Background(), &orlydbv1.GetPaidSubscriptionRequest{
 984  		PubkeyHex: pubkeyHex,
 985  	})
 986  	if err != nil {
 987  		return nil, err
 988  	}
 989  	return orlydbv1.ProtoToPaidSubscription(resp), nil
 990  }
 991  
 992  func (c *Client) DeletePaidSubscription(pubkeyHex string) error {
 993  	_, err := c.client.DeletePaidSubscription(context.Background(), &orlydbv1.DeletePaidSubscriptionRequest{
 994  		PubkeyHex: pubkeyHex,
 995  	})
 996  	return err
 997  }
 998  
 999  func (c *Client) ListPaidSubscriptions() ([]*database.PaidSubscription, error) {
1000  	resp, err := c.client.ListPaidSubscriptions(context.Background(), &orlydbv1.Empty{})
1001  	if err != nil {
1002  		return nil, err
1003  	}
1004  	return orlydbv1.ProtoToPaidSubscriptionList(resp), nil
1005  }
1006  
1007  func (c *Client) ClaimAlias(alias, pubkeyHex string) error {
1008  	_, err := c.client.ClaimAlias(context.Background(), &orlydbv1.ClaimAliasRequest{
1009  		Alias:     alias,
1010  		PubkeyHex: pubkeyHex,
1011  	})
1012  	return err
1013  }
1014  
1015  func (c *Client) GetAliasByPubkey(pubkeyHex string) (string, error) {
1016  	resp, err := c.client.GetAliasByPubkey(context.Background(), &orlydbv1.GetAliasByPubkeyRequest{
1017  		PubkeyHex: pubkeyHex,
1018  	})
1019  	if err != nil {
1020  		return "", err
1021  	}
1022  	return resp.Alias, nil
1023  }
1024  
1025  func (c *Client) GetPubkeyByAlias(alias string) (string, error) {
1026  	resp, err := c.client.GetPubkeyByAlias(context.Background(), &orlydbv1.GetPubkeyByAliasRequest{
1027  		Alias: alias,
1028  	})
1029  	if err != nil {
1030  		return "", err
1031  	}
1032  	return resp.PubkeyHex, nil
1033  }
1034  
1035  func (c *Client) IsAliasTaken(alias string) (bool, error) {
1036  	resp, err := c.client.IsAliasTaken(context.Background(), &orlydbv1.IsAliasTakenRequest{
1037  		Alias: alias,
1038  	})
1039  	if err != nil {
1040  		return false, err
1041  	}
1042  	return resp.Taken, nil
1043  }
1044  
1045  // NRC (Nostr Relay Connect) stubs - not supported in gRPC client
1046  // NRC management must be done on the database server side
1047  
1048  var errNRCNotSupportedGRPC = fmt.Errorf("NRC not supported in gRPC client - manage directly on database server")
1049  
1050  func (c *Client) CreateNRCConnection(label string, createdBy []byte) (*database.NRCConnection, error) {
1051  	return nil, errNRCNotSupportedGRPC
1052  }
1053  
1054  func (c *Client) GetNRCConnection(id string) (*database.NRCConnection, error) {
1055  	return nil, errNRCNotSupportedGRPC
1056  }
1057  
1058  func (c *Client) GetNRCConnectionByDerivedPubkey(derivedPubkey []byte) (*database.NRCConnection, error) {
1059  	return nil, errNRCNotSupportedGRPC
1060  }
1061  
1062  func (c *Client) SaveNRCConnection(conn *database.NRCConnection) error {
1063  	return errNRCNotSupportedGRPC
1064  }
1065  
1066  func (c *Client) DeleteNRCConnection(id string) error {
1067  	return errNRCNotSupportedGRPC
1068  }
1069  
1070  func (c *Client) GetAllNRCConnections() ([]*database.NRCConnection, error) {
1071  	return nil, errNRCNotSupportedGRPC
1072  }
1073  
1074  func (c *Client) GetNRCAuthorizedSecrets() (map[string]string, error) {
1075  	return nil, errNRCNotSupportedGRPC
1076  }
1077  
1078  func (c *Client) UpdateNRCConnectionLastUsed(id string) error {
1079  	return errNRCNotSupportedGRPC
1080  }
1081  
1082  func (c *Client) GetNRCConnectionURI(conn *database.NRCConnection, relayPubkey []byte, rendezvousURL string) (string, error) {
1083  	return "", errNRCNotSupportedGRPC
1084  }
1085