fetcher.go raw

   1  package nostr
   2  
   3  import (
   4  	"context"
   5  	"encoding/json"
   6  	"log"
   7  	"sync"
   8  	"time"
   9  
  10  	"next.orly.dev/pkg/nostr/encoders/event"
  11  	"next.orly.dev/pkg/nostr/encoders/filter"
  12  	"next.orly.dev/pkg/nostr/encoders/hex"
  13  	"next.orly.dev/pkg/nostr/encoders/kind"
  14  	"next.orly.dev/pkg/nostr/encoders/tag"
  15  	"next.orly.dev/pkg/nostr/ws"
  16  )
  17  
  18  const (
  19  	// FetchTimeout is how long to wait for relay responses
  20  	FetchTimeout = 10 * time.Second
  21  	// CacheTTL is how long to cache relay lists and profiles
  22  	CacheTTL = 24 * time.Hour
  23  )
  24  
  25  // Fetcher handles fetching relay lists and profiles from Nostr relays
  26  type Fetcher struct {
  27  	fallbackRelays []string
  28  	relayCache     map[string]*relayListCacheEntry
  29  	profileCache   map[string]*profileCacheEntry
  30  	mu             sync.RWMutex
  31  }
  32  
  33  type relayListCacheEntry struct {
  34  	Relays    []Nip65Relay
  35  	FetchedAt time.Time
  36  }
  37  
  38  type profileCacheEntry struct {
  39  	Profile   *ProfileMetadata
  40  	FetchedAt time.Time
  41  }
  42  
  43  // NewFetcher creates a new Fetcher with the given fallback relays
  44  func NewFetcher(fallbackRelays []string) *Fetcher {
  45  	return &Fetcher{
  46  		fallbackRelays: fallbackRelays,
  47  		relayCache:     make(map[string]*relayListCacheEntry),
  48  		profileCache:   make(map[string]*profileCacheEntry),
  49  	}
  50  }
  51  
  52  // newPubkeyFilter builds a filter for a given kind and hex pubkey with a limit.
  53  func newPubkeyFilter(k int, pubkey string, limit uint) *filter.F {
  54  	pubkeyBytes, err := hex.Dec(pubkey)
  55  	if err != nil {
  56  		return nil
  57  	}
  58  	lim := limit
  59  	return &filter.F{
  60  		Kinds:   kind.NewS(kind.New(k)),
  61  		Authors: tag.NewFromBytesSlice(pubkeyBytes),
  62  		Limit:   &lim,
  63  	}
  64  }
  65  
  66  // FetchRelayList fetches a user's NIP-65 relay list (kind 10002)
  67  func (f *Fetcher) FetchRelayList(ctx context.Context, pubkey string) []Nip65Relay {
  68  	// Check cache first
  69  	f.mu.RLock()
  70  	if entry, ok := f.relayCache[pubkey]; ok {
  71  		if time.Since(entry.FetchedAt) < CacheTTL {
  72  			f.mu.RUnlock()
  73  			return entry.Relays
  74  		}
  75  	}
  76  	f.mu.RUnlock()
  77  
  78  	// Fetch from relays
  79  	relays := f.doFetchRelayList(ctx, pubkey)
  80  
  81  	// Cache result
  82  	f.mu.Lock()
  83  	f.relayCache[pubkey] = &relayListCacheEntry{
  84  		Relays:    relays,
  85  		FetchedAt: time.Now(),
  86  	}
  87  	f.mu.Unlock()
  88  
  89  	return relays
  90  }
  91  
  92  func (f *Fetcher) doFetchRelayList(ctx context.Context, pubkey string) []Nip65Relay {
  93  	ctx, cancel := context.WithTimeout(ctx, FetchTimeout)
  94  	defer cancel()
  95  
  96  	ff := newPubkeyFilter(10002, pubkey, 10)
  97  	if ff == nil {
  98  		return nil
  99  	}
 100  
 101  	events := f.queryRelays(ctx, f.fallbackRelays, ff)
 102  	if len(events) == 0 {
 103  		return nil
 104  	}
 105  
 106  	// Get the most recent event
 107  	var latest *event.E
 108  	for _, ev := range events {
 109  		if latest == nil || ev.CreatedAt > latest.CreatedAt {
 110  			latest = ev
 111  		}
 112  	}
 113  
 114  	// Parse relay tags
 115  	var relays []Nip65Relay
 116  	if latest.Tags != nil {
 117  		for _, t := range *latest.Tags {
 118  			ss := t.ToSliceOfStrings()
 119  			if len(ss) >= 2 && ss[0] == "r" {
 120  				relay := Nip65Relay{
 121  					URL:   ss[1],
 122  					Read:  true,
 123  					Write: true,
 124  				}
 125  				if len(ss) >= 3 {
 126  					switch ss[2] {
 127  					case "read":
 128  						relay.Write = false
 129  					case "write":
 130  						relay.Read = false
 131  					}
 132  				}
 133  				relays = append(relays, relay)
 134  			}
 135  		}
 136  	}
 137  
 138  	return relays
 139  }
 140  
 141  // FetchProfile fetches a user's profile metadata (kind 0)
 142  // It first fetches the user's relay list, then queries those relays + fallbacks
 143  func (f *Fetcher) FetchProfile(ctx context.Context, pubkey string) *ProfileMetadata {
 144  	// Check cache first
 145  	f.mu.RLock()
 146  	if entry, ok := f.profileCache[pubkey]; ok {
 147  		if time.Since(entry.FetchedAt) < CacheTTL {
 148  			f.mu.RUnlock()
 149  			return entry.Profile
 150  		}
 151  	}
 152  	f.mu.RUnlock()
 153  
 154  	// First, get the user's relay list
 155  	userRelays := f.FetchRelayList(ctx, pubkey)
 156  
 157  	// Build relay list: user's read relays + fallbacks
 158  	relayURLs := make([]string, 0, len(userRelays)+len(f.fallbackRelays))
 159  	seen := make(map[string]bool)
 160  
 161  	// Add user's read relays first (more likely to have their profile)
 162  	for _, r := range userRelays {
 163  		if r.Read && !seen[r.URL] {
 164  			relayURLs = append(relayURLs, r.URL)
 165  			seen[r.URL] = true
 166  		}
 167  	}
 168  
 169  	// Add fallback relays
 170  	for _, url := range f.fallbackRelays {
 171  		if !seen[url] {
 172  			relayURLs = append(relayURLs, url)
 173  			seen[url] = true
 174  		}
 175  	}
 176  
 177  	// Fetch profile
 178  	profile := f.doFetchProfile(ctx, pubkey, relayURLs)
 179  
 180  	// Cache result (even if nil)
 181  	f.mu.Lock()
 182  	f.profileCache[pubkey] = &profileCacheEntry{
 183  		Profile:   profile,
 184  		FetchedAt: time.Now(),
 185  	}
 186  	f.mu.Unlock()
 187  
 188  	return profile
 189  }
 190  
 191  func (f *Fetcher) doFetchProfile(ctx context.Context, pubkey string, relayURLs []string) *ProfileMetadata {
 192  	ctx, cancel := context.WithTimeout(ctx, FetchTimeout)
 193  	defer cancel()
 194  
 195  	ff := newPubkeyFilter(0, pubkey, 10)
 196  	if ff == nil {
 197  		return nil
 198  	}
 199  
 200  	events := f.queryRelays(ctx, relayURLs, ff)
 201  	if len(events) == 0 {
 202  		return nil
 203  	}
 204  
 205  	// Get the most recent event
 206  	var latest *event.E
 207  	for _, ev := range events {
 208  		if latest == nil || ev.CreatedAt > latest.CreatedAt {
 209  			latest = ev
 210  		}
 211  	}
 212  
 213  	// Parse profile content
 214  	var content map[string]interface{}
 215  	if err := json.Unmarshal(latest.Content, &content); err != nil {
 216  		log.Printf("Failed to parse profile content for %s: %v", pubkey, err)
 217  		return nil
 218  	}
 219  
 220  	profile := &ProfileMetadata{
 221  		Pubkey: pubkey,
 222  	}
 223  
 224  	if v, ok := content["name"].(string); ok {
 225  		profile.Name = v
 226  	}
 227  	if v, ok := content["display_name"].(string); ok {
 228  		profile.DisplayName = v
 229  	}
 230  	if v, ok := content["displayName"].(string); ok && profile.DisplayName == "" {
 231  		profile.DisplayName = v
 232  	}
 233  	if v, ok := content["picture"].(string); ok {
 234  		profile.Picture = v
 235  	}
 236  	if v, ok := content["banner"].(string); ok {
 237  		profile.Banner = v
 238  	}
 239  	if v, ok := content["about"].(string); ok {
 240  		profile.About = v
 241  	}
 242  	if v, ok := content["website"].(string); ok {
 243  		profile.Website = v
 244  	}
 245  	if v, ok := content["nip05"].(string); ok {
 246  		profile.Nip05 = v
 247  	}
 248  	if v, ok := content["lud06"].(string); ok {
 249  		profile.Lud06 = v
 250  	}
 251  	if v, ok := content["lud16"].(string); ok {
 252  		profile.Lud16 = v
 253  	}
 254  
 255  	return profile
 256  }
 257  
 258  // queryRelays queries multiple relays and collects events
 259  func (f *Fetcher) queryRelays(ctx context.Context, relayURLs []string, ff *filter.F) []*event.E {
 260  	var (
 261  		events   []*event.E
 262  		eventsMu sync.Mutex
 263  		wg       sync.WaitGroup
 264  	)
 265  
 266  	// Query each relay concurrently
 267  	for _, url := range relayURLs {
 268  		wg.Add(1)
 269  		go func(relayURL string) {
 270  			defer wg.Done()
 271  
 272  			relay, err := ws.RelayConnect(ctx, relayURL)
 273  			if err != nil {
 274  				// Silently skip failed relays
 275  				return
 276  			}
 277  			defer relay.Close()
 278  
 279  			sub, err := relay.Subscribe(ctx, filter.NewS(ff))
 280  			if err != nil {
 281  				return
 282  			}
 283  			defer sub.Unsub()
 284  
 285  			for {
 286  				select {
 287  				case ev, ok := <-sub.Events:
 288  					if !ok {
 289  						return
 290  					}
 291  					eventsMu.Lock()
 292  					events = append(events, ev)
 293  					eventsMu.Unlock()
 294  				case <-sub.EndOfStoredEvents:
 295  					return
 296  				case <-ctx.Done():
 297  					return
 298  				}
 299  			}
 300  		}(url)
 301  	}
 302  
 303  	// Wait for all queries to complete or timeout
 304  	done := make(chan struct{})
 305  	go func() {
 306  		wg.Wait()
 307  		close(done)
 308  	}()
 309  
 310  	select {
 311  	case <-done:
 312  	case <-ctx.Done():
 313  	}
 314  
 315  	return events
 316  }
 317  
 318  // GetCachedProfile returns a cached profile if available and not expired
 319  func (f *Fetcher) GetCachedProfile(pubkey string) *ProfileMetadata {
 320  	f.mu.RLock()
 321  	defer f.mu.RUnlock()
 322  
 323  	if entry, ok := f.profileCache[pubkey]; ok {
 324  		if time.Since(entry.FetchedAt) < CacheTTL {
 325  			return entry.Profile
 326  		}
 327  	}
 328  	return nil
 329  }
 330  
 331  // GetCachedRelayList returns a cached relay list if available and not expired
 332  func (f *Fetcher) GetCachedRelayList(pubkey string) []Nip65Relay {
 333  	f.mu.RLock()
 334  	defer f.mu.RUnlock()
 335  
 336  	if entry, ok := f.relayCache[pubkey]; ok {
 337  		if time.Since(entry.FetchedAt) < CacheTTL {
 338  			return entry.Relays
 339  		}
 340  	}
 341  	return nil
 342  }
 343