bridge.go raw

   1  package bridge
   2  
   3  import (
   4  	"context"
   5  	"fmt"
   6  	"os"
   7  	"strings"
   8  	"sync"
   9  	"time"
  10  
  11  	"next.orly.dev/pkg/nostr/crypto/encryption"
  12  	"next.orly.dev/pkg/nostr/encoders/bech32encoding"
  13  	"next.orly.dev/pkg/nostr/encoders/event"
  14  	"next.orly.dev/pkg/nostr/encoders/filter"
  15  	"next.orly.dev/pkg/nostr/encoders/hex"
  16  	"next.orly.dev/pkg/nostr/encoders/kind"
  17  	"next.orly.dev/pkg/nostr/encoders/tag"
  18  	"next.orly.dev/pkg/nostr/encoders/timestamp"
  19  	"next.orly.dev/pkg/nostr/interfaces/signer"
  20  	"next.orly.dev/pkg/lol/chk"
  21  	"next.orly.dev/pkg/lol/log"
  22  
  23  	aclgrpc "next.orly.dev/pkg/acl/grpc"
  24  	bridgesmtp "next.orly.dev/pkg/bridge/smtp"
  25  )
  26  
  27  // dmFormat tracks which DM protocol a sender last used.
  28  type dmFormat int
  29  
  30  const (
  31  	dmFormatKind4    dmFormat = iota // NIP-04 style (kind 4)
  32  	dmFormatGiftWrap                 // NIP-17 gift wrap (kind 1059)
  33  )
  34  
  35  // Bridge is the Nostr-Email bridge. It manages identity, relay connection,
  36  // Marmot DM handling, and SMTP transport.
  37  type Bridge struct {
  38  	cfg    *Config
  39  	sign   signer.I
  40  	source IdentitySource
  41  	relay  *RelayConn
  42  
  43  	router     *Router
  44  	smtpServer *bridgesmtp.Server
  45  
  46  	aclClient *aclgrpc.Client
  47  
  48  	// senderFormats tracks the DM format last used by each sender so the
  49  	// bridge replies in the same format. Protected by senderFmtMu.
  50  	senderFormats map[string]dmFormat
  51  	senderFmtMu   sync.RWMutex
  52  
  53  	ctx    context.Context
  54  	cancel context.CancelFunc
  55  	wg     sync.WaitGroup
  56  
  57  	// dbGetter is a function that returns the relay identity secret key
  58  	// from the database. Non-nil in monolithic mode, nil in standalone.
  59  	dbGetter func() ([]byte, error)
  60  }
  61  
  62  // New creates a new Bridge. The dbGetter is optional — pass nil for standalone
  63  // mode (the bridge will fall back to file-based identity).
  64  func New(cfg *Config, dbGetter func() ([]byte, error)) *Bridge {
  65  	return &Bridge{
  66  		cfg:           cfg,
  67  		dbGetter:      dbGetter,
  68  		senderFormats: make(map[string]dmFormat),
  69  	}
  70  }
  71  
  72  // Start initializes the bridge: resolves identity, connects to relay, and
  73  // begins listening for events.
  74  func (b *Bridge) Start(ctx context.Context) error {
  75  	b.ctx, b.cancel = context.WithCancel(ctx)
  76  
  77  	// Ensure data directory exists
  78  	if err := os.MkdirAll(b.cfg.DataDir, 0700); err != nil {
  79  		return fmt.Errorf("create bridge data dir: %w", err)
  80  	}
  81  
  82  	// Resolve identity
  83  	sign, source, err := ResolveIdentity(b.cfg.NSEC, b.dbGetter, b.cfg.DataDir)
  84  	if err != nil {
  85  		return fmt.Errorf("resolve identity: %w", err)
  86  	}
  87  	b.sign = sign
  88  	b.source = source
  89  
  90  	pubHex := hex.Enc(b.sign.Pub())
  91  	npub, _ := bech32encoding.BinToNpub(b.sign.Pub())
  92  	sourceStr := identitySourceString(source)
  93  	log.I.F("bridge identity: %s (%s) [source: %s]", string(npub), string(pubHex), sourceStr)
  94  
  95  	if b.cfg.Domain != "" {
  96  		log.I.F("bridge email domain: %s", b.cfg.Domain)
  97  	}
  98  
  99  	// Connect to ACL gRPC server if configured
 100  	if b.cfg.ACLGRPCServer != "" {
 101  		aclClient, err := aclgrpc.New(b.ctx, &aclgrpc.ClientConfig{
 102  			ServerAddress:  b.cfg.ACLGRPCServer,
 103  			ConnectTimeout: 15 * time.Second,
 104  		})
 105  		if err != nil {
 106  			return fmt.Errorf("connect to ACL gRPC server: %w", err)
 107  		}
 108  		b.aclClient = aclClient
 109  		log.I.F("bridge connected to ACL gRPC server at %s", b.cfg.ACLGRPCServer)
 110  	}
 111  
 112  	// Build the sendDM callback
 113  	sendDM := b.makeSendDM()
 114  
 115  	// Initialize components
 116  	if err := b.initComponents(sendDM); err != nil {
 117  		return fmt.Errorf("init components: %w", err)
 118  	}
 119  
 120  	// Start SMTP server for inbound email
 121  	if b.cfg.Domain != "" {
 122  		if err := b.startSMTPServer(sendDM); err != nil {
 123  			return fmt.Errorf("start SMTP server: %w", err)
 124  		}
 125  	}
 126  
 127  	// Connect to relay (standalone mode)
 128  	if b.cfg.RelayURL != "" {
 129  		b.relay = NewRelayConn(b.cfg.RelayURL, b.sign)
 130  		if err := b.relay.Connect(b.ctx); err != nil {
 131  			return fmt.Errorf("relay connection: %w", err)
 132  		}
 133  
 134  		// Start subscription loop in background
 135  		b.wg.Add(1)
 136  		go b.relayWatchLoop()
 137  
 138  		// Publish kind 0 profile if template exists
 139  		if err := b.publishProfile(); err != nil {
 140  			log.W.F("publish bridge profile: %v", err)
 141  		}
 142  	}
 143  
 144  	log.I.F("bridge started")
 145  	return nil
 146  }
 147  
 148  // initComponents sets up the router, outbound processor, subscription handler,
 149  // and payment processor.
 150  func (b *Bridge) initComponents(sendDM func(string, string) error) error {
 151  	// SMTP client for outbound email
 152  	smtpCfg := bridgesmtp.ClientConfig{
 153  		FromDomain:    b.cfg.Domain,
 154  		RelayHost:     b.cfg.SMTPRelayHost,
 155  		RelayPort:     b.cfg.SMTPRelayPort,
 156  		RelayUsername: b.cfg.SMTPRelayUsername,
 157  		RelayPassword: b.cfg.SMTPRelayPassword,
 158  	}
 159  
 160  	// Load DKIM signer if configured
 161  	if b.cfg.DKIMKeyPath != "" {
 162  		dkim, err := bridgesmtp.NewDKIMSigner(b.cfg.Domain, b.cfg.DKIMSelector, b.cfg.DKIMKeyPath)
 163  		if err != nil {
 164  			log.W.F("DKIM signer init failed, continuing without DKIM: %v", err)
 165  		} else {
 166  			smtpCfg.DKIMSigner = dkim
 167  		}
 168  	}
 169  
 170  	smtpClient := bridgesmtp.NewClient(smtpCfg)
 171  
 172  	// Rate limiter
 173  	rateLimiter := NewRateLimiter(DefaultRateLimitConfig())
 174  
 175  	// Subscription store + handler
 176  	subStore, err := NewFileSubscriptionStore(b.cfg.DataDir)
 177  	if err != nil {
 178  		return fmt.Errorf("subscription store: %w", err)
 179  	}
 180  
 181  	var payments *PaymentProcessor
 182  	if b.cfg.NWCURI != "" {
 183  		payments, err = NewPaymentProcessor(b.cfg.NWCURI, b.cfg.MonthlyPriceSats)
 184  		if err != nil {
 185  			log.W.F("payment processor init failed: %v", err)
 186  		}
 187  	}
 188  
 189  	aliasPriceSats := b.cfg.AliasPriceSats
 190  	if aliasPriceSats == 0 && b.cfg.MonthlyPriceSats > 0 {
 191  		aliasPriceSats = b.cfg.MonthlyPriceSats * 2
 192  	}
 193  
 194  	subHandler := NewSubscriptionHandler(subStore, payments, sendDM, b.cfg.MonthlyPriceSats, b.aclClient, aliasPriceSats)
 195  	outbound := NewOutboundProcessor(smtpClient, rateLimiter, subHandler, b.cfg.Domain, sendDM, b.aclClient)
 196  	b.router = NewRouter(subHandler, outbound, sendDM)
 197  
 198  	return nil
 199  }
 200  
 201  // startSMTPServer starts the inbound SMTP server that receives forwarded mail.
 202  func (b *Bridge) startSMTPServer(sendDM func(string, string) error) error {
 203  	listenAddr := fmt.Sprintf("%s:%d", b.cfg.SMTPHost, b.cfg.SMTPPort)
 204  	cfg := bridgesmtp.ServerConfig{
 205  		Domain:          b.cfg.Domain,
 206  		ListenAddr:      listenAddr,
 207  		MaxMessageBytes: 25 * 1024 * 1024,
 208  		MaxRecipients:   10,
 209  		ReadTimeout:     60 * time.Second,
 210  		WriteTimeout:    60 * time.Second,
 211  	}
 212  
 213  	// Inbound processor (no Blossom uploader for now)
 214  	inbound := NewInboundProcessor(nil, b.cfg.ComposeURL, sendDM)
 215  
 216  	handler := func(email *bridgesmtp.InboundEmail) error {
 217  		for _, to := range email.To {
 218  			pubkeyHex, err := resolveRecipientPubkey(to, b.cfg.Domain, b.aclClient)
 219  			if err != nil {
 220  				log.W.F("cannot resolve recipient %s: %v", to, err)
 221  				continue
 222  			}
 223  			if err := inbound.ProcessInbound(email, pubkeyHex); err != nil {
 224  				log.E.F("inbound processing failed for %s: %v", to, err)
 225  			}
 226  		}
 227  		return nil
 228  	}
 229  
 230  	b.smtpServer = bridgesmtp.NewServer(cfg, handler)
 231  	return b.smtpServer.Start()
 232  }
 233  
 234  // Stop gracefully shuts down the bridge.
 235  func (b *Bridge) Stop() {
 236  	log.I.F("bridge stopping")
 237  	if b.cancel != nil {
 238  		b.cancel()
 239  	}
 240  	if b.smtpServer != nil {
 241  		b.smtpServer.Stop(context.Background())
 242  	}
 243  	if b.relay != nil {
 244  		b.relay.Close()
 245  	}
 246  	if b.aclClient != nil {
 247  		b.aclClient.Close()
 248  	}
 249  	b.wg.Wait()
 250  	log.I.F("bridge stopped")
 251  }
 252  
 253  // Signer returns the bridge's identity signer.
 254  func (b *Bridge) Signer() signer.I {
 255  	return b.sign
 256  }
 257  
 258  // IdentitySource returns how the identity was resolved.
 259  func (b *Bridge) IdentitySource() IdentitySource {
 260  	return b.source
 261  }
 262  
 263  // relayWatchLoop subscribes to kind 4 DMs addressed to the bridge and routes
 264  // them through the router. It reconnects and resubscribes on disconnection.
 265  func (b *Bridge) relayWatchLoop() {
 266  	defer b.wg.Done()
 267  
 268  	if b.relay == nil || b.sign == nil {
 269  		<-b.ctx.Done()
 270  		return
 271  	}
 272  
 273  	for {
 274  		if err := b.subscribeAndProcess(); err != nil {
 275  			if b.ctx.Err() != nil {
 276  				return // context cancelled, clean exit
 277  			}
 278  			log.W.F("subscription loop error: %v, reconnecting...", err)
 279  			if err := b.relay.Reconnect(); err != nil {
 280  				if b.ctx.Err() != nil {
 281  					return
 282  				}
 283  				log.E.F("reconnect failed: %v", err)
 284  			}
 285  		}
 286  	}
 287  }
 288  
 289  // subscribeAndProcess creates a subscription for DMs to the bridge and
 290  // processes events until the channel closes or context is cancelled.
 291  // Subscribes to both kind 4 (NIP-04) and kind 1059 (NIP-17 gift wrap).
 292  func (b *Bridge) subscribeAndProcess() error {
 293  	bridgePubHex := hex.Enc(b.sign.Pub())
 294  
 295  	ff := filter.NewS(
 296  		&filter.F{
 297  			Kinds: kind.NewS(kind.New(4), kind.New(1059)),
 298  			Tags: tag.NewS(
 299  				tag.NewFromAny("p", bridgePubHex),
 300  			),
 301  			Since: &timestamp.T{V: time.Now().Unix()},
 302  		},
 303  	)
 304  
 305  	sub, err := b.relay.Subscribe(b.ctx, ff)
 306  	if err != nil {
 307  		return fmt.Errorf("subscribe: %w", err)
 308  	}
 309  	defer sub.Close()
 310  
 311  	log.I.F("subscribed to DMs (kind 4 + 1059) for bridge pubkey %s", bridgePubHex)
 312  
 313  	for {
 314  		select {
 315  		case <-b.ctx.Done():
 316  			return nil
 317  		case ev, ok := <-sub.Events():
 318  			if !ok {
 319  				return fmt.Errorf("event channel closed")
 320  			}
 321  			switch ev.Kind {
 322  			case 1059:
 323  				b.handleGiftWrapEvent(ev)
 324  			default:
 325  				b.handleDMEvent(ev)
 326  			}
 327  		}
 328  	}
 329  }
 330  
 331  // handleDMEvent decrypts a kind 4 DM event and routes it.
 332  func (b *Bridge) handleDMEvent(ev *event.E) {
 333  	senderPubHex := hex.Enc(ev.Pubkey[:])
 334  
 335  	// Derive conversation key for NIP-44 decryption
 336  	conversationKey, err := encryption.GenerateConversationKey(
 337  		b.sign.Sec(), ev.Pubkey[:],
 338  	)
 339  	if err != nil {
 340  		log.E.F("ECDH failed for sender %s: %v", senderPubHex, err)
 341  		return
 342  	}
 343  
 344  	decrypted, err := encryption.Decrypt(conversationKey, string(ev.Content))
 345  	if err != nil {
 346  		log.W.F("DM decryption failed from %s: %v", senderPubHex, err)
 347  		return
 348  	}
 349  
 350  	log.D.F("received kind 4 DM from %s: %d bytes", senderPubHex, len(decrypted))
 351  
 352  	b.recordSenderFormat(senderPubHex, dmFormatKind4)
 353  
 354  	if b.router != nil {
 355  		b.router.RouteDM(b.ctx, senderPubHex, decrypted)
 356  	}
 357  }
 358  
 359  // handleGiftWrapEvent unwraps a NIP-17 gift-wrapped DM (kind 1059) and routes it.
 360  func (b *Bridge) handleGiftWrapEvent(ev *event.E) {
 361  	dm, err := unwrapGiftWrap(ev, b.sign)
 362  	if err != nil {
 363  		log.W.F("gift wrap unwrap failed: %v", err)
 364  		return
 365  	}
 366  
 367  	log.D.F("received NIP-17 DM from %s: %d bytes", dm.SenderPubHex, len(dm.Content))
 368  
 369  	b.recordSenderFormat(dm.SenderPubHex, dmFormatGiftWrap)
 370  
 371  	if b.router != nil {
 372  		b.router.RouteDM(b.ctx, dm.SenderPubHex, dm.Content)
 373  	}
 374  }
 375  
 376  // recordSenderFormat tracks the DM format a sender used.
 377  func (b *Bridge) recordSenderFormat(pubkeyHex string, format dmFormat) {
 378  	b.senderFmtMu.Lock()
 379  	b.senderFormats[pubkeyHex] = format
 380  	b.senderFmtMu.Unlock()
 381  }
 382  
 383  // getSenderFormat returns the last DM format used by a sender.
 384  // Returns dmFormatKind4 as default for unknown senders (backward compatible).
 385  func (b *Bridge) getSenderFormat(pubkeyHex string) dmFormat {
 386  	b.senderFmtMu.RLock()
 387  	defer b.senderFmtMu.RUnlock()
 388  	f, ok := b.senderFormats[pubkeyHex]
 389  	if !ok {
 390  		return dmFormatKind4
 391  	}
 392  	return f
 393  }
 394  
 395  // makeSendDM returns a callback that encrypts and publishes DMs to the given
 396  // pubkey via the relay. Replies in the same format the sender used: kind 4
 397  // for NIP-04 senders, kind 1059 gift wrap for NIP-17 senders. For unknown
 398  // senders (inbound email), sends kind 4 as the default.
 399  func (b *Bridge) makeSendDM() func(pubkeyHex string, content string) error {
 400  	return func(pubkeyHex string, content string) error {
 401  		if b.relay == nil {
 402  			return fmt.Errorf("no relay connection")
 403  		}
 404  
 405  		format := b.getSenderFormat(pubkeyHex)
 406  
 407  		switch format {
 408  		case dmFormatGiftWrap:
 409  			if err := b.sendGiftWrapDM(pubkeyHex, content); err != nil {
 410  				return fmt.Errorf("gift wrap DM: %w", err)
 411  			}
 412  			log.D.F("sent NIP-17 DM to %s (%d bytes)", pubkeyHex, len(content))
 413  		default:
 414  			if err := b.sendKind4DM(pubkeyHex, content); err != nil {
 415  				return fmt.Errorf("kind 4 DM: %w", err)
 416  			}
 417  			log.D.F("sent kind 4 DM to %s (%d bytes)", pubkeyHex, len(content))
 418  		}
 419  
 420  		return nil
 421  	}
 422  }
 423  
 424  // sendKind4DM sends a kind 4 encrypted DM using NIP-44.
 425  func (b *Bridge) sendKind4DM(pubkeyHex, content string) error {
 426  	recipientPub, err := hex.Dec(pubkeyHex)
 427  	if err != nil {
 428  		return fmt.Errorf("decode recipient pubkey: %w", err)
 429  	}
 430  
 431  	conversationKey, err := encryption.GenerateConversationKey(
 432  		b.sign.Sec(), recipientPub,
 433  	)
 434  	if err != nil {
 435  		return fmt.Errorf("ECDH: %w", err)
 436  	}
 437  
 438  	encrypted, err := encryption.Encrypt(conversationKey, []byte(content), nil)
 439  	if err != nil {
 440  		return fmt.Errorf("encrypt: %w", err)
 441  	}
 442  
 443  	ev := &event.E{
 444  		Content:   []byte(encrypted),
 445  		CreatedAt: time.Now().Unix(),
 446  		Kind:      4,
 447  		Tags: tag.NewS(
 448  			tag.NewFromAny("p", pubkeyHex),
 449  		),
 450  	}
 451  	if err := ev.Sign(b.sign); chk.E(err) {
 452  		return fmt.Errorf("sign: %w", err)
 453  	}
 454  	return b.relay.Publish(b.ctx, ev)
 455  }
 456  
 457  // sendGiftWrapDM sends a NIP-17 gift-wrapped DM (kind 1059).
 458  func (b *Bridge) sendGiftWrapDM(pubkeyHex, content string) error {
 459  	gw, err := wrapGiftWrap(pubkeyHex, content, b.sign)
 460  	if err != nil {
 461  		return fmt.Errorf("wrap: %w", err)
 462  	}
 463  	return b.relay.Publish(b.ctx, gw)
 464  }
 465  
 466  // resolveRecipientPubkey extracts the Nostr pubkey hex from an email address
 467  // local part. The local part can be an npub (bech32), a hex pubkey, or an alias.
 468  // When aclClient is non-nil, alias lookup is attempted.
 469  func resolveRecipientPubkey(emailAddr, domain string, aclClient *aclgrpc.Client) (string, error) {
 470  	parts := strings.SplitN(emailAddr, "@", 2)
 471  	if len(parts) != 2 {
 472  		return "", fmt.Errorf("invalid email address: %s", emailAddr)
 473  	}
 474  	local := strings.ToLower(parts[0])
 475  
 476  	// Try npub first
 477  	if strings.HasPrefix(local, "npub1") {
 478  		pubkey, err := bech32encoding.NpubToBytes([]byte(local))
 479  		if err != nil {
 480  			return "", fmt.Errorf("invalid npub: %w", err)
 481  		}
 482  		return hex.Enc(pubkey), nil
 483  	}
 484  
 485  	// Try full 64-char hex pubkey
 486  	if len(local) == 64 {
 487  		_, err := hex.Dec(local)
 488  		if err == nil {
 489  			return local, nil
 490  		}
 491  	}
 492  
 493  	// Try alias lookup via ACL
 494  	if aclClient != nil {
 495  		pubkey, err := aclClient.GetPubkeyByAlias(local)
 496  		if err == nil && pubkey != "" {
 497  			return pubkey, nil
 498  		}
 499  	}
 500  
 501  	return "", fmt.Errorf("cannot resolve pubkey from local part %q (must be npub, 64-char hex, or alias)", local)
 502  }
 503  
 504  func identitySourceString(s IdentitySource) string {
 505  	switch s {
 506  	case IdentityFromConfig:
 507  		return "config"
 508  	case IdentityFromDB:
 509  		return "database"
 510  	case IdentityFromFile:
 511  		return "file"
 512  	default:
 513  		return "unknown"
 514  	}
 515  }
 516