client.mx raw

   1  package ws
   2  
   3  import (
   4  	"fmt"
   5  	"sync"
   6  
   7  	"smesh.lol/pkg/nostr/envelope"
   8  	"smesh.lol/pkg/nostr/event"
   9  	"smesh.lol/pkg/nostr/filter"
  10  	"smesh.lol/pkg/nostr/text"
  11  )
  12  
  13  // Client is a Nostr relay client.
  14  type Client struct {
  15  	ws   *Conn
  16  	url  string
  17  	subs map[string]*Sub
  18  	mu   sync.Mutex
  19  	idN  int64
  20  	done chan struct{}
  21  	OKs  chan *envelope.OK // buffered channel for OK responses
  22  	Err  error             // set when the read loop exits
  23  }
  24  
  25  // Sub is an active subscription.
  26  type Sub struct {
  27  	ID     string
  28  	Events chan *event.E
  29  	EOSE   chan struct{}
  30  	eosed  bool
  31  }
  32  
  33  // Connect dials a Nostr relay. Call RunReadLoop() to start processing
  34  // incoming messages (blocks until the connection closes).
  35  func Connect(rawURL string) (*Client, error) {
  36  	ws, err := Dial(rawURL)
  37  	if err != nil {
  38  		return nil, err
  39  	}
  40  	c := &Client{
  41  		ws:   ws,
  42  		url:  rawURL,
  43  		subs: map[string]*Sub{},
  44  		done: chan struct{}{},
  45  		OKs:  chan *envelope.OK{64},
  46  	}
  47  	return c, nil
  48  }
  49  
  50  // RunReadLoop reads messages from the relay and dispatches them to
  51  // subscriptions. Blocks until the connection closes.
  52  func (c *Client) RunReadLoop() {
  53  	defer close(c.done)
  54  	for {
  55  		op, payload, err := c.ws.ReadMessage()
  56  		if err != nil {
  57  			c.Err = err
  58  			return
  59  		}
  60  		if op == OpClose {
  61  			return
  62  		}
  63  		if op != OpText {
  64  			continue
  65  		}
  66  		c.dispatch(payload)
  67  	}
  68  }
  69  
  70  // Subscribe sends a REQ and returns a subscription that delivers events.
  71  func (c *Client) Subscribe(filters ...*filter.F) (*Sub, error) {
  72  	c.mu.Lock()
  73  	c.idN++
  74  	id := string(append([]byte(nil), fmt.Sprintf("s%d", c.idN)...))
  75  	sub := &Sub{
  76  		ID:     id,
  77  		Events: chan *event.E{256},
  78  		EOSE:   chan struct{}{1},
  79  	}
  80  	c.subs[id] = sub
  81  	c.mu.Unlock()
  82  
  83  	fs := filter.S(filters)
  84  	req := &envelope.Req{
  85  		Subscription: []byte(id),
  86  		Filters:      &fs,
  87  	}
  88  	if err := c.ws.WriteText(req.Marshal(nil)); err != nil {
  89  		c.mu.Lock()
  90  		delete(c.subs, id)
  91  		c.mu.Unlock()
  92  		return nil, err
  93  	}
  94  	return sub, nil
  95  }
  96  
  97  // Publish sends an event to the relay.
  98  func (c *Client) Publish(ev *event.E) error {
  99  	es := &envelope.EventSubmission{E: ev}
 100  	return c.ws.WriteText(es.Marshal(nil))
 101  }
 102  
 103  // Unsubscribe sends CLOSE and removes the subscription.
 104  func (c *Client) Unsubscribe(sub *Sub) error {
 105  	cl := &envelope.Close{ID: []byte(sub.ID)}
 106  	if err := c.ws.WriteText(cl.Marshal(nil)); err != nil {
 107  		return err
 108  	}
 109  	c.mu.Lock()
 110  	delete(c.subs, sub.ID)
 111  	c.mu.Unlock()
 112  	return nil
 113  }
 114  
 115  // Close shuts down the client.
 116  func (c *Client) Close() error {
 117  	c.mu.Lock()
 118  	for _, sub := range c.subs {
 119  		close(sub.Events)
 120  	}
 121  	c.subs = nil
 122  	c.mu.Unlock()
 123  	return c.ws.Close()
 124  }
 125  
 126  // Done returns a channel closed when the read loop exits.
 127  func (c *Client) Done() <-chan struct{} { return c.done }
 128  
 129  func (c *Client) dispatch(msg []byte) {
 130  	label, rem, err := envelope.Identify(msg)
 131  	if err != nil {
 132  		return
 133  	}
 134  	switch label {
 135  	case envelope.EventLabel:
 136  		c.handleEvent(rem)
 137  	case envelope.EOSELabel:
 138  		c.handleEOSE(rem)
 139  	case envelope.OKLabel:
 140  		c.handleOK(rem)
 141  	case envelope.ClosedLabel:
 142  		c.handleClosed(rem)
 143  	}
 144  }
 145  
 146  func (c *Client) handleEvent(rem []byte) {
 147  	subID, rem, err := text.UnmarshalQuoted(rem)
 148  	if err != nil {
 149  		return
 150  	}
 151  	// skip comma
 152  	for len(rem) > 0 && (rem[0] == ',' || rem[0] == ' ') {
 153  		rem = rem[1:]
 154  	}
 155  	ev := event.New()
 156  	if _, err = ev.Unmarshal(rem); err != nil {
 157  		return
 158  	}
 159  	c.mu.Lock()
 160  	sub := c.subs[string(subID)]
 161  	c.mu.Unlock()
 162  	if sub != nil {
 163  		select {
 164  		case sub.Events <- ev:
 165  		default:
 166  		}
 167  	}
 168  }
 169  
 170  func (c *Client) handleEOSE(rem []byte) {
 171  	subID, _, err := text.UnmarshalQuoted(rem)
 172  	if err != nil {
 173  		return
 174  	}
 175  	c.mu.Lock()
 176  	sub := c.subs[string(subID)]
 177  	c.mu.Unlock()
 178  	if sub != nil && !sub.eosed {
 179  		sub.eosed = true
 180  		select {
 181  		case sub.EOSE <- struct{}{}:
 182  		default:
 183  		}
 184  	}
 185  }
 186  
 187  func (c *Client) handleOK(rem []byte) {
 188  	var ok envelope.OK
 189  	if _, err := ok.Unmarshal(rem); err != nil {
 190  		return
 191  	}
 192  	select {
 193  	case c.OKs <- &ok:
 194  	default:
 195  	}
 196  }
 197  
 198  func (c *Client) handleClosed(rem []byte) {
 199  	subID, _, err := text.UnmarshalQuoted(rem)
 200  	if err != nil {
 201  		return
 202  	}
 203  	c.mu.Lock()
 204  	sub := c.subs[string(subID)]
 205  	if sub != nil {
 206  		delete(c.subs, string(subID))
 207  	}
 208  	c.mu.Unlock()
 209  	if sub != nil {
 210  		close(sub.Events)
 211  	}
 212  }
 213