relay.go raw

   1  package bridge
   2  
   3  import (
   4  	"context"
   5  	"fmt"
   6  	"strings"
   7  	"sync"
   8  	"time"
   9  
  10  	"next.orly.dev/pkg/nostr/encoders/event"
  11  	"next.orly.dev/pkg/nostr/encoders/filter"
  12  	"next.orly.dev/pkg/nostr/interfaces/signer"
  13  	"next.orly.dev/pkg/nostr/ws"
  14  	"next.orly.dev/pkg/lol/log"
  15  )
  16  
  17  // RelayConn wraps a WebSocket relay connection with auto-reconnect.
  18  // It satisfies the RelayConnection interface used by the bridge's Marmot
  19  // client for standalone mode (connecting to an external relay).
  20  type RelayConn struct {
  21  	url    string
  22  	sign   signer.I
  23  	conn   *ws.Client
  24  	mu     sync.RWMutex
  25  	ctx    context.Context
  26  	cancel context.CancelFunc
  27  	authed bool
  28  }
  29  
  30  // NewRelayConn creates a new relay connection wrapper.
  31  // The signer is used for NIP-42 authentication when the relay requires it.
  32  func NewRelayConn(url string, sign signer.I) *RelayConn {
  33  	return &RelayConn{url: url, sign: sign}
  34  }
  35  
  36  // Connect establishes the WebSocket connection to the relay and
  37  // pre-authenticates via NIP-42 so that subscriptions have proper access.
  38  // In monolithic mode, the relay may not be listening yet, so Connect
  39  // retries with exponential backoff for up to 30 seconds.
  40  func (rc *RelayConn) Connect(ctx context.Context) error {
  41  	rc.ctx, rc.cancel = context.WithCancel(ctx)
  42  
  43  	delay := time.Second
  44  	maxDelay := 10 * time.Second
  45  	timeout := 5 * time.Minute
  46  	deadline := time.Now().Add(timeout)
  47  
  48  	var conn *ws.Client
  49  	var err error
  50  
  51  	for {
  52  		conn, err = ws.RelayConnect(rc.ctx, rc.url)
  53  		if err == nil {
  54  			break
  55  		}
  56  
  57  		if time.Now().After(deadline) {
  58  			return fmt.Errorf("connect to relay %s after %v: %w", rc.url, timeout, err)
  59  		}
  60  
  61  		log.D.F("bridge waiting for relay %s: %v (retrying in %v)", rc.url, err, delay)
  62  
  63  		select {
  64  		case <-time.After(delay):
  65  			if delay < maxDelay {
  66  				delay *= 2
  67  			}
  68  		case <-rc.ctx.Done():
  69  			return fmt.Errorf("connect to relay %s: %w", rc.url, rc.ctx.Err())
  70  		}
  71  	}
  72  
  73  	rc.mu.Lock()
  74  	rc.conn = conn
  75  	rc.authed = false
  76  	rc.mu.Unlock()
  77  
  78  	log.I.F("bridge connected to relay: %s", rc.url)
  79  
  80  	// Pre-authenticate so subscriptions get proper access level
  81  	if rc.sign != nil {
  82  		if err := rc.preAuth(conn); err != nil {
  83  			log.W.F("bridge pre-auth failed: %v (will retry on publish)", err)
  84  		}
  85  	}
  86  
  87  	return nil
  88  }
  89  
  90  // preAuth waits briefly for the relay's AUTH challenge, then authenticates.
  91  func (rc *RelayConn) preAuth(conn *ws.Client) error {
  92  	// Give the relay time to send the AUTH challenge
  93  	time.Sleep(200 * time.Millisecond)
  94  
  95  	if err := conn.Auth(rc.ctx, rc.sign); err != nil {
  96  		return fmt.Errorf("auth: %w", err)
  97  	}
  98  
  99  	rc.mu.Lock()
 100  	rc.authed = true
 101  	rc.mu.Unlock()
 102  
 103  	log.I.F("bridge pre-authenticated with relay")
 104  	return nil
 105  }
 106  
 107  // Reconnect attempts to reconnect with exponential backoff.
 108  func (rc *RelayConn) Reconnect() error {
 109  	delay := time.Second
 110  	maxDelay := 30 * time.Second
 111  
 112  	for {
 113  		select {
 114  		case <-rc.ctx.Done():
 115  			return rc.ctx.Err()
 116  		default:
 117  		}
 118  
 119  		conn, err := ws.RelayConnect(rc.ctx, rc.url)
 120  		if err == nil {
 121  			rc.mu.Lock()
 122  			rc.conn = conn
 123  			rc.authed = false
 124  			rc.mu.Unlock()
 125  			log.I.F("bridge reconnected to relay: %s", rc.url)
 126  
 127  			// Pre-authenticate after reconnect
 128  			if rc.sign != nil {
 129  				if err := rc.preAuth(conn); err != nil {
 130  					log.W.F("bridge pre-auth after reconnect failed: %v", err)
 131  				}
 132  			}
 133  
 134  			return nil
 135  		}
 136  
 137  		log.W.F("bridge relay reconnect failed: %v, retrying in %v", err, delay)
 138  		select {
 139  		case <-time.After(delay):
 140  			if delay < maxDelay {
 141  				delay *= 2
 142  			}
 143  		case <-rc.ctx.Done():
 144  			return rc.ctx.Err()
 145  		}
 146  	}
 147  }
 148  
 149  // Publish sends an event to the relay. If the relay responds with
 150  // auth-required, the bridge authenticates via NIP-42 and retries once.
 151  func (rc *RelayConn) Publish(ctx context.Context, ev *event.E) error {
 152  	rc.mu.RLock()
 153  	conn := rc.conn
 154  	rc.mu.RUnlock()
 155  
 156  	if conn == nil {
 157  		return fmt.Errorf("not connected to relay")
 158  	}
 159  
 160  	err := conn.Publish(ctx, ev)
 161  	if err == nil {
 162  		return nil
 163  	}
 164  
 165  	// Check if the error is auth-required
 166  	if !strings.Contains(err.Error(), "auth-required") {
 167  		return err
 168  	}
 169  
 170  	// Authenticate and retry
 171  	if rc.sign == nil {
 172  		return fmt.Errorf("auth required but no signer configured")
 173  	}
 174  
 175  	log.D.F("relay requires auth, authenticating...")
 176  
 177  	// Give the relay a moment to send the challenge
 178  	time.Sleep(100 * time.Millisecond)
 179  
 180  	if authErr := conn.Auth(ctx, rc.sign); authErr != nil {
 181  		return fmt.Errorf("auth failed: %w", authErr)
 182  	}
 183  
 184  	rc.mu.Lock()
 185  	rc.authed = true
 186  	rc.mu.Unlock()
 187  
 188  	log.I.F("bridge authenticated with relay")
 189  
 190  	// Retry the publish
 191  	return conn.Publish(ctx, ev)
 192  }
 193  
 194  // Subscribe creates a subscription on the relay and returns a stream of events.
 195  func (rc *RelayConn) Subscribe(ctx context.Context, ff *filter.S) (*WsEventStream, error) {
 196  	rc.mu.RLock()
 197  	conn := rc.conn
 198  	rc.mu.RUnlock()
 199  
 200  	if conn == nil {
 201  		return nil, fmt.Errorf("not connected to relay")
 202  	}
 203  
 204  	sub, err := conn.Subscribe(ctx, ff)
 205  	if err != nil {
 206  		return nil, err
 207  	}
 208  
 209  	return &WsEventStream{sub: sub}, nil
 210  }
 211  
 212  // Close closes the relay connection.
 213  func (rc *RelayConn) Close() {
 214  	if rc.cancel != nil {
 215  		rc.cancel()
 216  	}
 217  	rc.mu.Lock()
 218  	if rc.conn != nil {
 219  		rc.conn.Close()
 220  		rc.conn = nil
 221  	}
 222  	rc.mu.Unlock()
 223  }
 224  
 225  // WsEventStream wraps a ws.Subscription to deliver events.
 226  type WsEventStream struct {
 227  	sub *ws.Subscription
 228  }
 229  
 230  // Events returns the channel of events.
 231  func (s *WsEventStream) Events() <-chan *event.E {
 232  	return s.sub.Events
 233  }
 234  
 235  // Close unsubscribes from the relay.
 236  func (s *WsEventStream) Close() {
 237  	s.sub.Unsub()
 238  }
 239