main.mx raw

   1  package main
   2  
   3  import (
   4  	"fmt"
   5  	"mime"
   6  	"os"
   7  	"time"
   8  
   9  	"smesh.lol/pkg/blossom"
  10  	"smesh.lol/pkg/nostr/envelope"
  11  	"smesh.lol/pkg/nostr/filter"
  12  	"smesh.lol/pkg/nostr/ws"
  13  	"smesh.lol/pkg/relay/server"
  14  	"smesh.lol/pkg/relay/worker"
  15  )
  16  
  17  var version = "0.1.17"
  18  
  19  func main() {
  20  	if len(os.Args) < 2 {
  21  		runRelay(os.Args[1:])
  22  		return
  23  	}
  24  	switch os.Args[1] {
  25  	case "relay":
  26  		runRelay(os.Args[2:])
  27  	case "sync":
  28  		runSync(os.Args[2:])
  29  	case "crawl":
  30  		runCrawl(os.Args[2:])
  31  	case "version", "-v", "--version":
  32  		fmt.Printf("smesh %s\n", version)
  33  	case "help", "-h", "--help":
  34  		printHelp()
  35  	default:
  36  		if len(os.Args[1]) > 0 && os.Args[1][0] == '-' {
  37  			runRelay(os.Args[1:])
  38  		} else {
  39  			fmt.Fprintf(os.Stderr, "unknown command: %s\n", os.Args[1])
  40  			printHelp()
  41  			os.Exit(1)
  42  		}
  43  	}
  44  }
  45  
  46  func runRelay(_ []string) {
  47  	mime.AddExtensionType(".mjs", "application/javascript")
  48  	listenAddr := envOr("SMESH_LISTEN", "0.0.0.0:3334")
  49  	dataDir := envOr("SMESH_DATA_DIR", "data")
  50  	blossomDir := envOr("SMESH_BLOSSOM_DIR", dataDir | "/blossom")
  51  	staticDir := envOr("SMESH_STATIC_DIR", "web/static")
  52  
  53  	// Open the store.
  54  	store, err := worker.Open(dataDir)
  55  	if err != nil {
  56  		fmt.Fprintf(os.Stderr, "store: %v\n", err)
  57  		os.Exit(1)
  58  	}
  59  	defer store.Close()
  60  
  61  	srv := server.New(store)
  62  
  63  	// Blossom file server.
  64  	bsrv, err := blossom.New(blossomDir)
  65  	if err != nil {
  66  		fmt.Fprintf(os.Stderr, "blossom: %v\n", err)
  67  		os.Exit(1)
  68  	}
  69  
  70  	srv.Fallback = func(path string, headers map[string]string) (int, map[string]string, []byte) {
  71  		switch {
  72  		case hasPrefix(path, "/blossom/"):
  73  			return bsrv.HandleRaw(path[len("/blossom"):], headers)
  74  		case path == "/__version":
  75  			// Combine the human version with the mtime of the main JS bundle so
  76  			// the SW's version poller invalidates its cache on every rebuild.
  77  			stamp := int64(0)
  78  			if info, err := os.Stat(staticDir | "/__web_app_.mjs"); err == nil {
  79  				stamp = info.ModTime().UnixNano()
  80  			}
  81  			return 200, map[string]string{
  82  				"Content-Type":                "application/json",
  83  				"Access-Control-Allow-Origin": "*",
  84  				"Cache-Control":               "no-store, no-cache, must-revalidate",
  85  				"Pragma":                      "no-cache",
  86  				"Expires":                     "0",
  87  			}, []byte(fmt.Sprintf(`{"v":"%s+%d"}`, version, stamp))
  88  		case path == "/.well-known/nostr.json":
  89  			return 200, map[string]string{
  90  				"Content-Type":                "application/json",
  91  				"Access-Control-Allow-Origin": "*",
  92  			}, []byte(`{"names":{"mleku":"4c800257a588a82849d049817c2bdaad984b25a45ad9f6dad66e47d3b47e3b2f","bridge":"cf1ae33ad5f229dabd7d733ce37b0165126aebf581e4094df9373f77e00cb696"},"relays":{"4c800257a588a82849d049817c2bdaad984b25a45ad9f6dad66e47d3b47e3b2f":["wss://smesh.lol"],"cf1ae33ad5f229dabd7d733ce37b0165126aebf581e4094df9373f77e00cb696":["wss://smesh.lol","wss://relay.orly.dev"]}}`)
  93  		default:
  94  			return serveStatic(staticDir, path)
  95  		}
  96  	}
  97  
  98  	srv.OnReady = func() {
  99  		fmt.Fprintf(os.Stderr, "smesh %s listening on %s\n", version, listenAddr)
 100  		spawnCrawler(listenAddr)
 101  	}
 102  	if err := srv.ListenAndServe(listenAddr); err != nil {
 103  		fmt.Fprintf(os.Stderr, "listen: %v\n", err)
 104  		os.Exit(1)
 105  	}
 106  }
 107  
 108  func serveStatic(dir, path string) (int, map[string]string, []byte) {
 109  	if path == "" || path == "/" {
 110  		path = "/index.html"
 111  	}
 112  	data, err := os.ReadFile(dir | path)
 113  	if err != nil {
 114  		// SPA fallback: serve index.html for unmatched paths (client-side routing).
 115  		data, err = os.ReadFile(dir | "/index.html")
 116  		if err != nil {
 117  			return 404, map[string]string{"Content-Type": "text/plain"}, []byte("404 not found\n")
 118  		}
 119  		return 200, map[string]string{"Content-Type": "text/html; charset=utf-8", "Cache-Control": "no-cache"}, data
 120  	}
 121  	ct := "application/octet-stream"
 122  	switch {
 123  	case hasSuffix(path, ".html"):
 124  		ct = "text/html; charset=utf-8"
 125  	case hasSuffix(path, ".js"), hasSuffix(path, ".mjs"):
 126  		ct = "application/javascript"
 127  	case hasSuffix(path, ".css"):
 128  		ct = "text/css"
 129  	case hasSuffix(path, ".json"):
 130  		ct = "application/json"
 131  	case hasSuffix(path, ".svg"):
 132  		ct = "image/svg+xml"
 133  	case hasSuffix(path, ".png"):
 134  		ct = "image/png"
 135  	case hasSuffix(path, ".ico"):
 136  		ct = "image/x-icon"
 137  	case hasSuffix(path, ".wasm"):
 138  		ct = "application/wasm"
 139  	case hasSuffix(path, ".webp"):
 140  		ct = "image/webp"
 141  	case hasSuffix(path, ".woff2"):
 142  		ct = "font/woff2"
 143  	}
 144  	h := map[string]string{"Content-Type": ct}
 145  	if path == "/$sw/$entry.mjs" {
 146  		h["Service-Worker-Allowed"] = "/"
 147  	}
 148  	// Always revalidate so the SW's CacheFromManifests fetch hits the network
 149  	// instead of being short-circuited by the browser HTTP cache. Without this
 150  	// the SW updates its Cache API entries from a stale browser cache and the
 151  	// hot-reload chain silently breaks.
 152  	h["Cache-Control"] = "no-cache"
 153  	return 200, h, data
 154  }
 155  
 156  // --- sync command ---
 157  
 158  func envOr(key, fallback string) string {
 159  	if v := os.Getenv(key); v != "" {
 160  		return v
 161  	}
 162  	return fallback
 163  }
 164  
 165  func runSync(args []string) {
 166  	if len(args) < 1 {
 167  		fmt.Fprintln(os.Stderr, "usage: smesh sync <remote-relay-url> [local-relay-url]")
 168  		os.Exit(1)
 169  	}
 170  	remoteURL := args[0]
 171  	localURL := "ws://127.0.0.1:3334"
 172  	if len(args) >= 2 {
 173  		localURL = args[1]
 174  	}
 175  
 176  	for {
 177  		syncOnce(remoteURL, localURL)
 178  		fmt.Fprintln(os.Stderr, "sync: disconnected, reconnecting in 30s...")
 179  		time.Sleep(30 * time.Second)
 180  	}
 181  }
 182  
 183  func syncOnce(remoteURL, localURL string) {
 184  	fmt.Fprintf(os.Stderr, "sync: connecting to remote %s\n", remoteURL)
 185  	remote, err := ws.Dial(remoteURL)
 186  	if err != nil {
 187  		fmt.Fprintf(os.Stderr, "sync: remote connect error: %v\n", err)
 188  		return
 189  	}
 190  	defer remote.Close()
 191  
 192  	fmt.Fprintf(os.Stderr, "sync: connecting to local %s\n", localURL)
 193  	local, err := ws.Dial(localURL)
 194  	if err != nil {
 195  		fmt.Fprintf(os.Stderr, "sync: local connect error: %v\n", err)
 196  		return
 197  	}
 198  	defer local.Close()
 199  
 200  	f := &filter.F{}
 201  	req := &envelope.Req{
 202  		Subscription: []byte("sync"),
 203  		Filters:      &filter.S{f},
 204  	}
 205  	if err := remote.WriteText(req.Marshal(nil)); err != nil {
 206  		fmt.Fprintf(os.Stderr, "sync: subscribe error: %v\n", err)
 207  		return
 208  	}
 209  
 210  	var forwarded int64
 211  	eosed := false
 212  
 213  	for {
 214  		op, payload, err := remote.ReadMessage()
 215  		if err != nil {
 216  			fmt.Fprintf(os.Stderr, "sync: read error (%d forwarded): %v\n", forwarded, err)
 217  			return
 218  		}
 219  		if op == ws.OpClose {
 220  			fmt.Fprintf(os.Stderr, "sync: remote closed (%d forwarded)\n", forwarded)
 221  			return
 222  		}
 223  		if op != ws.OpText {
 224  			continue
 225  		}
 226  
 227  		label, rem, _ := envelope.Identify(payload)
 228  		switch label {
 229  		case envelope.EventLabel:
 230  			var es envelope.EventSubmission
 231  			if _, err := es.Unmarshal(rem); err != nil {
 232  				continue
 233  			}
 234  			fwd := &envelope.EventSubmission{E: es.E}
 235  			if err := local.WriteText(fwd.Marshal(nil)); err != nil {
 236  				fmt.Fprintf(os.Stderr, "sync: local publish error: %v\n", err)
 237  				return
 238  			}
 239  			forwarded++
 240  			if forwarded%1000 == 0 {
 241  				fmt.Fprintf(os.Stderr, "sync: %d events forwarded\n", forwarded)
 242  			}
 243  		case envelope.EOSELabel:
 244  			if !eosed {
 245  				eosed = true
 246  				fmt.Fprintf(os.Stderr, "sync: EOSE — historical sync complete (%d forwarded). streaming live...\n", forwarded)
 247  			}
 248  		}
 249  	}
 250  }
 251  
 252  func hasPrefix(s, prefix string) bool {
 253  	return len(s) >= len(prefix) && s[:len(prefix)] == prefix
 254  }
 255  
 256  func hasSuffix(s, suffix string) bool {
 257  	return len(s) >= len(suffix) && s[len(s)-len(suffix):] == suffix
 258  }
 259  
 260  func spawnCrawler(listenAddr string) {
 261  	host := listenAddr
 262  	if hasPrefix(host, "0.0.0.0:") {
 263  		host = "127.0.0.1:" + host[len("0.0.0.0:"):]
 264  	}
 265  	cmd := os.Args[0] + " crawl ws://" + host
 266  	argv := []string{"/bin/sh", "-c", cmd}
 267  	attr := &os.ProcAttr{}
 268  	_, err := os.StartProcess("/bin/sh", argv, attr)
 269  	if err != nil {
 270  		fmt.Fprintf(os.Stderr, "crawl: spawn failed: %v\n", err)
 271  	}
 272  }
 273  
 274  // --- crawl command ---
 275  
 276  var crawlSeeds = []string{
 277  	"wss://relay.damus.io",
 278  	"wss://nos.lol",
 279  	"wss://relay.nostr.band",
 280  }
 281  
 282  // Directory event kinds to fetch.
 283  const crawlKindsFilter = `[0,3,5,1984,10000,10002,10050]`
 284  
 285  var crawlLog *os.File
 286  
 287  func clog(format string, args ...any) {
 288  	ts := time.Now().Format("15:04:05")
 289  	fmt.Fprintf(crawlLog, ts+" "+format+"\n", args...)
 290  }
 291  
 292  // relayDB tracks known relays with frequency scores.
 293  // Higher score = seen more often in kind 10002/10050 events = higher priority.
 294  type relayDB struct {
 295  	score map[string]int // URL → frequency count
 296  	order []string       // URLs sorted by descending score
 297  }
 298  
 299  func newRelayDB() *relayDB {
 300  	return &relayDB{score: map[string]int{}}
 301  }
 302  
 303  func (db *relayDB) add(url string, weight int) {
 304  	db.score[url] += weight
 305  }
 306  
 307  // sorted returns relay URLs ordered by descending frequency.
 308  func (db *relayDB) sorted() []string {
 309  	urls := []string{:0:len(db.score)}
 310  	for u := range db.score {
 311  		urls = append(urls, u)
 312  	}
 313  	// Simple insertion sort by score descending.
 314  	for i := 1; i < len(urls); i++ {
 315  		for j := i; j > 0 && db.score[urls[j]] > db.score[urls[j-1]]; j-- {
 316  			urls[j], urls[j-1] = urls[j-1], urls[j]
 317  		}
 318  	}
 319  	return urls
 320  }
 321  
 322  func runCrawl(args []string) {
 323  	var err error
 324  	crawlLog, err = os.OpenFile("/tmp/smesh-crawl.log",
 325  		os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
 326  	if err != nil {
 327  		crawlLog = os.Stderr
 328  	}
 329  
 330  	localURL := "ws://127.0.0.1:3334"
 331  	if len(args) >= 1 {
 332  		localURL = args[0]
 333  	}
 334  	clog("started pid=%d local=%s", os.Getpid(), localURL)
 335  
 336  	db := newRelayDB()
 337  	// Seed relays get a high initial score.
 338  	for _, s := range crawlSeeds {
 339  		db.add(s, 100)
 340  	}
 341  
 342  	pass := 0
 343  	for {
 344  		pass++
 345  		clog("=== pass %d, %d relays known ===", pass, len(db.score))
 346  		ok := crawlPass(localURL, db)
 347  		if ok {
 348  			clog("pass complete, sleeping 5m")
 349  			time.Sleep(5 * time.Minute)
 350  		} else {
 351  			clog("pass failed, retrying in 30s")
 352  			time.Sleep(30 * time.Second)
 353  		}
 354  	}
 355  }
 356  
 357  func crawlPass(localURL string, db *relayDB) bool {
 358  	relays := db.sorted()
 359  	if len(relays) == 0 {
 360  		clog("no relays known")
 361  		return false
 362  	}
 363  
 364  	totalEvents := 0
 365  	for i, relayURL := range relays {
 366  		clog("[%d/%d] crawling %s (score %d)", i+1, len(relays), relayURL, db.score[relayURL])
 367  
 368  		events := crawlRelay(relayURL)
 369  		if len(events) == 0 {
 370  			clog("  %s → 0 events", relayURL)
 371  			time.Sleep(1 * time.Second)
 372  			continue
 373  		}
 374  		clog("  %s → %d events", relayURL, len(events))
 375  
 376  		// Extract new relay URLs from the events before publishing.
 377  		for _, raw := range events {
 378  			crawlExtractRelays(raw, db)
 379  		}
 380  
 381  		// Publish batch to local relay.
 382  		published := crawlPublishBatch(localURL, events)
 383  		clog("  published %d/%d to local", published, len(events))
 384  		totalEvents += published
 385  
 386  		time.Sleep(1 * time.Second)
 387  	}
 388  
 389  	clog("total %d events from %d relays", totalEvents, len(relays))
 390  	return true
 391  }
 392  
 393  // crawlRelay connects to one relay and subscribes to directory events.
 394  // Returns raw EVENT messages suitable for republishing.
 395  func crawlRelay(relayURL string) [][]byte {
 396  	remote, err := ws.Dial(relayURL)
 397  	if err != nil {
 398  		clog("  dial %s FAILED: %v", relayURL, err)
 399  		return nil
 400  	}
 401  	defer remote.Close()
 402  
 403  	reqJSON := []byte(`["REQ","cr",{"kinds":` + crawlKindsFilter + `,"limit":200}]`)
 404  	if err := remote.WriteText(reqJSON); err != nil {
 405  		clog("  write REQ to %s failed: %v", relayURL, err)
 406  		return nil
 407  	}
 408  
 409  	var events [][]byte
 410  	for {
 411  		op, payload, err := remote.ReadMessage()
 412  		if err != nil {
 413  			break
 414  		}
 415  		if op != ws.OpText {
 416  			continue
 417  		}
 418  		label, rem, _ := envelope.Identify(payload)
 419  		if label == envelope.EOSELabel {
 420  			break
 421  		}
 422  		if label == envelope.EventLabel {
 423  			var er envelope.EventResult
 424  			if _, err := er.Unmarshal(rem); err == nil && er.Event != nil {
 425  				es := &envelope.EventSubmission{E: er.Event}
 426  				events = append(events, es.Marshal(nil))
 427  			}
 428  		}
 429  		_ = rem
 430  	}
 431  	return events
 432  }
 433  
 434  // crawlExtractRelays parses an EVENT submission and adds discovered relay
 435  // URLs to the database with a frequency bump.
 436  func crawlExtractRelays(raw []byte, db *relayDB) {
 437  	_, rem, err := envelope.Identify(raw)
 438  	if err != nil {
 439  		return
 440  	}
 441  	var es envelope.EventSubmission
 442  	if _, err := es.Unmarshal(rem); err != nil || es.E == nil {
 443  		return
 444  	}
 445  	ev := es.E
 446  	if (ev.Kind != 10002 && ev.Kind != 10050) || ev.Tags == nil {
 447  		return
 448  	}
 449  	for _, t := range ev.Tags.GetAll([]byte("r")) {
 450  		if t.Len() >= 2 {
 451  			url := string(t.Value())
 452  			if len(url) > 5 && (hasPrefix(url, "wss://") || hasPrefix(url, "ws://")) {
 453  				db.add(url, 1)
 454  			}
 455  		}
 456  	}
 457  }
 458  
 459  // crawlPublishBatch publishes a batch of EVENT messages to the local relay.
 460  func crawlPublishBatch(localURL string, events [][]byte) int {
 461  	local, err := ws.Dial(localURL)
 462  	if err != nil {
 463  		clog("  local connect failed: %v", err)
 464  		return 0
 465  	}
 466  	defer local.Close()
 467  
 468  	for _, evBytes := range events {
 469  		local.WriteText(evBytes)
 470  	}
 471  
 472  	// Drain OKs — one per event sent.
 473  	count := 0
 474  	for count < len(events) {
 475  		_, _, err := local.ReadMessage()
 476  		if err != nil {
 477  			break
 478  		}
 479  		count++
 480  	}
 481  	return count
 482  }
 483  
 484  func hexEnc(b []byte) string {
 485  	const hx = "0123456789abcdef"
 486  	out := []byte{:len(b)*2}
 487  	for i, v := range b {
 488  		out[i*2] = hx[v>>4]
 489  		out[i*2+1] = hx[v&0x0f]
 490  	}
 491  	return string(out)
 492  }
 493  
 494  func i64str(n int64) string {
 495  	if n == 0 {
 496  		return "0"
 497  	}
 498  	neg := false
 499  	if n < 0 {
 500  		neg = true
 501  		n = -n
 502  	}
 503  	var buf [20]byte
 504  	i := 19
 505  	for n > 0 {
 506  		buf[i] = byte('0' + n%10)
 507  		i--
 508  		n /= 10
 509  	}
 510  	if neg {
 511  		buf[i] = '-'
 512  		i--
 513  	}
 514  	return string(buf[i+1:])
 515  }
 516  
 517  func crawlAppendUniq(ss []string, s string) []string {
 518  	for _, x := range ss {
 519  		if x == s {
 520  			return ss
 521  		}
 522  	}
 523  	return append(ss, s)
 524  }
 525  
 526  func printHelp() {
 527  	fmt.Println(`smesh - Nostr relay
 528  
 529  Usage:
 530    smesh [command] [options]
 531  
 532  Commands:
 533    relay     Run the relay server (default)
 534    sync      Sync events from a remote relay
 535    crawl     Background metadata crawler
 536    version   Show version
 537    help      Show this help
 538  
 539  Environment:
 540    SMESH_DATA_DIR      Data directory (default: data)
 541    SMESH_LISTEN        Listen address (default: :3334)
 542    SMESH_BLOSSOM_DIR   Blossom file directory (default: data/blossom)
 543    SMESH_STATIC_DIR    Static web files directory (default: web/static)`)
 544  }
 545