main.mx raw
1 // Package sync is the standalone relay-to-relay sync client.
2 // Subscribes to a remote relay and forwards all events to a local relay.
3 package main
4
5 import (
6 "fmt"
7 "os"
8 "time"
9
10 "smesh.lol/pkg/nostr/envelope"
11 "smesh.lol/pkg/nostr/filter"
12 "smesh.lol/pkg/nostr/ws"
13 )
14
15 func main() {
16 args := os.Args[1:]
17 if len(args) < 1 {
18 fmt.Fprintln(os.Stderr, "usage: smesh-sync <remote-relay-url> [local-relay-url]")
19 os.Exit(1)
20 }
21 remoteURL := args[0]
22 localURL := "ws://127.0.0.1:3334"
23 if len(args) >= 2 {
24 localURL = args[1]
25 }
26
27 for {
28 syncOnce(remoteURL, localURL)
29 fmt.Fprintln(os.Stderr, "sync: disconnected, reconnecting in 30s...")
30 time.Sleep(30 * time.Second)
31 }
32 }
33
34 func syncOnce(remoteURL, localURL string) {
35 fmt.Fprintf(os.Stderr, "sync: connecting to remote %s\n", remoteURL)
36 remote, err := ws.Dial(remoteURL)
37 if err != nil {
38 fmt.Fprintf(os.Stderr, "sync: remote connect error: %v\n", err)
39 return
40 }
41 defer remote.Close()
42
43 fmt.Fprintf(os.Stderr, "sync: connecting to local %s\n", localURL)
44 local, err := ws.Dial(localURL)
45 if err != nil {
46 fmt.Fprintf(os.Stderr, "sync: local connect error: %v\n", err)
47 return
48 }
49 defer local.Close()
50
51 f := &filter.F{}
52 req := &envelope.Req{
53 Subscription: []byte("sync"),
54 Filters: &filter.S{f},
55 }
56 if err := remote.WriteText(req.Marshal(nil)); err != nil {
57 fmt.Fprintf(os.Stderr, "sync: subscribe error: %v\n", err)
58 return
59 }
60
61 var forwarded int64
62 eosed := false
63
64 for {
65 op, payload, err := remote.ReadMessage()
66 if err != nil {
67 fmt.Fprintf(os.Stderr, "sync: read error (%d forwarded): %v\n", forwarded, err)
68 return
69 }
70 if op == ws.OpClose {
71 fmt.Fprintf(os.Stderr, "sync: remote closed (%d forwarded)\n", forwarded)
72 return
73 }
74 if op != ws.OpText {
75 continue
76 }
77
78 label, rem, _ := envelope.Identify(payload)
79 switch label {
80 case envelope.EventLabel:
81 var es envelope.EventSubmission
82 if _, err := es.Unmarshal(rem); err != nil {
83 continue
84 }
85 fwd := &envelope.EventSubmission{E: es.E}
86 if err := local.WriteText(fwd.Marshal(nil)); err != nil {
87 fmt.Fprintf(os.Stderr, "sync: local publish error: %v\n", err)
88 return
89 }
90 if _, _, err := local.ReadMessage(); err != nil {
91 fmt.Fprintf(os.Stderr, "sync: local read error: %v\n", err)
92 return
93 }
94 forwarded++
95 if forwarded%1000 == 0 {
96 fmt.Fprintf(os.Stderr, "sync: %d events forwarded\n", forwarded)
97 }
98 case envelope.EOSELabel:
99 if !eosed {
100 eosed = true
101 fmt.Fprintf(os.Stderr, "sync: EOSE — historical sync complete (%d forwarded). streaming live...\n", forwarded)
102 }
103 }
104 }
105 }
106