bridge.go raw
1 package bridge
2
3 import (
4 "context"
5 "fmt"
6 "os"
7 "strings"
8 "sync"
9 "time"
10
11 "next.orly.dev/pkg/nostr/crypto/encryption"
12 "next.orly.dev/pkg/nostr/encoders/bech32encoding"
13 "next.orly.dev/pkg/nostr/encoders/event"
14 "next.orly.dev/pkg/nostr/encoders/filter"
15 "next.orly.dev/pkg/nostr/encoders/hex"
16 "next.orly.dev/pkg/nostr/encoders/kind"
17 "next.orly.dev/pkg/nostr/encoders/tag"
18 "next.orly.dev/pkg/nostr/encoders/timestamp"
19 "next.orly.dev/pkg/nostr/interfaces/signer"
20 "next.orly.dev/pkg/lol/chk"
21 "next.orly.dev/pkg/lol/log"
22
23 aclgrpc "next.orly.dev/pkg/acl/grpc"
24 bridgesmtp "next.orly.dev/pkg/bridge/smtp"
25 )
26
27 // dmFormat tracks which DM protocol a sender last used.
28 type dmFormat int
29
30 const (
31 dmFormatKind4 dmFormat = iota // NIP-04 style (kind 4)
32 dmFormatGiftWrap // NIP-17 gift wrap (kind 1059)
33 )
34
35 // Bridge is the Nostr-Email bridge. It manages identity, relay connection,
36 // Marmot DM handling, and SMTP transport.
37 type Bridge struct {
38 cfg *Config
39 sign signer.I
40 source IdentitySource
41 relay *RelayConn
42
43 router *Router
44 smtpServer *bridgesmtp.Server
45
46 aclClient *aclgrpc.Client
47
48 // senderFormats tracks the DM format last used by each sender so the
49 // bridge replies in the same format. Protected by senderFmtMu.
50 senderFormats map[string]dmFormat
51 senderFmtMu sync.RWMutex
52
53 ctx context.Context
54 cancel context.CancelFunc
55 wg sync.WaitGroup
56
57 // dbGetter is a function that returns the relay identity secret key
58 // from the database. Non-nil in monolithic mode, nil in standalone.
59 dbGetter func() ([]byte, error)
60 }
61
62 // New creates a new Bridge. The dbGetter is optional — pass nil for standalone
63 // mode (the bridge will fall back to file-based identity).
64 func New(cfg *Config, dbGetter func() ([]byte, error)) *Bridge {
65 return &Bridge{
66 cfg: cfg,
67 dbGetter: dbGetter,
68 senderFormats: make(map[string]dmFormat),
69 }
70 }
71
72 // Start initializes the bridge: resolves identity, connects to relay, and
73 // begins listening for events.
74 func (b *Bridge) Start(ctx context.Context) error {
75 b.ctx, b.cancel = context.WithCancel(ctx)
76
77 // Ensure data directory exists
78 if err := os.MkdirAll(b.cfg.DataDir, 0700); err != nil {
79 return fmt.Errorf("create bridge data dir: %w", err)
80 }
81
82 // Resolve identity
83 sign, source, err := ResolveIdentity(b.cfg.NSEC, b.dbGetter, b.cfg.DataDir)
84 if err != nil {
85 return fmt.Errorf("resolve identity: %w", err)
86 }
87 b.sign = sign
88 b.source = source
89
90 pubHex := hex.Enc(b.sign.Pub())
91 npub, _ := bech32encoding.BinToNpub(b.sign.Pub())
92 sourceStr := identitySourceString(source)
93 log.I.F("bridge identity: %s (%s) [source: %s]", string(npub), string(pubHex), sourceStr)
94
95 if b.cfg.Domain != "" {
96 log.I.F("bridge email domain: %s", b.cfg.Domain)
97 }
98
99 // Connect to ACL gRPC server if configured
100 if b.cfg.ACLGRPCServer != "" {
101 aclClient, err := aclgrpc.New(b.ctx, &aclgrpc.ClientConfig{
102 ServerAddress: b.cfg.ACLGRPCServer,
103 ConnectTimeout: 15 * time.Second,
104 })
105 if err != nil {
106 return fmt.Errorf("connect to ACL gRPC server: %w", err)
107 }
108 b.aclClient = aclClient
109 log.I.F("bridge connected to ACL gRPC server at %s", b.cfg.ACLGRPCServer)
110 }
111
112 // Build the sendDM callback
113 sendDM := b.makeSendDM()
114
115 // Initialize components
116 if err := b.initComponents(sendDM); err != nil {
117 return fmt.Errorf("init components: %w", err)
118 }
119
120 // Start SMTP server for inbound email
121 if b.cfg.Domain != "" {
122 if err := b.startSMTPServer(sendDM); err != nil {
123 return fmt.Errorf("start SMTP server: %w", err)
124 }
125 }
126
127 // Connect to relay (standalone mode)
128 if b.cfg.RelayURL != "" {
129 b.relay = NewRelayConn(b.cfg.RelayURL, b.sign)
130 if err := b.relay.Connect(b.ctx); err != nil {
131 return fmt.Errorf("relay connection: %w", err)
132 }
133
134 // Start subscription loop in background
135 b.wg.Add(1)
136 go b.relayWatchLoop()
137
138 // Publish kind 0 profile if template exists
139 if err := b.publishProfile(); err != nil {
140 log.W.F("publish bridge profile: %v", err)
141 }
142 }
143
144 log.I.F("bridge started")
145 return nil
146 }
147
148 // initComponents sets up the router, outbound processor, subscription handler,
149 // and payment processor.
150 func (b *Bridge) initComponents(sendDM func(string, string) error) error {
151 // SMTP client for outbound email
152 smtpCfg := bridgesmtp.ClientConfig{
153 FromDomain: b.cfg.Domain,
154 RelayHost: b.cfg.SMTPRelayHost,
155 RelayPort: b.cfg.SMTPRelayPort,
156 RelayUsername: b.cfg.SMTPRelayUsername,
157 RelayPassword: b.cfg.SMTPRelayPassword,
158 }
159
160 // Load DKIM signer if configured
161 if b.cfg.DKIMKeyPath != "" {
162 dkim, err := bridgesmtp.NewDKIMSigner(b.cfg.Domain, b.cfg.DKIMSelector, b.cfg.DKIMKeyPath)
163 if err != nil {
164 log.W.F("DKIM signer init failed, continuing without DKIM: %v", err)
165 } else {
166 smtpCfg.DKIMSigner = dkim
167 }
168 }
169
170 smtpClient := bridgesmtp.NewClient(smtpCfg)
171
172 // Rate limiter
173 rateLimiter := NewRateLimiter(DefaultRateLimitConfig())
174
175 // Subscription store + handler
176 subStore, err := NewFileSubscriptionStore(b.cfg.DataDir)
177 if err != nil {
178 return fmt.Errorf("subscription store: %w", err)
179 }
180
181 var payments *PaymentProcessor
182 if b.cfg.NWCURI != "" {
183 payments, err = NewPaymentProcessor(b.cfg.NWCURI, b.cfg.MonthlyPriceSats)
184 if err != nil {
185 log.W.F("payment processor init failed: %v", err)
186 }
187 }
188
189 aliasPriceSats := b.cfg.AliasPriceSats
190 if aliasPriceSats == 0 && b.cfg.MonthlyPriceSats > 0 {
191 aliasPriceSats = b.cfg.MonthlyPriceSats * 2
192 }
193
194 subHandler := NewSubscriptionHandler(subStore, payments, sendDM, b.cfg.MonthlyPriceSats, b.aclClient, aliasPriceSats)
195 outbound := NewOutboundProcessor(smtpClient, rateLimiter, subHandler, b.cfg.Domain, sendDM, b.aclClient)
196 b.router = NewRouter(subHandler, outbound, sendDM)
197
198 return nil
199 }
200
201 // startSMTPServer starts the inbound SMTP server that receives forwarded mail.
202 func (b *Bridge) startSMTPServer(sendDM func(string, string) error) error {
203 listenAddr := fmt.Sprintf("%s:%d", b.cfg.SMTPHost, b.cfg.SMTPPort)
204 cfg := bridgesmtp.ServerConfig{
205 Domain: b.cfg.Domain,
206 ListenAddr: listenAddr,
207 MaxMessageBytes: 25 * 1024 * 1024,
208 MaxRecipients: 10,
209 ReadTimeout: 60 * time.Second,
210 WriteTimeout: 60 * time.Second,
211 }
212
213 // Inbound processor (no Blossom uploader for now)
214 inbound := NewInboundProcessor(nil, b.cfg.ComposeURL, sendDM)
215
216 handler := func(email *bridgesmtp.InboundEmail) error {
217 for _, to := range email.To {
218 pubkeyHex, err := resolveRecipientPubkey(to, b.cfg.Domain, b.aclClient)
219 if err != nil {
220 log.W.F("cannot resolve recipient %s: %v", to, err)
221 continue
222 }
223 if err := inbound.ProcessInbound(email, pubkeyHex); err != nil {
224 log.E.F("inbound processing failed for %s: %v", to, err)
225 }
226 }
227 return nil
228 }
229
230 b.smtpServer = bridgesmtp.NewServer(cfg, handler)
231 return b.smtpServer.Start()
232 }
233
234 // Stop gracefully shuts down the bridge.
235 func (b *Bridge) Stop() {
236 log.I.F("bridge stopping")
237 if b.cancel != nil {
238 b.cancel()
239 }
240 if b.smtpServer != nil {
241 b.smtpServer.Stop(context.Background())
242 }
243 if b.relay != nil {
244 b.relay.Close()
245 }
246 if b.aclClient != nil {
247 b.aclClient.Close()
248 }
249 b.wg.Wait()
250 log.I.F("bridge stopped")
251 }
252
253 // Signer returns the bridge's identity signer.
254 func (b *Bridge) Signer() signer.I {
255 return b.sign
256 }
257
258 // IdentitySource returns how the identity was resolved.
259 func (b *Bridge) IdentitySource() IdentitySource {
260 return b.source
261 }
262
263 // relayWatchLoop subscribes to kind 4 DMs addressed to the bridge and routes
264 // them through the router. It reconnects and resubscribes on disconnection.
265 func (b *Bridge) relayWatchLoop() {
266 defer b.wg.Done()
267
268 if b.relay == nil || b.sign == nil {
269 <-b.ctx.Done()
270 return
271 }
272
273 for {
274 if err := b.subscribeAndProcess(); err != nil {
275 if b.ctx.Err() != nil {
276 return // context cancelled, clean exit
277 }
278 log.W.F("subscription loop error: %v, reconnecting...", err)
279 if err := b.relay.Reconnect(); err != nil {
280 if b.ctx.Err() != nil {
281 return
282 }
283 log.E.F("reconnect failed: %v", err)
284 }
285 }
286 }
287 }
288
289 // subscribeAndProcess creates a subscription for DMs to the bridge and
290 // processes events until the channel closes or context is cancelled.
291 // Subscribes to both kind 4 (NIP-04) and kind 1059 (NIP-17 gift wrap).
292 func (b *Bridge) subscribeAndProcess() error {
293 bridgePubHex := hex.Enc(b.sign.Pub())
294
295 ff := filter.NewS(
296 &filter.F{
297 Kinds: kind.NewS(kind.New(4), kind.New(1059)),
298 Tags: tag.NewS(
299 tag.NewFromAny("p", bridgePubHex),
300 ),
301 Since: ×tamp.T{V: time.Now().Unix()},
302 },
303 )
304
305 sub, err := b.relay.Subscribe(b.ctx, ff)
306 if err != nil {
307 return fmt.Errorf("subscribe: %w", err)
308 }
309 defer sub.Close()
310
311 log.I.F("subscribed to DMs (kind 4 + 1059) for bridge pubkey %s", bridgePubHex)
312
313 for {
314 select {
315 case <-b.ctx.Done():
316 return nil
317 case ev, ok := <-sub.Events():
318 if !ok {
319 return fmt.Errorf("event channel closed")
320 }
321 switch ev.Kind {
322 case 1059:
323 b.handleGiftWrapEvent(ev)
324 default:
325 b.handleDMEvent(ev)
326 }
327 }
328 }
329 }
330
331 // handleDMEvent decrypts a kind 4 DM event and routes it.
332 func (b *Bridge) handleDMEvent(ev *event.E) {
333 senderPubHex := hex.Enc(ev.Pubkey[:])
334
335 // Derive conversation key for NIP-44 decryption
336 conversationKey, err := encryption.GenerateConversationKey(
337 b.sign.Sec(), ev.Pubkey[:],
338 )
339 if err != nil {
340 log.E.F("ECDH failed for sender %s: %v", senderPubHex, err)
341 return
342 }
343
344 decrypted, err := encryption.Decrypt(conversationKey, string(ev.Content))
345 if err != nil {
346 log.W.F("DM decryption failed from %s: %v", senderPubHex, err)
347 return
348 }
349
350 log.D.F("received kind 4 DM from %s: %d bytes", senderPubHex, len(decrypted))
351
352 b.recordSenderFormat(senderPubHex, dmFormatKind4)
353
354 if b.router != nil {
355 b.router.RouteDM(b.ctx, senderPubHex, decrypted)
356 }
357 }
358
359 // handleGiftWrapEvent unwraps a NIP-17 gift-wrapped DM (kind 1059) and routes it.
360 func (b *Bridge) handleGiftWrapEvent(ev *event.E) {
361 dm, err := unwrapGiftWrap(ev, b.sign)
362 if err != nil {
363 log.W.F("gift wrap unwrap failed: %v", err)
364 return
365 }
366
367 log.D.F("received NIP-17 DM from %s: %d bytes", dm.SenderPubHex, len(dm.Content))
368
369 b.recordSenderFormat(dm.SenderPubHex, dmFormatGiftWrap)
370
371 if b.router != nil {
372 b.router.RouteDM(b.ctx, dm.SenderPubHex, dm.Content)
373 }
374 }
375
376 // recordSenderFormat tracks the DM format a sender used.
377 func (b *Bridge) recordSenderFormat(pubkeyHex string, format dmFormat) {
378 b.senderFmtMu.Lock()
379 b.senderFormats[pubkeyHex] = format
380 b.senderFmtMu.Unlock()
381 }
382
383 // getSenderFormat returns the last DM format used by a sender.
384 // Returns dmFormatKind4 as default for unknown senders (backward compatible).
385 func (b *Bridge) getSenderFormat(pubkeyHex string) dmFormat {
386 b.senderFmtMu.RLock()
387 defer b.senderFmtMu.RUnlock()
388 f, ok := b.senderFormats[pubkeyHex]
389 if !ok {
390 return dmFormatKind4
391 }
392 return f
393 }
394
395 // makeSendDM returns a callback that encrypts and publishes DMs to the given
396 // pubkey via the relay. Replies in the same format the sender used: kind 4
397 // for NIP-04 senders, kind 1059 gift wrap for NIP-17 senders. For unknown
398 // senders (inbound email), sends kind 4 as the default.
399 func (b *Bridge) makeSendDM() func(pubkeyHex string, content string) error {
400 return func(pubkeyHex string, content string) error {
401 if b.relay == nil {
402 return fmt.Errorf("no relay connection")
403 }
404
405 format := b.getSenderFormat(pubkeyHex)
406
407 switch format {
408 case dmFormatGiftWrap:
409 if err := b.sendGiftWrapDM(pubkeyHex, content); err != nil {
410 return fmt.Errorf("gift wrap DM: %w", err)
411 }
412 log.D.F("sent NIP-17 DM to %s (%d bytes)", pubkeyHex, len(content))
413 default:
414 if err := b.sendKind4DM(pubkeyHex, content); err != nil {
415 return fmt.Errorf("kind 4 DM: %w", err)
416 }
417 log.D.F("sent kind 4 DM to %s (%d bytes)", pubkeyHex, len(content))
418 }
419
420 return nil
421 }
422 }
423
424 // sendKind4DM sends a kind 4 encrypted DM using NIP-44.
425 func (b *Bridge) sendKind4DM(pubkeyHex, content string) error {
426 recipientPub, err := hex.Dec(pubkeyHex)
427 if err != nil {
428 return fmt.Errorf("decode recipient pubkey: %w", err)
429 }
430
431 conversationKey, err := encryption.GenerateConversationKey(
432 b.sign.Sec(), recipientPub,
433 )
434 if err != nil {
435 return fmt.Errorf("ECDH: %w", err)
436 }
437
438 encrypted, err := encryption.Encrypt(conversationKey, []byte(content), nil)
439 if err != nil {
440 return fmt.Errorf("encrypt: %w", err)
441 }
442
443 ev := &event.E{
444 Content: []byte(encrypted),
445 CreatedAt: time.Now().Unix(),
446 Kind: 4,
447 Tags: tag.NewS(
448 tag.NewFromAny("p", pubkeyHex),
449 ),
450 }
451 if err := ev.Sign(b.sign); chk.E(err) {
452 return fmt.Errorf("sign: %w", err)
453 }
454 return b.relay.Publish(b.ctx, ev)
455 }
456
457 // sendGiftWrapDM sends a NIP-17 gift-wrapped DM (kind 1059).
458 func (b *Bridge) sendGiftWrapDM(pubkeyHex, content string) error {
459 gw, err := wrapGiftWrap(pubkeyHex, content, b.sign)
460 if err != nil {
461 return fmt.Errorf("wrap: %w", err)
462 }
463 return b.relay.Publish(b.ctx, gw)
464 }
465
466 // resolveRecipientPubkey extracts the Nostr pubkey hex from an email address
467 // local part. The local part can be an npub (bech32), a hex pubkey, or an alias.
468 // When aclClient is non-nil, alias lookup is attempted.
469 func resolveRecipientPubkey(emailAddr, domain string, aclClient *aclgrpc.Client) (string, error) {
470 parts := strings.SplitN(emailAddr, "@", 2)
471 if len(parts) != 2 {
472 return "", fmt.Errorf("invalid email address: %s", emailAddr)
473 }
474 local := strings.ToLower(parts[0])
475
476 // Try npub first
477 if strings.HasPrefix(local, "npub1") {
478 pubkey, err := bech32encoding.NpubToBytes([]byte(local))
479 if err != nil {
480 return "", fmt.Errorf("invalid npub: %w", err)
481 }
482 return hex.Enc(pubkey), nil
483 }
484
485 // Try full 64-char hex pubkey
486 if len(local) == 64 {
487 _, err := hex.Dec(local)
488 if err == nil {
489 return local, nil
490 }
491 }
492
493 // Try alias lookup via ACL
494 if aclClient != nil {
495 pubkey, err := aclClient.GetPubkeyByAlias(local)
496 if err == nil && pubkey != "" {
497 return pubkey, nil
498 }
499 }
500
501 return "", fmt.Errorf("cannot resolve pubkey from local part %q (must be npub, 64-char hex, or alias)", local)
502 }
503
504 func identitySourceString(s IdentitySource) string {
505 switch s {
506 case IdentityFromConfig:
507 return "config"
508 case IdentityFromDB:
509 return "database"
510 case IdentityFromFile:
511 return "file"
512 default:
513 return "unknown"
514 }
515 }
516