1 package app
2 3 import (
4 "context"
5 "fmt"
6 "sync"
7 "time"
8 9 "github.com/gorilla/websocket"
10 "next.orly.dev/pkg/lol/log"
11 "next.orly.dev/pkg/acl"
12 "next.orly.dev/pkg/nostr/encoders/event"
13 "next.orly.dev/pkg/nostr/encoders/filter"
14 "next.orly.dev/pkg/nostr/encoders/hex"
15 "next.orly.dev/pkg/nostr/encoders/kind"
16 "next.orly.dev/pkg/interfaces/publisher"
17 "next.orly.dev/pkg/interfaces/typer"
18 "next.orly.dev/pkg/policy"
19 "next.orly.dev/pkg/protocol/publish"
20 "next.orly.dev/pkg/utils"
21 )
22 23 const Type = "socketapi"
24 25 // WriteChanMap maps websocket connections to their write channels
26 type WriteChanMap map[*websocket.Conn]chan publish.WriteRequest
27 28 type Subscription struct {
29 remote string
30 AuthedPubkey []byte
31 Receiver event.C // Channel for delivering events to this subscription
32 AuthRequired bool // Whether ACL requires authentication for privileged events
33 *filter.S
34 }
35 36 // Map is a map of filters associated with a collection of ws.Listener
37 // connections.
38 type Map map[*websocket.Conn]map[string]Subscription
39 40 type W struct {
41 *websocket.Conn
42 43 remote string
44 45 // If Cancel is true, this is a close command.
46 Cancel bool
47 48 // Id is the subscription Id. If Cancel is true, cancel the named
49 // subscription, otherwise, cancel the publisher for the socket.
50 Id string
51 52 // The Receiver holds the event channel for receiving notifications or data
53 // relevant to this WebSocket connection.
54 Receiver event.C
55 56 // Filters holds a collection of filters used to match or process events
57 // associated with this WebSocket connection. It is used to determine which
58 // notifications or data should be received by the subscriber.
59 Filters *filter.S
60 61 // AuthedPubkey is the authenticated pubkey associated with the listener (if any).
62 AuthedPubkey []byte
63 64 // AuthRequired indicates whether the ACL in operation requires auth. If
65 // this is set to true, the publisher will not publish privileged or other
66 // restricted events to non-authed listeners, otherwise, it will.
67 AuthRequired bool
68 }
69 70 func (w *W) Type() (typeName string) { return Type }
71 72 // P is a structure that manages subscriptions and associated filters for
73 // websocket listeners. It uses a mutex to synchronize access to a map storing
74 // subscriber connections and their filter configurations.
75 type P struct {
76 c context.Context
77 // Mx is the mutex for the Map.
78 Mx sync.RWMutex
79 // Map is the map of subscribers and subscriptions from the websocket api.
80 Map
81 // WriteChans maps websocket connections to their write channels
82 WriteChans WriteChanMap
83 // ChannelMembership is used for NIRC channel access control (kinds 40-44)
84 ChannelMembership *ChannelMembership
85 }
86 87 var _ publisher.I = &P{}
88 89 func NewPublisher(c context.Context) (publisher *P) {
90 return &P{
91 c: c,
92 Map: make(Map),
93 WriteChans: make(WriteChanMap, 100),
94 }
95 }
96 97 func (p *P) Type() (typeName string) { return Type }
98 99 // Receive handles incoming messages to manage websocket listener subscriptions
100 // and associated filters.
101 //
102 // # Parameters
103 //
104 // - msg (publisher.Message): The incoming message to process; expected to be of
105 // type *W to trigger subscription management actions.
106 //
107 // # Expected behaviour
108 //
109 // - Checks if the message is of type *W.
110 //
111 // - If Cancel is true, removes a subscriber by ID or the entire listener.
112 //
113 // - Otherwise, adds the subscription to the map under a mutex lock.
114 //
115 // - Logs actions related to subscription creation or removal.
116 func (p *P) Receive(msg typer.T) {
117 if m, ok := msg.(*W); ok {
118 if m.Cancel {
119 if m.Id == "" {
120 p.removeSubscriber(m.Conn)
121 } else {
122 p.removeSubscriberId(m.Conn, m.Id)
123 }
124 return
125 }
126 p.Mx.Lock()
127 defer p.Mx.Unlock()
128 if subs, ok := p.Map[m.Conn]; !ok {
129 subs = make(map[string]Subscription)
130 subs[m.Id] = Subscription{
131 S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey,
132 Receiver: m.Receiver, AuthRequired: m.AuthRequired,
133 }
134 p.Map[m.Conn] = subs
135 } else {
136 subs[m.Id] = Subscription{
137 S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey,
138 Receiver: m.Receiver, AuthRequired: m.AuthRequired,
139 }
140 }
141 }
142 }
143 144 // Deliver processes and distributes an event to all matching subscribers based on their filter configurations.
145 //
146 // # Parameters
147 //
148 // - ev (*event.E): The event to be delivered to subscribed clients.
149 //
150 // # Expected behaviour
151 //
152 // Delivers the event to all subscribers whose filters match the event. It
153 // applies authentication checks if required by the server and skips delivery
154 // for unauthenticated users when events are privileged.
155 func (p *P) Deliver(ev *event.E) {
156 // Snapshot the deliveries under read lock to avoid holding locks during I/O
157 p.Mx.RLock()
158 type delivery struct {
159 w *websocket.Conn
160 id string
161 sub Subscription
162 }
163 var deliveries []delivery
164 for w, subs := range p.Map {
165 for id, subscriber := range subs {
166 if subscriber.Match(ev) {
167 deliveries = append(
168 deliveries, delivery{w: w, id: id, sub: subscriber},
169 )
170 }
171 }
172 }
173 p.Mx.RUnlock()
174 if len(deliveries) > 0 {
175 log.D.C(
176 func() string {
177 return fmt.Sprintf(
178 "delivering event %0x to websocket subscribers %d", ev.ID,
179 len(deliveries),
180 )
181 },
182 )
183 }
184 // Track subscriptions that timeout so we can remove them afterward
185 type stuckSub struct {
186 w *websocket.Conn
187 id string
188 }
189 var stuckSubs []stuckSub
190 191 for _, d := range deliveries {
192 // If the event is privileged, enforce that the subscriber's authed pubkey matches
193 // either the event pubkey or appears in any 'p' tag of the event.
194 // Channel kinds always require auth check; other privileged kinds only when ACL is active.
195 isChannel := kind.IsChannelKind(ev.Kind)
196 if kind.IsPrivileged(ev.Kind) && (d.sub.AuthRequired || isChannel) {
197 pk := d.sub.AuthedPubkey
198 199 // Channel kinds (40-44) use channel membership instead of p-tag involvement
200 var allowed bool
201 if kind.IsChannelKind(ev.Kind) && p.ChannelMembership != nil {
202 channelID := ExtractChannelIDFromEvent(ev)
203 allowed = p.ChannelMembership.IsChannelMemberByID(channelID, ev.Kind, pk, p.c)
204 } else {
205 // Use centralized IsPartyInvolved function for consistent privilege checking
206 allowed = policy.IsPartyInvolved(ev, pk)
207 }
208 209 if !allowed {
210 log.D.F(
211 "subscription delivery DENIED for privileged event %s to %s (not authenticated or not a party involved)",
212 hex.Enc(ev.ID), d.sub.remote,
213 )
214 // Skip delivery for this subscriber
215 continue
216 }
217 }
218 219 // For non-channel, non-privileged events: check if they reference channel
220 // events via e-tags. Reactions, reposts, zaps, reports, deletions that
221 // target channel messages must only be delivered to channel members.
222 if !kind.IsChannelKind(ev.Kind) && !kind.IsPrivileged(ev.Kind) && p.ChannelMembership != nil {
223 if channelIDHex, isChannel := p.ChannelMembership.ReferencesChannelEvent(ev, p.c); isChannel {
224 pk := d.sub.AuthedPubkey
225 if !p.ChannelMembership.IsChannelMemberByID(channelIDHex, ev.Kind, pk, p.c) {
226 log.D.F(
227 "subscription delivery DENIED for channel-referencing event %s kind %d to %s (not a member of channel %s)",
228 hex.Enc(ev.ID), ev.Kind, d.sub.remote, channelIDHex,
229 )
230 continue
231 }
232 }
233 }
234 235 // Check for private tags - only deliver to authorized users
236 if ev.Tags != nil && ev.Tags.Len() > 0 {
237 hasPrivateTag := false
238 var privatePubkey []byte
239 240 for _, t := range *ev.Tags {
241 if t.Len() >= 2 {
242 keyBytes := t.Key()
243 if len(keyBytes) == 7 && string(keyBytes) == "private" {
244 hasPrivateTag = true
245 privatePubkey = t.Value()
246 break
247 }
248 }
249 }
250 251 if hasPrivateTag {
252 canSeePrivate := p.canSeePrivateEvent(
253 d.sub.AuthedPubkey, privatePubkey, d.sub.remote,
254 )
255 if !canSeePrivate {
256 log.D.F(
257 "subscription delivery DENIED for private event %s to %s (unauthorized)",
258 hex.Enc(ev.ID), d.sub.remote,
259 )
260 continue
261 }
262 log.D.F(
263 "subscription delivery ALLOWED for private event %s to %s (authorized)",
264 hex.Enc(ev.ID), d.sub.remote,
265 )
266 }
267 }
268 269 // Send event to the subscription's receiver channel
270 // The consumer goroutine (in handle-req.go) will read from this channel
271 // and forward it to the client via the write channel
272 log.D.F(
273 "attempting delivery of event %s (kind=%d) to subscription %s @ %s",
274 hex.Enc(ev.ID), ev.Kind, d.id, d.sub.remote,
275 )
276 277 // Check if receiver channel exists
278 if d.sub.Receiver == nil {
279 log.E.F(
280 "subscription %s has nil receiver channel for %s", d.id,
281 d.sub.remote,
282 )
283 continue
284 }
285 286 // Send to receiver channel - non-blocking with timeout
287 select {
288 case <-p.c.Done():
289 continue
290 case d.sub.Receiver <- ev:
291 log.D.F(
292 "subscription delivery QUEUED: event=%s to=%s sub=%s",
293 hex.Enc(ev.ID), d.sub.remote, d.id,
294 )
295 case <-time.After(DefaultWriteTimeout):
296 log.W.F(
297 "subscription delivery TIMEOUT: event=%s to=%s sub=%s — removing stuck subscription",
298 hex.Enc(ev.ID), d.sub.remote, d.id,
299 )
300 stuckSubs = append(stuckSubs, stuckSub{w: d.w, id: d.id})
301 }
302 }
303 304 // Remove stuck subscriptions to prevent repeated timeouts
305 if len(stuckSubs) > 0 {
306 p.Mx.Lock()
307 for _, s := range stuckSubs {
308 if subs, ok := p.Map[s.w]; ok {
309 delete(subs, s.id)
310 if len(subs) == 0 {
311 delete(p.Map, s.w)
312 }
313 }
314 }
315 p.Mx.Unlock()
316 }
317 }
318 319 // removeSubscriberId removes a specific subscription from a subscriber
320 // websocket.
321 func (p *P) removeSubscriberId(ws *websocket.Conn, id string) {
322 p.Mx.Lock()
323 defer p.Mx.Unlock()
324 var subs map[string]Subscription
325 var ok bool
326 if subs, ok = p.Map[ws]; ok {
327 delete(subs, id)
328 // Check the actual map after deletion, not the original reference
329 if len(p.Map[ws]) == 0 {
330 delete(p.Map, ws)
331 // Don't remove write channel here - it's tied to the connection, not subscriptions
332 // The write channel will be removed when the connection closes (in handle-websocket.go defer)
333 // This allows new subscriptions to be created on the same connection
334 }
335 }
336 }
337 338 // SetWriteChan stores the write channel for a websocket connection
339 // If writeChan is nil, the entry is removed from the map
340 func (p *P) SetWriteChan(
341 conn *websocket.Conn, writeChan chan publish.WriteRequest,
342 ) {
343 p.Mx.Lock()
344 defer p.Mx.Unlock()
345 if writeChan == nil {
346 delete(p.WriteChans, conn)
347 } else {
348 p.WriteChans[conn] = writeChan
349 }
350 }
351 352 // GetWriteChan returns the write channel for a websocket connection
353 func (p *P) GetWriteChan(conn *websocket.Conn) (
354 chan publish.WriteRequest, bool,
355 ) {
356 p.Mx.RLock()
357 defer p.Mx.RUnlock()
358 ch, ok := p.WriteChans[conn]
359 return ch, ok
360 }
361 362 // removeSubscriber removes a websocket from the P collection.
363 func (p *P) removeSubscriber(ws *websocket.Conn) {
364 p.Mx.Lock()
365 defer p.Mx.Unlock()
366 clear(p.Map[ws])
367 delete(p.Map, ws)
368 delete(p.WriteChans, ws)
369 }
370 371 // HasActiveNIP46Signer checks if there's an active subscription for kind 24133
372 // where the given pubkey is involved (either as author filter or in #p tag filter).
373 // This is used to authenticate clients by proving a signer is connected for that pubkey.
374 func (p *P) HasActiveNIP46Signer(signerPubkey []byte) bool {
375 const kindNIP46 = 24133
376 p.Mx.RLock()
377 defer p.Mx.RUnlock()
378 379 for _, subs := range p.Map {
380 for _, sub := range subs {
381 if sub.S == nil {
382 continue
383 }
384 for _, f := range *sub.S {
385 if f == nil || f.Kinds == nil {
386 continue
387 }
388 // Check if filter is for kind 24133
389 hasNIP46Kind := false
390 for _, k := range f.Kinds.K {
391 if k.K == kindNIP46 {
392 hasNIP46Kind = true
393 break
394 }
395 }
396 if !hasNIP46Kind {
397 continue
398 }
399 // Check if the signer pubkey matches the #p tag filter
400 if f.Tags != nil {
401 pTag := f.Tags.GetFirst([]byte("p"))
402 if pTag != nil && pTag.Len() >= 2 {
403 for i := 1; i < pTag.Len(); i++ {
404 tagValue := pTag.T[i]
405 // Compare - handle both binary and hex formats
406 if len(tagValue) == 32 && len(signerPubkey) == 32 {
407 if utils.FastEqual(tagValue, signerPubkey) {
408 return true
409 }
410 } else if len(tagValue) == 64 && len(signerPubkey) == 32 {
411 // tagValue is hex, signerPubkey is binary
412 if string(tagValue) == hex.Enc(signerPubkey) {
413 return true
414 }
415 } else if len(tagValue) == 32 && len(signerPubkey) == 64 {
416 // tagValue is binary, signerPubkey is hex
417 if hex.Enc(tagValue) == string(signerPubkey) {
418 return true
419 }
420 } else if utils.FastEqual(tagValue, signerPubkey) {
421 return true
422 }
423 }
424 }
425 }
426 }
427 }
428 }
429 return false
430 }
431 432 // canSeePrivateEvent checks if the authenticated user can see an event with a private tag
433 func (p *P) canSeePrivateEvent(
434 authedPubkey, privatePubkey []byte, remote string,
435 ) (canSee bool) {
436 // If no authenticated user, deny access
437 if len(authedPubkey) == 0 {
438 return false
439 }
440 441 // If the authenticated user matches the private tag pubkey, allow access
442 if len(privatePubkey) > 0 && utils.FastEqual(authedPubkey, privatePubkey) {
443 return true
444 }
445 446 // Check if user is an admin or owner (they can see all private events)
447 accessLevel := acl.Registry.GetAccessLevel(authedPubkey, remote)
448 if accessLevel == "admin" || accessLevel == "owner" {
449 return true
450 }
451 452 // Default deny
453 return false
454 }
455