mls.go raw

   1  package bridge
   2  
   3  import (
   4  	"context"
   5  	"fmt"
   6  	"path/filepath"
   7  	"time"
   8  
   9  	"git.smesh.lol/orly/pkg/lol/log"
  10  	"git.smesh.lol/orly/pkg/nostr/encoders/event"
  11  	"git.smesh.lol/orly/pkg/nostr/encoders/filter"
  12  	"git.smesh.lol/orly/pkg/nostr/encoders/hex"
  13  	"git.smesh.lol/orly/pkg/nostr/protocol/marmot"
  14  )
  15  
  16  // relayAdapter adapts *RelayConn to the marmot.RelayConnection interface.
  17  type relayAdapter struct {
  18  	relay *RelayConn
  19  }
  20  
  21  func (a *relayAdapter) Publish(ctx context.Context, ev *event.E) error {
  22  	return a.relay.Publish(ctx, ev)
  23  }
  24  
  25  func (a *relayAdapter) Subscribe(ctx context.Context, ff *filter.S) (marmot.EventStream, error) {
  26  	sub, err := a.relay.Subscribe(ctx, ff)
  27  	if err != nil {
  28  		return nil, err
  29  	}
  30  	return sub, nil
  31  }
  32  
  33  // initMLS initializes the MLS client for the bridge.
  34  func (b *Bridge) initMLS() error {
  35  	groupDir := filepath.Join(b.cfg.DataDir, "mls-groups")
  36  	store, err := marmot.NewFileGroupStore(groupDir)
  37  	if err != nil {
  38  		return fmt.Errorf("create MLS group store: %w", err)
  39  	}
  40  
  41  	relayURL := b.cfg.PublicRelayURL
  42  	if relayURL == "" {
  43  		relayURL = b.cfg.RelayURL
  44  	}
  45  
  46  	adapter := &relayAdapter{relay: b.relay}
  47  	client, err := marmot.NewClient(&marmot.LocalCrypto{Sign: b.sign}, store, adapter, relayURL)
  48  	if err != nil {
  49  		return fmt.Errorf("create MLS client: %w", err)
  50  	}
  51  
  52  	client.OnDM(b.handleMLSDM)
  53  	client.OnGroupJoined(b.handleMLSGroupJoined)
  54  	b.mlsClient = client
  55  
  56  	// Publish our key package so MLS peers can find us
  57  	if err := client.PublishKeyPackage(b.ctx); err != nil {
  58  		log.W.F("publish MLS key package: %v", err)
  59  	} else {
  60  		log.I.F("published MLS key package (kind 443)")
  61  	}
  62  
  63  	// Publish key package relay list
  64  	if relayURL != "" {
  65  		if err := client.PublishKeyPackageRelays(b.ctx, []string{relayURL}); err != nil {
  66  			log.W.F("publish MLS key package relays: %v", err)
  67  		} else {
  68  			log.I.F("published MLS key package relay list (kind 10051)")
  69  		}
  70  	}
  71  
  72  	// Start MLS subscription loop in background
  73  	b.wg.Add(1)
  74  	go b.mlsWatchLoop()
  75  
  76  	return nil
  77  }
  78  
  79  // handleMLSDM is the callback for incoming decrypted MLS DMs.
  80  func (b *Bridge) handleMLSDM(senderPub []byte, plaintext []byte) {
  81  	senderPubHex := hex.Enc(senderPub)
  82  
  83  	bridgePubHex := hex.Enc(b.sign.Pub())
  84  	if senderPubHex == bridgePubHex {
  85  		return
  86  	}
  87  
  88  	log.I.F("received MLS DM from %s: %d bytes", senderPubHex, len(plaintext))
  89  
  90  	if b.router != nil {
  91  		b.router.RouteDM(b.ctx, senderPubHex, string(plaintext))
  92  	}
  93  }
  94  
  95  // handleMLSGroupJoined is called when a new peer establishes a group.
  96  // No welcome message — the peer's first DM will get a response (command
  97  // result, email processing, or auto-help for unrecognized input).
  98  func (b *Bridge) handleMLSGroupJoined(peerPub []byte) {
  99  	peerHex := hex.Enc(peerPub)
 100  	log.I.F("new MLS group with %s", peerHex)
 101  }
 102  
 103  // sendMLSDM sends an MLS-encrypted DM to the given recipient.
 104  func (b *Bridge) sendMLSDM(pubkeyHex, content string) error {
 105  	if b.mlsClient == nil {
 106  		return fmt.Errorf("MLS client not initialized")
 107  	}
 108  
 109  	recipientPub, err := hex.Dec(pubkeyHex)
 110  	if err != nil {
 111  		return fmt.Errorf("decode recipient pubkey: %w", err)
 112  	}
 113  
 114  	return b.mlsClient.SendDM(b.ctx, recipientPub, []byte(content))
 115  }
 116  
 117  // mlsKeyPackageEvent returns a signed kind 443 event for broadcasting.
 118  func (b *Bridge) mlsKeyPackageEvent() (*event.E, error) {
 119  	return b.mlsClient.KeyPackageEvent()
 120  }
 121  
 122  // mlsKeyPackageRelaysEvent returns a signed kind 10051 event for broadcasting.
 123  func (b *Bridge) mlsKeyPackageRelaysEvent(relayURL string) (*event.E, error) {
 124  	return b.mlsClient.KeyPackageRelaysEvent([]string{relayURL})
 125  }
 126  
 127  // mlsWatchLoop subscribes to MLS events (kind 1059 welcomes + kind 445 group
 128  // messages) and dispatches them to the MLS client. Reconnects with updated
 129  // filters as new groups are established.
 130  func (b *Bridge) mlsWatchLoop() {
 131  	defer b.wg.Done()
 132  
 133  	delay := time.Second
 134  	for {
 135  		if err := b.mlsSubscribeAndProcess(); err != nil {
 136  			if b.ctx.Err() != nil {
 137  				return
 138  			}
 139  			log.W.F("MLS subscription error: %v, retrying in %v", err, delay)
 140  			select {
 141  			case <-time.After(delay):
 142  				if delay < 30*time.Second {
 143  					delay *= 2
 144  				}
 145  			case <-b.ctx.Done():
 146  				return
 147  			}
 148  		} else {
 149  			delay = time.Second // reset on success
 150  		}
 151  	}
 152  }
 153  
 154  func (b *Bridge) mlsSubscribeAndProcess() error {
 155  	ff := b.mlsClient.SubscriptionFilters()
 156  
 157  	sub, err := b.relay.Subscribe(b.ctx, ff)
 158  	if err != nil {
 159  		return fmt.Errorf("MLS subscribe: %w", err)
 160  	}
 161  
 162  	groupIDs := b.mlsClient.ActiveGroupIDs()
 163  	log.I.F("MLS subscribed (welcomes + %d active groups)", len(groupIDs))
 164  
 165  	for {
 166  		select {
 167  		case <-b.ctx.Done():
 168  			sub.Close()
 169  			return nil
 170  		case <-b.mlsClient.GroupsChanged():
 171  			// Open new subscription BEFORE closing old — no gap where events are lost.
 172  			// seenIDs deduplicates any overlap; Since=now-2days catches stragglers.
 173  			newFF := b.mlsClient.SubscriptionFilters()
 174  			newSub, err := b.relay.Subscribe(b.ctx, newFF)
 175  			if err != nil {
 176  				sub.Close()
 177  				return fmt.Errorf("MLS re-subscribe: %w", err)
 178  			}
 179  			sub.Close()
 180  			sub = newSub
 181  			groupIDs = b.mlsClient.ActiveGroupIDs()
 182  			log.I.F("MLS group added, re-subscribed (welcomes + %d active groups)", len(groupIDs))
 183  		case ev, ok := <-sub.Events():
 184  			if !ok {
 185  				sub.Close()
 186  				return fmt.Errorf("MLS event channel closed")
 187  			}
 188  			if err := b.mlsClient.HandleEvent(b.ctx, ev); err != nil {
 189  				log.W.F("MLS event handling: %v", err)
 190  			}
 191  		}
 192  	}
 193  }
 194