router.go raw

   1  package main
   2  
   3  import (
   4  	"common/helpers"
   5  	"common/jsbridge/sw"
   6  	"common/nostr"
   7  )
   8  
   9  // Subscription Router — central dispatcher for relay operations.
  10  // Owns client subscriptions, filter matching, message dispatch.
  11  
  12  var (
  13  	clientSubs map[string]*clientSub
  14  	proxySubs  map[string]*proxySub
  15  )
  16  
  17  type clientSub struct {
  18  	filters   []*nostr.Filter
  19  	filterRaw string
  20  	clientID  string
  21  }
  22  
  23  type proxySub struct {
  24  	remoteIDs  map[string]bool
  25  	relayCount int
  26  	eoseCount  int
  27  	timer      sw.Timer
  28  	done       bool
  29  }
  30  
  31  func initRouter() {
  32  	clientSubs = make(map[string]*clientSub)
  33  	proxySubs = make(map[string]*proxySub)
  34  }
  35  
  36  // --- REQ / CLOSE / EVENT ---
  37  
  38  func routerReq(clientID, subID, filterRaw string) {
  39  	f := nostr.ParseFilter(filterRaw)
  40  	if f == nil {
  41  		sw.Log("relay-sw: REQ parse filter FAILED")
  42  		return
  43  	}
  44  	clientSubs[subID] = &clientSub{filters: []*nostr.Filter{f}, filterRaw: filterRaw, clientID: clientID}
  45  
  46  	cacheQuery(filterRaw, func(eventsJSON string) {
  47  		events := nostr.ParseEventsJSON(eventsJSON)
  48  		for _, ev := range events {
  49  			fwd(clientID, "[\"EVENT\","+jstr(subID)+","+ev.ToJSON()+"]")
  50  		}
  51  		fwd(clientID, "[\"EOSE\","+jstr(subID)+"]")
  52  	})
  53  }
  54  
  55  func routerClose(subID string) {
  56  	delete(clientSubs, subID)
  57  	routerCleanupProxy(subID)
  58  }
  59  
  60  func routerPublish(clientID, eventRaw string) {
  61  	ev := nostr.ParseEvent(eventRaw)
  62  	if ev == nil {
  63  		return
  64  	}
  65  
  66  	cacheStore(eventRaw, func(saved bool) {
  67  		if saved {
  68  			pushToMatchingSubs(ev)
  69  		}
  70  	})
  71  
  72  	relayPublish(ev)
  73  	fwd(clientID, "[\"OK\","+jstr(ev.ID)+",true,\"\"]")
  74  }
  75  
  76  func routerPublishToRelays(eventRaw string, relayURLs []string) {
  77  	ev := nostr.ParseEvent(eventRaw)
  78  	if ev == nil {
  79  		sw.Log("relay: publishToRelays PARSE FAILED len=" + helpers.Itoa(int64(len(eventRaw))))
  80  		return
  81  	}
  82  	sw.Log("relay: publishToRelays kind=" + helpers.Itoa(int64(ev.Kind)) + " id=" + ev.ID[:16] + " relays=" + helpers.Itoa(int64(len(relayURLs))))
  83  	cacheStore(eventRaw, func(saved bool) {
  84  		if saved {
  85  			pushToMatchingSubs(ev)
  86  		}
  87  	})
  88  	for _, url := range relayURLs {
  89  		c := getConn(url)
  90  		sw.Log("relay: publish to " + url + " open=" + boolStr(c.IsOpen()))
  91  		c.Publish(ev)
  92  	}
  93  }
  94  
  95  // --- PROXY subscriptions ---
  96  
  97  func routerProxy(clientID, subID, filterRaw string, relayURLs []string) {
  98  	routerCleanupProxy(subID)
  99  
 100  	filters := parseFilters(filterRaw)
 101  	if len(filters) == 0 {
 102  		sw.Log("relay-sw: PROXY parse filter FAILED")
 103  		return
 104  	}
 105  	clientSubs[subID] = &clientSub{filters: filters, filterRaw: filterRaw, clientID: clientID}
 106  
 107  	// Skip IDB cache for PROXY subscriptions — marmot needs fresh relay data.
 108  	// IDB may hold stale key packages from a previous bridge session (bridge
 109  	// restarts generate a new kpp without a version bump, so the epoch check
 110  	// doesn't flush them). Serving a stale key package causes the WASM to
 111  	// create a Welcome with the wrong KeyPackageRef → bridge can't decrypt.
 112  
 113  	remoteIDs := make(map[string]bool)
 114  	base := "p_" + subID + "_"
 115  
 116  	proxySubs[subID] = &proxySub{
 117  		remoteIDs:  remoteIDs,
 118  		relayCount: len(relayURLs),
 119  	}
 120  
 121  	for _, url := range relayURLs {
 122  		suffix := urlSuffix(url)
 123  		rSubID := base + suffix
 124  		remoteIDs[rSubID] = true
 125  		c := getConn(url)
 126  		c.Subscribe(rSubID, filters)
 127  	}
 128  
 129  	proxyID := subID
 130  	proxySubs[subID].timer = sw.SetTimeout(10000, func() {
 131  		info, ok := proxySubs[proxyID]
 132  		if ok && !info.done {
 133  			info.done = true
 134  			if cs, ok := clientSubs[proxyID]; ok {
 135  				fwd(cs.clientID, "[\"EOSE\","+jstr(proxyID)+"]")
 136  			}
 137  		}
 138  	})
 139  }
 140  
 141  func routerCleanupProxy(proxyID string) {
 142  	info, ok := proxySubs[proxyID]
 143  	if !ok {
 144  		return
 145  	}
 146  	sw.ClearTimeout(info.timer)
 147  
 148  	if !info.done {
 149  		if cs, ok := clientSubs[proxyID]; ok {
 150  			fwd(cs.clientID, "[\"EOSE\","+jstr(proxyID)+"]")
 151  		}
 152  	}
 153  
 154  	for rSubID := range info.remoteIDs {
 155  		for _, url := range rpool.URLs() {
 156  			c := rpool.Get(url)
 157  			if c != nil && c.IsOpen() {
 158  				c.CloseSubscription(rSubID)
 159  			}
 160  		}
 161  	}
 162  	delete(proxySubs, proxyID)
 163  	delete(clientSubs, proxyID)
 164  }
 165  
 166  // --- Relay event callbacks ---
 167  
 168  func routerOnRelayEvent(relayURL string, ev *nostr.Event) {
 169  	evJSON := ev.ToJSON()
 170  
 171  	pushToMatchingSubs(ev)
 172  
 173  	cacheStore(evJSON, func(saved bool) {
 174  		if saved {
 175  			relayPublishExcept(ev, relayURL)
 176  		}
 177  	})
 178  }
 179  
 180  func routerOnRelayEOSE(subID string) {
 181  	for proxyID, info := range proxySubs {
 182  		if info.remoteIDs[subID] {
 183  			info.eoseCount++
 184  			if !info.done {
 185  				sw.ClearTimeout(info.timer)
 186  				pid := proxyID
 187  				info.timer = sw.SetTimeout(3000, func() {
 188  					inf, ok := proxySubs[pid]
 189  					if ok && !inf.done {
 190  						inf.done = true
 191  						if cs, ok := clientSubs[pid]; ok {
 192  							fwd(cs.clientID, "[\"EOSE\","+jstr(pid)+"]")
 193  						}
 194  					}
 195  				})
 196  			}
 197  		}
 198  	}
 199  }
 200  
 201  func pushToMatchingSubs(ev *nostr.Event) {
 202  	matched := 0
 203  	for subID, cs := range clientSubs {
 204  		for _, f := range cs.filters {
 205  			if f.Matches(ev) {
 206  				matched++
 207  				fwd(cs.clientID, "[\"EVENT\","+jstr(subID)+","+ev.ToJSON()+"]")
 208  				break // one match per sub is enough
 209  			}
 210  		}
 211  	}
 212  	_ = matched
 213  }
 214  
 215  // parseFilters parses a JSON filter string that may be a single object
 216  // or an array of objects. Returns a slice of parsed filters.
 217  func parseFilters(raw string) []*nostr.Filter {
 218  	// Trim whitespace
 219  	s := raw
 220  	for len(s) > 0 && (s[0] == ' ' || s[0] == '\t' || s[0] == '\n') {
 221  		s = s[1:]
 222  	}
 223  	if len(s) == 0 {
 224  		return nil
 225  	}
 226  	// Single filter object
 227  	if s[0] == '{' {
 228  		f := nostr.ParseFilter(s)
 229  		if f == nil {
 230  			return nil
 231  		}
 232  		return []*nostr.Filter{f}
 233  	}
 234  	// Array of filter objects
 235  	if s[0] != '[' {
 236  		return nil
 237  	}
 238  	s = s[1:] // skip '['
 239  	var filters []*nostr.Filter
 240  	for {
 241  		for len(s) > 0 && (s[0] == ' ' || s[0] == ',' || s[0] == '\t' || s[0] == '\n') {
 242  			s = s[1:]
 243  		}
 244  		if len(s) == 0 || s[0] == ']' {
 245  			break
 246  		}
 247  		if s[0] != '{' {
 248  			break
 249  		}
 250  		// Find matching '}'
 251  		depth := 0
 252  		end := 0
 253  		for i := 0; i < len(s); i++ {
 254  			if s[i] == '{' {
 255  				depth++
 256  			} else if s[i] == '}' {
 257  				depth--
 258  				if depth == 0 {
 259  					end = i + 1
 260  					break
 261  				}
 262  			}
 263  		}
 264  		if end == 0 {
 265  			break
 266  		}
 267  		f := nostr.ParseFilter(s[:end])
 268  		if f != nil {
 269  			filters = append(filters, f)
 270  		}
 271  		s = s[end:]
 272  	}
 273  	return filters
 274  }
 275  
 276  // --- Signing ---
 277  
 278  func routerSign(clientID, requestID, eventRaw string) {
 279  	if myPubkey == "" {
 280  		fwd(clientID, "[\"SIGN_ERROR\","+jstr(requestID)+",\"no key\"]")
 281  		return
 282  	}
 283  	ev := nostr.ParseEvent(eventRaw)
 284  	if ev == nil {
 285  		fwd(clientID, "[\"SIGN_ERROR\","+jstr(requestID)+",\"parse error\"]")
 286  		return
 287  	}
 288  	// Proxy signing through crypto SW -> signer extension.
 289  	cryptoProxy("signEvent", "", eventRaw, func(signedJSON, errMsg string) {
 290  		if errMsg != "" || signedJSON == "" {
 291  			fwd(clientID, "[\"SIGN_ERROR\","+jstr(requestID)+","+jstr(errMsg)+"]")
 292  			return
 293  		}
 294  		fwd(clientID, "[\"SIGNED\","+jstr(requestID)+","+signedJSON+"]")
 295  	})
 296  }
 297  
 298  // --- DM routing ---
 299  
 300  func routerSaveDMRecord(rec *DMRecord) {
 301  	dmJSON := rec.ToJSON()
 302  	cacheSaveDM(dmJSON, func(result string) {
 303  		if result != "duplicate" {
 304  			fwdAll("[\"DM_RECEIVED\"," + dmJSON + "]")
 305  		}
 306  	})
 307  }
 308  
 309  func routerDMList(clientID string) {
 310  	cacheGetConversationList(func(listJSON string) {
 311  		fwd(clientID, "[\"DM_LIST\","+listJSON+"]")
 312  	})
 313  }
 314  
 315  func routerDMHistory(clientID, peer string, limit int, until int64) {
 316  	if limit <= 0 {
 317  		limit = 50
 318  	}
 319  	cacheQueryDMs(peer, limit, until, func(msgsJSON string) {
 320  		fwd(clientID, "[\"DM_HISTORY\","+jstr(peer)+","+msgsJSON+"]")
 321  	})
 322  }
 323  
 324  // --- Broadcast ---
 325  
 326  func routerBroadcast(clientID, pubkey string, relayURLs []string) {
 327  	filterJSON := "{\"authors\":[" + jstr(pubkey) + "],\"kinds\":[0,3,10002,10050,10051]}"
 328  	cacheQuery(filterJSON, func(eventsJSON string) {
 329  		events := nostr.ParseEventsJSON(eventsJSON)
 330  		byKind := make(map[int]*nostr.Event)
 331  		for _, ev := range events {
 332  			if prev, ok := byKind[ev.Kind]; !ok || ev.CreatedAt > prev.CreatedAt {
 333  				byKind[ev.Kind] = ev
 334  			}
 335  		}
 336  
 337  		userRelays := relayURLs
 338  		if relayEv, ok := byKind[10002]; ok {
 339  			userRelays = nil
 340  			for _, t := range relayEv.Tags.GetAll("r") {
 341  				userRelays = append(userRelays, t.Value())
 342  			}
 343  		}
 344  		if len(userRelays) == 0 {
 345  			userRelays = writeRelays
 346  		}
 347  
 348  		if _, ok := byKind[10050]; !ok && myPubkey != "" && len(userRelays) > 0 {
 349  			createRelayListEventAsync(10050, userRelays, func(ev *nostr.Event) {
 350  				if ev != nil {
 351  					cacheStore(ev.ToJSON(), func(_ bool) {})
 352  					for _, url := range relayURLs {
 353  						getConn(url).Publish(ev)
 354  					}
 355  				}
 356  			})
 357  		}
 358  		if _, ok := byKind[10051]; !ok && myPubkey != "" && len(userRelays) > 0 {
 359  			createRelayListEventAsync(10051, userRelays, func(ev *nostr.Event) {
 360  				if ev != nil {
 361  					cacheStore(ev.ToJSON(), func(_ bool) {})
 362  					for _, url := range relayURLs {
 363  						getConn(url).Publish(ev)
 364  					}
 365  				}
 366  			})
 367  		}
 368  
 369  		count := 0
 370  		for _, ev := range byKind {
 371  			for _, url := range relayURLs {
 372  				getConn(url).Publish(ev)
 373  			}
 374  			count++
 375  		}
 376  		fwd(clientID, "[\"BROADCAST_DONE\","+helpers.Itoa(int64(count))+","+helpers.Itoa(int64(len(relayURLs)))+"]")
 377  	})
 378  }
 379  
 380  func createRelayListEventAsync(kind int, relays []string, cb func(ev *nostr.Event)) {
 381  	tagKey := "relay"
 382  	var tags nostr.Tags
 383  	for _, r := range relays {
 384  		tags = append(tags, nostr.Tag{tagKey, r})
 385  	}
 386  	ev := &nostr.Event{
 387  		Kind:      kind,
 388  		PubKey:    myPubkey,
 389  		Content:   "",
 390  		Tags:      tags,
 391  		CreatedAt: sw.NowSeconds(),
 392  	}
 393  	cryptoProxy("signEvent", "", ev.ToJSON(), func(signedJSON, errMsg string) {
 394  		if errMsg != "" || signedJSON == "" {
 395  			cb(nil)
 396  			return
 397  		}
 398  		signed := nostr.ParseEvent(signedJSON)
 399  		cb(signed)
 400  	})
 401  }
 402