1 package indexers
2 3 import (
4 "errors"
5 "fmt"
6 "github.com/p9c/p9/pkg/block"
7 8 "github.com/p9c/p9/pkg/qu"
9 10 "github.com/p9c/p9/pkg/blockchain"
11 "github.com/p9c/p9/pkg/chainhash"
12 "github.com/p9c/p9/pkg/database"
13 "github.com/p9c/p9/pkg/wire"
14 )
15 16 const (
17 // txIndexName is the human-readable name for the index.
18 txIndexName = "transaction index"
19 )
20 21 var (
22 // txIndexKey is the key of the transaction index and the db bucket used
23 // to house it.
24 txIndexKey = []byte("txbyhashidx")
25 // idByHashIndexBucketName is the name of the db bucket used to house the
26 // block id -> block hash index.
27 idByHashIndexBucketName = []byte("idbyhashidx")
28 // hashByIDIndexBucketName is the name of the db bucket used to house the
29 // block hash -> block id index.
30 hashByIDIndexBucketName = []byte("hashbyididx")
31 // errNoBlockIDEntry is an error that indicates a requested entry does
32 // not exist in the block ID index.
33 errNoBlockIDEntry = errors.New("no entry in the block ID index")
34 )
35 36 // The transaction index consists of an entry for every transaction in the main chain. In order to significantly
37 // optimize the space requirements a separate index which provides an internal mapping between each block that has been
38 // indexed and a unique ID for use within the hash to location mappings. The ID is simply a sequentially incremented
39 // uint32.
40 //
41 // This is useful because it is only 4 bytes versus 32 bytes hashes and thus saves a ton of space in the index. There
42 // are three buckets used in total.
43 //
44 // The first bucket maps the hash of each transaction to the specific block location. The second bucket maps the hash of
45 // each block to the unique ID and the third maps that ID back to the block hash.
46 //
47 // NOTE: Although it is technically possible for multiple transactions to have the same hash as long as the previous
48 // transaction with the same hash is fully spent, this code only stores the most recent one because doing otherwise
49 // would add a non-trivial amount of space and overhead for something that will realistically never happen per the
50 // probability and even if it did, the old one must be fully spent and so the most likely transaction a caller would
51 // want for a given hash is the most recent one anyways.
52 //
53 // The serialized format for keys and values in the block hash to ID bucket is:
54 //
55 // <hash> = <ID>
56 // Field Type Size
57 // hash chainhash.Hash 32 bytes
58 // ID uint32 4 bytes
59 // -----
60 // Total: 36 bytes
61 //
62 // The serialized format for keys and values in the ID to block hash bucket is:
63 //
64 // <ID> = <hash>
65 // Field Type Size
66 // ID uint32 4 bytes
67 // hash chainhash.Hash 32 bytes
68 // -----
69 // Total: 36 bytes
70 // The serialized format for the keys and values in the tx index bucket is:
71 // <txhash> = <block id><start offset><tx length>
72 // Field Type Size
73 // txhash chainhash.Hash 32 bytes
74 // block id uint32 4 bytes
75 // start offset uint32 4 bytes
76 // tx length uint32 4 bytes
77 // -----
78 // Total: 44 bytes
79 //
80 // dbPutBlockIDIndexEntry uses an existing database transaction to update or add the index entries for the hash to id
81 // and id to hash mappings for the provided values.
82 func dbPutBlockIDIndexEntry(dbTx database.Tx, hash *chainhash.Hash, id uint32) (e error) {
83 // Serialize the height for use in the index entries.
84 var serializedID [4]byte
85 byteOrder.PutUint32(serializedID[:], id)
86 // Add the block hash to ID mapping to the index.
87 meta := dbTx.Metadata()
88 hashIndex := meta.Bucket(idByHashIndexBucketName)
89 if e := hashIndex.Put(hash[:], serializedID[:]); E.Chk(e) {
90 return e
91 }
92 // Add the block ID to hash mapping to the index.
93 idIndex := meta.Bucket(hashByIDIndexBucketName)
94 return idIndex.Put(serializedID[:], hash[:])
95 }
96 97 // dbRemoveBlockIDIndexEntry uses an existing database transaction remove index entries from the hash to id and id to
98 // hash mappings for the provided hash.
99 func dbRemoveBlockIDIndexEntry(dbTx database.Tx, hash *chainhash.Hash) (e error) {
100 // Remove the block hash to ID mapping.
101 meta := dbTx.Metadata()
102 hashIndex := meta.Bucket(idByHashIndexBucketName)
103 serializedID := hashIndex.Get(hash[:])
104 if serializedID == nil {
105 return nil
106 }
107 if e := hashIndex.Delete(hash[:]); E.Chk(e) {
108 return e
109 }
110 // Remove the block ID to hash mapping.
111 idIndex := meta.Bucket(hashByIDIndexBucketName)
112 return idIndex.Delete(serializedID)
113 }
114 115 // dbFetchBlockIDByHash uses an existing database transaction to retrieve the block id for the provided hash from the
116 // index.
117 func dbFetchBlockIDByHash(dbTx database.Tx, hash *chainhash.Hash) (uint32, error) {
118 hashIndex := dbTx.Metadata().Bucket(idByHashIndexBucketName)
119 serializedID := hashIndex.Get(hash[:])
120 if serializedID == nil {
121 return 0, errNoBlockIDEntry
122 }
123 return byteOrder.Uint32(serializedID), nil
124 }
125 126 // dbFetchBlockHashBySerializedID uses an existing database transaction to retrieve the hash for the provided serialized
127 // block id from the index.
128 func dbFetchBlockHashBySerializedID(dbTx database.Tx, serializedID []byte) (*chainhash.Hash, error) {
129 idIndex := dbTx.Metadata().Bucket(hashByIDIndexBucketName)
130 hashBytes := idIndex.Get(serializedID)
131 if hashBytes == nil {
132 return nil, errNoBlockIDEntry
133 }
134 var hash chainhash.Hash
135 copy(hash[:], hashBytes)
136 return &hash, nil
137 }
138 139 // dbFetchBlockHashByID uses an existing database transaction to retrieve the hash for the provided block id from the
140 // index.
141 func dbFetchBlockHashByID(dbTx database.Tx, id uint32) (*chainhash.Hash, error) {
142 var serializedID [4]byte
143 byteOrder.PutUint32(serializedID[:], id)
144 return dbFetchBlockHashBySerializedID(dbTx, serializedID[:])
145 }
146 147 // putTxIndexEntry serializes the provided values according to the format described about for a transaction index entry.
148 // The target byte slice must be at least large enough to handle the number of bytes defined by the txEntrySize constant
149 // or it will panic.
150 func putTxIndexEntry(target []byte, blockID uint32, txLoc wire.TxLoc) {
151 byteOrder.PutUint32(target, blockID)
152 byteOrder.PutUint32(target[4:], uint32(txLoc.TxStart))
153 byteOrder.PutUint32(target[8:], uint32(txLoc.TxLen))
154 }
155 156 // dbPutTxIndexEntry uses an existing database transaction to update the transaction index given the provided serialized
157 // data that is expected to have been serialized putTxIndexEntry.
158 func dbPutTxIndexEntry(dbTx database.Tx, txHash *chainhash.Hash, serializedData []byte) (e error) {
159 txIndex := dbTx.Metadata().Bucket(txIndexKey)
160 return txIndex.Put(txHash[:], serializedData)
161 }
162 163 // dbFetchTxIndexEntry uses an existing database transaction to fetch the block region for the provided transaction hash
164 // from the transaction index. When there is no entry for the provided hash, nil will be returned for the both the
165 // region and the error.
166 func dbFetchTxIndexEntry(dbTx database.Tx, txHash *chainhash.Hash) (*database.BlockRegion, error) {
167 // Load the record from the database and return now if it doesn't exist.
168 txIndex := dbTx.Metadata().Bucket(txIndexKey)
169 serializedData := txIndex.Get(txHash[:])
170 if len(serializedData) == 0 {
171 return nil, nil
172 }
173 // Ensure the serialized data has enough bytes to properly deserialize.
174 if len(serializedData) < 12 {
175 return nil, database.DBError{
176 ErrorCode: database.ErrCorruption,
177 Description: fmt.Sprintf(
178 "corrupt transaction index "+
179 "entry for %s", txHash,
180 ),
181 }
182 }
183 // Load the block hash associated with the block ID.
184 hash, e := dbFetchBlockHashBySerializedID(dbTx, serializedData[0:4])
185 if e != nil {
186 return nil, database.DBError{
187 ErrorCode: database.ErrCorruption,
188 Description: fmt.Sprintf(
189 "corrupt transaction index "+
190 "entry for %s: %v", txHash, e,
191 ),
192 }
193 }
194 // Deserialize the final entry.
195 region := database.BlockRegion{Hash: &chainhash.Hash{}}
196 copy(region.Hash[:], hash[:])
197 region.Offset = byteOrder.Uint32(serializedData[4:8])
198 region.Len = byteOrder.Uint32(serializedData[8:12])
199 return ®ion, nil
200 }
201 202 // dbAddTxIndexEntries uses an existing database transaction to add a transaction index entry for every transaction in
203 // the passed block.
204 func dbAddTxIndexEntries(dbTx database.Tx, block *block.Block, blockID uint32) (e error) {
205 // The offset and length of the transactions within the serialized block.
206 txLocs, e := block.TxLoc()
207 if e != nil {
208 return e
209 }
210 // As an optimization, allocate a single slice big enough to hold all of the serialized transaction index entries
211 // for the block and serialize them directly into the slice. Then, pass the appropriate subslice to the database to
212 // be written. This approach significantly cuts down on the number of required allocations.
213 offset := 0
214 serializedValues := make([]byte, len(block.Transactions())*txEntrySize)
215 for i, tx := range block.Transactions() {
216 putTxIndexEntry(serializedValues[offset:], blockID, txLocs[i])
217 endOffset := offset + txEntrySize
218 e := dbPutTxIndexEntry(
219 dbTx, tx.Hash(),
220 serializedValues[offset:endOffset:endOffset],
221 )
222 if e != nil {
223 return e
224 }
225 offset += txEntrySize
226 }
227 return nil
228 }
229 230 // dbRemoveTxIndexEntry uses an existing database transaction to remove the most recent transaction index entry for the
231 // given hash.
232 func dbRemoveTxIndexEntry(dbTx database.Tx, txHash *chainhash.Hash) (e error) {
233 txIndex := dbTx.Metadata().Bucket(txIndexKey)
234 serializedData := txIndex.Get(txHash[:])
235 if len(serializedData) == 0 {
236 return fmt.Errorf(
237 "can't remove non-existent transaction %s "+
238 "from the transaction index", txHash,
239 )
240 }
241 return txIndex.Delete(txHash[:])
242 }
243 244 // dbRemoveTxIndexEntries uses an existing database transaction to remove the latest transaction entry for every
245 // transaction in the passed block.
246 func dbRemoveTxIndexEntries(dbTx database.Tx, block *block.Block) (e error) {
247 for _, tx := range block.Transactions() {
248 e := dbRemoveTxIndexEntry(dbTx, tx.Hash())
249 if e != nil {
250 return e
251 }
252 }
253 return nil
254 }
255 256 // TxIndex implements a transaction by hash index. That is to say, it supports querying all transactions by their hash.
257 type TxIndex struct {
258 db database.DB
259 curBlockID uint32
260 }
261 262 // Ensure the TxIndex type implements the Indexer interface.
263 var _ Indexer = (*TxIndex)(nil)
264 265 // Init initializes the hash-based transaction index. In particular, it finds the highest used block ID and stores it
266 // for later use when connecting or disconnecting blocks. This is part of the Indexer interface.
267 func (idx *TxIndex) Init() (e error) {
268 // Find the latest known block id field for the internal block id index and initialize it. This is done because it's
269 // a lot more efficient to do a single search at initialize time than it is to write another value to the database
270 // on every update.
271 e = idx.db.View(
272 func(dbTx database.Tx) (e error) {
273 // Scan forward in large gaps to find a block id that doesn't exist yet to serve as an upper bound for the
274 // binary search below.
275 var highestKnown, nextUnknown uint32
276 testBlockID := uint32(1)
277 increment := uint32(100000)
278 for {
279 _, e := dbFetchBlockHashByID(dbTx, testBlockID)
280 if e != nil {
281 // F.Ln(err)
282 nextUnknown = testBlockID
283 break
284 }
285 highestKnown = testBlockID
286 testBlockID += increment
287 }
288 T.F("forward scan (highest known %d, next unknown %d)", highestKnown, nextUnknown)
289 // No used block IDs due to new database.
290 if nextUnknown == 1 {
291 return nil
292 }
293 // Use a binary search to find the final highest used block id. This will take at most ceil(log_2(increment))
294 // attempts.
295 for {
296 testBlockID = (highestKnown + nextUnknown) / 2
297 _, e := dbFetchBlockHashByID(dbTx, testBlockID)
298 if e != nil {
299 // F.Ln(err)
300 nextUnknown = testBlockID
301 } else {
302 highestKnown = testBlockID
303 }
304 T.F("binary scan (highest known %d, next unknown %d)", highestKnown, nextUnknown)
305 if highestKnown+1 == nextUnknown {
306 break
307 }
308 }
309 idx.curBlockID = highestKnown
310 return nil
311 },
312 )
313 if e != nil {
314 return e
315 }
316 T.Ln("current internal block ID:", idx.curBlockID)
317 return nil
318 }
319 320 // Key returns the database key to use for the index as a byte slice. This is part of the Indexer interface.
321 func (idx *TxIndex) Key() []byte {
322 return txIndexKey
323 }
324 325 // Name returns the human-readable name of the index. This is part of the Indexer interface.
326 func (idx *TxIndex) Name() string {
327 return txIndexName
328 }
329 330 // Create is invoked when the indexer manager determines the index needs to be created for the first time. It creates
331 // the buckets for the hash-based transaction index and the internal block ID indexes. This is part of the Indexer
332 // interface.
333 func (idx *TxIndex) Create(dbTx database.Tx) (e error) {
334 meta := dbTx.Metadata()
335 if _, e = meta.CreateBucket(idByHashIndexBucketName); E.Chk(e) {
336 return e
337 }
338 if _, e = meta.CreateBucket(hashByIDIndexBucketName); E.Chk(e) {
339 return e
340 }
341 _, e = meta.CreateBucket(txIndexKey)
342 return e
343 }
344 345 // ConnectBlock is invoked by the index manager when a new block has been connected to the main chain. This indexer adds
346 // a hash-to-transaction mapping for every transaction in the passed block. This is part of the Indexer interface.
347 func (idx *TxIndex) ConnectBlock(dbTx database.Tx, block *block.Block, stxos []blockchain.SpentTxOut) (e error) {
348 // Increment the internal block ID to use for the block being connected and add all of the transactions in the block
349 // to the index.
350 newBlockID := idx.curBlockID + 1
351 if e = dbAddTxIndexEntries(dbTx, block, newBlockID); E.Chk(e) {
352 return e
353 }
354 // Add the new block ID index entry for the block being connected and update the current internal block ID
355 // accordingly.
356 e = dbPutBlockIDIndexEntry(dbTx, block.Hash(), newBlockID)
357 if e != nil {
358 return e
359 }
360 idx.curBlockID = newBlockID
361 return nil
362 }
363 364 // DisconnectBlock is invoked by the index manager when a block has been disconnected from the main chain.
365 //
366 // This indexer removes the hash-to-transaction mapping for every transaction in the block.
367 //
368 // This is part of the Indexer interface.
369 func (idx *TxIndex) DisconnectBlock(
370 dbTx database.Tx, block *block.Block,
371 stxos []blockchain.SpentTxOut,
372 ) (e error) {
373 // Remove all of the transactions in the block from the index.
374 if e = dbRemoveTxIndexEntries(dbTx, block); E.Chk(e) {
375 return e
376 }
377 // Remove the block ID index entry for the block being disconnected and decrement the current internal block ID to
378 // account for it.
379 if e = dbRemoveBlockIDIndexEntry(dbTx, block.Hash()); E.Chk(e) {
380 return e
381 }
382 idx.curBlockID--
383 return nil
384 }
385 386 // TxBlockRegion returns the block region for the provided transaction hash from the transaction index.
387 //
388 // The block region can in turn be used to load the raw transaction bytes.
389 //
390 // When there is no entry for the provided hash, nil will be returned for the both the entry and the error.
391 //
392 // This function is safe for concurrent access.
393 func (idx *TxIndex) TxBlockRegion(hash *chainhash.Hash) (region *database.BlockRegion, e error) {
394 e = idx.db.View(
395 func(dbTx database.Tx) (e error) {
396 region, e = dbFetchTxIndexEntry(dbTx, hash)
397 return e
398 },
399 )
400 return region, e
401 }
402 403 // NewTxIndex returns a new instance of an indexer that is used to create a mapping of the hashes of all transactions in
404 // the blockchain to the respective block, location within the block, and size of the transaction.
405 //
406 // It implements the Indexer interface which plugs into the IndexManager that in turn is used by the blockchain package.
407 //
408 // This allows the index to be seamlessly maintained along with the chain.
409 func NewTxIndex(db database.DB) *TxIndex {
410 return &TxIndex{db: db}
411 }
412 413 // dropBlockIDIndex drops the internal block id index.
414 func dropBlockIDIndex(db database.DB) (e error) {
415 return db.Update(
416 func(dbTx database.Tx) (e error) {
417 meta := dbTx.Metadata()
418 e = meta.DeleteBucket(idByHashIndexBucketName)
419 if e != nil {
420 return e
421 }
422 return meta.DeleteBucket(hashByIDIndexBucketName)
423 },
424 )
425 }
426 427 // DropTxIndex drops the transaction index from the provided database if it exists. Since the address index relies on
428 // it, the address index will also be dropped when it exists.
429 func DropTxIndex(db database.DB, interrupt qu.C) (e error) {
430 e = dropIndex(db, addrIndexKey, addrIndexName, interrupt)
431 if e != nil {
432 return e
433 }
434 return dropIndex(db, txIndexKey, txIndexName, interrupt)
435 }
436