package ws import ( "fmt" "sync" "smesh.lol/pkg/nostr/envelope" "smesh.lol/pkg/nostr/event" "smesh.lol/pkg/nostr/filter" "smesh.lol/pkg/nostr/text" ) // Client is a Nostr relay client. type Client struct { ws *Conn url string subs map[string]*Sub mu sync.Mutex idN int64 done chan struct{} OKs chan *envelope.OK // buffered channel for OK responses Err error // set when the read loop exits } // Sub is an active subscription. type Sub struct { ID string Events chan *event.E EOSE chan struct{} eosed bool } // Connect dials a Nostr relay. Call RunReadLoop() to start processing // incoming messages (blocks until the connection closes). func Connect(rawURL string) (*Client, error) { ws, err := Dial(rawURL) if err != nil { return nil, err } c := &Client{ ws: ws, url: rawURL, subs: map[string]*Sub{}, done: chan struct{}{}, OKs: chan *envelope.OK{64}, } return c, nil } // RunReadLoop reads messages from the relay and dispatches them to // subscriptions. Blocks until the connection closes. func (c *Client) RunReadLoop() { defer close(c.done) for { op, payload, err := c.ws.ReadMessage() if err != nil { c.Err = err return } if op == OpClose { return } if op != OpText { continue } c.dispatch(payload) } } // Subscribe sends a REQ and returns a subscription that delivers events. func (c *Client) Subscribe(filters ...*filter.F) (*Sub, error) { c.mu.Lock() c.idN++ id := string(append([]byte(nil), fmt.Sprintf("s%d", c.idN)...)) sub := &Sub{ ID: id, Events: chan *event.E{256}, EOSE: chan struct{}{1}, } c.subs[id] = sub c.mu.Unlock() fs := filter.S(filters) req := &envelope.Req{ Subscription: []byte(id), Filters: &fs, } if err := c.ws.WriteText(req.Marshal(nil)); err != nil { c.mu.Lock() delete(c.subs, id) c.mu.Unlock() return nil, err } return sub, nil } // Publish sends an event to the relay. func (c *Client) Publish(ev *event.E) error { es := &envelope.EventSubmission{E: ev} return c.ws.WriteText(es.Marshal(nil)) } // Unsubscribe sends CLOSE and removes the subscription. func (c *Client) Unsubscribe(sub *Sub) error { cl := &envelope.Close{ID: []byte(sub.ID)} if err := c.ws.WriteText(cl.Marshal(nil)); err != nil { return err } c.mu.Lock() delete(c.subs, sub.ID) c.mu.Unlock() return nil } // Close shuts down the client. func (c *Client) Close() error { c.mu.Lock() for _, sub := range c.subs { close(sub.Events) } c.subs = nil c.mu.Unlock() return c.ws.Close() } // Done returns a channel closed when the read loop exits. func (c *Client) Done() <-chan struct{} { return c.done } func (c *Client) dispatch(msg []byte) { label, rem, err := envelope.Identify(msg) if err != nil { return } switch label { case envelope.EventLabel: c.handleEvent(rem) case envelope.EOSELabel: c.handleEOSE(rem) case envelope.OKLabel: c.handleOK(rem) case envelope.ClosedLabel: c.handleClosed(rem) } } func (c *Client) handleEvent(rem []byte) { subID, rem, err := text.UnmarshalQuoted(rem) if err != nil { return } // skip comma for len(rem) > 0 && (rem[0] == ',' || rem[0] == ' ') { rem = rem[1:] } ev := event.New() if _, err = ev.Unmarshal(rem); err != nil { return } c.mu.Lock() sub := c.subs[string(subID)] c.mu.Unlock() if sub != nil { select { case sub.Events <- ev: default: } } } func (c *Client) handleEOSE(rem []byte) { subID, _, err := text.UnmarshalQuoted(rem) if err != nil { return } c.mu.Lock() sub := c.subs[string(subID)] c.mu.Unlock() if sub != nil && !sub.eosed { sub.eosed = true select { case sub.EOSE <- struct{}{}: default: } } } func (c *Client) handleOK(rem []byte) { var ok envelope.OK if _, err := ok.Unmarshal(rem); err != nil { return } select { case c.OKs <- &ok: default: } } func (c *Client) handleClosed(rem []byte) { subID, _, err := text.UnmarshalQuoted(rem) if err != nil { return } c.mu.Lock() sub := c.subs[string(subID)] if sub != nil { delete(c.subs, string(subID)) } c.mu.Unlock() if sub != nil { close(sub.Events) } }