package server import ( "bytes" "time" "smesh.lol/pkg/access" "smesh.lol/pkg/nostr/envelope" "smesh.lol/pkg/nostr/event" "smesh.lol/pkg/nostr/filter" ) // --- Subscription and Nostr message handling --- func (s *Server) handleReq(fd int, msg []byte) { c := s.conns[fd] if c == nil { return } _, rem, _ := envelope.Identify(msg) filter.Tainted = false var req envelope.Req if _, err := req.Unmarshal(rem); err != nil { return } if filter.Tainted { c.jailed = true return } id := string(req.Subscription) cfg := s.cfg authed := len(c.authedPubkey) > 0 wl := s.t.ConnIsWhitelisted(fd) if cfg.AuthRequired && !authed && !wl { cl := &envelope.Closed{ Subscription: []byte(id), Reason: []byte("auth-required: authentication required"), } s.t.SendWS(fd, cl.Marshal(nil)) return } maxSubs := cfg.MaxSubscriptions if maxSubs > 0 { if _, exists := c.subs[id]; !exists && len(c.subs) >= maxSubs { cl := &envelope.Closed{ Subscription: []byte(id), Reason: []byte("error: too many subscriptions"), } s.t.SendWS(fd, cl.Marshal(nil)) return } } filters := filter.S(*req.Filters) reqCopy := []byte{:len(msg)} copy(reqCopy, msg) c.subs[id] = &sub{id: id, filters: filters, rawReq: reqCopy} s.bcast.OnSubscribe(int32(fd), []byte(id), msg) needsFilter := cfg.RelayURL != "" && !cfg.PrivilegedOpen && !wl limit := cfg.QueryResultLimit if limit <= 0 { limit = 256 } var events []*event.E subID := req.Subscription seen := map[string]bool{} hasSearch := false for _, f := range filters { if len(f.Search) > 0 { hasSearch = true break } } if hasSearch { for _, f := range filters { if len(f.Search) == 0 { continue } results := s.eng.Search(f.Search, limit) for _, ev := range results { if seen[string(ev.ID)] { continue } seen[string(ev.ID)] = true if f.Matches(ev) { events = append(events, ev) } } } } else { for _, f := range filters { evs, err := s.eng.QueryEvents(f) if err != nil { continue } for _, ev := range evs { if seen[string(ev.ID)] { continue } seen[string(ev.ID)] = true events = append(events, ev) } } } count := 0 for _, ev := range events { if count >= limit { break } if needsFilter && !s.canSeeEventFD(fd, ev) { continue } er := &envelope.EventResult{Subscription: subID, Event: ev} if err := s.t.SendWSErr(fd, er.Marshal(nil)); err != nil { s.t.CloseConn(fd) return } count++ } s.t.SendWS(fd, (&envelope.EOSE{Subscription: subID}).Marshal(nil)) } func (s *Server) handleClose(fd int, msg []byte) { c := s.conns[fd] if c == nil { return } _, rem, _ := envelope.Identify(msg) var cl envelope.Close if _, err := cl.Unmarshal(rem); err != nil { return } delete(c.subs, string(cl.ID)) s.bcast.OnUnsubscribe(int32(fd), cl.ID) } func (s *Server) handleCount(fd int, msg []byte) { c := s.conns[fd] if c == nil { return } if s.cfg.AuthRequired && len(c.authedPubkey) == 0 && !s.t.ConnIsWhitelisted(fd) { return } _, rem, err := envelope.Identify(msg) if err != nil { return } filter.Tainted = false var cr envelope.CountRequest if _, err := cr.Unmarshal(rem); err != nil { return } if filter.Tainted { c.jailed = true return } var total int for _, f := range cr.Filters { evs, err := s.eng.QueryEvents(f) if err != nil { continue } total += len(evs) } resp := (&envelope.CountResponse{ Subscription: cr.Subscription, Count: total, }).Marshal(nil) s.t.SendWS(fd, resp) } func (s *Server) handleAuth(fd int, msg []byte) { c := s.conns[fd] if c == nil { return } _, rem, _ := envelope.Identify(msg) var auth envelope.AuthResponse if _, err := auth.Unmarshal(rem); err != nil || auth.Event == nil { ok := &envelope.OK{ EventID: []byte{:32}, OK: false, Reason: []byte("error: failed to parse auth event"), } s.t.SendWS(fd, ok.Marshal(nil)) return } if auth.Event.Kind != 22242 { s.authFail(fd, auth.Event.ID, "error: wrong event kind for auth") return } ct := auth.Event.Tags.GetFirst([]byte("challenge")) if ct == nil || !bytes.Equal(ct.Value(), c.challenge) { s.authFail(fd, auth.Event.ID, "error: wrong challenge") return } rt := auth.Event.Tags.GetFirst([]byte("relay")) if rt == nil || len(rt.Value()) == 0 { s.authFail(fd, auth.Event.ID, "error: missing relay tag") return } if !relayURLMatch([]byte(s.cfg.RelayURL), rt.Value()) { s.authFail(fd, auth.Event.ID, "error: relay URL mismatch") return } now := time.Now().Unix() if auth.Event.CreatedAt > now+600 || auth.Event.CreatedAt < now-600 { s.authFail(fd, auth.Event.ID, "error: timestamp out of range") return } valid, err := auth.Event.Verify() if err != nil || !valid { s.authFail(fd, auth.Event.ID, "error: invalid signature") return } c.authedPubkey = []byte{:len(auth.Event.Pubkey)} copy(c.authedPubkey, auth.Event.Pubkey) s.bcast.OnAuth(int32(fd), c.authedPubkey) ok := &envelope.OK{EventID: auth.Event.ID, OK: true} s.t.SendWS(fd, ok.Marshal(nil)) } func (s *Server) authFail(fd int, id []byte, reason string) { ok := &envelope.OK{EventID: id, OK: false, Reason: []byte(reason)} s.t.SendWS(fd, ok.Marshal(nil)) } func (s *Server) canSeeEventFD(fd int, ev *event.E) bool { c := s.conns[fd] var authedPubkey []byte if c != nil { authedPubkey = c.authedPubkey } return access.CanSee(len(authedPubkey) > 0, authedPubkey, ev, s.cfg.NIP70Enforce, s.cfg.MarmotOpen) } func relayURLMatch(expected, found []byte) bool { if bytes.Equal(expected, found) { return true } e := makeCopy(expected) f := makeCopy(found) toLower(e) toLower(f) if len(e) > 0 && e[len(e)-1] == '/' { e = e[:len(e)-1] } if len(f) > 0 && f[len(f)-1] == '/' { f = f[:len(f)-1] } return bytes.Equal(stripScheme(e), stripScheme(f)) } func stripScheme(u []byte) []byte { if bytes.HasPrefix(u, []byte("wss://")) { return u[6:] } if bytes.HasPrefix(u, []byte("ws://")) { return u[5:] } return u }