connection.go raw

   1  package archive
   2  
   3  import (
   4  	"context"
   5  	"sync"
   6  	"time"
   7  
   8  	"next.orly.dev/pkg/nostr/encoders/event"
   9  	"next.orly.dev/pkg/nostr/encoders/filter"
  10  	"next.orly.dev/pkg/nostr/ws"
  11  	"next.orly.dev/pkg/lol/log"
  12  )
  13  
  14  // RelayConnection manages a single archive relay connection.
  15  type RelayConnection struct {
  16  	url    string
  17  	client *ws.Client
  18  	ctx    context.Context
  19  	cancel context.CancelFunc
  20  
  21  	// Connection state
  22  	mu             sync.RWMutex
  23  	lastConnect    time.Time
  24  	reconnectDelay time.Duration
  25  	connected      bool
  26  }
  27  
  28  const (
  29  	// Initial delay between reconnection attempts
  30  	initialReconnectDelay = 5 * time.Second
  31  	// Maximum delay between reconnection attempts
  32  	maxReconnectDelay = 5 * time.Minute
  33  	// Connection timeout
  34  	connectTimeout = 10 * time.Second
  35  	// Query timeout (per query, not global)
  36  	queryTimeout = 30 * time.Second
  37  )
  38  
  39  // NewRelayConnection creates a new relay connection.
  40  func NewRelayConnection(parentCtx context.Context, url string) *RelayConnection {
  41  	ctx, cancel := context.WithCancel(parentCtx)
  42  	return &RelayConnection{
  43  		url:            url,
  44  		ctx:            ctx,
  45  		cancel:         cancel,
  46  		reconnectDelay: initialReconnectDelay,
  47  	}
  48  }
  49  
  50  // Connect establishes a connection to the archive relay.
  51  func (rc *RelayConnection) Connect() error {
  52  	rc.mu.Lock()
  53  	defer rc.mu.Unlock()
  54  
  55  	if rc.connected && rc.client != nil {
  56  		return nil
  57  	}
  58  
  59  	connectCtx, cancel := context.WithTimeout(rc.ctx, connectTimeout)
  60  	defer cancel()
  61  
  62  	client, err := ws.RelayConnect(connectCtx, rc.url)
  63  	if err != nil {
  64  		rc.reconnectDelay = min(rc.reconnectDelay*2, maxReconnectDelay)
  65  		return err
  66  	}
  67  
  68  	rc.client = client
  69  	rc.connected = true
  70  	rc.lastConnect = time.Now()
  71  	rc.reconnectDelay = initialReconnectDelay
  72  
  73  	log.D.F("archive: connected to %s", rc.url)
  74  
  75  	return nil
  76  }
  77  
  78  // Query executes a query against the archive relay.
  79  // Returns a slice of events matching the filter.
  80  func (rc *RelayConnection) Query(ctx context.Context, f *filter.F) ([]*event.E, error) {
  81  	rc.mu.RLock()
  82  	client := rc.client
  83  	connected := rc.connected
  84  	rc.mu.RUnlock()
  85  
  86  	if !connected || client == nil {
  87  		if err := rc.Connect(); err != nil {
  88  			return nil, err
  89  		}
  90  		rc.mu.RLock()
  91  		client = rc.client
  92  		rc.mu.RUnlock()
  93  	}
  94  
  95  	// Create query context with timeout
  96  	queryCtx, cancel := context.WithTimeout(ctx, queryTimeout)
  97  	defer cancel()
  98  
  99  	// Subscribe to the filter
 100  	sub, err := client.Subscribe(queryCtx, filter.NewS(f))
 101  	if err != nil {
 102  		rc.handleDisconnection()
 103  		return nil, err
 104  	}
 105  	defer sub.Unsub()
 106  
 107  	// Collect events until EOSE or timeout
 108  	var events []*event.E
 109  
 110  	for {
 111  		select {
 112  		case <-queryCtx.Done():
 113  			return events, nil
 114  		case <-sub.EndOfStoredEvents:
 115  			return events, nil
 116  		case ev := <-sub.Events:
 117  			if ev == nil {
 118  				return events, nil
 119  			}
 120  			events = append(events, ev)
 121  		}
 122  	}
 123  }
 124  
 125  // handleDisconnection marks the connection as disconnected.
 126  func (rc *RelayConnection) handleDisconnection() {
 127  	rc.mu.Lock()
 128  	defer rc.mu.Unlock()
 129  
 130  	rc.connected = false
 131  	if rc.client != nil {
 132  		rc.client.Close()
 133  		rc.client = nil
 134  	}
 135  }
 136  
 137  // IsConnected returns whether the relay is currently connected.
 138  func (rc *RelayConnection) IsConnected() bool {
 139  	rc.mu.RLock()
 140  	defer rc.mu.RUnlock()
 141  
 142  	if !rc.connected || rc.client == nil {
 143  		return false
 144  	}
 145  
 146  	// Check if client is still connected
 147  	return rc.client.IsConnected()
 148  }
 149  
 150  // Close closes the relay connection.
 151  func (rc *RelayConnection) Close() {
 152  	rc.cancel()
 153  
 154  	rc.mu.Lock()
 155  	defer rc.mu.Unlock()
 156  
 157  	rc.connected = false
 158  	if rc.client != nil {
 159  		rc.client.Close()
 160  		rc.client = nil
 161  	}
 162  }
 163  
 164  // URL returns the relay URL.
 165  func (rc *RelayConnection) URL() string {
 166  	return rc.url
 167  }
 168  
 169  // min returns the smaller of two durations.
 170  func min(a, b time.Duration) time.Duration {
 171  	if a < b {
 172  		return a
 173  	}
 174  	return b
 175  }
 176