watermark.go raw
1 /*
2 * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 package y
7
8 import (
9 "container/heap"
10 "context"
11 "sync/atomic"
12
13 "github.com/dgraph-io/ristretto/v2/z"
14 )
15
16 type uint64Heap []uint64
17
18 func (u uint64Heap) Len() int { return len(u) }
19 func (u uint64Heap) Less(i, j int) bool { return u[i] < u[j] }
20 func (u uint64Heap) Swap(i, j int) { u[i], u[j] = u[j], u[i] }
21 func (u *uint64Heap) Push(x interface{}) { *u = append(*u, x.(uint64)) }
22 func (u *uint64Heap) Pop() interface{} {
23 old := *u
24 n := len(old)
25 x := old[n-1]
26 *u = old[0 : n-1]
27 return x
28 }
29
30 // mark contains one of more indices, along with a done boolean to indicate the
31 // status of the index: begin or done. It also contains waiters, who could be
32 // waiting for the watermark to reach >= a certain index.
33 type mark struct {
34 // Either this is an (index, waiter) pair or (index, done) or (indices, done).
35 index uint64
36 waiter chan struct{}
37 indices []uint64
38 done bool // Set to true if the index is done.
39 }
40
41 // WaterMark is used to keep track of the minimum un-finished index. Typically, an index k becomes
42 // finished or "done" according to a WaterMark once Done(k) has been called
43 // 1. as many times as Begin(k) has, AND
44 // 2. a positive number of times.
45 //
46 // An index may also become "done" by calling SetDoneUntil at a time such that it is not
47 // inter-mingled with Begin/Done calls.
48 //
49 // Since doneUntil and lastIndex addresses are passed to sync/atomic packages, we ensure that they
50 // are 64-bit aligned by putting them at the beginning of the structure.
51 type WaterMark struct {
52 doneUntil atomic.Uint64
53 lastIndex atomic.Uint64
54 Name string
55 markCh chan mark
56 }
57
58 // Init initializes a WaterMark struct. MUST be called before using it.
59 func (w *WaterMark) Init(closer *z.Closer) {
60 w.markCh = make(chan mark, 100)
61 go w.process(closer)
62 }
63
64 // Begin sets the last index to the given value.
65 func (w *WaterMark) Begin(index uint64) {
66 w.lastIndex.Store(index)
67 w.markCh <- mark{index: index, done: false}
68 }
69
70 // BeginMany works like Begin but accepts multiple indices.
71 func (w *WaterMark) BeginMany(indices []uint64) {
72 w.lastIndex.Store(indices[len(indices)-1])
73 w.markCh <- mark{index: 0, indices: indices, done: false}
74 }
75
76 // Done sets a single index as done.
77 func (w *WaterMark) Done(index uint64) {
78 w.markCh <- mark{index: index, done: true}
79 }
80
81 // DoneMany works like Done but accepts multiple indices.
82 func (w *WaterMark) DoneMany(indices []uint64) {
83 w.markCh <- mark{index: 0, indices: indices, done: true}
84 }
85
86 // DoneUntil returns the maximum index that has the property that all indices
87 // less than or equal to it are done.
88 func (w *WaterMark) DoneUntil() uint64 {
89 return w.doneUntil.Load()
90 }
91
92 // SetDoneUntil sets the maximum index that has the property that all indices
93 // less than or equal to it are done.
94 func (w *WaterMark) SetDoneUntil(val uint64) {
95 w.doneUntil.Store(val)
96 }
97
98 // LastIndex returns the last index for which Begin has been called.
99 func (w *WaterMark) LastIndex() uint64 {
100 return w.lastIndex.Load()
101 }
102
103 // WaitForMark waits until the given index is marked as done.
104 func (w *WaterMark) WaitForMark(ctx context.Context, index uint64) error {
105 if w.DoneUntil() >= index {
106 return nil
107 }
108 waitCh := make(chan struct{})
109 w.markCh <- mark{index: index, waiter: waitCh}
110
111 select {
112 case <-ctx.Done():
113 return ctx.Err()
114 case <-waitCh:
115 return nil
116 }
117 }
118
119 // process is used to process the Mark channel. This is not thread-safe,
120 // so only run one goroutine for process. One is sufficient, because
121 // all goroutine ops use purely memory and cpu.
122 // Each index has to emit atleast one begin watermark in serial order otherwise waiters
123 // can get blocked idefinitely. Example: We had an watermark at 100 and a waiter at 101,
124 // if no watermark is emitted at index 101 then waiter would get stuck indefinitely as it
125 // can't decide whether the task at 101 has decided not to emit watermark or it didn't get
126 // scheduled yet.
127 func (w *WaterMark) process(closer *z.Closer) {
128 defer closer.Done()
129
130 var indices uint64Heap
131 // pending maps raft proposal index to the number of pending mutations for this proposal.
132 pending := make(map[uint64]int)
133 waiters := make(map[uint64][]chan struct{})
134
135 heap.Init(&indices)
136
137 processOne := func(index uint64, done bool) {
138 // If not already done, then set. Otherwise, don't undo a done entry.
139 prev, present := pending[index]
140 if !present {
141 heap.Push(&indices, index)
142 }
143
144 delta := 1
145 if done {
146 delta = -1
147 }
148 pending[index] = prev + delta
149
150 // Update mark by going through all indices in order; and checking if they have
151 // been done. Stop at the first index, which isn't done.
152 doneUntil := w.DoneUntil()
153 if doneUntil > index {
154 AssertTruef(false, "Name: %s doneUntil: %d. Index: %d", w.Name, doneUntil, index)
155 }
156
157 until := doneUntil
158 loops := 0
159
160 for len(indices) > 0 {
161 min := indices[0]
162 if done := pending[min]; done > 0 {
163 break // len(indices) will be > 0.
164 }
165 // Even if done is called multiple times causing it to become
166 // negative, we should still pop the index.
167 heap.Pop(&indices)
168 delete(pending, min)
169 until = min
170 loops++
171 }
172
173 if until != doneUntil {
174 AssertTrue(w.doneUntil.CompareAndSwap(doneUntil, until))
175 }
176
177 notifyAndRemove := func(idx uint64, toNotify []chan struct{}) {
178 for _, ch := range toNotify {
179 close(ch)
180 }
181 delete(waiters, idx) // Release the memory back.
182 }
183
184 if until-doneUntil <= uint64(len(waiters)) {
185 // Issue #908 showed that if doneUntil is close to 2^60, while until is zero, this loop
186 // can hog up CPU just iterating over integers creating a busy-wait loop. So, only do
187 // this path if until - doneUntil is less than the number of waiters.
188 for idx := doneUntil + 1; idx <= until; idx++ {
189 if toNotify, ok := waiters[idx]; ok {
190 notifyAndRemove(idx, toNotify)
191 }
192 }
193 } else {
194 for idx, toNotify := range waiters {
195 if idx <= until {
196 notifyAndRemove(idx, toNotify)
197 }
198 }
199 } // end of notifying waiters.
200 }
201
202 for {
203 select {
204 case <-closer.HasBeenClosed():
205 return
206 case mark := <-w.markCh:
207 if mark.waiter != nil {
208 doneUntil := w.doneUntil.Load()
209 if doneUntil >= mark.index {
210 close(mark.waiter)
211 } else {
212 ws, ok := waiters[mark.index]
213 if !ok {
214 waiters[mark.index] = []chan struct{}{mark.waiter}
215 } else {
216 waiters[mark.index] = append(ws, mark.waiter)
217 }
218 }
219 } else {
220 // it is possible that mark.index is zero. We need to handle that case as well.
221 if mark.index > 0 || (mark.index == 0 && len(mark.indices) == 0) {
222 processOne(mark.index, mark.done)
223 }
224 for _, index := range mark.indices {
225 processOne(index, mark.done)
226 }
227 }
228 }
229 }
230 }
231