engine.go raw

   1  package grapevine
   2  
   3  import (
   4  	"encoding/json"
   5  	"math"
   6  	"time"
   7  
   8  	"next.orly.dev/pkg/lol/log"
   9  	"next.orly.dev/pkg/nostr/encoders/hex"
  10  )
  11  
  12  // Engine computes GrapeVine influence and WoT intersection scores.
  13  type Engine struct {
  14  	graph GraphSource
  15  	store ScoreStore
  16  	cfg   Config
  17  }
  18  
  19  // NewEngine creates a new scoring engine.
  20  func NewEngine(graph GraphSource, store ScoreStore, cfg Config) *Engine {
  21  	return &Engine{graph: graph, store: store, cfg: cfg}
  22  }
  23  
  24  // Compute calculates influence and WoT scores for the given observer.
  25  // The result is persisted via the ScoreStore and also returned.
  26  func (e *Engine) Compute(observerHex string) (*ScoreSet, error) {
  27  	start := time.Now()
  28  
  29  	// Decode observer hex to binary for graph traversal
  30  	seedPubkey, err := hex.Dec(observerHex)
  31  	if err != nil {
  32  		return nil, err
  33  	}
  34  
  35  	// BFS outward from observer to get the full hop set
  36  	pubkeysByDepth, allHop, err := e.graph.TraverseFollowsOutbound(seedPubkey, e.cfg.MaxDepth)
  37  	if err != nil {
  38  		return nil, err
  39  	}
  40  	_ = pubkeysByDepth // depth info captured in allHop values
  41  
  42  	if len(allHop) == 0 {
  43  		return &ScoreSet{
  44  			ObserverHex:  observerHex,
  45  			ComputedAt:   time.Now(),
  46  			ComputeMs:    time.Since(start).Milliseconds(),
  47  			TotalPubkeys: 0,
  48  		}, nil
  49  	}
  50  
  51  	log.I.F("grapevine: computing scores for observer %s, hop set size: %d", observerHex[:12], len(allHop))
  52  
  53  	// Initialize influence scores
  54  	infScores := make(map[string]float64, len(allHop))
  55  	avgScores := make(map[string]float64, len(allHop))
  56  	certaintyScores := make(map[string]float64, len(allHop))
  57  	inputScores := make(map[string]float64, len(allHop))
  58  
  59  	// Observer starts at 1.0, everyone else at 0.0
  60  	infScores[observerHex] = 1.0
  61  	avgScores[observerHex] = 1.0
  62  	inputScores[observerHex] = 9999
  63  	certaintyScores[observerHex] = 1.0
  64  
  65  	// Cache follower lookups within this computation since the graph is static
  66  	followerCache := make(map[string][]string, len(allHop))
  67  	getFollowers := func(pk string) []string {
  68  		if cached, ok := followerCache[pk]; ok {
  69  			return cached
  70  		}
  71  		followers, err := e.graph.GetFollowerPubkeys(pk)
  72  		if err != nil {
  73  			followers = nil
  74  		}
  75  		followerCache[pk] = followers
  76  		return followers
  77  	}
  78  
  79  	// Convergence loop
  80  	rigority := -math.Log(e.cfg.Rigor)
  81  	for cycle := 0; cycle < e.cfg.Cycles; cycle++ {
  82  		for ratee := range allHop {
  83  			if ratee == observerHex {
  84  				continue
  85  			}
  86  
  87  			sumOfWeights := 0.0
  88  			sumOfProducts := 0.0
  89  
  90  			followers := getFollowers(ratee)
  91  			for _, rater := range followers {
  92  				if rater == ratee {
  93  					continue
  94  				}
  95  				// Only consider raters within the hop set
  96  				if _, inHop := allHop[rater]; !inHop {
  97  					continue
  98  				}
  99  
 100  				rating := 1.0 // followInterpretationScore
 101  				weight := e.cfg.AttenuationFactor * infScores[rater] * e.cfg.FollowConfidence
 102  				if rater == observerHex {
 103  					// No attenuation for observer's own follows
 104  					weight = infScores[rater] * e.cfg.FollowConfidence
 105  				}
 106  
 107  				// TODO: mutes - skip muted pubkeys, apply muteInterpretationScore
 108  				// TODO: reports - apply report penalty weight
 109  
 110  				product := weight * rating
 111  				sumOfWeights += weight
 112  				sumOfProducts += product
 113  			}
 114  
 115  			if sumOfWeights > 0 {
 116  				average := sumOfProducts / sumOfWeights
 117  				input := sumOfWeights
 118  				certainty := 1 - math.Exp(-input*rigority)
 119  				influence := average * certainty
 120  
 121  				infScores[ratee] = influence
 122  				avgScores[ratee] = average
 123  				certaintyScores[ratee] = certainty
 124  				inputScores[ratee] = input
 125  			}
 126  		}
 127  		log.D.F("grapevine: convergence cycle %d/%d complete", cycle+1, e.cfg.Cycles)
 128  	}
 129  
 130  	// WoT intersection scores: for each target, count how many of
 131  	// observer's direct follows also follow the target
 132  	observerFollows, err := e.graph.GetFollowsPubkeys(observerHex)
 133  	if err != nil {
 134  		observerFollows = nil
 135  	}
 136  	observerFollowSet := make(map[string]struct{}, len(observerFollows))
 137  	for _, f := range observerFollows {
 138  		observerFollowSet[f] = struct{}{}
 139  	}
 140  
 141  	wotScores := make(map[string]int, len(allHop))
 142  	for target := range allHop {
 143  		followers := getFollowers(target)
 144  		count := 0
 145  		for _, follower := range followers {
 146  			if _, ok := observerFollowSet[follower]; ok {
 147  				count++
 148  			}
 149  		}
 150  		wotScores[target] = count
 151  	}
 152  
 153  	// Build score entries
 154  	scores := make([]ScoreEntry, 0, len(allHop))
 155  	for pk, depth := range allHop {
 156  		inf := infScores[pk]
 157  		if inf <= 0 && wotScores[pk] <= 0 && pk != observerHex {
 158  			continue // skip zero-score entries to keep payload smaller
 159  		}
 160  		scores = append(scores, ScoreEntry{
 161  			PubkeyHex: pk,
 162  			Influence: inf,
 163  			Average:   avgScores[pk],
 164  			Certainty: certaintyScores[pk],
 165  			Input:     inputScores[pk],
 166  			WoTScore:  wotScores[pk],
 167  			Depth:     depth,
 168  		})
 169  	}
 170  
 171  	set := &ScoreSet{
 172  		ObserverHex:  observerHex,
 173  		Scores:       scores,
 174  		ComputedAt:   time.Now(),
 175  		ComputeMs:    time.Since(start).Milliseconds(),
 176  		TotalPubkeys: len(allHop),
 177  	}
 178  
 179  	// Marshal for storage
 180  	setData, err := json.Marshal(set)
 181  	if err != nil {
 182  		return set, err
 183  	}
 184  	entries := make(map[string][]byte, len(scores))
 185  	for i := range scores {
 186  		entryData, err := json.Marshal(&scores[i])
 187  		if err != nil {
 188  			continue
 189  		}
 190  		entries[scores[i].PubkeyHex] = entryData
 191  	}
 192  	if err := e.store.SaveScoreSet(set.ObserverHex, setData, entries); err != nil {
 193  		return set, err
 194  	}
 195  
 196  	log.I.F("grapevine: computed %d scores for observer %s in %dms",
 197  		len(scores), observerHex[:12], set.ComputeMs)
 198  
 199  	return set, nil
 200  }
 201  
 202  // GetScores returns the stored score set for an observer, or nil if not computed.
 203  func (e *Engine) GetScores(observerHex string) (*ScoreSet, error) {
 204  	data, err := e.store.GetScoreSet(observerHex)
 205  	if err != nil || data == nil {
 206  		return nil, err
 207  	}
 208  	var set ScoreSet
 209  	if err := json.Unmarshal(data, &set); err != nil {
 210  		return nil, err
 211  	}
 212  	return &set, nil
 213  }
 214  
 215  // GetScore returns a single stored score entry, or nil if not found.
 216  func (e *Engine) GetScore(observerHex, targetHex string) (*ScoreEntry, error) {
 217  	data, err := e.store.GetScore(observerHex, targetHex)
 218  	if err != nil || data == nil {
 219  		return nil, err
 220  	}
 221  	var entry ScoreEntry
 222  	if err := json.Unmarshal(data, &entry); err != nil {
 223  		return nil, err
 224  	}
 225  	return &entry, nil
 226  }
 227