mls.go raw
1 package bridge
2
3 import (
4 "context"
5 "fmt"
6 "path/filepath"
7 "time"
8
9 "git.smesh.lol/orly/pkg/lol/log"
10 "git.smesh.lol/orly/pkg/nostr/encoders/event"
11 "git.smesh.lol/orly/pkg/nostr/encoders/filter"
12 "git.smesh.lol/orly/pkg/nostr/encoders/hex"
13 "git.smesh.lol/orly/pkg/nostr/protocol/marmot"
14 )
15
16 // relayAdapter adapts *RelayConn to the marmot.RelayConnection interface.
17 type relayAdapter struct {
18 relay *RelayConn
19 }
20
21 func (a *relayAdapter) Publish(ctx context.Context, ev *event.E) error {
22 return a.relay.Publish(ctx, ev)
23 }
24
25 func (a *relayAdapter) Subscribe(ctx context.Context, ff *filter.S) (marmot.EventStream, error) {
26 sub, err := a.relay.Subscribe(ctx, ff)
27 if err != nil {
28 return nil, err
29 }
30 return sub, nil
31 }
32
33 // initMLS initializes the MLS client for the bridge.
34 func (b *Bridge) initMLS() error {
35 groupDir := filepath.Join(b.cfg.DataDir, "mls-groups")
36 store, err := marmot.NewFileGroupStore(groupDir)
37 if err != nil {
38 return fmt.Errorf("create MLS group store: %w", err)
39 }
40
41 relayURL := b.cfg.PublicRelayURL
42 if relayURL == "" {
43 relayURL = b.cfg.RelayURL
44 }
45
46 adapter := &relayAdapter{relay: b.relay}
47 client, err := marmot.NewClient(&marmot.LocalCrypto{Sign: b.sign}, store, adapter, relayURL)
48 if err != nil {
49 return fmt.Errorf("create MLS client: %w", err)
50 }
51
52 client.OnDM(b.handleMLSDM)
53 client.OnGroupJoined(b.handleMLSGroupJoined)
54 b.mlsClient = client
55
56 // Publish our key package so MLS peers can find us
57 if err := client.PublishKeyPackage(b.ctx); err != nil {
58 log.W.F("publish MLS key package: %v", err)
59 } else {
60 log.I.F("published MLS key package (kind 443)")
61 }
62
63 // Publish key package relay list
64 if relayURL != "" {
65 if err := client.PublishKeyPackageRelays(b.ctx, []string{relayURL}); err != nil {
66 log.W.F("publish MLS key package relays: %v", err)
67 } else {
68 log.I.F("published MLS key package relay list (kind 10051)")
69 }
70 }
71
72 // Start MLS subscription loop in background
73 b.wg.Add(1)
74 go b.mlsWatchLoop()
75
76 return nil
77 }
78
79 // handleMLSDM is the callback for incoming decrypted MLS DMs.
80 func (b *Bridge) handleMLSDM(senderPub []byte, plaintext []byte) {
81 senderPubHex := hex.Enc(senderPub)
82
83 bridgePubHex := hex.Enc(b.sign.Pub())
84 if senderPubHex == bridgePubHex {
85 return
86 }
87
88 log.I.F("received MLS DM from %s: %d bytes", senderPubHex, len(plaintext))
89
90 if b.router != nil {
91 b.router.RouteDM(b.ctx, senderPubHex, string(plaintext))
92 }
93 }
94
95 // handleMLSGroupJoined is called when a new peer establishes a group.
96 // No welcome message — the peer's first DM will get a response (command
97 // result, email processing, or auto-help for unrecognized input).
98 func (b *Bridge) handleMLSGroupJoined(peerPub []byte) {
99 peerHex := hex.Enc(peerPub)
100 log.I.F("new MLS group with %s", peerHex)
101 }
102
103 // sendMLSDM sends an MLS-encrypted DM to the given recipient.
104 func (b *Bridge) sendMLSDM(pubkeyHex, content string) error {
105 if b.mlsClient == nil {
106 return fmt.Errorf("MLS client not initialized")
107 }
108
109 recipientPub, err := hex.Dec(pubkeyHex)
110 if err != nil {
111 return fmt.Errorf("decode recipient pubkey: %w", err)
112 }
113
114 return b.mlsClient.SendDM(b.ctx, recipientPub, []byte(content))
115 }
116
117 // mlsKeyPackageEvent returns a signed kind 443 event for broadcasting.
118 func (b *Bridge) mlsKeyPackageEvent() (*event.E, error) {
119 return b.mlsClient.KeyPackageEvent()
120 }
121
122 // mlsKeyPackageRelaysEvent returns a signed kind 10051 event for broadcasting.
123 func (b *Bridge) mlsKeyPackageRelaysEvent(relayURL string) (*event.E, error) {
124 return b.mlsClient.KeyPackageRelaysEvent([]string{relayURL})
125 }
126
127 // mlsWatchLoop subscribes to MLS events (kind 1059 welcomes + kind 445 group
128 // messages) and dispatches them to the MLS client. Reconnects with updated
129 // filters as new groups are established.
130 func (b *Bridge) mlsWatchLoop() {
131 defer b.wg.Done()
132
133 delay := time.Second
134 for {
135 if err := b.mlsSubscribeAndProcess(); err != nil {
136 if b.ctx.Err() != nil {
137 return
138 }
139 log.W.F("MLS subscription error: %v, retrying in %v", err, delay)
140 select {
141 case <-time.After(delay):
142 if delay < 30*time.Second {
143 delay *= 2
144 }
145 case <-b.ctx.Done():
146 return
147 }
148 } else {
149 delay = time.Second // reset on success
150 }
151 }
152 }
153
154 func (b *Bridge) mlsSubscribeAndProcess() error {
155 ff := b.mlsClient.SubscriptionFilters()
156
157 sub, err := b.relay.Subscribe(b.ctx, ff)
158 if err != nil {
159 return fmt.Errorf("MLS subscribe: %w", err)
160 }
161
162 groupIDs := b.mlsClient.ActiveGroupIDs()
163 log.I.F("MLS subscribed (welcomes + %d active groups)", len(groupIDs))
164
165 for {
166 select {
167 case <-b.ctx.Done():
168 sub.Close()
169 return nil
170 case <-b.mlsClient.GroupsChanged():
171 // Open new subscription BEFORE closing old — no gap where events are lost.
172 // seenIDs deduplicates any overlap; Since=now-2days catches stragglers.
173 newFF := b.mlsClient.SubscriptionFilters()
174 newSub, err := b.relay.Subscribe(b.ctx, newFF)
175 if err != nil {
176 sub.Close()
177 return fmt.Errorf("MLS re-subscribe: %w", err)
178 }
179 sub.Close()
180 sub = newSub
181 groupIDs = b.mlsClient.ActiveGroupIDs()
182 log.I.F("MLS group added, re-subscribed (welcomes + %d active groups)", len(groupIDs))
183 case ev, ok := <-sub.Events():
184 if !ok {
185 sub.Close()
186 return fmt.Errorf("MLS event channel closed")
187 }
188 if err := b.mlsClient.HandleEvent(b.ctx, ev); err != nil {
189 log.W.F("MLS event handling: %v", err)
190 }
191 }
192 }
193 }
194