scheduler.go raw
1 package grapevine
2
3 import (
4 "context"
5 "sync"
6 "time"
7
8 "next.orly.dev/pkg/lol/log"
9 )
10
11 // Scheduler runs periodic GrapeVine score computation for configured observers.
12 type Scheduler struct {
13 engine *Engine
14 observers []string // hex pubkeys
15 interval time.Duration
16 mu sync.Mutex
17 computing map[string]bool
18 }
19
20 // NewScheduler creates a new scheduler.
21 func NewScheduler(engine *Engine, observers []string, interval time.Duration) *Scheduler {
22 return &Scheduler{
23 engine: engine,
24 observers: observers,
25 interval: interval,
26 computing: make(map[string]bool),
27 }
28 }
29
30 // Start runs the periodic computation loop. Blocks until ctx is cancelled.
31 func (s *Scheduler) Start(ctx context.Context) {
32 log.I.F("grapevine: scheduler started for %d observers, interval %v", len(s.observers), s.interval)
33
34 // Immediate first run
35 s.runAll()
36
37 ticker := time.NewTicker(s.interval)
38 defer ticker.Stop()
39
40 for {
41 select {
42 case <-ctx.Done():
43 log.I.F("grapevine: scheduler stopped")
44 return
45 case <-ticker.C:
46 s.runAll()
47 }
48 }
49 }
50
51 // TriggerCompute starts an async computation for a single observer.
52 // Returns immediately. No-op if already computing for that observer.
53 func (s *Scheduler) TriggerCompute(observerHex string) bool {
54 s.mu.Lock()
55 if s.computing[observerHex] {
56 s.mu.Unlock()
57 return false
58 }
59 s.computing[observerHex] = true
60 s.mu.Unlock()
61
62 go func() {
63 defer func() {
64 s.mu.Lock()
65 delete(s.computing, observerHex)
66 s.mu.Unlock()
67 }()
68 if _, err := s.engine.Compute(observerHex); err != nil {
69 log.E.F("grapevine: compute failed for %s: %v", observerHex[:12], err)
70 }
71 }()
72 return true
73 }
74
75 func (s *Scheduler) runAll() {
76 for _, obs := range s.observers {
77 s.mu.Lock()
78 if s.computing[obs] {
79 s.mu.Unlock()
80 log.D.F("grapevine: skipping %s, already computing", obs[:12])
81 continue
82 }
83 s.computing[obs] = true
84 s.mu.Unlock()
85
86 func(observerHex string) {
87 defer func() {
88 s.mu.Lock()
89 delete(s.computing, observerHex)
90 s.mu.Unlock()
91 }()
92 if _, err := s.engine.Compute(observerHex); err != nil {
93 log.E.F("grapevine: scheduled compute failed for %s: %v", observerHex[:12], err)
94 }
95 }(obs)
96 }
97 }
98