publisher.go raw

   1  package app
   2  
   3  import (
   4  	"context"
   5  	"fmt"
   6  	"sync"
   7  	"time"
   8  
   9  	"github.com/gorilla/websocket"
  10  	"next.orly.dev/pkg/lol/log"
  11  	"next.orly.dev/pkg/acl"
  12  	"next.orly.dev/pkg/nostr/encoders/event"
  13  	"next.orly.dev/pkg/nostr/encoders/filter"
  14  	"next.orly.dev/pkg/nostr/encoders/hex"
  15  	"next.orly.dev/pkg/nostr/encoders/kind"
  16  	"next.orly.dev/pkg/interfaces/publisher"
  17  	"next.orly.dev/pkg/interfaces/typer"
  18  	"next.orly.dev/pkg/policy"
  19  	"next.orly.dev/pkg/protocol/publish"
  20  	"next.orly.dev/pkg/utils"
  21  )
  22  
  23  const Type = "socketapi"
  24  
  25  // WriteChanMap maps websocket connections to their write channels
  26  type WriteChanMap map[*websocket.Conn]chan publish.WriteRequest
  27  
  28  type Subscription struct {
  29  	remote       string
  30  	AuthedPubkey []byte
  31  	Receiver     event.C // Channel for delivering events to this subscription
  32  	AuthRequired bool    // Whether ACL requires authentication for privileged events
  33  	*filter.S
  34  }
  35  
  36  // Map is a map of filters associated with a collection of ws.Listener
  37  // connections.
  38  type Map map[*websocket.Conn]map[string]Subscription
  39  
  40  type W struct {
  41  	*websocket.Conn
  42  
  43  	remote string
  44  
  45  	// If Cancel is true, this is a close command.
  46  	Cancel bool
  47  
  48  	// Id is the subscription Id. If Cancel is true, cancel the named
  49  	// subscription, otherwise, cancel the publisher for the socket.
  50  	Id string
  51  
  52  	// The Receiver holds the event channel for receiving notifications or data
  53  	// relevant to this WebSocket connection.
  54  	Receiver event.C
  55  
  56  	// Filters holds a collection of filters used to match or process events
  57  	// associated with this WebSocket connection. It is used to determine which
  58  	// notifications or data should be received by the subscriber.
  59  	Filters *filter.S
  60  
  61  	// AuthedPubkey is the authenticated pubkey associated with the listener (if any).
  62  	AuthedPubkey []byte
  63  
  64  	// AuthRequired indicates whether the ACL in operation requires auth. If
  65  	// this is set to true, the publisher will not publish privileged or other
  66  	// restricted events to non-authed listeners, otherwise, it will.
  67  	AuthRequired bool
  68  }
  69  
  70  func (w *W) Type() (typeName string) { return Type }
  71  
  72  // P is a structure that manages subscriptions and associated filters for
  73  // websocket listeners. It uses a mutex to synchronize access to a map storing
  74  // subscriber connections and their filter configurations.
  75  type P struct {
  76  	c context.Context
  77  	// Mx is the mutex for the Map.
  78  	Mx sync.RWMutex
  79  	// Map is the map of subscribers and subscriptions from the websocket api.
  80  	Map
  81  	// WriteChans maps websocket connections to their write channels
  82  	WriteChans WriteChanMap
  83  	// ChannelMembership is used for NIRC channel access control (kinds 40-44)
  84  	ChannelMembership *ChannelMembership
  85  }
  86  
  87  var _ publisher.I = &P{}
  88  
  89  func NewPublisher(c context.Context) (publisher *P) {
  90  	return &P{
  91  		c:          c,
  92  		Map:        make(Map),
  93  		WriteChans: make(WriteChanMap, 100),
  94  	}
  95  }
  96  
  97  func (p *P) Type() (typeName string) { return Type }
  98  
  99  // Receive handles incoming messages to manage websocket listener subscriptions
 100  // and associated filters.
 101  //
 102  // # Parameters
 103  //
 104  // - msg (publisher.Message): The incoming message to process; expected to be of
 105  // type *W to trigger subscription management actions.
 106  //
 107  // # Expected behaviour
 108  //
 109  // - Checks if the message is of type *W.
 110  //
 111  // - If Cancel is true, removes a subscriber by ID or the entire listener.
 112  //
 113  // - Otherwise, adds the subscription to the map under a mutex lock.
 114  //
 115  // - Logs actions related to subscription creation or removal.
 116  func (p *P) Receive(msg typer.T) {
 117  	if m, ok := msg.(*W); ok {
 118  		if m.Cancel {
 119  			if m.Id == "" {
 120  				p.removeSubscriber(m.Conn)
 121  			} else {
 122  				p.removeSubscriberId(m.Conn, m.Id)
 123  			}
 124  			return
 125  		}
 126  		p.Mx.Lock()
 127  		defer p.Mx.Unlock()
 128  		if subs, ok := p.Map[m.Conn]; !ok {
 129  			subs = make(map[string]Subscription)
 130  			subs[m.Id] = Subscription{
 131  				S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey,
 132  				Receiver: m.Receiver, AuthRequired: m.AuthRequired,
 133  			}
 134  			p.Map[m.Conn] = subs
 135  		} else {
 136  			subs[m.Id] = Subscription{
 137  				S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey,
 138  				Receiver: m.Receiver, AuthRequired: m.AuthRequired,
 139  			}
 140  		}
 141  	}
 142  }
 143  
 144  // Deliver processes and distributes an event to all matching subscribers based on their filter configurations.
 145  //
 146  // # Parameters
 147  //
 148  // - ev (*event.E): The event to be delivered to subscribed clients.
 149  //
 150  // # Expected behaviour
 151  //
 152  // Delivers the event to all subscribers whose filters match the event. It
 153  // applies authentication checks if required by the server and skips delivery
 154  // for unauthenticated users when events are privileged.
 155  func (p *P) Deliver(ev *event.E) {
 156  	// Snapshot the deliveries under read lock to avoid holding locks during I/O
 157  	p.Mx.RLock()
 158  	type delivery struct {
 159  		w   *websocket.Conn
 160  		id  string
 161  		sub Subscription
 162  	}
 163  	var deliveries []delivery
 164  	for w, subs := range p.Map {
 165  		for id, subscriber := range subs {
 166  			if subscriber.Match(ev) {
 167  				deliveries = append(
 168  					deliveries, delivery{w: w, id: id, sub: subscriber},
 169  				)
 170  			}
 171  		}
 172  	}
 173  	p.Mx.RUnlock()
 174  	if len(deliveries) > 0 {
 175  		log.D.C(
 176  			func() string {
 177  				return fmt.Sprintf(
 178  					"delivering event %0x to websocket subscribers %d", ev.ID,
 179  					len(deliveries),
 180  				)
 181  			},
 182  		)
 183  	}
 184  	// Track subscriptions that timeout so we can remove them afterward
 185  	type stuckSub struct {
 186  		w  *websocket.Conn
 187  		id string
 188  	}
 189  	var stuckSubs []stuckSub
 190  
 191  	for _, d := range deliveries {
 192  		// If the event is privileged, enforce that the subscriber's authed pubkey matches
 193  		// either the event pubkey or appears in any 'p' tag of the event.
 194  		// Channel kinds always require auth check; other privileged kinds only when ACL is active.
 195  		isChannel := kind.IsChannelKind(ev.Kind)
 196  		if kind.IsPrivileged(ev.Kind) && (d.sub.AuthRequired || isChannel) {
 197  			pk := d.sub.AuthedPubkey
 198  
 199  			// Channel kinds (40-44) use channel membership instead of p-tag involvement
 200  			var allowed bool
 201  			if kind.IsChannelKind(ev.Kind) && p.ChannelMembership != nil {
 202  				channelID := ExtractChannelIDFromEvent(ev)
 203  				allowed = p.ChannelMembership.IsChannelMemberByID(channelID, ev.Kind, pk, p.c)
 204  			} else {
 205  				// Use centralized IsPartyInvolved function for consistent privilege checking
 206  				allowed = policy.IsPartyInvolved(ev, pk)
 207  			}
 208  
 209  			if !allowed {
 210  				log.D.F(
 211  					"subscription delivery DENIED for privileged event %s to %s (not authenticated or not a party involved)",
 212  					hex.Enc(ev.ID), d.sub.remote,
 213  				)
 214  				// Skip delivery for this subscriber
 215  				continue
 216  			}
 217  		}
 218  
 219  		// For non-channel, non-privileged events: check if they reference channel
 220  		// events via e-tags. Reactions, reposts, zaps, reports, deletions that
 221  		// target channel messages must only be delivered to channel members.
 222  		if !kind.IsChannelKind(ev.Kind) && !kind.IsPrivileged(ev.Kind) && p.ChannelMembership != nil {
 223  			if channelIDHex, isChannel := p.ChannelMembership.ReferencesChannelEvent(ev, p.c); isChannel {
 224  				pk := d.sub.AuthedPubkey
 225  				if !p.ChannelMembership.IsChannelMemberByID(channelIDHex, ev.Kind, pk, p.c) {
 226  					log.D.F(
 227  						"subscription delivery DENIED for channel-referencing event %s kind %d to %s (not a member of channel %s)",
 228  						hex.Enc(ev.ID), ev.Kind, d.sub.remote, channelIDHex,
 229  					)
 230  					continue
 231  				}
 232  			}
 233  		}
 234  
 235  		// Check for private tags - only deliver to authorized users
 236  		if ev.Tags != nil && ev.Tags.Len() > 0 {
 237  			hasPrivateTag := false
 238  			var privatePubkey []byte
 239  
 240  			for _, t := range *ev.Tags {
 241  				if t.Len() >= 2 {
 242  					keyBytes := t.Key()
 243  					if len(keyBytes) == 7 && string(keyBytes) == "private" {
 244  						hasPrivateTag = true
 245  						privatePubkey = t.Value()
 246  						break
 247  					}
 248  				}
 249  			}
 250  
 251  			if hasPrivateTag {
 252  				canSeePrivate := p.canSeePrivateEvent(
 253  					d.sub.AuthedPubkey, privatePubkey, d.sub.remote,
 254  				)
 255  				if !canSeePrivate {
 256  					log.D.F(
 257  						"subscription delivery DENIED for private event %s to %s (unauthorized)",
 258  						hex.Enc(ev.ID), d.sub.remote,
 259  					)
 260  					continue
 261  				}
 262  				log.D.F(
 263  					"subscription delivery ALLOWED for private event %s to %s (authorized)",
 264  					hex.Enc(ev.ID), d.sub.remote,
 265  				)
 266  			}
 267  		}
 268  
 269  		// Send event to the subscription's receiver channel
 270  		// The consumer goroutine (in handle-req.go) will read from this channel
 271  		// and forward it to the client via the write channel
 272  		log.D.F(
 273  			"attempting delivery of event %s (kind=%d) to subscription %s @ %s",
 274  			hex.Enc(ev.ID), ev.Kind, d.id, d.sub.remote,
 275  		)
 276  
 277  		// Check if receiver channel exists
 278  		if d.sub.Receiver == nil {
 279  			log.E.F(
 280  				"subscription %s has nil receiver channel for %s", d.id,
 281  				d.sub.remote,
 282  			)
 283  			continue
 284  		}
 285  
 286  		// Send to receiver channel - non-blocking with timeout
 287  		select {
 288  		case <-p.c.Done():
 289  			continue
 290  		case d.sub.Receiver <- ev:
 291  			log.D.F(
 292  				"subscription delivery QUEUED: event=%s to=%s sub=%s",
 293  				hex.Enc(ev.ID), d.sub.remote, d.id,
 294  			)
 295  		case <-time.After(DefaultWriteTimeout):
 296  			log.W.F(
 297  				"subscription delivery TIMEOUT: event=%s to=%s sub=%s — removing stuck subscription",
 298  				hex.Enc(ev.ID), d.sub.remote, d.id,
 299  			)
 300  			stuckSubs = append(stuckSubs, stuckSub{w: d.w, id: d.id})
 301  		}
 302  	}
 303  
 304  	// Remove stuck subscriptions to prevent repeated timeouts
 305  	if len(stuckSubs) > 0 {
 306  		p.Mx.Lock()
 307  		for _, s := range stuckSubs {
 308  			if subs, ok := p.Map[s.w]; ok {
 309  				delete(subs, s.id)
 310  				if len(subs) == 0 {
 311  					delete(p.Map, s.w)
 312  				}
 313  			}
 314  		}
 315  		p.Mx.Unlock()
 316  	}
 317  }
 318  
 319  // removeSubscriberId removes a specific subscription from a subscriber
 320  // websocket.
 321  func (p *P) removeSubscriberId(ws *websocket.Conn, id string) {
 322  	p.Mx.Lock()
 323  	defer p.Mx.Unlock()
 324  	var subs map[string]Subscription
 325  	var ok bool
 326  	if subs, ok = p.Map[ws]; ok {
 327  		delete(subs, id)
 328  		// Check the actual map after deletion, not the original reference
 329  		if len(p.Map[ws]) == 0 {
 330  			delete(p.Map, ws)
 331  			// Don't remove write channel here - it's tied to the connection, not subscriptions
 332  			// The write channel will be removed when the connection closes (in handle-websocket.go defer)
 333  			// This allows new subscriptions to be created on the same connection
 334  		}
 335  	}
 336  }
 337  
 338  // SetWriteChan stores the write channel for a websocket connection
 339  // If writeChan is nil, the entry is removed from the map
 340  func (p *P) SetWriteChan(
 341  	conn *websocket.Conn, writeChan chan publish.WriteRequest,
 342  ) {
 343  	p.Mx.Lock()
 344  	defer p.Mx.Unlock()
 345  	if writeChan == nil {
 346  		delete(p.WriteChans, conn)
 347  	} else {
 348  		p.WriteChans[conn] = writeChan
 349  	}
 350  }
 351  
 352  // GetWriteChan returns the write channel for a websocket connection
 353  func (p *P) GetWriteChan(conn *websocket.Conn) (
 354  	chan publish.WriteRequest, bool,
 355  ) {
 356  	p.Mx.RLock()
 357  	defer p.Mx.RUnlock()
 358  	ch, ok := p.WriteChans[conn]
 359  	return ch, ok
 360  }
 361  
 362  // removeSubscriber removes a websocket from the P collection.
 363  func (p *P) removeSubscriber(ws *websocket.Conn) {
 364  	p.Mx.Lock()
 365  	defer p.Mx.Unlock()
 366  	clear(p.Map[ws])
 367  	delete(p.Map, ws)
 368  	delete(p.WriteChans, ws)
 369  }
 370  
 371  // HasActiveNIP46Signer checks if there's an active subscription for kind 24133
 372  // where the given pubkey is involved (either as author filter or in #p tag filter).
 373  // This is used to authenticate clients by proving a signer is connected for that pubkey.
 374  func (p *P) HasActiveNIP46Signer(signerPubkey []byte) bool {
 375  	const kindNIP46 = 24133
 376  	p.Mx.RLock()
 377  	defer p.Mx.RUnlock()
 378  
 379  	for _, subs := range p.Map {
 380  		for _, sub := range subs {
 381  			if sub.S == nil {
 382  				continue
 383  			}
 384  			for _, f := range *sub.S {
 385  				if f == nil || f.Kinds == nil {
 386  					continue
 387  				}
 388  				// Check if filter is for kind 24133
 389  				hasNIP46Kind := false
 390  				for _, k := range f.Kinds.K {
 391  					if k.K == kindNIP46 {
 392  						hasNIP46Kind = true
 393  						break
 394  					}
 395  				}
 396  				if !hasNIP46Kind {
 397  					continue
 398  				}
 399  				// Check if the signer pubkey matches the #p tag filter
 400  				if f.Tags != nil {
 401  					pTag := f.Tags.GetFirst([]byte("p"))
 402  					if pTag != nil && pTag.Len() >= 2 {
 403  						for i := 1; i < pTag.Len(); i++ {
 404  							tagValue := pTag.T[i]
 405  							// Compare - handle both binary and hex formats
 406  							if len(tagValue) == 32 && len(signerPubkey) == 32 {
 407  								if utils.FastEqual(tagValue, signerPubkey) {
 408  									return true
 409  								}
 410  							} else if len(tagValue) == 64 && len(signerPubkey) == 32 {
 411  								// tagValue is hex, signerPubkey is binary
 412  								if string(tagValue) == hex.Enc(signerPubkey) {
 413  									return true
 414  								}
 415  							} else if len(tagValue) == 32 && len(signerPubkey) == 64 {
 416  								// tagValue is binary, signerPubkey is hex
 417  								if hex.Enc(tagValue) == string(signerPubkey) {
 418  									return true
 419  								}
 420  							} else if utils.FastEqual(tagValue, signerPubkey) {
 421  								return true
 422  							}
 423  						}
 424  					}
 425  				}
 426  			}
 427  		}
 428  	}
 429  	return false
 430  }
 431  
 432  // canSeePrivateEvent checks if the authenticated user can see an event with a private tag
 433  func (p *P) canSeePrivateEvent(
 434  	authedPubkey, privatePubkey []byte, remote string,
 435  ) (canSee bool) {
 436  	// If no authenticated user, deny access
 437  	if len(authedPubkey) == 0 {
 438  		return false
 439  	}
 440  
 441  	// If the authenticated user matches the private tag pubkey, allow access
 442  	if len(privatePubkey) > 0 && utils.FastEqual(authedPubkey, privatePubkey) {
 443  		return true
 444  	}
 445  
 446  	// Check if user is an admin or owner (they can see all private events)
 447  	accessLevel := acl.Registry.GetAccessLevel(authedPubkey, remote)
 448  	if accessLevel == "admin" || accessLevel == "owner" {
 449  		return true
 450  	}
 451  
 452  	// Default deny
 453  	return false
 454  }
 455