worker.mx raw

   1  // Package worker provides the store — owns all storage and event processing.
   2  // Direct function calls, no channels. Correct for moxie's sequential model.
   3  package worker
   4  
   5  import (
   6  	"fmt"
   7  	"os"
   8  
   9  	"smesh.lol/pkg/acl"
  10  	"smesh.lol/pkg/nostr/envelope"
  11  	"smesh.lol/pkg/nostr/event"
  12  	"smesh.lol/pkg/nostr/filter"
  13  	"smesh.lol/pkg/relay/pipeline"
  14  	"smesh.lol/pkg/store"
  15  )
  16  
  17  // Store owns the database and event pipeline.
  18  type Store struct {
  19  	eng  *store.Engine
  20  	pipe *pipeline.Pipeline
  21  }
  22  
  23  // Open creates a Store rooted at the given data directory.
  24  func Open(dataDir string) (*Store, error) {
  25  	eng, err := store.Open(dataDir | "/db")
  26  	if err != nil {
  27  		return nil, fmt.Errorf("store: %w", err)
  28  	}
  29  	a := acl.Open{}
  30  	pipe := pipeline.New(eng, a, nil, pipeline.DefaultConfig())
  31  	return &Store{eng: eng, pipe: pipe}, nil
  32  }
  33  
  34  // Close flushes and closes the database.
  35  func (s *Store) Close() {
  36  	s.eng.Flush()
  37  	s.eng.Close()
  38  }
  39  
  40  // IngestResult holds the outcome of an event ingestion.
  41  type IngestResult struct {
  42  	OK      bool
  43  	EventID []byte
  44  	Reason  []byte
  45  	Event   *event.E // non-nil if accepted (for broadcast)
  46  }
  47  
  48  // Ingest validates and stores an event.
  49  func (s *Store) Ingest(msg []byte) IngestResult {
  50  	_, rem, err := envelope.Identify(msg)
  51  	if err != nil {
  52  		return IngestResult{Reason: []byte("invalid: parse error")}
  53  	}
  54  	var es envelope.EventSubmission
  55  	if _, err := es.Unmarshal(rem); err != nil {
  56  		return IngestResult{Reason: []byte("invalid: parse error")}
  57  	}
  58  
  59  	result := s.pipe.Ingest(es.E)
  60  	ir := IngestResult{
  61  		OK:      result.OK,
  62  		EventID: es.E.ID,
  63  		Reason:  result.Reason,
  64  	}
  65  	if result.OK {
  66  		ir.Event = es.E
  67  	}
  68  	return ir
  69  }
  70  
  71  // Query runs filters and returns matching events plus EOSE marker.
  72  func (s *Store) Query(msg []byte) [][]byte {
  73  	_, rem, err := envelope.Identify(msg)
  74  	if err != nil {
  75  		return nil
  76  	}
  77  	var req envelope.Req
  78  	if _, err := req.Unmarshal(rem); err != nil {
  79  		return nil
  80  	}
  81  
  82  	filters := filter.S(*req.Filters)
  83  	seen := map[string]bool{}
  84  	var out [][]byte
  85  	for _, f := range filters {
  86  		events, err := s.eng.QueryEvents(f)
  87  		if err != nil {
  88  			continue
  89  		}
  90  		for _, ev := range events {
  91  			k := string(ev.ID)
  92  			if seen[k] {
  93  				continue
  94  			}
  95  			seen[k] = true
  96  			er := &envelope.EventResult{
  97  				Subscription: req.Subscription,
  98  				Event:        ev,
  99  			}
 100  			out = append(out, er.Marshal(nil))
 101  		}
 102  	}
 103  	out = append(out, (&envelope.EOSE{Subscription: req.Subscription}).Marshal(nil))
 104  	return out
 105  }
 106  
 107  // Count returns the count of events matching the filters.
 108  func (s *Store) Count(msg []byte) []byte {
 109  	_, rem, err := envelope.Identify(msg)
 110  	if err != nil {
 111  		return nil
 112  	}
 113  	var cr envelope.CountRequest
 114  	if _, err := cr.Unmarshal(rem); err != nil {
 115  		return nil
 116  	}
 117  	var total int
 118  	for _, f := range cr.Filters {
 119  		evs, err := s.eng.QueryEvents(f)
 120  		if err != nil {
 121  			continue
 122  		}
 123  		total += len(evs)
 124  	}
 125  	return (&envelope.CountResponse{
 126  		Subscription: cr.Subscription,
 127  		Count:        total,
 128  	}).Marshal(nil)
 129  }
 130  
 131  // Engine returns the underlying storage engine for direct queries.
 132  func (s *Store) Engine() *store.Engine { return s.eng }
 133  
 134  // Flush forces a WAL flush.
 135  func (s *Store) Flush() {
 136  	s.eng.Flush()
 137  }
 138  
 139  func init() {
 140  	// Suppress unused import warnings.
 141  	_ = fmt.Sprintf
 142  	_ = os.Stderr
 143  }
 144