publisher.go raw
1 /*
2 * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 package badger
7
8 import (
9 "sync"
10 "sync/atomic"
11
12 "github.com/dgraph-io/badger/v4/pb"
13 "github.com/dgraph-io/badger/v4/trie"
14 "github.com/dgraph-io/badger/v4/y"
15 "github.com/dgraph-io/ristretto/v2/z"
16 )
17
18 type subscriber struct {
19 id uint64
20 matches []pb.Match
21 sendCh chan *pb.KVList
22 subCloser *z.Closer
23 // this will be atomic pointer which will be used to
24 // track whether the subscriber is active or not
25 active *atomic.Uint64
26 }
27
28 type publisher struct {
29 sync.Mutex
30 pubCh chan requests
31 subscribers map[uint64]subscriber
32 nextID uint64
33 indexer *trie.Trie
34 }
35
36 func newPublisher() *publisher {
37 return &publisher{
38 pubCh: make(chan requests, 1000),
39 subscribers: make(map[uint64]subscriber),
40 nextID: 0,
41 indexer: trie.NewTrie(),
42 }
43 }
44
45 func (p *publisher) listenForUpdates(c *z.Closer) {
46 defer func() {
47 p.cleanSubscribers()
48 c.Done()
49 }()
50 slurp := func(batch requests) {
51 for {
52 select {
53 case reqs := <-p.pubCh:
54 batch = append(batch, reqs...)
55 default:
56 p.publishUpdates(batch)
57 return
58 }
59 }
60 }
61 for {
62 select {
63 case <-c.HasBeenClosed():
64 return
65 case reqs := <-p.pubCh:
66 slurp(reqs)
67 }
68 }
69 }
70
71 func (p *publisher) publishUpdates(reqs requests) {
72 p.Lock()
73 defer func() {
74 p.Unlock()
75 // Release all the request.
76 reqs.DecrRef()
77 }()
78 batchedUpdates := make(map[uint64]*pb.KVList)
79 for _, req := range reqs {
80 for _, e := range req.Entries {
81 ids := p.indexer.Get(e.Key)
82 if len(ids) == 0 {
83 continue
84 }
85 k := y.SafeCopy(nil, e.Key)
86 kv := &pb.KV{
87 Key: y.ParseKey(k),
88 Value: y.SafeCopy(nil, e.Value),
89 Meta: []byte{e.UserMeta},
90 ExpiresAt: e.ExpiresAt,
91 Version: y.ParseTs(k),
92 }
93 for id := range ids {
94 if _, ok := batchedUpdates[id]; !ok {
95 batchedUpdates[id] = &pb.KVList{}
96 }
97 batchedUpdates[id].Kv = append(batchedUpdates[id].Kv, kv)
98 }
99 }
100 }
101
102 for id, kvs := range batchedUpdates {
103 if p.subscribers[id].active.Load() == 1 {
104 p.subscribers[id].sendCh <- kvs
105 }
106 }
107 }
108
109 func (p *publisher) newSubscriber(c *z.Closer, matches []pb.Match) (subscriber, error) {
110 p.Lock()
111 defer p.Unlock()
112 ch := make(chan *pb.KVList, 1000)
113 id := p.nextID
114 // Increment next ID.
115 p.nextID++
116 s := subscriber{
117 id: id,
118 matches: matches,
119 sendCh: ch,
120 subCloser: c,
121 active: new(atomic.Uint64),
122 }
123 s.active.Store(1)
124
125 p.subscribers[id] = s
126 for _, m := range matches {
127 if err := p.indexer.AddMatch(m, id); err != nil {
128 return subscriber{}, err
129 }
130 }
131 return s, nil
132 }
133
134 // cleanSubscribers stops all the subscribers. Ideally, It should be called while closing DB.
135 func (p *publisher) cleanSubscribers() {
136 p.Lock()
137 defer p.Unlock()
138 for id, s := range p.subscribers {
139 for _, m := range s.matches {
140 _ = p.indexer.DeleteMatch(m, id)
141 }
142 delete(p.subscribers, id)
143 s.subCloser.SignalAndWait()
144 }
145 }
146
147 func (p *publisher) deleteSubscriber(id uint64) {
148 p.Lock()
149 defer p.Unlock()
150 if s, ok := p.subscribers[id]; ok {
151 for _, m := range s.matches {
152 _ = p.indexer.DeleteMatch(m, id)
153 }
154 }
155 delete(p.subscribers, id)
156 }
157
158 func (p *publisher) sendUpdates(reqs requests) {
159 if p.noOfSubscribers() != 0 {
160 reqs.IncrRef()
161 p.pubCh <- reqs
162 }
163 }
164
165 func (p *publisher) noOfSubscribers() int {
166 p.Lock()
167 defer p.Unlock()
168 return len(p.subscribers)
169 }
170