txn.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package badger
   7  
   8  import (
   9  	"bytes"
  10  	"context"
  11  	"encoding/hex"
  12  	"errors"
  13  	"fmt"
  14  	"math"
  15  	"sort"
  16  	"strconv"
  17  	"sync"
  18  	"sync/atomic"
  19  
  20  	"github.com/dgraph-io/badger/v4/y"
  21  	"github.com/dgraph-io/ristretto/v2/z"
  22  )
  23  
  24  type oracle struct {
  25  	isManaged       bool // Does not change value, so no locking required.
  26  	detectConflicts bool // Determines if the txns should be checked for conflicts.
  27  
  28  	sync.Mutex // For nextTxnTs and commits.
  29  	// writeChLock lock is for ensuring that transactions go to the write
  30  	// channel in the same order as their commit timestamps.
  31  	writeChLock sync.Mutex
  32  	nextTxnTs   uint64
  33  
  34  	// Used to block NewTransaction, so all previous commits are visible to a new read.
  35  	txnMark *y.WaterMark
  36  
  37  	// Either of these is used to determine which versions can be permanently
  38  	// discarded during compaction.
  39  	discardTs uint64       // Used by ManagedDB.
  40  	readMark  *y.WaterMark // Used by DB.
  41  
  42  	// committedTxns contains all committed writes (contains fingerprints
  43  	// of keys written and their latest commit counter).
  44  	committedTxns []committedTxn
  45  	lastCleanupTs uint64
  46  
  47  	// closer is used to stop watermarks.
  48  	closer *z.Closer
  49  }
  50  
  51  type committedTxn struct {
  52  	ts uint64
  53  	// ConflictKeys Keeps track of the entries written at timestamp ts.
  54  	conflictKeys map[uint64]struct{}
  55  }
  56  
  57  func newOracle(opt Options) *oracle {
  58  	orc := &oracle{
  59  		isManaged:       opt.managedTxns,
  60  		detectConflicts: opt.DetectConflicts,
  61  		// We're not initializing nextTxnTs and readOnlyTs. It would be done after replay in Open.
  62  		//
  63  		// WaterMarks must be 64-bit aligned for atomic package, hence we must use pointers here.
  64  		// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
  65  		readMark: &y.WaterMark{Name: "badger.PendingReads"},
  66  		txnMark:  &y.WaterMark{Name: "badger.TxnTimestamp"},
  67  		closer:   z.NewCloser(2),
  68  	}
  69  	orc.readMark.Init(orc.closer)
  70  	orc.txnMark.Init(orc.closer)
  71  	return orc
  72  }
  73  
  74  func (o *oracle) Stop() {
  75  	o.closer.SignalAndWait()
  76  }
  77  
  78  func (o *oracle) readTs() uint64 {
  79  	if o.isManaged {
  80  		panic("ReadTs should not be retrieved for managed DB")
  81  	}
  82  
  83  	var readTs uint64
  84  	o.Lock()
  85  	readTs = o.nextTxnTs - 1
  86  	o.readMark.Begin(readTs)
  87  	o.Unlock()
  88  
  89  	// Wait for all txns which have no conflicts, have been assigned a commit
  90  	// timestamp and are going through the write to value log and LSM tree
  91  	// process. Not waiting here could mean that some txns which have been
  92  	// committed would not be read.
  93  	y.Check(o.txnMark.WaitForMark(context.Background(), readTs))
  94  	return readTs
  95  }
  96  
  97  func (o *oracle) nextTs() uint64 {
  98  	o.Lock()
  99  	defer o.Unlock()
 100  	return o.nextTxnTs
 101  }
 102  
 103  func (o *oracle) incrementNextTs() {
 104  	o.Lock()
 105  	defer o.Unlock()
 106  	o.nextTxnTs++
 107  }
 108  
 109  // Any deleted or invalid versions at or below ts would be discarded during
 110  // compaction to reclaim disk space in LSM tree and thence value log.
 111  func (o *oracle) setDiscardTs(ts uint64) {
 112  	o.Lock()
 113  	defer o.Unlock()
 114  	o.discardTs = ts
 115  	o.cleanupCommittedTransactions()
 116  }
 117  
 118  func (o *oracle) discardAtOrBelow() uint64 {
 119  	if o.isManaged {
 120  		o.Lock()
 121  		defer o.Unlock()
 122  		return o.discardTs
 123  	}
 124  	return o.readMark.DoneUntil()
 125  }
 126  
 127  // hasConflict must be called while having a lock.
 128  func (o *oracle) hasConflict(txn *Txn) bool {
 129  	if len(txn.reads) == 0 {
 130  		return false
 131  	}
 132  	for _, committedTxn := range o.committedTxns {
 133  		// If the committedTxn.ts is less than txn.readTs that implies that the
 134  		// committedTxn finished before the current transaction started.
 135  		// We don't need to check for conflict in that case.
 136  		// This change assumes linearizability. Lack of linearizability could
 137  		// cause the read ts of a new txn to be lower than the commit ts of
 138  		// a txn before it (@mrjn).
 139  		if committedTxn.ts <= txn.readTs {
 140  			continue
 141  		}
 142  
 143  		for _, ro := range txn.reads {
 144  			if _, has := committedTxn.conflictKeys[ro]; has {
 145  				return true
 146  			}
 147  		}
 148  	}
 149  
 150  	return false
 151  }
 152  
 153  func (o *oracle) newCommitTs(txn *Txn) (uint64, bool) {
 154  	o.Lock()
 155  	defer o.Unlock()
 156  
 157  	if o.hasConflict(txn) {
 158  		return 0, true
 159  	}
 160  
 161  	var ts uint64
 162  	if !o.isManaged {
 163  		o.doneRead(txn)
 164  		o.cleanupCommittedTransactions()
 165  
 166  		// This is the general case, when user doesn't specify the read and commit ts.
 167  		ts = o.nextTxnTs
 168  		o.nextTxnTs++
 169  		o.txnMark.Begin(ts)
 170  
 171  	} else {
 172  		// If commitTs is set, use it instead.
 173  		ts = txn.commitTs
 174  	}
 175  
 176  	y.AssertTrue(ts >= o.lastCleanupTs)
 177  
 178  	if o.detectConflicts {
 179  		// We should ensure that txns are not added to o.committedTxns slice when
 180  		// conflict detection is disabled otherwise this slice would keep growing.
 181  		o.committedTxns = append(o.committedTxns, committedTxn{
 182  			ts:           ts,
 183  			conflictKeys: txn.conflictKeys,
 184  		})
 185  	}
 186  
 187  	return ts, false
 188  }
 189  
 190  func (o *oracle) doneRead(txn *Txn) {
 191  	if !txn.doneRead {
 192  		txn.doneRead = true
 193  		o.readMark.Done(txn.readTs)
 194  	}
 195  }
 196  
 197  func (o *oracle) cleanupCommittedTransactions() { // Must be called under o.Lock
 198  	if !o.detectConflicts {
 199  		// When detectConflicts is set to false, we do not store any
 200  		// committedTxns and so there's nothing to clean up.
 201  		return
 202  	}
 203  	// Same logic as discardAtOrBelow but unlocked
 204  	var maxReadTs uint64
 205  	if o.isManaged {
 206  		maxReadTs = o.discardTs
 207  	} else {
 208  		maxReadTs = o.readMark.DoneUntil()
 209  	}
 210  
 211  	y.AssertTrue(maxReadTs >= o.lastCleanupTs)
 212  
 213  	// do not run clean up if the maxReadTs (read timestamp of the
 214  	// oldest transaction that is still in flight) has not increased
 215  	if maxReadTs == o.lastCleanupTs {
 216  		return
 217  	}
 218  	o.lastCleanupTs = maxReadTs
 219  
 220  	tmp := o.committedTxns[:0]
 221  	for _, txn := range o.committedTxns {
 222  		if txn.ts <= maxReadTs {
 223  			continue
 224  		}
 225  		tmp = append(tmp, txn)
 226  	}
 227  	o.committedTxns = tmp
 228  }
 229  
 230  func (o *oracle) doneCommit(cts uint64) {
 231  	if o.isManaged {
 232  		// No need to update anything.
 233  		return
 234  	}
 235  	o.txnMark.Done(cts)
 236  }
 237  
 238  // Txn represents a Badger transaction.
 239  type Txn struct {
 240  	readTs   uint64
 241  	commitTs uint64
 242  	size     int64
 243  	count    int64
 244  	db       *DB
 245  
 246  	reads []uint64 // contains fingerprints of keys read.
 247  	// contains fingerprints of keys written. This is used for conflict detection.
 248  	conflictKeys map[uint64]struct{}
 249  	readsLock    sync.Mutex // guards the reads slice. See addReadKey.
 250  
 251  	pendingWrites   map[string]*Entry // cache stores any writes done by txn.
 252  	duplicateWrites []*Entry          // Used in managed mode to store duplicate entries.
 253  
 254  	numIterators atomic.Int32
 255  	discarded    bool
 256  	doneRead     bool
 257  	update       bool // update is used to conditionally keep track of reads.
 258  }
 259  
 260  type pendingWritesIterator struct {
 261  	entries  []*Entry
 262  	nextIdx  int
 263  	readTs   uint64
 264  	reversed bool
 265  }
 266  
 267  func (pi *pendingWritesIterator) Next() {
 268  	pi.nextIdx++
 269  }
 270  
 271  func (pi *pendingWritesIterator) Rewind() {
 272  	pi.nextIdx = 0
 273  }
 274  
 275  func (pi *pendingWritesIterator) Seek(key []byte) {
 276  	key = y.ParseKey(key)
 277  	pi.nextIdx = sort.Search(len(pi.entries), func(idx int) bool {
 278  		cmp := bytes.Compare(pi.entries[idx].Key, key)
 279  		if !pi.reversed {
 280  			return cmp >= 0
 281  		}
 282  		return cmp <= 0
 283  	})
 284  }
 285  
 286  func (pi *pendingWritesIterator) Key() []byte {
 287  	y.AssertTrue(pi.Valid())
 288  	entry := pi.entries[pi.nextIdx]
 289  	return y.KeyWithTs(entry.Key, pi.readTs)
 290  }
 291  
 292  func (pi *pendingWritesIterator) Value() y.ValueStruct {
 293  	y.AssertTrue(pi.Valid())
 294  	entry := pi.entries[pi.nextIdx]
 295  	return y.ValueStruct{
 296  		Value:     entry.Value,
 297  		Meta:      entry.meta,
 298  		UserMeta:  entry.UserMeta,
 299  		ExpiresAt: entry.ExpiresAt,
 300  		Version:   pi.readTs,
 301  	}
 302  }
 303  
 304  func (pi *pendingWritesIterator) Valid() bool {
 305  	return pi.nextIdx < len(pi.entries)
 306  }
 307  
 308  func (pi *pendingWritesIterator) Close() error {
 309  	return nil
 310  }
 311  
 312  func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator {
 313  	if !txn.update || len(txn.pendingWrites) == 0 {
 314  		return nil
 315  	}
 316  	entries := make([]*Entry, 0, len(txn.pendingWrites))
 317  	for _, e := range txn.pendingWrites {
 318  		entries = append(entries, e)
 319  	}
 320  	// Number of pending writes per transaction shouldn't be too big in general.
 321  	sort.Slice(entries, func(i, j int) bool {
 322  		cmp := bytes.Compare(entries[i].Key, entries[j].Key)
 323  		if !reversed {
 324  			return cmp < 0
 325  		}
 326  		return cmp > 0
 327  	})
 328  	return &pendingWritesIterator{
 329  		readTs:   txn.readTs,
 330  		entries:  entries,
 331  		reversed: reversed,
 332  	}
 333  }
 334  
 335  func (txn *Txn) checkSize(e *Entry) error {
 336  	count := txn.count + 1
 337  	// Extra bytes for the version in key.
 338  	size := txn.size + e.estimateSizeAndSetThreshold(txn.db.valueThreshold()) + 10
 339  	if count >= txn.db.opt.maxBatchCount || size >= txn.db.opt.maxBatchSize {
 340  		return ErrTxnTooBig
 341  	}
 342  	txn.count, txn.size = count, size
 343  	return nil
 344  }
 345  
 346  func exceedsSize(prefix string, max int64, key []byte) error {
 347  	return fmt.Errorf("%s with size %d exceeded %d limit. %s:\n%s",
 348  		prefix, len(key), max, prefix, hex.Dump(key[:1<<10]))
 349  }
 350  
 351  func (txn *Txn) modify(e *Entry) error {
 352  	const maxKeySize = 65000
 353  
 354  	switch {
 355  	case !txn.update:
 356  		return ErrReadOnlyTxn
 357  	case txn.discarded:
 358  		return ErrDiscardedTxn
 359  	case len(e.Key) == 0:
 360  		return ErrEmptyKey
 361  	case bytes.HasPrefix(e.Key, badgerPrefix):
 362  		return ErrInvalidKey
 363  	case len(e.Key) > maxKeySize:
 364  		// Key length can't be more than uint16, as determined by table::header.  To
 365  		// keep things safe and allow badger move prefix and a timestamp suffix, let's
 366  		// cut it down to 65000, instead of using 65536.
 367  		return exceedsSize("Key", maxKeySize, e.Key)
 368  	case int64(len(e.Value)) > txn.db.opt.ValueLogFileSize:
 369  		return exceedsSize("Value", txn.db.opt.ValueLogFileSize, e.Value)
 370  	case txn.db.opt.InMemory && int64(len(e.Value)) > txn.db.valueThreshold():
 371  		return exceedsSize("Value", txn.db.valueThreshold(), e.Value)
 372  	}
 373  
 374  	if err := txn.db.isBanned(e.Key); err != nil {
 375  		return err
 376  	}
 377  
 378  	if err := txn.checkSize(e); err != nil {
 379  		return err
 380  	}
 381  
 382  	// The txn.conflictKeys is used for conflict detection. If conflict detection
 383  	// is disabled, we don't need to store key hashes in this map.
 384  	if txn.db.opt.DetectConflicts {
 385  		fp := z.MemHash(e.Key) // Avoid dealing with byte arrays.
 386  		txn.conflictKeys[fp] = struct{}{}
 387  	}
 388  	// If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice.
 389  	// Add the entry to duplicateWrites only if both the entries have different versions. For
 390  	// same versions, we will overwrite the existing entry.
 391  	if oldEntry, ok := txn.pendingWrites[string(e.Key)]; ok && oldEntry.version != e.version {
 392  		txn.duplicateWrites = append(txn.duplicateWrites, oldEntry)
 393  	}
 394  	txn.pendingWrites[string(e.Key)] = e
 395  	return nil
 396  }
 397  
 398  // Set adds a key-value pair to the database.
 399  // It will return ErrReadOnlyTxn if update flag was set to false when creating the transaction.
 400  //
 401  // The current transaction keeps a reference to the key and val byte slice
 402  // arguments. Users must not modify key and val until the end of the transaction.
 403  func (txn *Txn) Set(key, val []byte) error {
 404  	return txn.SetEntry(NewEntry(key, val))
 405  }
 406  
 407  // SetEntry takes an Entry struct and adds the key-value pair in the struct,
 408  // along with other metadata to the database.
 409  //
 410  // The current transaction keeps a reference to the entry passed in argument.
 411  // Users must not modify the entry until the end of the transaction.
 412  func (txn *Txn) SetEntry(e *Entry) error {
 413  	return txn.modify(e)
 414  }
 415  
 416  // Delete deletes a key.
 417  //
 418  // This is done by adding a delete marker for the key at commit timestamp.  Any
 419  // reads happening before this timestamp would be unaffected. Any reads after
 420  // this commit would see the deletion.
 421  //
 422  // The current transaction keeps a reference to the key byte slice argument.
 423  // Users must not modify the key until the end of the transaction.
 424  func (txn *Txn) Delete(key []byte) error {
 425  	e := &Entry{
 426  		Key:  key,
 427  		meta: bitDelete,
 428  	}
 429  	return txn.modify(e)
 430  }
 431  
 432  // Get looks for key and returns corresponding Item.
 433  // If key is not found, ErrKeyNotFound is returned.
 434  func (txn *Txn) Get(key []byte) (item *Item, rerr error) {
 435  	if len(key) == 0 {
 436  		return nil, ErrEmptyKey
 437  	} else if txn.discarded {
 438  		return nil, ErrDiscardedTxn
 439  	}
 440  
 441  	if err := txn.db.isBanned(key); err != nil {
 442  		return nil, err
 443  	}
 444  
 445  	item = new(Item)
 446  	if txn.update {
 447  		if e, has := txn.pendingWrites[string(key)]; has && bytes.Equal(key, e.Key) {
 448  			if isDeletedOrExpired(e.meta, e.ExpiresAt) {
 449  				return nil, ErrKeyNotFound
 450  			}
 451  			// Fulfill from cache.
 452  			item.meta = e.meta
 453  			item.val = e.Value
 454  			item.userMeta = e.UserMeta
 455  			item.key = key
 456  			item.status = prefetched
 457  			item.version = txn.readTs
 458  			item.expiresAt = e.ExpiresAt
 459  			// We probably don't need to set db on item here.
 460  			return item, nil
 461  		}
 462  		// Only track reads if this is update txn. No need to track read if txn serviced it
 463  		// internally.
 464  		txn.addReadKey(key)
 465  	}
 466  
 467  	seek := y.KeyWithTs(key, txn.readTs)
 468  	vs, err := txn.db.get(seek)
 469  	if err != nil {
 470  		return nil, y.Wrapf(err, "DB::Get key: %q", key)
 471  	}
 472  	if vs.Value == nil && vs.Meta == 0 {
 473  		return nil, ErrKeyNotFound
 474  	}
 475  	if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
 476  		return nil, ErrKeyNotFound
 477  	}
 478  
 479  	item.key = key
 480  	item.version = vs.Version
 481  	item.meta = vs.Meta
 482  	item.userMeta = vs.UserMeta
 483  	item.vptr = y.SafeCopy(item.vptr, vs.Value)
 484  	item.txn = txn
 485  	item.expiresAt = vs.ExpiresAt
 486  	return item, nil
 487  }
 488  
 489  func (txn *Txn) addReadKey(key []byte) {
 490  	if txn.update {
 491  		fp := z.MemHash(key)
 492  
 493  		// Because of the possibility of multiple iterators it is now possible
 494  		// for multiple threads within a read-write transaction to read keys at
 495  		// the same time. The reads slice is not currently thread-safe and
 496  		// needs to be locked whenever we mark a key as read.
 497  		txn.readsLock.Lock()
 498  		txn.reads = append(txn.reads, fp)
 499  		txn.readsLock.Unlock()
 500  	}
 501  }
 502  
 503  // Discard discards a created transaction. This method is very important and must be called. Commit
 504  // method calls this internally, however, calling this multiple times doesn't cause any issues. So,
 505  // this can safely be called via a defer right when transaction is created.
 506  //
 507  // NOTE: If any operations are run on a discarded transaction, ErrDiscardedTxn is returned.
 508  func (txn *Txn) Discard() {
 509  	if txn.discarded { // Avoid a re-run.
 510  		return
 511  	}
 512  	if txn.numIterators.Load() > 0 {
 513  		panic("Unclosed iterator at time of Txn.Discard.")
 514  	}
 515  	txn.discarded = true
 516  	if !txn.db.orc.isManaged {
 517  		txn.db.orc.doneRead(txn)
 518  	}
 519  }
 520  
 521  func (txn *Txn) commitAndSend() (func() error, error) {
 522  	orc := txn.db.orc
 523  	// Ensure that the order in which we get the commit timestamp is the same as
 524  	// the order in which we push these updates to the write channel. So, we
 525  	// acquire a writeChLock before getting a commit timestamp, and only release
 526  	// it after pushing the entries to it.
 527  	orc.writeChLock.Lock()
 528  	defer orc.writeChLock.Unlock()
 529  
 530  	commitTs, conflict := orc.newCommitTs(txn)
 531  	if conflict {
 532  		return nil, ErrConflict
 533  	}
 534  
 535  	keepTogether := true
 536  	setVersion := func(e *Entry) {
 537  		if e.version == 0 {
 538  			e.version = commitTs
 539  		} else {
 540  			keepTogether = false
 541  		}
 542  	}
 543  	for _, e := range txn.pendingWrites {
 544  		setVersion(e)
 545  	}
 546  	// The duplicateWrites slice will be non-empty only if there are duplicate
 547  	// entries with different versions.
 548  	for _, e := range txn.duplicateWrites {
 549  		setVersion(e)
 550  	}
 551  
 552  	entries := make([]*Entry, 0, len(txn.pendingWrites)+len(txn.duplicateWrites)+1)
 553  
 554  	processEntry := func(e *Entry) {
 555  		// Suffix the keys with commit ts, so the key versions are sorted in
 556  		// descending order of commit timestamp.
 557  		e.Key = y.KeyWithTs(e.Key, e.version)
 558  		// Add bitTxn only if these entries are part of a transaction. We
 559  		// support SetEntryAt(..) in managed mode which means a single
 560  		// transaction can have entries with different timestamps. If entries
 561  		// in a single transaction have different timestamps, we don't add the
 562  		// transaction markers.
 563  		if keepTogether {
 564  			e.meta |= bitTxn
 565  		}
 566  		entries = append(entries, e)
 567  	}
 568  
 569  	// The following debug information is what led to determining the cause of
 570  	// bank txn violation bug, and it took a whole bunch of effort to narrow it
 571  	// down to here. So, keep this around for at least a couple of months.
 572  	// var b strings.Builder
 573  	// fmt.Fprintf(&b, "Read: %d. Commit: %d. reads: %v. writes: %v. Keys: ",
 574  	// 	txn.readTs, commitTs, txn.reads, txn.conflictKeys)
 575  	for _, e := range txn.pendingWrites {
 576  		processEntry(e)
 577  	}
 578  	for _, e := range txn.duplicateWrites {
 579  		processEntry(e)
 580  	}
 581  
 582  	if keepTogether {
 583  		// CommitTs should not be zero if we're inserting transaction markers.
 584  		y.AssertTrue(commitTs != 0)
 585  		e := &Entry{
 586  			Key:   y.KeyWithTs(txnKey, commitTs),
 587  			Value: []byte(strconv.FormatUint(commitTs, 10)),
 588  			meta:  bitFinTxn,
 589  		}
 590  		entries = append(entries, e)
 591  	}
 592  
 593  	req, err := txn.db.sendToWriteCh(entries)
 594  	if err != nil {
 595  		orc.doneCommit(commitTs)
 596  		return nil, err
 597  	}
 598  	ret := func() error {
 599  		err := req.Wait()
 600  		// Wait before marking commitTs as done.
 601  		// We can't defer doneCommit above, because it is being called from a
 602  		// callback here.
 603  		orc.doneCommit(commitTs)
 604  		return err
 605  	}
 606  	return ret, nil
 607  }
 608  
 609  func (txn *Txn) commitPrecheck() error {
 610  	if txn.discarded {
 611  		return errors.New("Trying to commit a discarded txn")
 612  	}
 613  	keepTogether := true
 614  	for _, e := range txn.pendingWrites {
 615  		if e.version != 0 {
 616  			keepTogether = false
 617  		}
 618  	}
 619  
 620  	// If keepTogether is True, it implies transaction markers will be added.
 621  	// In that case, commitTs should not be never be zero. This might happen if
 622  	// someone uses txn.Commit instead of txn.CommitAt in managed mode.  This
 623  	// should happen only in managed mode. In normal mode, keepTogether will
 624  	// always be true.
 625  	if keepTogether && txn.db.opt.managedTxns && txn.commitTs == 0 {
 626  		return errors.New("CommitTs cannot be zero. Please use commitAt instead")
 627  	}
 628  	return nil
 629  }
 630  
 631  // Commit commits the transaction, following these steps:
 632  //
 633  // 1. If there are no writes, return immediately.
 634  //
 635  // 2. Check if read rows were updated since txn started. If so, return ErrConflict.
 636  //
 637  // 3. If no conflict, generate a commit timestamp and update written rows' commit ts.
 638  //
 639  // 4. Batch up all writes, write them to value log and LSM tree.
 640  //
 641  // 5. If callback is provided, Badger will return immediately after checking
 642  // for conflicts. Writes to the database will happen in the background.  If
 643  // there is a conflict, an error will be returned and the callback will not
 644  // run. If there are no conflicts, the callback will be called in the
 645  // background upon successful completion of writes or any error during write.
 646  //
 647  // If error is nil, the transaction is successfully committed. In case of a non-nil error, the LSM
 648  // tree won't be updated, so there's no need for any rollback.
 649  func (txn *Txn) Commit() error {
 650  	// txn.conflictKeys can be zero if conflict detection is turned off. So we
 651  	// should check txn.pendingWrites.
 652  	if len(txn.pendingWrites) == 0 {
 653  		// Discard the transaction so that the read is marked done.
 654  		txn.Discard()
 655  		return nil
 656  	}
 657  	// Precheck before discarding txn.
 658  	if err := txn.commitPrecheck(); err != nil {
 659  		return err
 660  	}
 661  	defer txn.Discard()
 662  
 663  	txnCb, err := txn.commitAndSend()
 664  	if err != nil {
 665  		return err
 666  	}
 667  	// If batchSet failed, LSM would not have been updated. So, no need to rollback anything.
 668  
 669  	// TODO: What if some of the txns successfully make it to value log, but others fail.
 670  	// Nothing gets updated to LSM, until a restart happens.
 671  	return txnCb()
 672  }
 673  
 674  type txnCb struct {
 675  	commit func() error
 676  	user   func(error)
 677  	err    error
 678  }
 679  
 680  func runTxnCallback(cb *txnCb) {
 681  	switch {
 682  	case cb == nil:
 683  		panic("txn callback is nil")
 684  	case cb.user == nil:
 685  		panic("Must have caught a nil callback for txn.CommitWith")
 686  	case cb.err != nil:
 687  		cb.user(cb.err)
 688  	case cb.commit != nil:
 689  		err := cb.commit()
 690  		cb.user(err)
 691  	default:
 692  		cb.user(nil)
 693  	}
 694  }
 695  
 696  // CommitWith acts like Commit, but takes a callback, which gets run via a
 697  // goroutine to avoid blocking this function. The callback is guaranteed to run,
 698  // so it is safe to increment sync.WaitGroup before calling CommitWith, and
 699  // decrementing it in the callback; to block until all callbacks are run.
 700  func (txn *Txn) CommitWith(cb func(error)) {
 701  	if cb == nil {
 702  		panic("Nil callback provided to CommitWith")
 703  	}
 704  
 705  	if len(txn.pendingWrites) == 0 {
 706  		// Do not run these callbacks from here, because the CommitWith and the
 707  		// callback might be acquiring the same locks. Instead run the callback
 708  		// from another goroutine.
 709  		go runTxnCallback(&txnCb{user: cb, err: nil})
 710  		// Discard the transaction so that the read is marked done.
 711  		txn.Discard()
 712  		return
 713  	}
 714  
 715  	// Precheck before discarding txn.
 716  	if err := txn.commitPrecheck(); err != nil {
 717  		cb(err)
 718  		return
 719  	}
 720  
 721  	defer txn.Discard()
 722  
 723  	commitCb, err := txn.commitAndSend()
 724  	if err != nil {
 725  		go runTxnCallback(&txnCb{user: cb, err: err})
 726  		return
 727  	}
 728  
 729  	go runTxnCallback(&txnCb{user: cb, commit: commitCb})
 730  }
 731  
 732  // ReadTs returns the read timestamp of the transaction.
 733  func (txn *Txn) ReadTs() uint64 {
 734  	return txn.readTs
 735  }
 736  
 737  // NewTransaction creates a new transaction. Badger supports concurrent execution of transactions,
 738  // providing serializable snapshot isolation, avoiding write skews. Badger achieves this by tracking
 739  // the keys read and at Commit time, ensuring that these read keys weren't concurrently modified by
 740  // another transaction.
 741  //
 742  // For read-only transactions, set update to false. In this mode, we don't track the rows read for
 743  // any changes. Thus, any long running iterations done in this mode wouldn't pay this overhead.
 744  //
 745  // Running transactions concurrently is OK. However, a transaction itself isn't thread safe, and
 746  // should only be run serially. It doesn't matter if a transaction is created by one goroutine and
 747  // passed down to other, as long as the Txn APIs are called serially.
 748  //
 749  // When you create a new transaction, it is absolutely essential to call
 750  // Discard(). This should be done irrespective of what the update param is set
 751  // to. Commit API internally runs Discard, but running it twice wouldn't cause
 752  // any issues.
 753  //
 754  //	txn := db.NewTransaction(false)
 755  //	defer txn.Discard()
 756  //	// Call various APIs.
 757  func (db *DB) NewTransaction(update bool) *Txn {
 758  	return db.newTransaction(update, false)
 759  }
 760  
 761  func (db *DB) newTransaction(update, isManaged bool) *Txn {
 762  	if db.opt.ReadOnly && update {
 763  		// DB is read-only, force read-only transaction.
 764  		update = false
 765  	}
 766  
 767  	txn := &Txn{
 768  		update: update,
 769  		db:     db,
 770  		count:  1,                       // One extra entry for BitFin.
 771  		size:   int64(len(txnKey) + 10), // Some buffer for the extra entry.
 772  	}
 773  	if update {
 774  		if db.opt.DetectConflicts {
 775  			txn.conflictKeys = make(map[uint64]struct{})
 776  		}
 777  		txn.pendingWrites = make(map[string]*Entry)
 778  	}
 779  	if !isManaged {
 780  		txn.readTs = db.orc.readTs()
 781  	}
 782  	return txn
 783  }
 784  
 785  // View executes a function creating and managing a read-only transaction for the user. Error
 786  // returned by the function is relayed by the View method.
 787  // If View is used with managed transactions, it would assume a read timestamp of MaxUint64.
 788  func (db *DB) View(fn func(txn *Txn) error) error {
 789  	if db.IsClosed() {
 790  		return ErrDBClosed
 791  	}
 792  	var txn *Txn
 793  	if db.opt.managedTxns {
 794  		txn = db.NewTransactionAt(math.MaxUint64, false)
 795  	} else {
 796  		txn = db.NewTransaction(false)
 797  	}
 798  	defer txn.Discard()
 799  
 800  	return fn(txn)
 801  }
 802  
 803  // Update executes a function, creating and managing a read-write transaction
 804  // for the user. Error returned by the function is relayed by the Update method.
 805  // Update cannot be used with managed transactions.
 806  func (db *DB) Update(fn func(txn *Txn) error) error {
 807  	if db.IsClosed() {
 808  		return ErrDBClosed
 809  	}
 810  	if db.opt.managedTxns {
 811  		panic("Update can only be used with managedDB=false.")
 812  	}
 813  	txn := db.NewTransaction(true)
 814  	defer txn.Discard()
 815  
 816  	if err := fn(txn); err != nil {
 817  		return err
 818  	}
 819  
 820  	return txn.Commit()
 821  }
 822