embedded.go raw

   1  package negentropy
   2  
   3  import (
   4  	"context"
   5  	"encoding/hex"
   6  	"fmt"
   7  
   8  	"next.orly.dev/pkg/lol/log"
   9  
  10  	"next.orly.dev/pkg/nostr/encoders/filter"
  11  	"next.orly.dev/pkg/nostr/encoders/kind"
  12  	"next.orly.dev/pkg/nostr/encoders/tag"
  13  	"next.orly.dev/pkg/nostr/encoders/timestamp"
  14  	negentropylib "next.orly.dev/pkg/nostr/negentropy"
  15  	"next.orly.dev/pkg/database"
  16  	negentropyiface "next.orly.dev/pkg/interfaces/negentropy"
  17  	commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
  18  )
  19  
  20  // EmbeddedHandler wraps the negentropy Manager to implement the Handler interface.
  21  // This allows negentropy to run embedded in monolithic mode without gRPC.
  22  type EmbeddedHandler struct {
  23  	mgr   *Manager
  24  	db    database.Database
  25  	ready chan struct{}
  26  }
  27  
  28  // NewEmbeddedHandler creates a new embedded negentropy handler.
  29  func NewEmbeddedHandler(db database.Database, cfg *Config) *EmbeddedHandler {
  30  	h := &EmbeddedHandler{
  31  		mgr:   NewManager(db, cfg),
  32  		db:    db,
  33  		ready: make(chan struct{}),
  34  	}
  35  	close(h.ready) // Immediately ready in embedded mode
  36  	return h
  37  }
  38  
  39  // Start starts the background sync loop.
  40  func (h *EmbeddedHandler) Start() {
  41  	h.mgr.Start()
  42  }
  43  
  44  // Stop stops the background sync.
  45  func (h *EmbeddedHandler) Stop() {
  46  	h.mgr.Stop()
  47  }
  48  
  49  // Ready returns a channel that closes when the handler is ready.
  50  func (h *EmbeddedHandler) Ready() <-chan struct{} {
  51  	return h.ready
  52  }
  53  
  54  // Close cleans up resources.
  55  func (h *EmbeddedHandler) Close() error {
  56  	h.mgr.Stop()
  57  	return nil
  58  }
  59  
  60  // HandleNegOpen processes a NEG-OPEN message from a client.
  61  func (h *EmbeddedHandler) HandleNegOpen(ctx context.Context, connectionID, subscriptionID string, protoFilter *commonv1.Filter, initialMessage []byte) ([]byte, [][]byte, [][]byte, bool, string, error) {
  62  	// Open a session for this client
  63  	session := h.mgr.OpenSession(connectionID, subscriptionID)
  64  
  65  	// Build storage from local events matching the filter
  66  	storage, err := h.buildStorageForFilter(ctx, protoFilter)
  67  	if err != nil {
  68  		log.E.F("NEG-OPEN: failed to build storage: %v", err)
  69  		return nil, nil, nil, false, fmt.Sprintf("failed to build storage: %v", err), nil
  70  	}
  71  
  72  	log.D.F("NEG-OPEN: built storage with %d events", storage.Size())
  73  
  74  	// Create negentropy instance for this session
  75  	neg := negentropylib.New(storage, negentropylib.DefaultFrameSizeLimit)
  76  
  77  	// Store in session for later use
  78  	session.SetNegentropy(neg, storage)
  79  
  80  	// If we have an initial message from client, process it
  81  	var respMsg []byte
  82  	var complete bool
  83  	if len(initialMessage) > 0 {
  84  		respMsg, complete, err = neg.Reconcile(initialMessage)
  85  		if err != nil {
  86  			log.E.F("NEG-OPEN: reconcile failed: %v", err)
  87  			return nil, nil, nil, false, fmt.Sprintf("reconcile failed: %v", err), nil
  88  		}
  89  		log.D.F("NEG-OPEN: reconcile complete=%v, response len=%d", complete, len(respMsg))
  90  		// Debug: dump first bytes and initial message first bytes
  91  		if len(respMsg) > 0 {
  92  			end := 64
  93  			if end > len(respMsg) {
  94  				end = len(respMsg)
  95  			}
  96  			log.D.F("NEG-OPEN: initial msg first 64 bytes: %x", initialMessage[:min(64, len(initialMessage))])
  97  			log.D.F("NEG-OPEN: response first 64 bytes: %x", respMsg[:end])
  98  		}
  99  	} else {
 100  		// No initial message, start as server (initiator)
 101  		respMsg, err = neg.Start()
 102  		if err != nil {
 103  			log.E.F("NEG-OPEN: failed to start: %v", err)
 104  			return nil, nil, nil, false, fmt.Sprintf("failed to start: %v", err), nil
 105  		}
 106  		log.D.F("NEG-OPEN: started negentropy, initial msg len=%d", len(respMsg))
 107  	}
 108  
 109  	// Collect IDs we have that client needs (to send as events)
 110  	haveIDs := neg.CollectHaves()
 111  	var haveIDBytes [][]byte
 112  	for _, id := range haveIDs {
 113  		if decoded, err := hex.DecodeString(id); err == nil {
 114  			haveIDBytes = append(haveIDBytes, decoded)
 115  		}
 116  	}
 117  
 118  	// Collect IDs we need from client
 119  	needIDs := neg.CollectHaveNots()
 120  	var needIDBytes [][]byte
 121  	for _, id := range needIDs {
 122  		if decoded, err := hex.DecodeString(id); err == nil {
 123  			needIDBytes = append(needIDBytes, decoded)
 124  		}
 125  	}
 126  
 127  	log.D.F("NEG-OPEN: complete=%v, haves=%d, needs=%d, response len=%d",
 128  		complete, len(haveIDs), len(needIDs), len(respMsg))
 129  
 130  	return respMsg, haveIDBytes, needIDBytes, complete, "", nil
 131  }
 132  
 133  // HandleNegMsg processes a NEG-MSG message from a client.
 134  func (h *EmbeddedHandler) HandleNegMsg(ctx context.Context, connectionID, subscriptionID string, message []byte) ([]byte, [][]byte, [][]byte, bool, string, error) {
 135  	// Update session activity
 136  	h.mgr.UpdateSessionActivity(connectionID, subscriptionID)
 137  
 138  	// Look up session
 139  	session, ok := h.mgr.GetSession(connectionID, subscriptionID)
 140  	if !ok {
 141  		return nil, nil, nil, false, "session not found", nil
 142  	}
 143  
 144  	neg := session.GetNegentropy()
 145  	if neg == nil {
 146  		return nil, nil, nil, false, "session has no negentropy state", nil
 147  	}
 148  
 149  	// Process the message
 150  	respMsg, complete, err := neg.Reconcile(message)
 151  	if err != nil {
 152  		log.E.F("NEG-MSG: reconcile failed: %v", err)
 153  		return nil, nil, nil, false, fmt.Sprintf("reconcile failed: %v", err), nil
 154  	}
 155  
 156  	// Collect IDs we have that client needs
 157  	haveIDs := neg.CollectHaves()
 158  	var haveIDBytes [][]byte
 159  	for _, id := range haveIDs {
 160  		if decoded, err := hex.DecodeString(id); err == nil {
 161  			haveIDBytes = append(haveIDBytes, decoded)
 162  		}
 163  	}
 164  
 165  	// Collect IDs we need from client
 166  	needIDs := neg.CollectHaveNots()
 167  	var needIDBytes [][]byte
 168  	for _, id := range needIDs {
 169  		if decoded, err := hex.DecodeString(id); err == nil {
 170  			needIDBytes = append(needIDBytes, decoded)
 171  		}
 172  	}
 173  
 174  	log.D.F("NEG-MSG: complete=%v, haves=%d, needs=%d, response len=%d",
 175  		complete, len(haveIDs), len(needIDs), len(respMsg))
 176  
 177  	return respMsg, haveIDBytes, needIDBytes, complete, "", nil
 178  }
 179  
 180  // HandleNegClose processes a NEG-CLOSE message from a client.
 181  func (h *EmbeddedHandler) HandleNegClose(ctx context.Context, connectionID, subscriptionID string) error {
 182  	h.mgr.CloseSession(connectionID, subscriptionID)
 183  	return nil
 184  }
 185  
 186  // ListSessions returns active client negentropy sessions.
 187  func (h *EmbeddedHandler) ListSessions(ctx context.Context) ([]*negentropyiface.ClientSession, error) {
 188  	sessions := h.mgr.ListSessions()
 189  
 190  	result := make([]*negentropyiface.ClientSession, 0, len(sessions))
 191  	for _, sess := range sessions {
 192  		result = append(result, &negentropyiface.ClientSession{
 193  			SubscriptionID: sess.SubscriptionID,
 194  			ConnectionID:   sess.ConnectionID,
 195  			CreatedAt:      sess.CreatedAt.Unix(),
 196  			LastActivity:   sess.LastActivity.Unix(),
 197  			RoundCount:     sess.RoundCount,
 198  		})
 199  	}
 200  	return result, nil
 201  }
 202  
 203  // CloseSession forcefully closes a client session.
 204  func (h *EmbeddedHandler) CloseSession(ctx context.Context, connectionID, subscriptionID string) error {
 205  	if connectionID == "" {
 206  		// Close all sessions with this subscription ID
 207  		sessions := h.mgr.ListSessions()
 208  		for _, sess := range sessions {
 209  			if sess.SubscriptionID == subscriptionID {
 210  				h.mgr.CloseSession(sess.ConnectionID, sess.SubscriptionID)
 211  			}
 212  		}
 213  	} else {
 214  		h.mgr.CloseSession(connectionID, subscriptionID)
 215  	}
 216  	return nil
 217  }
 218  
 219  // buildStorageForFilter creates a negentropy Vector from local events matching the filter.
 220  func (h *EmbeddedHandler) buildStorageForFilter(ctx context.Context, protoFilter *commonv1.Filter) (*negentropylib.Vector, error) {
 221  	storage := negentropylib.NewVector()
 222  
 223  	// Convert proto filter to nostr filter
 224  	f := protoToFilter(protoFilter)
 225  
 226  	// If no filter provided, use a reasonable limit
 227  	if f == nil {
 228  		limit := uint(1000000)
 229  		f = &filter.F{Limit: &limit}
 230  	}
 231  	if f.Limit == nil {
 232  		limit := uint(1000000)
 233  		f.Limit = &limit
 234  	}
 235  
 236  	// Query events from database
 237  	idPkTs, err := h.db.QueryForIds(ctx, f)
 238  	if err != nil {
 239  		return nil, fmt.Errorf("failed to query events: %w", err)
 240  	}
 241  
 242  	for _, item := range idPkTs {
 243  		storage.Insert(item.Ts, item.IDHex())
 244  	}
 245  
 246  	storage.Seal()
 247  	return storage, nil
 248  }
 249  
 250  // protoToFilter converts a proto filter to a nostr filter.
 251  func protoToFilter(pf *commonv1.Filter) *filter.F {
 252  	if pf == nil {
 253  		return nil
 254  	}
 255  
 256  	f := &filter.F{}
 257  
 258  	// Convert IDs (binary 32-byte event IDs)
 259  	if len(pf.Ids) > 0 {
 260  		f.Ids = &tag.T{T: pf.Ids}
 261  	}
 262  
 263  	// Convert Kinds (uint32 → kind.K with uint16)
 264  	if len(pf.Kinds) > 0 {
 265  		ks := kind.NewWithCap(len(pf.Kinds))
 266  		for _, k := range pf.Kinds {
 267  			ks.K = append(ks.K, kind.New(k))
 268  		}
 269  		f.Kinds = ks
 270  	}
 271  
 272  	// Convert Authors (binary 32-byte pubkeys)
 273  	if len(pf.Authors) > 0 {
 274  		f.Authors = &tag.T{T: pf.Authors}
 275  	}
 276  
 277  	// Convert Since
 278  	if pf.Since != nil {
 279  		f.Since = timestamp.New(*pf.Since)
 280  	}
 281  
 282  	// Convert Until
 283  	if pf.Until != nil {
 284  		f.Until = timestamp.New(*pf.Until)
 285  	}
 286  
 287  	// Convert Limit
 288  	if pf.Limit != nil {
 289  		limit := uint(*pf.Limit)
 290  		f.Limit = &limit
 291  	}
 292  
 293  	return f
 294  }
 295