watermark.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package y
   7  
   8  import (
   9  	"container/heap"
  10  	"context"
  11  	"sync/atomic"
  12  
  13  	"github.com/dgraph-io/ristretto/v2/z"
  14  )
  15  
  16  type uint64Heap []uint64
  17  
  18  func (u uint64Heap) Len() int            { return len(u) }
  19  func (u uint64Heap) Less(i, j int) bool  { return u[i] < u[j] }
  20  func (u uint64Heap) Swap(i, j int)       { u[i], u[j] = u[j], u[i] }
  21  func (u *uint64Heap) Push(x interface{}) { *u = append(*u, x.(uint64)) }
  22  func (u *uint64Heap) Pop() interface{} {
  23  	old := *u
  24  	n := len(old)
  25  	x := old[n-1]
  26  	*u = old[0 : n-1]
  27  	return x
  28  }
  29  
  30  // mark contains one of more indices, along with a done boolean to indicate the
  31  // status of the index: begin or done. It also contains waiters, who could be
  32  // waiting for the watermark to reach >= a certain index.
  33  type mark struct {
  34  	// Either this is an (index, waiter) pair or (index, done) or (indices, done).
  35  	index   uint64
  36  	waiter  chan struct{}
  37  	indices []uint64
  38  	done    bool // Set to true if the index is done.
  39  }
  40  
  41  // WaterMark is used to keep track of the minimum un-finished index.  Typically, an index k becomes
  42  // finished or "done" according to a WaterMark once Done(k) has been called
  43  //  1. as many times as Begin(k) has, AND
  44  //  2. a positive number of times.
  45  //
  46  // An index may also become "done" by calling SetDoneUntil at a time such that it is not
  47  // inter-mingled with Begin/Done calls.
  48  //
  49  // Since doneUntil and lastIndex addresses are passed to sync/atomic packages, we ensure that they
  50  // are 64-bit aligned by putting them at the beginning of the structure.
  51  type WaterMark struct {
  52  	doneUntil atomic.Uint64
  53  	lastIndex atomic.Uint64
  54  	Name      string
  55  	markCh    chan mark
  56  }
  57  
  58  // Init initializes a WaterMark struct. MUST be called before using it.
  59  func (w *WaterMark) Init(closer *z.Closer) {
  60  	w.markCh = make(chan mark, 100)
  61  	go w.process(closer)
  62  }
  63  
  64  // Begin sets the last index to the given value.
  65  func (w *WaterMark) Begin(index uint64) {
  66  	w.lastIndex.Store(index)
  67  	w.markCh <- mark{index: index, done: false}
  68  }
  69  
  70  // BeginMany works like Begin but accepts multiple indices.
  71  func (w *WaterMark) BeginMany(indices []uint64) {
  72  	w.lastIndex.Store(indices[len(indices)-1])
  73  	w.markCh <- mark{index: 0, indices: indices, done: false}
  74  }
  75  
  76  // Done sets a single index as done.
  77  func (w *WaterMark) Done(index uint64) {
  78  	w.markCh <- mark{index: index, done: true}
  79  }
  80  
  81  // DoneMany works like Done but accepts multiple indices.
  82  func (w *WaterMark) DoneMany(indices []uint64) {
  83  	w.markCh <- mark{index: 0, indices: indices, done: true}
  84  }
  85  
  86  // DoneUntil returns the maximum index that has the property that all indices
  87  // less than or equal to it are done.
  88  func (w *WaterMark) DoneUntil() uint64 {
  89  	return w.doneUntil.Load()
  90  }
  91  
  92  // SetDoneUntil sets the maximum index that has the property that all indices
  93  // less than or equal to it are done.
  94  func (w *WaterMark) SetDoneUntil(val uint64) {
  95  	w.doneUntil.Store(val)
  96  }
  97  
  98  // LastIndex returns the last index for which Begin has been called.
  99  func (w *WaterMark) LastIndex() uint64 {
 100  	return w.lastIndex.Load()
 101  }
 102  
 103  // WaitForMark waits until the given index is marked as done.
 104  func (w *WaterMark) WaitForMark(ctx context.Context, index uint64) error {
 105  	if w.DoneUntil() >= index {
 106  		return nil
 107  	}
 108  	waitCh := make(chan struct{})
 109  	w.markCh <- mark{index: index, waiter: waitCh}
 110  
 111  	select {
 112  	case <-ctx.Done():
 113  		return ctx.Err()
 114  	case <-waitCh:
 115  		return nil
 116  	}
 117  }
 118  
 119  // process is used to process the Mark channel. This is not thread-safe,
 120  // so only run one goroutine for process. One is sufficient, because
 121  // all goroutine ops use purely memory and cpu.
 122  // Each index has to emit atleast one begin watermark in serial order otherwise waiters
 123  // can get blocked idefinitely. Example: We had an watermark at 100 and a waiter at 101,
 124  // if no watermark is emitted at index 101 then waiter would get stuck indefinitely as it
 125  // can't decide whether the task at 101 has decided not to emit watermark or it didn't get
 126  // scheduled yet.
 127  func (w *WaterMark) process(closer *z.Closer) {
 128  	defer closer.Done()
 129  
 130  	var indices uint64Heap
 131  	// pending maps raft proposal index to the number of pending mutations for this proposal.
 132  	pending := make(map[uint64]int)
 133  	waiters := make(map[uint64][]chan struct{})
 134  
 135  	heap.Init(&indices)
 136  
 137  	processOne := func(index uint64, done bool) {
 138  		// If not already done, then set. Otherwise, don't undo a done entry.
 139  		prev, present := pending[index]
 140  		if !present {
 141  			heap.Push(&indices, index)
 142  		}
 143  
 144  		delta := 1
 145  		if done {
 146  			delta = -1
 147  		}
 148  		pending[index] = prev + delta
 149  
 150  		// Update mark by going through all indices in order; and checking if they have
 151  		// been done. Stop at the first index, which isn't done.
 152  		doneUntil := w.DoneUntil()
 153  		if doneUntil > index {
 154  			AssertTruef(false, "Name: %s doneUntil: %d. Index: %d", w.Name, doneUntil, index)
 155  		}
 156  
 157  		until := doneUntil
 158  		loops := 0
 159  
 160  		for len(indices) > 0 {
 161  			min := indices[0]
 162  			if done := pending[min]; done > 0 {
 163  				break // len(indices) will be > 0.
 164  			}
 165  			// Even if done is called multiple times causing it to become
 166  			// negative, we should still pop the index.
 167  			heap.Pop(&indices)
 168  			delete(pending, min)
 169  			until = min
 170  			loops++
 171  		}
 172  
 173  		if until != doneUntil {
 174  			AssertTrue(w.doneUntil.CompareAndSwap(doneUntil, until))
 175  		}
 176  
 177  		notifyAndRemove := func(idx uint64, toNotify []chan struct{}) {
 178  			for _, ch := range toNotify {
 179  				close(ch)
 180  			}
 181  			delete(waiters, idx) // Release the memory back.
 182  		}
 183  
 184  		if until-doneUntil <= uint64(len(waiters)) {
 185  			// Issue #908 showed that if doneUntil is close to 2^60, while until is zero, this loop
 186  			// can hog up CPU just iterating over integers creating a busy-wait loop. So, only do
 187  			// this path if until - doneUntil is less than the number of waiters.
 188  			for idx := doneUntil + 1; idx <= until; idx++ {
 189  				if toNotify, ok := waiters[idx]; ok {
 190  					notifyAndRemove(idx, toNotify)
 191  				}
 192  			}
 193  		} else {
 194  			for idx, toNotify := range waiters {
 195  				if idx <= until {
 196  					notifyAndRemove(idx, toNotify)
 197  				}
 198  			}
 199  		} // end of notifying waiters.
 200  	}
 201  
 202  	for {
 203  		select {
 204  		case <-closer.HasBeenClosed():
 205  			return
 206  		case mark := <-w.markCh:
 207  			if mark.waiter != nil {
 208  				doneUntil := w.doneUntil.Load()
 209  				if doneUntil >= mark.index {
 210  					close(mark.waiter)
 211  				} else {
 212  					ws, ok := waiters[mark.index]
 213  					if !ok {
 214  						waiters[mark.index] = []chan struct{}{mark.waiter}
 215  					} else {
 216  						waiters[mark.index] = append(ws, mark.waiter)
 217  					}
 218  				}
 219  			} else {
 220  				// it is possible that mark.index is zero. We need to handle that case as well.
 221  				if mark.index > 0 || (mark.index == 0 && len(mark.indices) == 0) {
 222  					processOne(mark.index, mark.done)
 223  				}
 224  				for _, index := range mark.indices {
 225  					processOne(index, mark.done)
 226  				}
 227  			}
 228  		}
 229  	}
 230  }
 231