txn.go raw
1 /*
2 * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 package badger
7
8 import (
9 "bytes"
10 "context"
11 "encoding/hex"
12 "errors"
13 "fmt"
14 "math"
15 "sort"
16 "strconv"
17 "sync"
18 "sync/atomic"
19
20 "github.com/dgraph-io/badger/v4/y"
21 "github.com/dgraph-io/ristretto/v2/z"
22 )
23
24 type oracle struct {
25 isManaged bool // Does not change value, so no locking required.
26 detectConflicts bool // Determines if the txns should be checked for conflicts.
27
28 sync.Mutex // For nextTxnTs and commits.
29 // writeChLock lock is for ensuring that transactions go to the write
30 // channel in the same order as their commit timestamps.
31 writeChLock sync.Mutex
32 nextTxnTs uint64
33
34 // Used to block NewTransaction, so all previous commits are visible to a new read.
35 txnMark *y.WaterMark
36
37 // Either of these is used to determine which versions can be permanently
38 // discarded during compaction.
39 discardTs uint64 // Used by ManagedDB.
40 readMark *y.WaterMark // Used by DB.
41
42 // committedTxns contains all committed writes (contains fingerprints
43 // of keys written and their latest commit counter).
44 committedTxns []committedTxn
45 lastCleanupTs uint64
46
47 // closer is used to stop watermarks.
48 closer *z.Closer
49 }
50
51 type committedTxn struct {
52 ts uint64
53 // ConflictKeys Keeps track of the entries written at timestamp ts.
54 conflictKeys map[uint64]struct{}
55 }
56
57 func newOracle(opt Options) *oracle {
58 orc := &oracle{
59 isManaged: opt.managedTxns,
60 detectConflicts: opt.DetectConflicts,
61 // We're not initializing nextTxnTs and readOnlyTs. It would be done after replay in Open.
62 //
63 // WaterMarks must be 64-bit aligned for atomic package, hence we must use pointers here.
64 // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
65 readMark: &y.WaterMark{Name: "badger.PendingReads"},
66 txnMark: &y.WaterMark{Name: "badger.TxnTimestamp"},
67 closer: z.NewCloser(2),
68 }
69 orc.readMark.Init(orc.closer)
70 orc.txnMark.Init(orc.closer)
71 return orc
72 }
73
74 func (o *oracle) Stop() {
75 o.closer.SignalAndWait()
76 }
77
78 func (o *oracle) readTs() uint64 {
79 if o.isManaged {
80 panic("ReadTs should not be retrieved for managed DB")
81 }
82
83 var readTs uint64
84 o.Lock()
85 readTs = o.nextTxnTs - 1
86 o.readMark.Begin(readTs)
87 o.Unlock()
88
89 // Wait for all txns which have no conflicts, have been assigned a commit
90 // timestamp and are going through the write to value log and LSM tree
91 // process. Not waiting here could mean that some txns which have been
92 // committed would not be read.
93 y.Check(o.txnMark.WaitForMark(context.Background(), readTs))
94 return readTs
95 }
96
97 func (o *oracle) nextTs() uint64 {
98 o.Lock()
99 defer o.Unlock()
100 return o.nextTxnTs
101 }
102
103 func (o *oracle) incrementNextTs() {
104 o.Lock()
105 defer o.Unlock()
106 o.nextTxnTs++
107 }
108
109 // Any deleted or invalid versions at or below ts would be discarded during
110 // compaction to reclaim disk space in LSM tree and thence value log.
111 func (o *oracle) setDiscardTs(ts uint64) {
112 o.Lock()
113 defer o.Unlock()
114 o.discardTs = ts
115 o.cleanupCommittedTransactions()
116 }
117
118 func (o *oracle) discardAtOrBelow() uint64 {
119 if o.isManaged {
120 o.Lock()
121 defer o.Unlock()
122 return o.discardTs
123 }
124 return o.readMark.DoneUntil()
125 }
126
127 // hasConflict must be called while having a lock.
128 func (o *oracle) hasConflict(txn *Txn) bool {
129 if len(txn.reads) == 0 {
130 return false
131 }
132 for _, committedTxn := range o.committedTxns {
133 // If the committedTxn.ts is less than txn.readTs that implies that the
134 // committedTxn finished before the current transaction started.
135 // We don't need to check for conflict in that case.
136 // This change assumes linearizability. Lack of linearizability could
137 // cause the read ts of a new txn to be lower than the commit ts of
138 // a txn before it (@mrjn).
139 if committedTxn.ts <= txn.readTs {
140 continue
141 }
142
143 for _, ro := range txn.reads {
144 if _, has := committedTxn.conflictKeys[ro]; has {
145 return true
146 }
147 }
148 }
149
150 return false
151 }
152
153 func (o *oracle) newCommitTs(txn *Txn) (uint64, bool) {
154 o.Lock()
155 defer o.Unlock()
156
157 if o.hasConflict(txn) {
158 return 0, true
159 }
160
161 var ts uint64
162 if !o.isManaged {
163 o.doneRead(txn)
164 o.cleanupCommittedTransactions()
165
166 // This is the general case, when user doesn't specify the read and commit ts.
167 ts = o.nextTxnTs
168 o.nextTxnTs++
169 o.txnMark.Begin(ts)
170
171 } else {
172 // If commitTs is set, use it instead.
173 ts = txn.commitTs
174 }
175
176 y.AssertTrue(ts >= o.lastCleanupTs)
177
178 if o.detectConflicts {
179 // We should ensure that txns are not added to o.committedTxns slice when
180 // conflict detection is disabled otherwise this slice would keep growing.
181 o.committedTxns = append(o.committedTxns, committedTxn{
182 ts: ts,
183 conflictKeys: txn.conflictKeys,
184 })
185 }
186
187 return ts, false
188 }
189
190 func (o *oracle) doneRead(txn *Txn) {
191 if !txn.doneRead {
192 txn.doneRead = true
193 o.readMark.Done(txn.readTs)
194 }
195 }
196
197 func (o *oracle) cleanupCommittedTransactions() { // Must be called under o.Lock
198 if !o.detectConflicts {
199 // When detectConflicts is set to false, we do not store any
200 // committedTxns and so there's nothing to clean up.
201 return
202 }
203 // Same logic as discardAtOrBelow but unlocked
204 var maxReadTs uint64
205 if o.isManaged {
206 maxReadTs = o.discardTs
207 } else {
208 maxReadTs = o.readMark.DoneUntil()
209 }
210
211 y.AssertTrue(maxReadTs >= o.lastCleanupTs)
212
213 // do not run clean up if the maxReadTs (read timestamp of the
214 // oldest transaction that is still in flight) has not increased
215 if maxReadTs == o.lastCleanupTs {
216 return
217 }
218 o.lastCleanupTs = maxReadTs
219
220 tmp := o.committedTxns[:0]
221 for _, txn := range o.committedTxns {
222 if txn.ts <= maxReadTs {
223 continue
224 }
225 tmp = append(tmp, txn)
226 }
227 o.committedTxns = tmp
228 }
229
230 func (o *oracle) doneCommit(cts uint64) {
231 if o.isManaged {
232 // No need to update anything.
233 return
234 }
235 o.txnMark.Done(cts)
236 }
237
238 // Txn represents a Badger transaction.
239 type Txn struct {
240 readTs uint64
241 commitTs uint64
242 size int64
243 count int64
244 db *DB
245
246 reads []uint64 // contains fingerprints of keys read.
247 // contains fingerprints of keys written. This is used for conflict detection.
248 conflictKeys map[uint64]struct{}
249 readsLock sync.Mutex // guards the reads slice. See addReadKey.
250
251 pendingWrites map[string]*Entry // cache stores any writes done by txn.
252 duplicateWrites []*Entry // Used in managed mode to store duplicate entries.
253
254 numIterators atomic.Int32
255 discarded bool
256 doneRead bool
257 update bool // update is used to conditionally keep track of reads.
258 }
259
260 type pendingWritesIterator struct {
261 entries []*Entry
262 nextIdx int
263 readTs uint64
264 reversed bool
265 }
266
267 func (pi *pendingWritesIterator) Next() {
268 pi.nextIdx++
269 }
270
271 func (pi *pendingWritesIterator) Rewind() {
272 pi.nextIdx = 0
273 }
274
275 func (pi *pendingWritesIterator) Seek(key []byte) {
276 key = y.ParseKey(key)
277 pi.nextIdx = sort.Search(len(pi.entries), func(idx int) bool {
278 cmp := bytes.Compare(pi.entries[idx].Key, key)
279 if !pi.reversed {
280 return cmp >= 0
281 }
282 return cmp <= 0
283 })
284 }
285
286 func (pi *pendingWritesIterator) Key() []byte {
287 y.AssertTrue(pi.Valid())
288 entry := pi.entries[pi.nextIdx]
289 return y.KeyWithTs(entry.Key, pi.readTs)
290 }
291
292 func (pi *pendingWritesIterator) Value() y.ValueStruct {
293 y.AssertTrue(pi.Valid())
294 entry := pi.entries[pi.nextIdx]
295 return y.ValueStruct{
296 Value: entry.Value,
297 Meta: entry.meta,
298 UserMeta: entry.UserMeta,
299 ExpiresAt: entry.ExpiresAt,
300 Version: pi.readTs,
301 }
302 }
303
304 func (pi *pendingWritesIterator) Valid() bool {
305 return pi.nextIdx < len(pi.entries)
306 }
307
308 func (pi *pendingWritesIterator) Close() error {
309 return nil
310 }
311
312 func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator {
313 if !txn.update || len(txn.pendingWrites) == 0 {
314 return nil
315 }
316 entries := make([]*Entry, 0, len(txn.pendingWrites))
317 for _, e := range txn.pendingWrites {
318 entries = append(entries, e)
319 }
320 // Number of pending writes per transaction shouldn't be too big in general.
321 sort.Slice(entries, func(i, j int) bool {
322 cmp := bytes.Compare(entries[i].Key, entries[j].Key)
323 if !reversed {
324 return cmp < 0
325 }
326 return cmp > 0
327 })
328 return &pendingWritesIterator{
329 readTs: txn.readTs,
330 entries: entries,
331 reversed: reversed,
332 }
333 }
334
335 func (txn *Txn) checkSize(e *Entry) error {
336 count := txn.count + 1
337 // Extra bytes for the version in key.
338 size := txn.size + e.estimateSizeAndSetThreshold(txn.db.valueThreshold()) + 10
339 if count >= txn.db.opt.maxBatchCount || size >= txn.db.opt.maxBatchSize {
340 return ErrTxnTooBig
341 }
342 txn.count, txn.size = count, size
343 return nil
344 }
345
346 func exceedsSize(prefix string, max int64, key []byte) error {
347 return fmt.Errorf("%s with size %d exceeded %d limit. %s:\n%s",
348 prefix, len(key), max, prefix, hex.Dump(key[:1<<10]))
349 }
350
351 func (txn *Txn) modify(e *Entry) error {
352 const maxKeySize = 65000
353
354 switch {
355 case !txn.update:
356 return ErrReadOnlyTxn
357 case txn.discarded:
358 return ErrDiscardedTxn
359 case len(e.Key) == 0:
360 return ErrEmptyKey
361 case bytes.HasPrefix(e.Key, badgerPrefix):
362 return ErrInvalidKey
363 case len(e.Key) > maxKeySize:
364 // Key length can't be more than uint16, as determined by table::header. To
365 // keep things safe and allow badger move prefix and a timestamp suffix, let's
366 // cut it down to 65000, instead of using 65536.
367 return exceedsSize("Key", maxKeySize, e.Key)
368 case int64(len(e.Value)) > txn.db.opt.ValueLogFileSize:
369 return exceedsSize("Value", txn.db.opt.ValueLogFileSize, e.Value)
370 case txn.db.opt.InMemory && int64(len(e.Value)) > txn.db.valueThreshold():
371 return exceedsSize("Value", txn.db.valueThreshold(), e.Value)
372 }
373
374 if err := txn.db.isBanned(e.Key); err != nil {
375 return err
376 }
377
378 if err := txn.checkSize(e); err != nil {
379 return err
380 }
381
382 // The txn.conflictKeys is used for conflict detection. If conflict detection
383 // is disabled, we don't need to store key hashes in this map.
384 if txn.db.opt.DetectConflicts {
385 fp := z.MemHash(e.Key) // Avoid dealing with byte arrays.
386 txn.conflictKeys[fp] = struct{}{}
387 }
388 // If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice.
389 // Add the entry to duplicateWrites only if both the entries have different versions. For
390 // same versions, we will overwrite the existing entry.
391 if oldEntry, ok := txn.pendingWrites[string(e.Key)]; ok && oldEntry.version != e.version {
392 txn.duplicateWrites = append(txn.duplicateWrites, oldEntry)
393 }
394 txn.pendingWrites[string(e.Key)] = e
395 return nil
396 }
397
398 // Set adds a key-value pair to the database.
399 // It will return ErrReadOnlyTxn if update flag was set to false when creating the transaction.
400 //
401 // The current transaction keeps a reference to the key and val byte slice
402 // arguments. Users must not modify key and val until the end of the transaction.
403 func (txn *Txn) Set(key, val []byte) error {
404 return txn.SetEntry(NewEntry(key, val))
405 }
406
407 // SetEntry takes an Entry struct and adds the key-value pair in the struct,
408 // along with other metadata to the database.
409 //
410 // The current transaction keeps a reference to the entry passed in argument.
411 // Users must not modify the entry until the end of the transaction.
412 func (txn *Txn) SetEntry(e *Entry) error {
413 return txn.modify(e)
414 }
415
416 // Delete deletes a key.
417 //
418 // This is done by adding a delete marker for the key at commit timestamp. Any
419 // reads happening before this timestamp would be unaffected. Any reads after
420 // this commit would see the deletion.
421 //
422 // The current transaction keeps a reference to the key byte slice argument.
423 // Users must not modify the key until the end of the transaction.
424 func (txn *Txn) Delete(key []byte) error {
425 e := &Entry{
426 Key: key,
427 meta: bitDelete,
428 }
429 return txn.modify(e)
430 }
431
432 // Get looks for key and returns corresponding Item.
433 // If key is not found, ErrKeyNotFound is returned.
434 func (txn *Txn) Get(key []byte) (item *Item, rerr error) {
435 if len(key) == 0 {
436 return nil, ErrEmptyKey
437 } else if txn.discarded {
438 return nil, ErrDiscardedTxn
439 }
440
441 if err := txn.db.isBanned(key); err != nil {
442 return nil, err
443 }
444
445 item = new(Item)
446 if txn.update {
447 if e, has := txn.pendingWrites[string(key)]; has && bytes.Equal(key, e.Key) {
448 if isDeletedOrExpired(e.meta, e.ExpiresAt) {
449 return nil, ErrKeyNotFound
450 }
451 // Fulfill from cache.
452 item.meta = e.meta
453 item.val = e.Value
454 item.userMeta = e.UserMeta
455 item.key = key
456 item.status = prefetched
457 item.version = txn.readTs
458 item.expiresAt = e.ExpiresAt
459 // We probably don't need to set db on item here.
460 return item, nil
461 }
462 // Only track reads if this is update txn. No need to track read if txn serviced it
463 // internally.
464 txn.addReadKey(key)
465 }
466
467 seek := y.KeyWithTs(key, txn.readTs)
468 vs, err := txn.db.get(seek)
469 if err != nil {
470 return nil, y.Wrapf(err, "DB::Get key: %q", key)
471 }
472 if vs.Value == nil && vs.Meta == 0 {
473 return nil, ErrKeyNotFound
474 }
475 if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
476 return nil, ErrKeyNotFound
477 }
478
479 item.key = key
480 item.version = vs.Version
481 item.meta = vs.Meta
482 item.userMeta = vs.UserMeta
483 item.vptr = y.SafeCopy(item.vptr, vs.Value)
484 item.txn = txn
485 item.expiresAt = vs.ExpiresAt
486 return item, nil
487 }
488
489 func (txn *Txn) addReadKey(key []byte) {
490 if txn.update {
491 fp := z.MemHash(key)
492
493 // Because of the possibility of multiple iterators it is now possible
494 // for multiple threads within a read-write transaction to read keys at
495 // the same time. The reads slice is not currently thread-safe and
496 // needs to be locked whenever we mark a key as read.
497 txn.readsLock.Lock()
498 txn.reads = append(txn.reads, fp)
499 txn.readsLock.Unlock()
500 }
501 }
502
503 // Discard discards a created transaction. This method is very important and must be called. Commit
504 // method calls this internally, however, calling this multiple times doesn't cause any issues. So,
505 // this can safely be called via a defer right when transaction is created.
506 //
507 // NOTE: If any operations are run on a discarded transaction, ErrDiscardedTxn is returned.
508 func (txn *Txn) Discard() {
509 if txn.discarded { // Avoid a re-run.
510 return
511 }
512 if txn.numIterators.Load() > 0 {
513 panic("Unclosed iterator at time of Txn.Discard.")
514 }
515 txn.discarded = true
516 if !txn.db.orc.isManaged {
517 txn.db.orc.doneRead(txn)
518 }
519 }
520
521 func (txn *Txn) commitAndSend() (func() error, error) {
522 orc := txn.db.orc
523 // Ensure that the order in which we get the commit timestamp is the same as
524 // the order in which we push these updates to the write channel. So, we
525 // acquire a writeChLock before getting a commit timestamp, and only release
526 // it after pushing the entries to it.
527 orc.writeChLock.Lock()
528 defer orc.writeChLock.Unlock()
529
530 commitTs, conflict := orc.newCommitTs(txn)
531 if conflict {
532 return nil, ErrConflict
533 }
534
535 keepTogether := true
536 setVersion := func(e *Entry) {
537 if e.version == 0 {
538 e.version = commitTs
539 } else {
540 keepTogether = false
541 }
542 }
543 for _, e := range txn.pendingWrites {
544 setVersion(e)
545 }
546 // The duplicateWrites slice will be non-empty only if there are duplicate
547 // entries with different versions.
548 for _, e := range txn.duplicateWrites {
549 setVersion(e)
550 }
551
552 entries := make([]*Entry, 0, len(txn.pendingWrites)+len(txn.duplicateWrites)+1)
553
554 processEntry := func(e *Entry) {
555 // Suffix the keys with commit ts, so the key versions are sorted in
556 // descending order of commit timestamp.
557 e.Key = y.KeyWithTs(e.Key, e.version)
558 // Add bitTxn only if these entries are part of a transaction. We
559 // support SetEntryAt(..) in managed mode which means a single
560 // transaction can have entries with different timestamps. If entries
561 // in a single transaction have different timestamps, we don't add the
562 // transaction markers.
563 if keepTogether {
564 e.meta |= bitTxn
565 }
566 entries = append(entries, e)
567 }
568
569 // The following debug information is what led to determining the cause of
570 // bank txn violation bug, and it took a whole bunch of effort to narrow it
571 // down to here. So, keep this around for at least a couple of months.
572 // var b strings.Builder
573 // fmt.Fprintf(&b, "Read: %d. Commit: %d. reads: %v. writes: %v. Keys: ",
574 // txn.readTs, commitTs, txn.reads, txn.conflictKeys)
575 for _, e := range txn.pendingWrites {
576 processEntry(e)
577 }
578 for _, e := range txn.duplicateWrites {
579 processEntry(e)
580 }
581
582 if keepTogether {
583 // CommitTs should not be zero if we're inserting transaction markers.
584 y.AssertTrue(commitTs != 0)
585 e := &Entry{
586 Key: y.KeyWithTs(txnKey, commitTs),
587 Value: []byte(strconv.FormatUint(commitTs, 10)),
588 meta: bitFinTxn,
589 }
590 entries = append(entries, e)
591 }
592
593 req, err := txn.db.sendToWriteCh(entries)
594 if err != nil {
595 orc.doneCommit(commitTs)
596 return nil, err
597 }
598 ret := func() error {
599 err := req.Wait()
600 // Wait before marking commitTs as done.
601 // We can't defer doneCommit above, because it is being called from a
602 // callback here.
603 orc.doneCommit(commitTs)
604 return err
605 }
606 return ret, nil
607 }
608
609 func (txn *Txn) commitPrecheck() error {
610 if txn.discarded {
611 return errors.New("Trying to commit a discarded txn")
612 }
613 keepTogether := true
614 for _, e := range txn.pendingWrites {
615 if e.version != 0 {
616 keepTogether = false
617 }
618 }
619
620 // If keepTogether is True, it implies transaction markers will be added.
621 // In that case, commitTs should not be never be zero. This might happen if
622 // someone uses txn.Commit instead of txn.CommitAt in managed mode. This
623 // should happen only in managed mode. In normal mode, keepTogether will
624 // always be true.
625 if keepTogether && txn.db.opt.managedTxns && txn.commitTs == 0 {
626 return errors.New("CommitTs cannot be zero. Please use commitAt instead")
627 }
628 return nil
629 }
630
631 // Commit commits the transaction, following these steps:
632 //
633 // 1. If there are no writes, return immediately.
634 //
635 // 2. Check if read rows were updated since txn started. If so, return ErrConflict.
636 //
637 // 3. If no conflict, generate a commit timestamp and update written rows' commit ts.
638 //
639 // 4. Batch up all writes, write them to value log and LSM tree.
640 //
641 // 5. If callback is provided, Badger will return immediately after checking
642 // for conflicts. Writes to the database will happen in the background. If
643 // there is a conflict, an error will be returned and the callback will not
644 // run. If there are no conflicts, the callback will be called in the
645 // background upon successful completion of writes or any error during write.
646 //
647 // If error is nil, the transaction is successfully committed. In case of a non-nil error, the LSM
648 // tree won't be updated, so there's no need for any rollback.
649 func (txn *Txn) Commit() error {
650 // txn.conflictKeys can be zero if conflict detection is turned off. So we
651 // should check txn.pendingWrites.
652 if len(txn.pendingWrites) == 0 {
653 // Discard the transaction so that the read is marked done.
654 txn.Discard()
655 return nil
656 }
657 // Precheck before discarding txn.
658 if err := txn.commitPrecheck(); err != nil {
659 return err
660 }
661 defer txn.Discard()
662
663 txnCb, err := txn.commitAndSend()
664 if err != nil {
665 return err
666 }
667 // If batchSet failed, LSM would not have been updated. So, no need to rollback anything.
668
669 // TODO: What if some of the txns successfully make it to value log, but others fail.
670 // Nothing gets updated to LSM, until a restart happens.
671 return txnCb()
672 }
673
674 type txnCb struct {
675 commit func() error
676 user func(error)
677 err error
678 }
679
680 func runTxnCallback(cb *txnCb) {
681 switch {
682 case cb == nil:
683 panic("txn callback is nil")
684 case cb.user == nil:
685 panic("Must have caught a nil callback for txn.CommitWith")
686 case cb.err != nil:
687 cb.user(cb.err)
688 case cb.commit != nil:
689 err := cb.commit()
690 cb.user(err)
691 default:
692 cb.user(nil)
693 }
694 }
695
696 // CommitWith acts like Commit, but takes a callback, which gets run via a
697 // goroutine to avoid blocking this function. The callback is guaranteed to run,
698 // so it is safe to increment sync.WaitGroup before calling CommitWith, and
699 // decrementing it in the callback; to block until all callbacks are run.
700 func (txn *Txn) CommitWith(cb func(error)) {
701 if cb == nil {
702 panic("Nil callback provided to CommitWith")
703 }
704
705 if len(txn.pendingWrites) == 0 {
706 // Do not run these callbacks from here, because the CommitWith and the
707 // callback might be acquiring the same locks. Instead run the callback
708 // from another goroutine.
709 go runTxnCallback(&txnCb{user: cb, err: nil})
710 // Discard the transaction so that the read is marked done.
711 txn.Discard()
712 return
713 }
714
715 // Precheck before discarding txn.
716 if err := txn.commitPrecheck(); err != nil {
717 cb(err)
718 return
719 }
720
721 defer txn.Discard()
722
723 commitCb, err := txn.commitAndSend()
724 if err != nil {
725 go runTxnCallback(&txnCb{user: cb, err: err})
726 return
727 }
728
729 go runTxnCallback(&txnCb{user: cb, commit: commitCb})
730 }
731
732 // ReadTs returns the read timestamp of the transaction.
733 func (txn *Txn) ReadTs() uint64 {
734 return txn.readTs
735 }
736
737 // NewTransaction creates a new transaction. Badger supports concurrent execution of transactions,
738 // providing serializable snapshot isolation, avoiding write skews. Badger achieves this by tracking
739 // the keys read and at Commit time, ensuring that these read keys weren't concurrently modified by
740 // another transaction.
741 //
742 // For read-only transactions, set update to false. In this mode, we don't track the rows read for
743 // any changes. Thus, any long running iterations done in this mode wouldn't pay this overhead.
744 //
745 // Running transactions concurrently is OK. However, a transaction itself isn't thread safe, and
746 // should only be run serially. It doesn't matter if a transaction is created by one goroutine and
747 // passed down to other, as long as the Txn APIs are called serially.
748 //
749 // When you create a new transaction, it is absolutely essential to call
750 // Discard(). This should be done irrespective of what the update param is set
751 // to. Commit API internally runs Discard, but running it twice wouldn't cause
752 // any issues.
753 //
754 // txn := db.NewTransaction(false)
755 // defer txn.Discard()
756 // // Call various APIs.
757 func (db *DB) NewTransaction(update bool) *Txn {
758 return db.newTransaction(update, false)
759 }
760
761 func (db *DB) newTransaction(update, isManaged bool) *Txn {
762 if db.opt.ReadOnly && update {
763 // DB is read-only, force read-only transaction.
764 update = false
765 }
766
767 txn := &Txn{
768 update: update,
769 db: db,
770 count: 1, // One extra entry for BitFin.
771 size: int64(len(txnKey) + 10), // Some buffer for the extra entry.
772 }
773 if update {
774 if db.opt.DetectConflicts {
775 txn.conflictKeys = make(map[uint64]struct{})
776 }
777 txn.pendingWrites = make(map[string]*Entry)
778 }
779 if !isManaged {
780 txn.readTs = db.orc.readTs()
781 }
782 return txn
783 }
784
785 // View executes a function creating and managing a read-only transaction for the user. Error
786 // returned by the function is relayed by the View method.
787 // If View is used with managed transactions, it would assume a read timestamp of MaxUint64.
788 func (db *DB) View(fn func(txn *Txn) error) error {
789 if db.IsClosed() {
790 return ErrDBClosed
791 }
792 var txn *Txn
793 if db.opt.managedTxns {
794 txn = db.NewTransactionAt(math.MaxUint64, false)
795 } else {
796 txn = db.NewTransaction(false)
797 }
798 defer txn.Discard()
799
800 return fn(txn)
801 }
802
803 // Update executes a function, creating and managing a read-write transaction
804 // for the user. Error returned by the function is relayed by the Update method.
805 // Update cannot be used with managed transactions.
806 func (db *DB) Update(fn func(txn *Txn) error) error {
807 if db.IsClosed() {
808 return ErrDBClosed
809 }
810 if db.opt.managedTxns {
811 panic("Update can only be used with managedDB=false.")
812 }
813 txn := db.NewTransaction(true)
814 defer txn.Discard()
815
816 if err := fn(txn); err != nil {
817 return err
818 }
819
820 return txn.Commit()
821 }
822