publisher.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package badger
   7  
   8  import (
   9  	"sync"
  10  	"sync/atomic"
  11  
  12  	"github.com/dgraph-io/badger/v4/pb"
  13  	"github.com/dgraph-io/badger/v4/trie"
  14  	"github.com/dgraph-io/badger/v4/y"
  15  	"github.com/dgraph-io/ristretto/v2/z"
  16  )
  17  
  18  type subscriber struct {
  19  	id        uint64
  20  	matches   []pb.Match
  21  	sendCh    chan *pb.KVList
  22  	subCloser *z.Closer
  23  	// this will be atomic pointer which will be used to
  24  	// track whether the subscriber is active or not
  25  	active *atomic.Uint64
  26  }
  27  
  28  type publisher struct {
  29  	sync.Mutex
  30  	pubCh       chan requests
  31  	subscribers map[uint64]subscriber
  32  	nextID      uint64
  33  	indexer     *trie.Trie
  34  }
  35  
  36  func newPublisher() *publisher {
  37  	return &publisher{
  38  		pubCh:       make(chan requests, 1000),
  39  		subscribers: make(map[uint64]subscriber),
  40  		nextID:      0,
  41  		indexer:     trie.NewTrie(),
  42  	}
  43  }
  44  
  45  func (p *publisher) listenForUpdates(c *z.Closer) {
  46  	defer func() {
  47  		p.cleanSubscribers()
  48  		c.Done()
  49  	}()
  50  	slurp := func(batch requests) {
  51  		for {
  52  			select {
  53  			case reqs := <-p.pubCh:
  54  				batch = append(batch, reqs...)
  55  			default:
  56  				p.publishUpdates(batch)
  57  				return
  58  			}
  59  		}
  60  	}
  61  	for {
  62  		select {
  63  		case <-c.HasBeenClosed():
  64  			return
  65  		case reqs := <-p.pubCh:
  66  			slurp(reqs)
  67  		}
  68  	}
  69  }
  70  
  71  func (p *publisher) publishUpdates(reqs requests) {
  72  	p.Lock()
  73  	defer func() {
  74  		p.Unlock()
  75  		// Release all the request.
  76  		reqs.DecrRef()
  77  	}()
  78  	batchedUpdates := make(map[uint64]*pb.KVList)
  79  	for _, req := range reqs {
  80  		for _, e := range req.Entries {
  81  			ids := p.indexer.Get(e.Key)
  82  			if len(ids) == 0 {
  83  				continue
  84  			}
  85  			k := y.SafeCopy(nil, e.Key)
  86  			kv := &pb.KV{
  87  				Key:       y.ParseKey(k),
  88  				Value:     y.SafeCopy(nil, e.Value),
  89  				Meta:      []byte{e.UserMeta},
  90  				ExpiresAt: e.ExpiresAt,
  91  				Version:   y.ParseTs(k),
  92  			}
  93  			for id := range ids {
  94  				if _, ok := batchedUpdates[id]; !ok {
  95  					batchedUpdates[id] = &pb.KVList{}
  96  				}
  97  				batchedUpdates[id].Kv = append(batchedUpdates[id].Kv, kv)
  98  			}
  99  		}
 100  	}
 101  
 102  	for id, kvs := range batchedUpdates {
 103  		if p.subscribers[id].active.Load() == 1 {
 104  			p.subscribers[id].sendCh <- kvs
 105  		}
 106  	}
 107  }
 108  
 109  func (p *publisher) newSubscriber(c *z.Closer, matches []pb.Match) (subscriber, error) {
 110  	p.Lock()
 111  	defer p.Unlock()
 112  	ch := make(chan *pb.KVList, 1000)
 113  	id := p.nextID
 114  	// Increment next ID.
 115  	p.nextID++
 116  	s := subscriber{
 117  		id:        id,
 118  		matches:   matches,
 119  		sendCh:    ch,
 120  		subCloser: c,
 121  		active:    new(atomic.Uint64),
 122  	}
 123  	s.active.Store(1)
 124  
 125  	p.subscribers[id] = s
 126  	for _, m := range matches {
 127  		if err := p.indexer.AddMatch(m, id); err != nil {
 128  			return subscriber{}, err
 129  		}
 130  	}
 131  	return s, nil
 132  }
 133  
 134  // cleanSubscribers stops all the subscribers. Ideally, It should be called while closing DB.
 135  func (p *publisher) cleanSubscribers() {
 136  	p.Lock()
 137  	defer p.Unlock()
 138  	for id, s := range p.subscribers {
 139  		for _, m := range s.matches {
 140  			_ = p.indexer.DeleteMatch(m, id)
 141  		}
 142  		delete(p.subscribers, id)
 143  		s.subCloser.SignalAndWait()
 144  	}
 145  }
 146  
 147  func (p *publisher) deleteSubscriber(id uint64) {
 148  	p.Lock()
 149  	defer p.Unlock()
 150  	if s, ok := p.subscribers[id]; ok {
 151  		for _, m := range s.matches {
 152  			_ = p.indexer.DeleteMatch(m, id)
 153  		}
 154  	}
 155  	delete(p.subscribers, id)
 156  }
 157  
 158  func (p *publisher) sendUpdates(reqs requests) {
 159  	if p.noOfSubscribers() != 0 {
 160  		reqs.IncrRef()
 161  		p.pubCh <- reqs
 162  	}
 163  }
 164  
 165  func (p *publisher) noOfSubscribers() int {
 166  	p.Lock()
 167  	defer p.Unlock()
 168  	return len(p.subscribers)
 169  }
 170