main.mx raw

   1  // Package sync is the standalone relay-to-relay sync client.
   2  // Subscribes to a remote relay and forwards all events to a local relay.
   3  package main
   4  
   5  import (
   6  	"fmt"
   7  	"os"
   8  	"time"
   9  
  10  	"smesh.lol/pkg/nostr/envelope"
  11  	"smesh.lol/pkg/nostr/filter"
  12  	"smesh.lol/pkg/nostr/ws"
  13  )
  14  
  15  func main() {
  16  	args := os.Args[1:]
  17  	if len(args) < 1 {
  18  		fmt.Fprintln(os.Stderr, "usage: smesh-sync <remote-relay-url> [local-relay-url]")
  19  		os.Exit(1)
  20  	}
  21  	remoteURL := args[0]
  22  	localURL := "ws://127.0.0.1:3334"
  23  	if len(args) >= 2 {
  24  		localURL = args[1]
  25  	}
  26  
  27  	for {
  28  		syncOnce(remoteURL, localURL)
  29  		fmt.Fprintln(os.Stderr, "sync: disconnected, reconnecting in 30s...")
  30  		time.Sleep(30 * time.Second)
  31  	}
  32  }
  33  
  34  func syncOnce(remoteURL, localURL string) {
  35  	fmt.Fprintf(os.Stderr, "sync: connecting to remote %s\n", remoteURL)
  36  	remote, err := ws.Dial(remoteURL)
  37  	if err != nil {
  38  		fmt.Fprintf(os.Stderr, "sync: remote connect error: %v\n", err)
  39  		return
  40  	}
  41  	defer remote.Close()
  42  
  43  	fmt.Fprintf(os.Stderr, "sync: connecting to local %s\n", localURL)
  44  	local, err := ws.Dial(localURL)
  45  	if err != nil {
  46  		fmt.Fprintf(os.Stderr, "sync: local connect error: %v\n", err)
  47  		return
  48  	}
  49  	defer local.Close()
  50  
  51  	f := &filter.F{}
  52  	req := &envelope.Req{
  53  		Subscription: []byte("sync"),
  54  		Filters:      &filter.S{f},
  55  	}
  56  	if err := remote.WriteText(req.Marshal(nil)); err != nil {
  57  		fmt.Fprintf(os.Stderr, "sync: subscribe error: %v\n", err)
  58  		return
  59  	}
  60  
  61  	var forwarded int64
  62  	eosed := false
  63  
  64  	for {
  65  		op, payload, err := remote.ReadMessage()
  66  		if err != nil {
  67  			fmt.Fprintf(os.Stderr, "sync: read error (%d forwarded): %v\n", forwarded, err)
  68  			return
  69  		}
  70  		if op == ws.OpClose {
  71  			fmt.Fprintf(os.Stderr, "sync: remote closed (%d forwarded)\n", forwarded)
  72  			return
  73  		}
  74  		if op != ws.OpText {
  75  			continue
  76  		}
  77  
  78  		label, rem, _ := envelope.Identify(payload)
  79  		switch label {
  80  		case envelope.EventLabel:
  81  			var es envelope.EventSubmission
  82  			if _, err := es.Unmarshal(rem); err != nil {
  83  				continue
  84  			}
  85  			fwd := &envelope.EventSubmission{E: es.E}
  86  			if err := local.WriteText(fwd.Marshal(nil)); err != nil {
  87  				fmt.Fprintf(os.Stderr, "sync: local publish error: %v\n", err)
  88  				return
  89  			}
  90  			if _, _, err := local.ReadMessage(); err != nil {
  91  				fmt.Fprintf(os.Stderr, "sync: local read error: %v\n", err)
  92  				return
  93  			}
  94  			forwarded++
  95  			if forwarded%1000 == 0 {
  96  				fmt.Fprintf(os.Stderr, "sync: %d events forwarded\n", forwarded)
  97  			}
  98  		case envelope.EOSELabel:
  99  			if !eosed {
 100  				eosed = true
 101  				fmt.Fprintf(os.Stderr, "sync: EOSE — historical sync complete (%d forwarded). streaming live...\n", forwarded)
 102  			}
 103  		}
 104  	}
 105  }
 106