scheduler.go raw

   1  package grapevine
   2  
   3  import (
   4  	"context"
   5  	"sync"
   6  	"time"
   7  
   8  	"next.orly.dev/pkg/lol/log"
   9  )
  10  
  11  // Scheduler runs periodic GrapeVine score computation for configured observers.
  12  type Scheduler struct {
  13  	engine    *Engine
  14  	observers []string // hex pubkeys
  15  	interval  time.Duration
  16  	mu        sync.Mutex
  17  	computing map[string]bool
  18  }
  19  
  20  // NewScheduler creates a new scheduler.
  21  func NewScheduler(engine *Engine, observers []string, interval time.Duration) *Scheduler {
  22  	return &Scheduler{
  23  		engine:    engine,
  24  		observers: observers,
  25  		interval:  interval,
  26  		computing: make(map[string]bool),
  27  	}
  28  }
  29  
  30  // Start runs the periodic computation loop. Blocks until ctx is cancelled.
  31  func (s *Scheduler) Start(ctx context.Context) {
  32  	log.I.F("grapevine: scheduler started for %d observers, interval %v", len(s.observers), s.interval)
  33  
  34  	// Immediate first run
  35  	s.runAll()
  36  
  37  	ticker := time.NewTicker(s.interval)
  38  	defer ticker.Stop()
  39  
  40  	for {
  41  		select {
  42  		case <-ctx.Done():
  43  			log.I.F("grapevine: scheduler stopped")
  44  			return
  45  		case <-ticker.C:
  46  			s.runAll()
  47  		}
  48  	}
  49  }
  50  
  51  // TriggerCompute starts an async computation for a single observer.
  52  // Returns immediately. No-op if already computing for that observer.
  53  func (s *Scheduler) TriggerCompute(observerHex string) bool {
  54  	s.mu.Lock()
  55  	if s.computing[observerHex] {
  56  		s.mu.Unlock()
  57  		return false
  58  	}
  59  	s.computing[observerHex] = true
  60  	s.mu.Unlock()
  61  
  62  	go func() {
  63  		defer func() {
  64  			s.mu.Lock()
  65  			delete(s.computing, observerHex)
  66  			s.mu.Unlock()
  67  		}()
  68  		if _, err := s.engine.Compute(observerHex); err != nil {
  69  			log.E.F("grapevine: compute failed for %s: %v", observerHex[:12], err)
  70  		}
  71  	}()
  72  	return true
  73  }
  74  
  75  func (s *Scheduler) runAll() {
  76  	for _, obs := range s.observers {
  77  		s.mu.Lock()
  78  		if s.computing[obs] {
  79  			s.mu.Unlock()
  80  			log.D.F("grapevine: skipping %s, already computing", obs[:12])
  81  			continue
  82  		}
  83  		s.computing[obs] = true
  84  		s.mu.Unlock()
  85  
  86  		func(observerHex string) {
  87  			defer func() {
  88  				s.mu.Lock()
  89  				delete(s.computing, observerHex)
  90  				s.mu.Unlock()
  91  			}()
  92  			if _, err := s.engine.Compute(observerHex); err != nil {
  93  				log.E.F("grapevine: scheduled compute failed for %s: %v", observerHex[:12], err)
  94  			}
  95  		}(obs)
  96  	}
  97  }
  98