main.mx raw

   1  // Package crawl is the standalone multi-relay crawler.
   2  // Subscribes to directory events from known relays, discovers new relays,
   3  // and publishes collected events to a local relay.
   4  package main
   5  
   6  import (
   7  	"fmt"
   8  	"os"
   9  	"time"
  10  
  11  	"smesh.lol/pkg/nostr/envelope"
  12  	"smesh.lol/pkg/nostr/ws"
  13  )
  14  
  15  var crawlSeeds = []string{
  16  	"wss://relay.damus.io",
  17  	"wss://nos.lol",
  18  }
  19  
  20  const crawlKindsFilter = `[0,3,5,1984,10000,10002,10050]`
  21  
  22  var crawlLog *os.File
  23  
  24  func clog(format string, args ...any) {
  25  	ts := time.Now().Format("15:04:05")
  26  	fmt.Fprintf(crawlLog, ts+" "+format+"\n", args...)
  27  }
  28  
  29  type relayDB struct {
  30  	score map[string]int
  31  	order []string
  32  }
  33  
  34  func newRelayDB() *relayDB {
  35  	return &relayDB{score: map[string]int{}}
  36  }
  37  
  38  func (db *relayDB) add(url string, weight int) {
  39  	db.score[url] += weight
  40  }
  41  
  42  func (db *relayDB) sorted() []string {
  43  	urls := []string{:0:len(db.score)}
  44  	for u := range db.score {
  45  		urls = append(urls, u)
  46  	}
  47  	for i := 1; i < len(urls); i++ {
  48  		for j := i; j > 0 && db.score[urls[j]] > db.score[urls[j-1]]; j-- {
  49  			urls[j], urls[j-1] = urls[j-1], urls[j]
  50  		}
  51  	}
  52  	return urls
  53  }
  54  
  55  func main() {
  56  	var err error
  57  	crawlLog, err = os.OpenFile("/tmp/smesh-crawl.log",
  58  		os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
  59  	if err != nil {
  60  		crawlLog = os.Stderr
  61  	}
  62  
  63  	localURL := "ws://127.0.0.1:3334"
  64  	args := os.Args[1:]
  65  	if len(args) >= 1 {
  66  		localURL = args[0]
  67  	}
  68  	clog("started pid=%d local=%s", os.Getpid(), localURL)
  69  
  70  	db := newRelayDB()
  71  	for _, s := range crawlSeeds {
  72  		db.add(s, 100)
  73  	}
  74  
  75  	pass := 0
  76  	for {
  77  		pass++
  78  		clog("=== pass %d, %d relays known ===", pass, len(db.score))
  79  		ok := crawlPass(localURL, db)
  80  		if ok {
  81  			clog("pass complete, sleeping 5m")
  82  			time.Sleep(5 * time.Minute)
  83  		} else {
  84  			clog("pass failed, retrying in 30s")
  85  			time.Sleep(30 * time.Second)
  86  		}
  87  	}
  88  }
  89  
  90  func crawlPass(localURL string, db *relayDB) bool {
  91  	relays := db.sorted()
  92  	if len(relays) == 0 {
  93  		clog("no relays known")
  94  		return false
  95  	}
  96  	totalEvents := 0
  97  	for i, relayURL := range relays {
  98  		clog("[%d/%d] crawling %s (score %d)", i+1, len(relays), relayURL, db.score[relayURL])
  99  		events := crawlRelay(relayURL)
 100  		if len(events) == 0 {
 101  			clog("  %s → 0 events", relayURL)
 102  			time.Sleep(1 * time.Second)
 103  			continue
 104  		}
 105  		clog("  %s → %d events", relayURL, len(events))
 106  		for _, raw := range events {
 107  			crawlExtractRelays(raw, db)
 108  		}
 109  		published := crawlPublishBatch(localURL, events)
 110  		clog("  published %d/%d to local", published, len(events))
 111  		totalEvents += published
 112  		time.Sleep(1 * time.Second)
 113  	}
 114  	clog("total %d events from %d relays", totalEvents, len(relays))
 115  	return true
 116  }
 117  
 118  func crawlRelay(relayURL string) [][]byte {
 119  	remote, err := ws.Dial(relayURL)
 120  	if err != nil {
 121  		clog("  dial %s FAILED: %v", relayURL, err)
 122  		return nil
 123  	}
 124  	defer remote.Close()
 125  
 126  	reqJSON := []byte(`["REQ","cr",{"kinds":` | crawlKindsFilter | `,"limit":200}]`)
 127  	if err := remote.WriteText(reqJSON); err != nil {
 128  		clog("  write REQ to %s failed: %v", relayURL, err)
 129  		return nil
 130  	}
 131  
 132  	var events [][]byte
 133  	for {
 134  		op, payload, err := remote.ReadMessage()
 135  		if err != nil {
 136  			break
 137  		}
 138  		if op != ws.OpText {
 139  			continue
 140  		}
 141  		label, rem, _ := envelope.Identify(payload)
 142  		if label == envelope.EOSELabel {
 143  			break
 144  		}
 145  		if label == envelope.EventLabel {
 146  			var er envelope.EventResult
 147  			if _, err := er.Unmarshal(rem); err == nil && er.Event != nil {
 148  				es := &envelope.EventSubmission{E: er.Event}
 149  				events = append(events, es.Marshal(nil))
 150  			}
 151  		}
 152  	}
 153  	return events
 154  }
 155  
 156  func crawlExtractRelays(raw []byte, db *relayDB) {
 157  	_, rem, err := envelope.Identify(raw)
 158  	if err != nil {
 159  		return
 160  	}
 161  	var es envelope.EventSubmission
 162  	if _, err := es.Unmarshal(rem); err != nil || es.E == nil {
 163  		return
 164  	}
 165  	ev := es.E
 166  	if (ev.Kind != 10002 && ev.Kind != 10050) || ev.Tags == nil {
 167  		return
 168  	}
 169  	for _, t := range ev.Tags.GetAll([]byte("r")) {
 170  		if t.Len() >= 2 {
 171  			url := string(t.Value())
 172  			if len(url) > 5 && (hasPrefix(url, "wss://") || hasPrefix(url, "ws://")) {
 173  				db.add(url, 1)
 174  			}
 175  		}
 176  	}
 177  }
 178  
 179  func crawlPublishBatch(localURL string, events [][]byte) int {
 180  	local, err := ws.Dial(localURL)
 181  	if err != nil {
 182  		clog("  local connect failed: %v", err)
 183  		return 0
 184  	}
 185  	defer local.Close()
 186  	for _, evBytes := range events {
 187  		local.WriteText(evBytes)
 188  	}
 189  	count := 0
 190  	for count < len(events) {
 191  		_, _, err := local.ReadMessage()
 192  		if err != nil {
 193  			break
 194  		}
 195  		count++
 196  	}
 197  	return count
 198  }
 199  
 200  func hasPrefix(s, prefix string) bool {
 201  	return len(s) >= len(prefix) && s[:len(prefix)] == prefix
 202  }
 203