cfindex.go raw

   1  package index
   2  
   3  import (
   4  	"errors"
   5  	"github.com/p9c/p9/pkg/block"
   6  	"github.com/p9c/p9/pkg/chaincfg"
   7  	
   8  	"github.com/p9c/p9/pkg/qu"
   9  	
  10  	"github.com/p9c/p9/pkg/blockchain"
  11  	"github.com/p9c/p9/pkg/chainhash"
  12  	"github.com/p9c/p9/pkg/database"
  13  	"github.com/p9c/p9/pkg/gcs"
  14  	"github.com/p9c/p9/pkg/gcs/builder"
  15  	"github.com/p9c/p9/pkg/wire"
  16  )
  17  
  18  const (
  19  	// cfIndexName is the human-readable name for the index.
  20  	cfIndexName = "committed filter index"
  21  )
  22  
  23  // Committed filters come in one flavor currently: basic. They are generated and dropped in pairs, and both are indexed
  24  // by a block's hash. Besides holding different content, they also live in different buckets.
  25  var (
  26  	// cfIndexParentBucketKey is the name of the parent bucket used to house the index. The rest of the buckets live
  27  	// below this bucket.
  28  	cfIndexParentBucketKey = []byte("cfindexparentbucket")
  29  	// cfIndexKeys is an array of db bucket names used to house indexes of block hashes to cfilters.
  30  	cfIndexKeys = [][]byte{
  31  		[]byte("cf0byhashidx"),
  32  	}
  33  	// cfHeaderKeys is an array of db bucket names used to house indexes of block hashes to cf headers.
  34  	cfHeaderKeys = [][]byte{
  35  		[]byte("cf0headerbyhashidx"),
  36  	}
  37  	// cfHashKeys is an array of db bucket names used to house indexes of block hashes to cf hashes.
  38  	cfHashKeys = [][]byte{
  39  		[]byte("cf0hashbyhashidx"),
  40  	}
  41  	maxFilterType = uint8(len(cfHeaderKeys) - 1)
  42  	// zeroHash is the chainhash.Hash value of all zero bytes, defined here for convenience.
  43  	zeroHash chainhash.Hash
  44  )
  45  
  46  // dbFetchFilterIdxEntry retrieves a data blob from the filter index database. An entry's absence is not considered an
  47  // error.
  48  func dbFetchFilterIdxEntry(dbTx database.Tx, key []byte, h *chainhash.Hash) ([]byte, error) {
  49  	idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key)
  50  	return idx.Get(h[:]), nil
  51  }
  52  
  53  // dbStoreFilterIdxEntry stores a data blob in the filter index database.
  54  func dbStoreFilterIdxEntry(dbTx database.Tx, key []byte, h *chainhash.Hash, f []byte) (e error) {
  55  	idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key)
  56  	return idx.Put(h[:], f)
  57  }
  58  
  59  // dbDeleteFilterIdxEntry deletes a data blob from the filter index database.
  60  func dbDeleteFilterIdxEntry(dbTx database.Tx, key []byte, h *chainhash.Hash) (e error) {
  61  	idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key)
  62  	return idx.Delete(h[:])
  63  }
  64  
  65  // CFIndex implements a committed filter (cf) by hash index.
  66  type CFIndex struct {
  67  	db          database.DB
  68  	chainParams *chaincfg.Params
  69  }
  70  
  71  // Ensure the CfIndex type implements the Indexer interface.
  72  var _ Indexer = (*CFIndex)(nil)
  73  
  74  // Ensure the CfIndex type implements the NeedsInputser interface.
  75  var _ NeedsInputser = (*CFIndex)(nil)
  76  
  77  // NeedsInputs signals that the index requires the referenced inputs in order to properly create the index. This
  78  // implements the NeedsInputser interface.
  79  func (idx *CFIndex) NeedsInputs() bool {
  80  	return true
  81  }
  82  
  83  // Init initializes the hash-based cf index. This is part of the Indexer interface.
  84  func (idx *CFIndex) Init() (e error) {
  85  	return nil // Nothing to do.
  86  }
  87  
  88  // Key returns the database key to use for the index as a byte slice. This is part of the Indexer interface.
  89  func (idx *CFIndex) Key() []byte {
  90  	return cfIndexParentBucketKey
  91  }
  92  
  93  // Name returns the human-readable name of the index. This is part of the Indexer interface.
  94  func (idx *CFIndex) Name() string {
  95  	return cfIndexName
  96  }
  97  
  98  // Create is invoked when the indexer manager determines the index needs to be created for the first time. It creates
  99  // buckets for the two hash-based cf indexes (regular only currently).
 100  func (idx *CFIndex) Create(dbTx database.Tx) (e error) {
 101  	meta := dbTx.Metadata()
 102  	cfIndexParentBucket, e := meta.CreateBucket(cfIndexParentBucketKey)
 103  	if e != nil {
 104  		return e
 105  	}
 106  	for _, bucketName := range cfIndexKeys {
 107  		_, e = cfIndexParentBucket.CreateBucket(bucketName)
 108  		if e != nil {
 109  			return e
 110  		}
 111  	}
 112  	for _, bucketName := range cfHeaderKeys {
 113  		_, e = cfIndexParentBucket.CreateBucket(bucketName)
 114  		if e != nil {
 115  			return e
 116  		}
 117  	}
 118  	for _, bucketName := range cfHashKeys {
 119  		_, e = cfIndexParentBucket.CreateBucket(bucketName)
 120  		if e != nil {
 121  			return e
 122  		}
 123  	}
 124  	return nil
 125  }
 126  
 127  // storeFilter stores a given filter, and performs the steps needed to generate the filter's header.
 128  func storeFilter(
 129  	dbTx database.Tx, block *block.Block, f *gcs.Filter,
 130  	filterType wire.FilterType,
 131  ) (e error) {
 132  	if uint8(filterType) > maxFilterType {
 133  		return errors.New("unsupported filter type")
 134  	}
 135  	// Figure out which buckets to use.
 136  	fkey := cfIndexKeys[filterType]
 137  	hkey := cfHeaderKeys[filterType]
 138  	hashkey := cfHashKeys[filterType]
 139  	// Start by storing the filter.
 140  	h := block.Hash()
 141  	filterBytes, e := f.NBytes()
 142  	if e != nil {
 143  		return e
 144  	}
 145  	e = dbStoreFilterIdxEntry(dbTx, fkey, h, filterBytes)
 146  	if e != nil {
 147  		return e
 148  	}
 149  	// Next store the filter hash.
 150  	filterHash, e := builder.GetFilterHash(f)
 151  	if e != nil {
 152  		return e
 153  	}
 154  	e = dbStoreFilterIdxEntry(dbTx, hashkey, h, filterHash[:])
 155  	if e != nil {
 156  		return e
 157  	}
 158  	// Then fetch the previous block's filter header.
 159  	var prevHeader *chainhash.Hash
 160  	ph := &block.WireBlock().Header.PrevBlock
 161  	if ph.IsEqual(&zeroHash) {
 162  		prevHeader = &zeroHash
 163  	} else {
 164  		var pfh []byte
 165  		pfh, e = dbFetchFilterIdxEntry(dbTx, hkey, ph)
 166  		if e != nil {
 167  			return e
 168  		}
 169  		// Construct the new block's filter header, and store it.
 170  		prevHeader, e = chainhash.NewHash(pfh)
 171  		if e != nil {
 172  			return e
 173  		}
 174  	}
 175  	fh, e := builder.MakeHeaderForFilter(f, *prevHeader)
 176  	if e != nil {
 177  		return e
 178  	}
 179  	return dbStoreFilterIdxEntry(dbTx, hkey, h, fh[:])
 180  }
 181  
 182  // ConnectBlock is invoked by the index manager when a new block has been connected to the main chain. This indexer adds
 183  // a hash-to-cf mapping for every passed block. This is part of the Indexer interface.
 184  func (idx *CFIndex) ConnectBlock(
 185  	dbTx database.Tx, block *block.Block,
 186  	stxos []blockchain.SpentTxOut,
 187  ) (e error) {
 188  	prevScripts := make([][]byte, len(stxos))
 189  	for i, stxo := range stxos {
 190  		prevScripts[i] = stxo.PkScript
 191  	}
 192  	f, e := builder.BuildBasicFilter(block.WireBlock(), prevScripts)
 193  	if e != nil {
 194  		return e
 195  	}
 196  	return storeFilter(dbTx, block, f, wire.GCSFilterRegular)
 197  }
 198  
 199  // DisconnectBlock is invoked by the index manager when a block has been disconnected from the main chain. This indexer
 200  // removes the hash-to-cf mapping for every passed block. This is part of the Indexer interface.
 201  func (idx *CFIndex) DisconnectBlock(
 202  	dbTx database.Tx, block *block.Block,
 203  	_ []blockchain.SpentTxOut,
 204  ) (e error) {
 205  	for _, key := range cfIndexKeys {
 206  		e := dbDeleteFilterIdxEntry(dbTx, key, block.Hash())
 207  		if e != nil {
 208  			return e
 209  		}
 210  	}
 211  	for _, key := range cfHeaderKeys {
 212  		e := dbDeleteFilterIdxEntry(dbTx, key, block.Hash())
 213  		if e != nil {
 214  			return e
 215  		}
 216  	}
 217  	for _, key := range cfHashKeys {
 218  		e := dbDeleteFilterIdxEntry(dbTx, key, block.Hash())
 219  		if e != nil {
 220  			return e
 221  		}
 222  	}
 223  	return nil
 224  }
 225  
 226  // entryByBlockHash fetches a filter index entry of a particular type (eg. filter, filter header, etc) for a filter type
 227  // and block hash.
 228  func (idx *CFIndex) entryByBlockHash(
 229  	filterTypeKeys [][]byte,
 230  	filterType wire.FilterType, h *chainhash.Hash,
 231  ) (entry []byte, e error) {
 232  	if uint8(filterType) > maxFilterType {
 233  		return nil, errors.New("unsupported filter type")
 234  	}
 235  	key := filterTypeKeys[filterType]
 236  	e = idx.db.View(
 237  		func(dbTx database.Tx) (e error) {
 238  			entry, e = dbFetchFilterIdxEntry(dbTx, key, h)
 239  			return e
 240  		},
 241  	)
 242  	return entry, e
 243  }
 244  
 245  // entriesByBlockHashes batch fetches a filter index entry of a particular type (eg. filter, filter header, etc) for a
 246  // filter type and slice of block hashes.
 247  func (idx *CFIndex) entriesByBlockHashes(
 248  	filterTypeKeys [][]byte,
 249  	filterType wire.FilterType, blockHashes []*chainhash.Hash,
 250  ) (entries [][]byte, e error) {
 251  	if uint8(filterType) > maxFilterType {
 252  		return nil, errors.New("unsupported filter type")
 253  	}
 254  	key := filterTypeKeys[filterType]
 255  	entries = make([][]byte, 0, len(blockHashes))
 256  	e = idx.db.View(
 257  		func(dbTx database.Tx) (e error) {
 258  			for _, blockHash := range blockHashes {
 259  				entry, e := dbFetchFilterIdxEntry(dbTx, key, blockHash)
 260  				if e != nil {
 261  					return e
 262  				}
 263  				entries = append(entries, entry)
 264  			}
 265  			return nil
 266  		},
 267  	)
 268  	return entries, e
 269  }
 270  
 271  // FilterByBlockHash returns the serialized contents of a block's basic or committed filter.
 272  func (idx *CFIndex) FilterByBlockHash(
 273  	h *chainhash.Hash,
 274  	filterType wire.FilterType,
 275  ) ([]byte, error) {
 276  	return idx.entryByBlockHash(cfIndexKeys, filterType, h)
 277  }
 278  
 279  // FiltersByBlockHashes returns the serialized contents of a block's basic or committed filter for a set of blocks by
 280  // hash.
 281  func (idx *CFIndex) FiltersByBlockHashes(
 282  	blockHashes []*chainhash.Hash,
 283  	filterType wire.FilterType,
 284  ) ([][]byte, error) {
 285  	return idx.entriesByBlockHashes(cfIndexKeys, filterType, blockHashes)
 286  }
 287  
 288  // FilterHeaderByBlockHash returns the serialized contents of a block's basic committed filter header.
 289  func (idx *CFIndex) FilterHeaderByBlockHash(
 290  	h *chainhash.Hash,
 291  	filterType wire.FilterType,
 292  ) ([]byte, error) {
 293  	return idx.entryByBlockHash(cfHeaderKeys, filterType, h)
 294  }
 295  
 296  // FilterHeadersByBlockHashes returns the serialized contents of a block's basic committed filter header for a set of
 297  // blocks by hash.
 298  func (idx *CFIndex) FilterHeadersByBlockHashes(
 299  	blockHashes []*chainhash.Hash,
 300  	filterType wire.FilterType,
 301  ) ([][]byte, error) {
 302  	return idx.entriesByBlockHashes(cfHeaderKeys, filterType, blockHashes)
 303  }
 304  
 305  // FilterHashByBlockHash returns the serialized contents of a block's basic committed filter hash.
 306  func (idx *CFIndex) FilterHashByBlockHash(
 307  	h *chainhash.Hash,
 308  	filterType wire.FilterType,
 309  ) ([]byte, error) {
 310  	return idx.entryByBlockHash(cfHashKeys, filterType, h)
 311  }
 312  
 313  // FilterHashesByBlockHashes returns the serialized contents of a block's basic committed filter hash for a set of
 314  // blocks by hash.
 315  func (idx *CFIndex) FilterHashesByBlockHashes(
 316  	blockHashes []*chainhash.Hash,
 317  	filterType wire.FilterType,
 318  ) ([][]byte, error) {
 319  	return idx.entriesByBlockHashes(cfHashKeys, filterType, blockHashes)
 320  }
 321  
 322  // NewCfIndex returns a new instance of an indexer that is used to create a mapping of the hashes of all blocks in the
 323  // blockchain to their respective committed filters. It implements the Indexer interface which plugs into the
 324  // IndexManager that in turn is used by the blockchain package. This allows the index to be seamlessly maintained along
 325  // with the chain.
 326  func NewCfIndex(db database.DB, chainParams *chaincfg.Params) *CFIndex {
 327  	return &CFIndex{db: db, chainParams: chainParams}
 328  }
 329  
 330  // DropCfIndex drops the CF index from the provided database if exists.
 331  func DropCfIndex(db database.DB, interrupt qu.C) (e error) {
 332  	return dropIndex(db, cfIndexParentBucketKey, cfIndexName, interrupt)
 333  }
 334