conn.mx raw

   1  package relay
   2  
   3  import (
   4  	"smesh.lol/web/common/jsbridge/ws"
   5  	"smesh.lol/web/common/nostr"
   6  )
   7  
   8  // State constants for connection readiness.
   9  const (
  10  	StateConnecting = 0
  11  	StateOpen       = 1
  12  	StateClosed     = 2
  13  )
  14  
  15  // Conn is a single relay WebSocket connection.
  16  type Conn struct {
  17  	URL    string
  18  	wsConn ws.Conn
  19  	state  int
  20  	subs   map[string]*Sub
  21  
  22  	// Callbacks set before Dial returns.
  23  	onReady func(bool)
  24  
  25  	onEvent func(string, *nostr.Event)
  26  	onEOSE  func(string)
  27  	onOK    func(string, bool, string)
  28  	onAuth  func(string)
  29  
  30  	closing        bool     // true when Close() was called intentionally
  31  	pendingPublish []string // EVENT messages queued while connecting
  32  
  33  	// ScheduleReconnect is set by the consumer to provide a delayed callback.
  34  	// When a connection drops with active subscriptions, this is called with
  35  	// the reconnect function. The consumer wraps it in a timer (e.g. 5s delay).
  36  	ScheduleReconnect func(fn func())
  37  }
  38  
  39  // Dial opens a connection to a relay.
  40  // Call OnReady to receive the open/fail notification, then Start to begin processing.
  41  func Dial(url string) *Conn {
  42  	c := &Conn{
  43  		URL:   url,
  44  		state: StateConnecting,
  45  		subs:  map[string]*Sub{},
  46  	}
  47  
  48  	c.dial()
  49  	return c
  50  }
  51  
  52  func (c *Conn) dial() {
  53  	c.wsConn = ws.Dial(
  54  		c.URL,
  55  		func(connID int, data string) {
  56  			c.handleMessage(data)
  57  		},
  58  		func(connID int) {
  59  			c.state = StateOpen
  60  			if c.onReady != nil {
  61  				c.onReady(true)
  62  				c.onReady = nil
  63  			}
  64  			c.flushSubs()
  65  			c.flushPublish()
  66  		},
  67  		func(connID int, code int, reason string) {
  68  			c.state = StateClosed
  69  			if c.onReady != nil {
  70  				c.onReady(false)
  71  				c.onReady = nil
  72  			}
  73  			c.maybeReconnect()
  74  		},
  75  		func(connID int) {
  76  			c.state = StateClosed
  77  			if c.onReady != nil {
  78  				c.onReady(false)
  79  				c.onReady = nil
  80  			}
  81  			c.maybeReconnect()
  82  		},
  83  	)
  84  }
  85  
  86  func (c *Conn) maybeReconnect() {
  87  	if c.closing || len(c.subs) == 0 || c.ScheduleReconnect == nil {
  88  		return
  89  	}
  90  	c.state = StateConnecting // prevent pool from creating a duplicate
  91  	c.ScheduleReconnect(func() {
  92  		if c.closing {
  93  			return
  94  		}
  95  		c.dial()
  96  	})
  97  }
  98  
  99  // OnReady sets a callback that fires once when the connection opens (true) or fails (false).
 100  func (c *Conn) OnReady(fn func(bool)) {
 101  	if c.state == StateOpen {
 102  		fn(true)
 103  		return
 104  	}
 105  	if c.state == StateClosed {
 106  		fn(false)
 107  		return
 108  	}
 109  	c.onReady = fn
 110  }
 111  
 112  // IsOpen returns whether the connection is open.
 113  func (c *Conn) IsOpen() bool {
 114  	return c.state == StateOpen
 115  }
 116  
 117  func (c *Conn) handleMessage(msg string) {
 118  	label, subID, payload := nostr.ParseRelayMessage(msg)
 119  
 120  	switch label {
 121  	case "EVENT":
 122  		ev := nostr.ParseEvent(payload)
 123  		if ev == nil {
 124  			return
 125  		}
 126  		if sub, ok := c.subs[subID]; ok {
 127  			if sub.OnEvent != nil {
 128  				sub.OnEvent(ev)
 129  			}
 130  		}
 131  		if c.onEvent != nil {
 132  			c.onEvent(subID, ev)
 133  		}
 134  
 135  	case "EOSE":
 136  		if sub, ok := c.subs[subID]; ok {
 137  			sub.gotEOSE = true
 138  			if sub.OnEOSE != nil {
 139  				sub.OnEOSE()
 140  			}
 141  		}
 142  		if c.onEOSE != nil {
 143  			c.onEOSE(subID)
 144  		}
 145  
 146  	case "OK":
 147  		ok := len(payload) > 0 && payload[0] == 't'
 148  		msg := ""
 149  		idx := indexOf(payload, ':')
 150  		if idx >= 0 && idx+1 < len(payload) {
 151  			msg = payload[idx+1:]
 152  		}
 153  		if c.onOK != nil {
 154  			c.onOK(subID, ok, msg)
 155  		}
 156  
 157  	case "AUTH":
 158  		if c.onAuth != nil {
 159  			c.onAuth(payload)
 160  		}
 161  
 162  	case "NOTICE":
 163  		_ = payload
 164  	}
 165  }
 166  
 167  // Subscribe sends a REQ and tracks the subscription.
 168  // If the connection is still opening, the REQ is deferred to flushSubs().
 169  func (c *Conn) Subscribe(id string, filters []*nostr.Filter) *Sub {
 170  	sub := &Sub{
 171  		ID:      id,
 172  		Filters: filters,
 173  		conn:    c,
 174  	}
 175  	c.subs[id] = sub
 176  
 177  	if c.state == StateOpen {
 178  		msg := "[\"REQ\",\"" + id + "\""
 179  		for _, f := range filters {
 180  			msg += "," + f.Serialize()
 181  		}
 182  		msg += "]"
 183  		ws.Send(c.wsConn, msg)
 184  	}
 185  	// else: flushSubs() sends all stored subs when connection opens.
 186  
 187  	return sub
 188  }
 189  
 190  // Publish sends an EVENT message. Queues if the connection is still opening.
 191  func (c *Conn) Publish(ev *nostr.Event) {
 192  	msg := "[\"EVENT\"," + eventJSON(ev) + "]"
 193  	if c.state != StateOpen {
 194  		c.pendingPublish = append(c.pendingPublish, msg)
 195  		return
 196  	}
 197  	ws.Send(c.wsConn, msg)
 198  }
 199  
 200  // CloseSubscription sends a CLOSE message.
 201  func (c *Conn) CloseSubscription(id string) {
 202  	delete(c.subs, id)
 203  	msg := "[\"CLOSE\",\"" + id + "\"]"
 204  	ws.Send(c.wsConn, msg)
 205  }
 206  
 207  // Send sends a raw JSON message string.
 208  func (c *Conn) Send(msg string) {
 209  	ws.Send(c.wsConn, msg)
 210  }
 211  
 212  // Close closes the connection intentionally (no reconnect).
 213  func (c *Conn) Close() {
 214  	c.closing = true
 215  	c.state = StateClosed
 216  	ws.Close(c.wsConn)
 217  }
 218  
 219  // SetOnEvent sets a global event handler (all subscriptions).
 220  func (c *Conn) SetOnEvent(fn func(string, *nostr.Event)) {
 221  	c.onEvent = fn
 222  }
 223  
 224  // SetOnEOSE sets a global EOSE handler.
 225  func (c *Conn) SetOnEOSE(fn func(string)) {
 226  	c.onEOSE = fn
 227  }
 228  
 229  // SetOnOK sets a handler for OK responses.
 230  func (c *Conn) SetOnOK(fn func(string, bool, string)) {
 231  	c.onOK = fn
 232  }
 233  
 234  // SetOnAuth sets a handler for AUTH challenges.
 235  func (c *Conn) SetOnAuth(fn func(string)) {
 236  	c.onAuth = fn
 237  }
 238  
 239  // flushPublish sends queued EVENT messages that arrived while connecting.
 240  func (c *Conn) flushPublish() {
 241  	for _, msg := range c.pendingPublish {
 242  		ws.Send(c.wsConn, msg)
 243  	}
 244  	c.pendingPublish = nil
 245  }
 246  
 247  // flushSubs re-sends REQ for all stored subscriptions (used after WS opens).
 248  func (c *Conn) flushSubs() {
 249  	for _, sub := range c.subs {
 250  		msg := "[\"REQ\",\"" + sub.ID + "\""
 251  		for _, f := range sub.Filters {
 252  			msg += "," + f.Serialize()
 253  		}
 254  		msg += "]"
 255  		ws.Send(c.wsConn, msg)
 256  	}
 257  }
 258  
 259  func eventJSON(ev *nostr.Event) string {
 260  	buf := []byte{:0:512}
 261  	buf = append(buf, '{')
 262  	buf = append(buf, "\"id\":\""...)
 263  	buf = append(buf, ev.ID...)
 264  	buf = append(buf, "\",\"pubkey\":\""...)
 265  	buf = append(buf, ev.PubKey...)
 266  	buf = append(buf, "\",\"created_at\":"...)
 267  	buf = append(buf, itoa(ev.CreatedAt)...)
 268  	buf = append(buf, ",\"kind\":"...)
 269  	buf = append(buf, itoa(int64(ev.Kind))...)
 270  	buf = append(buf, ",\"tags\":"...)
 271  	buf = serializeTags(buf, ev.Tags)
 272  	buf = append(buf, ",\"content\":\""...)
 273  	buf = appendEscaped(buf, ev.Content)
 274  	buf = append(buf, "\",\"sig\":\""...)
 275  	buf = append(buf, ev.Sig...)
 276  	buf = append(buf, '"', '}')
 277  	return string(buf)
 278  }
 279  
 280  func serializeTags(buf []byte, tags nostr.Tags) []byte {
 281  	buf = append(buf, '[')
 282  	for i, tag := range tags {
 283  		if i > 0 {
 284  			buf = append(buf, ',')
 285  		}
 286  		buf = append(buf, '[')
 287  		for j, s := range tag {
 288  			if j > 0 {
 289  				buf = append(buf, ',')
 290  			}
 291  			buf = append(buf, '"')
 292  			buf = appendEscaped(buf, s)
 293  			buf = append(buf, '"')
 294  		}
 295  		buf = append(buf, ']')
 296  	}
 297  	buf = append(buf, ']')
 298  	return buf
 299  }
 300  
 301  func appendEscaped(buf []byte, s string) []byte {
 302  	for i := 0; i < len(s); i++ {
 303  		c := s[i]
 304  		switch c {
 305  		case '"':
 306  			buf = append(buf, '\\', '"')
 307  		case '\\':
 308  			buf = append(buf, '\\', '\\')
 309  		case '\n':
 310  			buf = append(buf, '\\', 'n')
 311  		case '\r':
 312  			buf = append(buf, '\\', 'r')
 313  		case '\t':
 314  			buf = append(buf, '\\', 't')
 315  		default:
 316  			buf = append(buf, c)
 317  		}
 318  	}
 319  	return buf
 320  }
 321  
 322  func itoa(n int64) string {
 323  	if n == 0 {
 324  		return "0"
 325  	}
 326  	neg := false
 327  	if n < 0 {
 328  		neg = true
 329  		n = -n
 330  	}
 331  	var b [20]byte
 332  	i := len(b)
 333  	for n > 0 {
 334  		i--
 335  		b[i] = byte('0' + n%10)
 336  		n /= 10
 337  	}
 338  	if neg {
 339  		i--
 340  		b[i] = '-'
 341  	}
 342  	return string(b[i:])
 343  }
 344  
 345  func indexOf(s string, c byte) int {
 346  	for i := 0; i < len(s); i++ {
 347  		if s[i] == c {
 348  			return i
 349  		}
 350  	}
 351  	return -1
 352  }
 353