memtable.go raw

   1  /*
   2   * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
   3   * SPDX-License-Identifier: Apache-2.0
   4   */
   5  
   6  package badger
   7  
   8  import (
   9  	"bufio"
  10  	"bytes"
  11  	"crypto/aes"
  12  	cryptorand "crypto/rand"
  13  	"encoding/binary"
  14  	"fmt"
  15  	"hash/crc32"
  16  	"io"
  17  	"os"
  18  	"path/filepath"
  19  	"sort"
  20  	"strconv"
  21  	"strings"
  22  	"sync"
  23  	"sync/atomic"
  24  
  25  	"github.com/dgraph-io/badger/v4/pb"
  26  	"github.com/dgraph-io/badger/v4/skl"
  27  	"github.com/dgraph-io/badger/v4/y"
  28  	"github.com/dgraph-io/ristretto/v2/z"
  29  )
  30  
  31  // memTable structure stores a skiplist and a corresponding WAL. Writes to memTable are written
  32  // both to the WAL and the skiplist. On a crash, the WAL is replayed to bring the skiplist back to
  33  // its pre-crash form.
  34  type memTable struct {
  35  	// TODO: Give skiplist z.Calloc'd []byte.
  36  	sl         *skl.Skiplist
  37  	wal        *logFile
  38  	maxVersion uint64
  39  	opt        Options
  40  	buf        *bytes.Buffer
  41  }
  42  
  43  func (db *DB) openMemTables(opt Options) error {
  44  	// We don't need to open any tables in in-memory mode.
  45  	if db.opt.InMemory {
  46  		return nil
  47  	}
  48  	files, err := os.ReadDir(db.opt.Dir)
  49  	if err != nil {
  50  		return errFile(err, db.opt.Dir, "Unable to open mem dir.")
  51  	}
  52  
  53  	var fids []int
  54  	for _, file := range files {
  55  		if !strings.HasSuffix(file.Name(), memFileExt) {
  56  			continue
  57  		}
  58  		fsz := len(file.Name())
  59  		fid, err := strconv.ParseInt(file.Name()[:fsz-len(memFileExt)], 10, 64)
  60  		if err != nil {
  61  			return errFile(err, file.Name(), "Unable to parse log id.")
  62  		}
  63  		fids = append(fids, int(fid))
  64  	}
  65  
  66  	// Sort in ascending order.
  67  	sort.Slice(fids, func(i, j int) bool {
  68  		return fids[i] < fids[j]
  69  	})
  70  	for _, fid := range fids {
  71  		flags := os.O_RDWR
  72  		if db.opt.ReadOnly {
  73  			flags = os.O_RDONLY
  74  		}
  75  		mt, err := db.openMemTable(fid, flags)
  76  		if err != nil {
  77  			return y.Wrapf(err, "while opening fid: %d", fid)
  78  		}
  79  		// If this memtable is empty we don't need to add it. This is a
  80  		// memtable that was completely truncated.
  81  		if mt.sl.Empty() {
  82  			mt.DecrRef()
  83  			continue
  84  		}
  85  		// These should no longer be written to. So, make them part of the imm.
  86  		db.imm = append(db.imm, mt)
  87  	}
  88  	if len(fids) != 0 {
  89  		db.nextMemFid = fids[len(fids)-1]
  90  	}
  91  	db.nextMemFid++
  92  	return nil
  93  }
  94  
  95  const memFileExt string = ".mem"
  96  
  97  func (db *DB) openMemTable(fid, flags int) (*memTable, error) {
  98  	filepath := db.mtFilePath(fid)
  99  	s := skl.NewSkiplist(arenaSize(db.opt))
 100  	mt := &memTable{
 101  		sl:  s,
 102  		opt: db.opt,
 103  		buf: &bytes.Buffer{},
 104  	}
 105  	// We don't need to create the wal for the skiplist in in-memory mode so return the mt.
 106  	if db.opt.InMemory {
 107  		return mt, z.NewFile
 108  	}
 109  
 110  	mt.wal = &logFile{
 111  		fid:      uint32(fid),
 112  		path:     filepath,
 113  		registry: db.registry,
 114  		writeAt:  vlogHeaderSize,
 115  		opt:      db.opt,
 116  	}
 117  	lerr := mt.wal.open(filepath, flags, 2*db.opt.MemTableSize)
 118  	if lerr != z.NewFile && lerr != nil {
 119  		return nil, y.Wrapf(lerr, "While opening memtable: %s", filepath)
 120  	}
 121  
 122  	// Have a callback set to delete WAL when skiplist reference count goes down to zero. That is,
 123  	// when it gets flushed to L0.
 124  	s.OnClose = func() {
 125  		if err := mt.wal.Delete(); err != nil {
 126  			db.opt.Errorf("while deleting file: %s, err: %v", filepath, err)
 127  		}
 128  	}
 129  
 130  	if lerr == z.NewFile {
 131  		return mt, lerr
 132  	}
 133  	err := mt.UpdateSkipList()
 134  	return mt, y.Wrapf(err, "while updating skiplist")
 135  }
 136  
 137  func (db *DB) newMemTable() (*memTable, error) {
 138  	mt, err := db.openMemTable(db.nextMemFid, os.O_CREATE|os.O_RDWR)
 139  	if err == z.NewFile {
 140  		db.nextMemFid++
 141  		return mt, nil
 142  	}
 143  
 144  	if err != nil {
 145  		db.opt.Errorf("Got error: %v for id: %d\n", err, db.nextMemFid)
 146  		return nil, y.Wrapf(err, "newMemTable")
 147  	}
 148  	return nil, fmt.Errorf("File %s already exists", mt.wal.Fd.Name())
 149  }
 150  
 151  func (db *DB) mtFilePath(fid int) string {
 152  	return filepath.Join(db.opt.Dir, fmt.Sprintf("%05d%s", fid, memFileExt))
 153  }
 154  
 155  func (mt *memTable) SyncWAL() error {
 156  	return mt.wal.Sync()
 157  }
 158  
 159  func (mt *memTable) isFull() bool {
 160  	if mt.sl.MemSize() >= mt.opt.MemTableSize {
 161  		return true
 162  	}
 163  	if mt.opt.InMemory {
 164  		// InMemory mode doesn't have any WAL.
 165  		return false
 166  	}
 167  	return int64(mt.wal.writeAt) >= mt.opt.MemTableSize
 168  }
 169  
 170  func (mt *memTable) Put(key []byte, value y.ValueStruct) error {
 171  	entry := &Entry{
 172  		Key:       key,
 173  		Value:     value.Value,
 174  		UserMeta:  value.UserMeta,
 175  		meta:      value.Meta,
 176  		ExpiresAt: value.ExpiresAt,
 177  	}
 178  
 179  	// wal is nil only when badger in running in in-memory mode and we don't need the wal.
 180  	if mt.wal != nil {
 181  		// If WAL exceeds opt.ValueLogFileSize, we'll force flush the memTable. See logic in
 182  		// ensureRoomForWrite.
 183  		if err := mt.wal.writeEntry(mt.buf, entry, mt.opt); err != nil {
 184  			return y.Wrapf(err, "cannot write entry to WAL file")
 185  		}
 186  	}
 187  	// We insert the finish marker in the WAL but not in the memtable.
 188  	if entry.meta&bitFinTxn > 0 {
 189  		return nil
 190  	}
 191  
 192  	// Write to skiplist and update maxVersion encountered.
 193  	mt.sl.Put(key, value)
 194  	if ts := y.ParseTs(entry.Key); ts > mt.maxVersion {
 195  		mt.maxVersion = ts
 196  	}
 197  	y.NumBytesWrittenToL0Add(mt.opt.MetricsEnabled, entry.estimateSizeAndSetThreshold(mt.opt.ValueThreshold))
 198  	return nil
 199  }
 200  
 201  func (mt *memTable) UpdateSkipList() error {
 202  	if mt.wal == nil || mt.sl == nil {
 203  		return nil
 204  	}
 205  	endOff, err := mt.wal.iterate(true, 0, mt.replayFunction(mt.opt))
 206  	if err != nil {
 207  		return y.Wrapf(err, "while iterating wal: %s", mt.wal.Fd.Name())
 208  	}
 209  	if endOff < mt.wal.size.Load() && mt.opt.ReadOnly {
 210  		return y.Wrapf(ErrTruncateNeeded, "end offset: %d < size: %d", endOff, mt.wal.size.Load())
 211  	}
 212  	return mt.wal.Truncate(int64(endOff))
 213  }
 214  
 215  // IncrRef increases the refcount
 216  func (mt *memTable) IncrRef() {
 217  	mt.sl.IncrRef()
 218  }
 219  
 220  // DecrRef decrements the refcount, deallocating the Skiplist when done using it
 221  func (mt *memTable) DecrRef() {
 222  	mt.sl.DecrRef()
 223  }
 224  
 225  func (mt *memTable) replayFunction(opt Options) func(Entry, valuePointer) error {
 226  	first := true
 227  	return func(e Entry, _ valuePointer) error { // Function for replaying.
 228  		if first {
 229  			opt.Debugf("First key=%q\n", e.Key)
 230  		}
 231  		first = false
 232  		if ts := y.ParseTs(e.Key); ts > mt.maxVersion {
 233  			mt.maxVersion = ts
 234  		}
 235  		v := y.ValueStruct{
 236  			Value:     e.Value,
 237  			Meta:      e.meta,
 238  			UserMeta:  e.UserMeta,
 239  			ExpiresAt: e.ExpiresAt,
 240  		}
 241  		// This is already encoded correctly. Value would be either a vptr, or a full value
 242  		// depending upon how big the original value was. Skiplist makes a copy of the key and
 243  		// value.
 244  		mt.sl.Put(e.Key, v)
 245  		return nil
 246  	}
 247  }
 248  
 249  type logFile struct {
 250  	*z.MmapFile
 251  	path string
 252  	// This is a lock on the log file. It guards the fd’s value, the file’s
 253  	// existence and the file’s memory map.
 254  	//
 255  	// Use shared ownership when reading/writing the file or memory map, use
 256  	// exclusive ownership to open/close the descriptor, unmap or remove the file.
 257  	lock     sync.RWMutex
 258  	fid      uint32
 259  	size     atomic.Uint32
 260  	dataKey  *pb.DataKey
 261  	baseIV   []byte
 262  	registry *KeyRegistry
 263  	writeAt  uint32
 264  	opt      Options
 265  }
 266  
 267  func (lf *logFile) Truncate(end int64) error {
 268  	if fi, err := lf.Fd.Stat(); err != nil {
 269  		return fmt.Errorf("while file.stat on file: %s, error: %v\n", lf.Fd.Name(), err)
 270  	} else if fi.Size() == end {
 271  		return nil
 272  	}
 273  	y.AssertTrue(!lf.opt.ReadOnly)
 274  	lf.size.Store(uint32(end))
 275  	return lf.MmapFile.Truncate(end)
 276  }
 277  
 278  // encodeEntry will encode entry to the buf
 279  // layout of entry
 280  // +--------+-----+-------+-------+
 281  // | header | key | value | crc32 |
 282  // +--------+-----+-------+-------+
 283  func (lf *logFile) encodeEntry(buf *bytes.Buffer, e *Entry, offset uint32) (int, error) {
 284  	h := header{
 285  		klen:      uint32(len(e.Key)),
 286  		vlen:      uint32(len(e.Value)),
 287  		expiresAt: e.ExpiresAt,
 288  		meta:      e.meta,
 289  		userMeta:  e.UserMeta,
 290  	}
 291  
 292  	hash := crc32.New(y.CastagnoliCrcTable)
 293  	writer := io.MultiWriter(buf, hash)
 294  
 295  	// encode header.
 296  	var headerEnc [maxHeaderSize]byte
 297  	sz := h.Encode(headerEnc[:])
 298  	y.Check2(writer.Write(headerEnc[:sz]))
 299  	// we'll encrypt only key and value.
 300  	if lf.encryptionEnabled() {
 301  		// TODO: no need to allocate the bytes. we can calculate the encrypted buf one by one
 302  		// since we're using ctr mode of AES encryption. Ordering won't changed. Need some
 303  		// refactoring in XORBlock which will work like stream cipher.
 304  		eBuf := make([]byte, 0, len(e.Key)+len(e.Value))
 305  		eBuf = append(eBuf, e.Key...)
 306  		eBuf = append(eBuf, e.Value...)
 307  		if err := y.XORBlockStream(
 308  			writer, eBuf, lf.dataKey.Data, lf.generateIV(offset)); err != nil {
 309  			return 0, y.Wrapf(err, "Error while encoding entry for vlog.")
 310  		}
 311  	} else {
 312  		// Encryption is disabled so writing directly to the buffer.
 313  		y.Check2(writer.Write(e.Key))
 314  		y.Check2(writer.Write(e.Value))
 315  	}
 316  	// write crc32 hash.
 317  	var crcBuf [crc32.Size]byte
 318  	binary.BigEndian.PutUint32(crcBuf[:], hash.Sum32())
 319  	y.Check2(buf.Write(crcBuf[:]))
 320  	// return encoded length.
 321  	return len(headerEnc[:sz]) + len(e.Key) + len(e.Value) + len(crcBuf), nil
 322  }
 323  
 324  func (lf *logFile) writeEntry(buf *bytes.Buffer, e *Entry, opt Options) error {
 325  	buf.Reset()
 326  	plen, err := lf.encodeEntry(buf, e, lf.writeAt)
 327  	if err != nil {
 328  		return err
 329  	}
 330  	y.AssertTrue(plen == copy(lf.Data[lf.writeAt:], buf.Bytes()))
 331  	lf.writeAt += uint32(plen)
 332  
 333  	lf.zeroNextEntry()
 334  	return nil
 335  }
 336  
 337  func (lf *logFile) decodeEntry(buf []byte, offset uint32) (*Entry, error) {
 338  	var h header
 339  	hlen := h.Decode(buf)
 340  	kv := buf[hlen:]
 341  	if lf.encryptionEnabled() {
 342  		var err error
 343  		// No need to worry about mmap. because, XORBlock allocates a byte array to do the
 344  		// xor. So, the given slice is not being mutated.
 345  		if kv, err = lf.decryptKV(kv, offset); err != nil {
 346  			return nil, err
 347  		}
 348  	}
 349  	e := &Entry{
 350  		meta:      h.meta,
 351  		UserMeta:  h.userMeta,
 352  		ExpiresAt: h.expiresAt,
 353  		offset:    offset,
 354  		Key:       kv[:h.klen],
 355  		Value:     kv[h.klen : h.klen+h.vlen],
 356  	}
 357  	return e, nil
 358  }
 359  
 360  func (lf *logFile) decryptKV(buf []byte, offset uint32) ([]byte, error) {
 361  	return y.XORBlockAllocate(buf, lf.dataKey.Data, lf.generateIV(offset))
 362  }
 363  
 364  // KeyID returns datakey's ID.
 365  func (lf *logFile) keyID() uint64 {
 366  	if lf.dataKey == nil {
 367  		// If there is no datakey, then we'll return 0. Which means no encryption.
 368  		return 0
 369  	}
 370  	return lf.dataKey.KeyId
 371  }
 372  
 373  func (lf *logFile) encryptionEnabled() bool {
 374  	return lf.dataKey != nil
 375  }
 376  
 377  // Acquire lock on mmap/file if you are calling this
 378  func (lf *logFile) read(p valuePointer) (buf []byte, err error) {
 379  	offset := p.Offset
 380  	// Do not convert size to uint32, because the lf.Data can be of size
 381  	// 4GB, which overflows the uint32 during conversion to make the size 0,
 382  	// causing the read to fail with ErrEOF. See issue #585.
 383  	size := int64(len(lf.Data))
 384  	valsz := p.Len
 385  	lfsz := lf.size.Load()
 386  	if int64(offset) >= size || int64(offset+valsz) > size ||
 387  		// Ensure that the read is within the file's actual size. It might be possible that
 388  		// the offset+valsz length is beyond the file's actual size. This could happen when
 389  		// dropAll and iterations are running simultaneously.
 390  		int64(offset+valsz) > int64(lfsz) {
 391  		err = y.ErrEOF
 392  	} else {
 393  		buf = lf.Data[offset : offset+valsz]
 394  	}
 395  	return buf, err
 396  }
 397  
 398  // generateIV will generate IV by appending given offset with the base IV.
 399  func (lf *logFile) generateIV(offset uint32) []byte {
 400  	iv := make([]byte, aes.BlockSize)
 401  	// baseIV is of 12 bytes.
 402  	y.AssertTrue(12 == copy(iv[:12], lf.baseIV))
 403  	// remaining 4 bytes is obtained from offset.
 404  	binary.BigEndian.PutUint32(iv[12:], offset)
 405  	return iv
 406  }
 407  
 408  func (lf *logFile) doneWriting(offset uint32) error {
 409  	if lf.opt.SyncWrites {
 410  		if err := lf.Sync(); err != nil {
 411  			return y.Wrapf(err, "Unable to sync value log: %q", lf.path)
 412  		}
 413  	}
 414  
 415  	// Before we were acquiring a lock here on lf.lock, because we were invalidating the file
 416  	// descriptor due to reopening it as read-only. Now, we don't invalidate the fd, but unmap it,
 417  	// truncate it and remap it. That creates a window where we have segfaults because the mmap is
 418  	// no longer valid, while someone might be reading it. Therefore, we need a lock here again.
 419  	lf.lock.Lock()
 420  	defer lf.lock.Unlock()
 421  
 422  	if err := lf.Truncate(int64(offset)); err != nil {
 423  		return y.Wrapf(err, "Unable to truncate file: %q", lf.path)
 424  	}
 425  
 426  	// Previously we used to close the file after it was written and reopen it in read-only mode.
 427  	// We no longer open files in read-only mode. We keep all vlog files open in read-write mode.
 428  	return nil
 429  }
 430  
 431  // iterate iterates over log file. It doesn't not allocate new memory for every kv pair.
 432  // Therefore, the kv pair is only valid for the duration of fn call.
 433  func (lf *logFile) iterate(readOnly bool, offset uint32, fn logEntry) (uint32, error) {
 434  	if offset == 0 {
 435  		// If offset is set to zero, let's advance past the encryption key header.
 436  		offset = vlogHeaderSize
 437  	}
 438  
 439  	// For now, read directly from file, because it allows
 440  	reader := bufio.NewReader(lf.NewReader(int(offset)))
 441  	read := &safeRead{
 442  		k:            make([]byte, 10),
 443  		v:            make([]byte, 10),
 444  		recordOffset: offset,
 445  		lf:           lf,
 446  	}
 447  
 448  	var lastCommit uint64
 449  	var validEndOffset uint32 = offset
 450  
 451  	var entries []*Entry
 452  	var vptrs []valuePointer
 453  
 454  loop:
 455  	for {
 456  		e, err := read.Entry(reader)
 457  		switch {
 458  		// We have not reached the end of the file but the entry we read is
 459  		// zero. This happens because we have truncated the file and
 460  		// zero'ed it out.
 461  		case err == io.EOF:
 462  			break loop
 463  		case err == io.ErrUnexpectedEOF || err == errTruncate:
 464  			break loop
 465  		case err != nil:
 466  			return 0, err
 467  		case e == nil:
 468  			continue
 469  		case e.isZero():
 470  			break loop
 471  		}
 472  
 473  		var vp valuePointer
 474  		vp.Len = uint32(e.hlen + len(e.Key) + len(e.Value) + crc32.Size)
 475  		read.recordOffset += vp.Len
 476  
 477  		vp.Offset = e.offset
 478  		vp.Fid = lf.fid
 479  
 480  		switch {
 481  		case e.meta&bitTxn > 0:
 482  			txnTs := y.ParseTs(e.Key)
 483  			if lastCommit == 0 {
 484  				lastCommit = txnTs
 485  			}
 486  			if lastCommit != txnTs {
 487  				break loop
 488  			}
 489  			entries = append(entries, e)
 490  			vptrs = append(vptrs, vp)
 491  
 492  		case e.meta&bitFinTxn > 0:
 493  			txnTs, err := strconv.ParseUint(string(e.Value), 10, 64)
 494  			if err != nil || lastCommit != txnTs {
 495  				break loop
 496  			}
 497  			// Got the end of txn. Now we can store them.
 498  			lastCommit = 0
 499  			validEndOffset = read.recordOffset
 500  
 501  			for i, e := range entries {
 502  				vp := vptrs[i]
 503  				if err := fn(*e, vp); err != nil {
 504  					if err == errStop {
 505  						break
 506  					}
 507  					return 0, errFile(err, lf.path, "Iteration function")
 508  				}
 509  			}
 510  			entries = entries[:0]
 511  			vptrs = vptrs[:0]
 512  
 513  		default:
 514  			if lastCommit != 0 {
 515  				// This is most likely an entry which was moved as part of GC.
 516  				// We shouldn't get this entry in the middle of a transaction.
 517  				break loop
 518  			}
 519  			validEndOffset = read.recordOffset
 520  
 521  			if err := fn(*e, vp); err != nil {
 522  				if err == errStop {
 523  					break
 524  				}
 525  				return 0, errFile(err, lf.path, "Iteration function")
 526  			}
 527  		}
 528  	}
 529  	return validEndOffset, nil
 530  }
 531  
 532  // Zero out the next entry to deal with any crashes.
 533  func (lf *logFile) zeroNextEntry() {
 534  	z.ZeroOut(lf.Data, int(lf.writeAt), int(lf.writeAt+maxHeaderSize))
 535  }
 536  
 537  func (lf *logFile) open(path string, flags int, fsize int64) error {
 538  	mf, ferr := z.OpenMmapFile(path, flags, int(fsize))
 539  	lf.MmapFile = mf
 540  
 541  	if ferr == z.NewFile {
 542  		if err := lf.bootstrap(); err != nil {
 543  			os.Remove(path)
 544  			return err
 545  		}
 546  		lf.size.Store(vlogHeaderSize)
 547  
 548  	} else if ferr != nil {
 549  		return y.Wrapf(ferr, "while opening file: %s", path)
 550  	}
 551  	lf.size.Store(uint32(len(lf.Data)))
 552  
 553  	if lf.size.Load() < vlogHeaderSize {
 554  		// Every vlog file should have at least vlogHeaderSize. If it is less than vlogHeaderSize
 555  		// then it must have been corrupted. But no need to handle here. log replayer will truncate
 556  		// and bootstrap the logfile. So ignoring here.
 557  		return nil
 558  	}
 559  
 560  	// Copy over the encryption registry data.
 561  	buf := make([]byte, vlogHeaderSize)
 562  
 563  	y.AssertTruef(vlogHeaderSize == copy(buf, lf.Data),
 564  		"Unable to copy from %s, size %d", path, lf.size.Load())
 565  	keyID := binary.BigEndian.Uint64(buf[:8])
 566  	// retrieve datakey.
 567  	if dk, err := lf.registry.DataKey(keyID); err != nil {
 568  		return y.Wrapf(err, "While opening vlog file %d", lf.fid)
 569  	} else {
 570  		lf.dataKey = dk
 571  	}
 572  	lf.baseIV = buf[8:]
 573  	y.AssertTrue(len(lf.baseIV) == 12)
 574  
 575  	// Preserved ferr so we can return if this was a new file.
 576  	return ferr
 577  }
 578  
 579  // bootstrap will initialize the log file with key id and baseIV.
 580  // The below figure shows the layout of log file.
 581  // +----------------+------------------+------------------+
 582  // | keyID(8 bytes) |  baseIV(12 bytes)|	 entry...     |
 583  // +----------------+------------------+------------------+
 584  func (lf *logFile) bootstrap() error {
 585  	var err error
 586  
 587  	// generate data key for the log file.
 588  	var dk *pb.DataKey
 589  	if dk, err = lf.registry.LatestDataKey(); err != nil {
 590  		return y.Wrapf(err, "Error while retrieving datakey in logFile.bootstarp")
 591  	}
 592  	lf.dataKey = dk
 593  
 594  	// We'll always preserve vlogHeaderSize for key id and baseIV.
 595  	buf := make([]byte, vlogHeaderSize)
 596  
 597  	// write key id to the buf.
 598  	// key id will be zero if the logfile is in plain text.
 599  	binary.BigEndian.PutUint64(buf[:8], lf.keyID())
 600  	// generate base IV. It'll be used with offset of the vptr to encrypt the entry.
 601  	if _, err := cryptorand.Read(buf[8:]); err != nil {
 602  		return y.Wrapf(err, "Error while creating base IV, while creating logfile")
 603  	}
 604  
 605  	// Initialize base IV.
 606  	lf.baseIV = buf[8:]
 607  	y.AssertTrue(len(lf.baseIV) == 12)
 608  
 609  	// Copy over to the logFile.
 610  	y.AssertTrue(vlogHeaderSize == copy(lf.Data[0:], buf))
 611  
 612  	// Zero out the next entry.
 613  	lf.zeroNextEntry()
 614  	return nil
 615  }
 616