main.mx raw
1 // Package crawl is the standalone multi-relay crawler.
2 // Subscribes to directory events from known relays, discovers new relays,
3 // and publishes collected events to a local relay.
4 package main
5
6 import (
7 "fmt"
8 "os"
9 "time"
10
11 "smesh.lol/pkg/nostr/envelope"
12 "smesh.lol/pkg/nostr/ws"
13 )
14
15 var crawlSeeds = []string{
16 "wss://relay.damus.io",
17 "wss://nos.lol",
18 }
19
20 const crawlKindsFilter = `[0,3,5,1984,10000,10002,10050]`
21
22 var crawlLog *os.File
23
24 func clog(format string, args ...any) {
25 ts := time.Now().Format("15:04:05")
26 fmt.Fprintf(crawlLog, ts+" "+format+"\n", args...)
27 }
28
29 type relayDB struct {
30 score map[string]int
31 order []string
32 }
33
34 func newRelayDB() *relayDB {
35 return &relayDB{score: map[string]int{}}
36 }
37
38 func (db *relayDB) add(url string, weight int) {
39 db.score[url] += weight
40 }
41
42 func (db *relayDB) sorted() []string {
43 urls := []string{:0:len(db.score)}
44 for u := range db.score {
45 urls = append(urls, u)
46 }
47 for i := 1; i < len(urls); i++ {
48 for j := i; j > 0 && db.score[urls[j]] > db.score[urls[j-1]]; j-- {
49 urls[j], urls[j-1] = urls[j-1], urls[j]
50 }
51 }
52 return urls
53 }
54
55 func main() {
56 var err error
57 crawlLog, err = os.OpenFile("/tmp/smesh-crawl.log",
58 os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
59 if err != nil {
60 crawlLog = os.Stderr
61 }
62
63 localURL := "ws://127.0.0.1:3334"
64 args := os.Args[1:]
65 if len(args) >= 1 {
66 localURL = args[0]
67 }
68 clog("started pid=%d local=%s", os.Getpid(), localURL)
69
70 db := newRelayDB()
71 for _, s := range crawlSeeds {
72 db.add(s, 100)
73 }
74
75 pass := 0
76 for {
77 pass++
78 clog("=== pass %d, %d relays known ===", pass, len(db.score))
79 ok := crawlPass(localURL, db)
80 if ok {
81 clog("pass complete, sleeping 5m")
82 time.Sleep(5 * time.Minute)
83 } else {
84 clog("pass failed, retrying in 30s")
85 time.Sleep(30 * time.Second)
86 }
87 }
88 }
89
90 func crawlPass(localURL string, db *relayDB) bool {
91 relays := db.sorted()
92 if len(relays) == 0 {
93 clog("no relays known")
94 return false
95 }
96 totalEvents := 0
97 for i, relayURL := range relays {
98 clog("[%d/%d] crawling %s (score %d)", i+1, len(relays), relayURL, db.score[relayURL])
99 events := crawlRelay(relayURL)
100 if len(events) == 0 {
101 clog(" %s → 0 events", relayURL)
102 time.Sleep(1 * time.Second)
103 continue
104 }
105 clog(" %s → %d events", relayURL, len(events))
106 for _, raw := range events {
107 crawlExtractRelays(raw, db)
108 }
109 published := crawlPublishBatch(localURL, events)
110 clog(" published %d/%d to local", published, len(events))
111 totalEvents += published
112 time.Sleep(1 * time.Second)
113 }
114 clog("total %d events from %d relays", totalEvents, len(relays))
115 return true
116 }
117
118 func crawlRelay(relayURL string) [][]byte {
119 remote, err := ws.Dial(relayURL)
120 if err != nil {
121 clog(" dial %s FAILED: %v", relayURL, err)
122 return nil
123 }
124 defer remote.Close()
125
126 reqJSON := []byte(`["REQ","cr",{"kinds":` | crawlKindsFilter | `,"limit":200}]`)
127 if err := remote.WriteText(reqJSON); err != nil {
128 clog(" write REQ to %s failed: %v", relayURL, err)
129 return nil
130 }
131
132 var events [][]byte
133 for {
134 op, payload, err := remote.ReadMessage()
135 if err != nil {
136 break
137 }
138 if op != ws.OpText {
139 continue
140 }
141 label, rem, _ := envelope.Identify(payload)
142 if label == envelope.EOSELabel {
143 break
144 }
145 if label == envelope.EventLabel {
146 var er envelope.EventResult
147 if _, err := er.Unmarshal(rem); err == nil && er.Event != nil {
148 es := &envelope.EventSubmission{E: er.Event}
149 events = append(events, es.Marshal(nil))
150 }
151 }
152 }
153 return events
154 }
155
156 func crawlExtractRelays(raw []byte, db *relayDB) {
157 _, rem, err := envelope.Identify(raw)
158 if err != nil {
159 return
160 }
161 var es envelope.EventSubmission
162 if _, err := es.Unmarshal(rem); err != nil || es.E == nil {
163 return
164 }
165 ev := es.E
166 if (ev.Kind != 10002 && ev.Kind != 10050) || ev.Tags == nil {
167 return
168 }
169 for _, t := range ev.Tags.GetAll([]byte("r")) {
170 if t.Len() >= 2 {
171 url := string(t.Value())
172 if len(url) > 5 && (hasPrefix(url, "wss://") || hasPrefix(url, "ws://")) {
173 db.add(url, 1)
174 }
175 }
176 }
177 }
178
179 func crawlPublishBatch(localURL string, events [][]byte) int {
180 local, err := ws.Dial(localURL)
181 if err != nil {
182 clog(" local connect failed: %v", err)
183 return 0
184 }
185 defer local.Close()
186 for _, evBytes := range events {
187 local.WriteText(evBytes)
188 }
189 count := 0
190 for count < len(events) {
191 _, _, err := local.ReadMessage()
192 if err != nil {
193 break
194 }
195 count++
196 }
197 return count
198 }
199
200 func hasPrefix(s, prefix string) bool {
201 return len(s) >= len(prefix) && s[:len(prefix)] == prefix
202 }
203