1 package indexers
2 3 import (
4 "errors"
5 "fmt"
6 "github.com/p9c/p9/pkg/block"
7 "github.com/p9c/p9/pkg/btcaddr"
8 "github.com/p9c/p9/pkg/chaincfg"
9 "sync"
10 11 "github.com/p9c/p9/pkg/qu"
12 13 "github.com/p9c/p9/pkg/blockchain"
14 "github.com/p9c/p9/pkg/chainhash"
15 "github.com/p9c/p9/pkg/database"
16 "github.com/p9c/p9/pkg/txscript"
17 "github.com/p9c/p9/pkg/util"
18 "github.com/p9c/p9/pkg/wire"
19 )
20 21 const (
22 // addrIndexName is the human-readable name for the index.
23 addrIndexName = "address index"
24 // level0MaxEntries is the maximum number of transactions that are stored in level 0 of an address index entry.
25 // Subsequent levels store 2^n * level0MaxEntries entries, or in words, double the maximum of the previous level.
26 level0MaxEntries = 8
27 // addrKeySize is the number of bytes an address key consumes in the index. It consists of 1 byte address type + 20
28 // bytes hash160.
29 addrKeySize = 1 + 20
30 // levelKeySize is the number of bytes a level key in the address index consumes. It consists of the address key + 1
31 // byte for the level.
32 levelKeySize = addrKeySize + 1
33 // levelOffset is the offset in the level key which identifes the level.
34 levelOffset = levelKeySize - 1
35 // addrKeyTypePubKeyHash is the address type in an address key which represents both a pay-to-pubkey-hash and a
36 // pay-to-pubkey address. This is done because both are identical for the purposes of the address index.
37 addrKeyTypePubKeyHash = 0
38 // addrKeyTypeScriptHash is the address type in an address key which represents a pay-to-script-hash address. This
39 // is necessary because the hash of a pubkey address might be the same as that of a script hash.
40 addrKeyTypeScriptHash = 1
41 // // addrKeyTypePubKeyHash is the address type in an address key which represents
42 // // a pay-to-witness-pubkey-hash address. This is required as the 20-byte data
43 // // push of a p2wkh witness program may be the same data push used a p2pkh
44 // // address.
45 // addrKeyTypeWitnessPubKeyHash = 2
46 // // addrKeyTypeScriptHash is the address type in an address key which represents
47 // // a pay-to-witness-script-hash address. This is required, as p2wsh are distinct
48 // // from p2sh addresses since they use a new script template, as well as a
49 // // 32-byte data push.
50 // addrKeyTypeWitnessScriptHash = 3
51 // Size of a transaction entry. It consists of 4 bytes block id + 4 bytes offset + 4 bytes length.
52 txEntrySize = 4 + 4 + 4
53 )
54 55 var (
56 // addrIndexKey is the key of the address index and the db bucket used to house it.
57 addrIndexKey = []byte("txbyaddridx")
58 // errUnsupportedAddressType is an error that is used to signal an unsupported address type has been used.
59 errUnsupportedAddressType = errors.New(
60 "address type is not supported " +
61 "by the address index",
62 )
63 )
64 65 // The address index maps addresses referenced in the blockchain to a list of all the transactions involving that
66 // address. Transactions are stored according to their order of appearance in the blockchain. That is to say first by
67 // block height and then by offset inside the block. It is also important to note that this implementation requires the
68 // transaction index since it is needed in order to catch up old blocks due to the fact the spent outputs will already
69 // be pruned from the utxo set.
70 //
71 // The approach used to store the index is similar to a log-structured merge tree (LSM tree) and is thus similar to how
72 // leveldb works internally.// Every address consists of one or more entries identified by a level starting from 0 where
73 // each level holds a maximum number of entries such that each subsequent level holds double the maximum of the previous
74 // one. In equation form, the number of entries each level holds is 2^n * firstLevelMaxSize. New transactions are
75 // appended to level 0 until it becomes full at which point the entire level 0 entry is appended to the level 1 entry
76 // and level 0 is cleared. This process continues until level 1 becomes full at which point it will be appended to level
77 // 2 and cleared and so on.
78 //
79 // The result of this is the lower levels contain newer transactions and the transactions within each level are ordered
80 // from oldest to newest. The intent of this approach is to provide a balance between space efficiency and indexing
81 // cost. Storing one entry per transaction would have the lowest indexing cost, but would waste a lot of space because
82 // the same address hash would be duplicated for every transaction key. On the other hand, storing a single entry with
83 // all transactions would be the most space efficient, but would cause indexing cost to grow quadratically with the
84 // number of transactions involving the same address. The approach used here provides logarithmic insertion and
85 // retrieval.
86 //
87 // The serialized key format is:
88 // <addr type><addr hash><level>
89 // Field Type Size
90 // addr type uint8 1 byte
91 // addr hash hash160 20 bytes
92 // level uint8 1 byte
93 // -----
94 // Total: 22 bytes
95 // The serialized value format is:
96 // [<block id><start offset><tx length>,...]
97 // Field Type Size
98 // block id uint32 4 bytes
99 // start offset uint32 4 bytes
100 // tx length uint32 4 bytes
101 // -----
102 // Total: 12 bytes per indexed tx
103 // fetchBlockHashFunc defines a callback function to use in order to convert a serialized block ID to an associated
104 // block hash.
105 type fetchBlockHashFunc func(serializedID []byte) (*chainhash.Hash, error)
106 107 // serializeAddrIndexEntry serializes the provided block id and transaction location according to the format described
108 // in detail above.
109 func serializeAddrIndexEntry(blockID uint32, txLoc wire.TxLoc) []byte {
110 // Serialize the entry.
111 serialized := make([]byte, 12)
112 byteOrder.PutUint32(serialized, blockID)
113 byteOrder.PutUint32(serialized[4:], uint32(txLoc.TxStart))
114 byteOrder.PutUint32(serialized[8:], uint32(txLoc.TxLen))
115 return serialized
116 }
117 118 // deserializeAddrIndexEntry decodes the passed serialized byte slice into the provided region struct according to the
119 // format described in detail above and uses the passed block hash fetching function in order to conver the block ID to
120 // the associated block hash.
121 func deserializeAddrIndexEntry(
122 serialized []byte,
123 region *database.BlockRegion,
124 fetchBlockHash fetchBlockHashFunc,
125 ) (e error) {
126 // Ensure there are enough bytes to decode.
127 if len(serialized) < txEntrySize {
128 return errDeserialize("unexpected end of data")
129 }
130 hash, e := fetchBlockHash(serialized[0:4])
131 if e != nil {
132 return e
133 }
134 region.Hash = hash
135 region.Offset = byteOrder.Uint32(serialized[4:8])
136 region.Len = byteOrder.Uint32(serialized[8:12])
137 return nil
138 }
139 140 // keyForLevel returns the key for a specific address and level in the address index entry.
141 func keyForLevel(addrKey [addrKeySize]byte, level uint8) [levelKeySize]byte {
142 var key [levelKeySize]byte
143 copy(key[:], addrKey[:])
144 key[levelOffset] = level
145 return key
146 }
147 148 // dbPutAddrIndexEntry updates the address index to include the provided entry according to the level-based scheme
149 // described in detail above.
150 func dbPutAddrIndexEntry(bucket internalBucket, addrKey [addrKeySize]byte, blockID uint32, txLoc wire.TxLoc) (e error) {
151 // Start with level 0 and its initial max number of entries.
152 curLevel := uint8(0)
153 maxLevelBytes := level0MaxEntries * txEntrySize
154 // Simply append the new entry to level 0 and return now when it will fit. This is the most common path.
155 newData := serializeAddrIndexEntry(blockID, txLoc)
156 level0Key := keyForLevel(addrKey, 0)
157 level0Data := bucket.Get(level0Key[:])
158 if len(level0Data)+len(newData) <= maxLevelBytes {
159 mergedData := newData
160 if len(level0Data) > 0 {
161 mergedData = make([]byte, len(level0Data)+len(newData))
162 copy(mergedData, level0Data)
163 copy(mergedData[len(level0Data):], newData)
164 }
165 return bucket.Put(level0Key[:], mergedData)
166 }
167 // At this point, level 0 is full, so merge each level into higher levels as many times as needed to free up level
168 // 0.
169 prevLevelData := level0Data
170 for {
171 // Each new level holds twice as much as the previous one.
172 curLevel++
173 maxLevelBytes *= 2
174 // Move to the next level as long as the current level is full.
175 curLevelKey := keyForLevel(addrKey, curLevel)
176 curLevelData := bucket.Get(curLevelKey[:])
177 if len(curLevelData) == maxLevelBytes {
178 prevLevelData = curLevelData
179 continue
180 }
181 // The current level has room for the data in the previous one, so merge the data from previous level into it.
182 mergedData := prevLevelData
183 if len(curLevelData) > 0 {
184 mergedData = make(
185 []byte, len(curLevelData)+
186 len(prevLevelData),
187 )
188 copy(mergedData, curLevelData)
189 copy(mergedData[len(curLevelData):], prevLevelData)
190 }
191 e := bucket.Put(curLevelKey[:], mergedData)
192 if e != nil {
193 return e
194 }
195 // Move all of the levels before the previous one up a level.
196 for mergeLevel := curLevel - 1; mergeLevel > 0; mergeLevel-- {
197 mergeLevelKey := keyForLevel(addrKey, mergeLevel)
198 prevLevelKey := keyForLevel(addrKey, mergeLevel-1)
199 prevData := bucket.Get(prevLevelKey[:])
200 e := bucket.Put(mergeLevelKey[:], prevData)
201 if e != nil {
202 return e
203 }
204 }
205 break
206 }
207 // Finally, insert the new entry into level 0 now that it is empty.
208 return bucket.Put(level0Key[:], newData)
209 }
210 211 // dbFetchAddrIndexEntries returns block regions for transactions referenced by the given address key and the number of
212 // entries skipped since it could have been less in the case where there are less total entries than the requested
213 // number of entries to skip.
214 func dbFetchAddrIndexEntries(
215 bucket internalBucket, addrKey [addrKeySize]byte, numToSkip, numRequested uint32,
216 reverse bool, fetchBlockHash fetchBlockHashFunc,
217 ) ([]database.BlockRegion, uint32, error) {
218 // When the reverse flag is not set, all levels need to be fetched because numToSkip and numRequested are counted
219 // from the oldest transactions (highest level) and thus the total count is needed. However, when the reverse flag
220 // is set, only enough records to satisfy the requested amount are needed.
221 var level uint8
222 var serialized []byte
223 for !reverse || len(serialized) < int(numToSkip+numRequested)*txEntrySize {
224 curLevelKey := keyForLevel(addrKey, level)
225 levelData := bucket.Get(curLevelKey[:])
226 if levelData == nil {
227 // Stop when there are no more levels.
228 break
229 }
230 // Higher levels contain older transactions, so prepend them.
231 prepended := make([]byte, len(serialized)+len(levelData))
232 copy(prepended, levelData)
233 copy(prepended[len(levelData):], serialized)
234 serialized = prepended
235 level++
236 }
237 // When the requested number of entries to skip is larger than the number available, skip them all and return now
238 // with the actual number skipped.
239 numEntries := uint32(len(serialized) / txEntrySize)
240 if numToSkip >= numEntries {
241 return nil, numEntries, nil
242 }
243 // Nothing more to do when there are no requested entries.
244 if numRequested == 0 {
245 return nil, numToSkip, nil
246 }
247 // Limit the number to load based on the number of available entries, the number to skip, and the number requested.
248 numToLoad := numEntries - numToSkip
249 if numToLoad > numRequested {
250 numToLoad = numRequested
251 }
252 // Start the offset after all skipped entries and load the calculated number.
253 results := make([]database.BlockRegion, numToLoad)
254 for i := uint32(0); i < numToLoad; i++ {
255 // Calculate the read offset according to the reverse flag.
256 var offset uint32
257 if reverse {
258 offset = (numEntries - numToSkip - i - 1) * txEntrySize
259 } else {
260 offset = (numToSkip + i) * txEntrySize
261 }
262 // Deserialize and populate the result.
263 e := deserializeAddrIndexEntry(
264 serialized[offset:],
265 &results[i], fetchBlockHash,
266 )
267 if e != nil {
268 // Ensure any deserialization errors are returned as database corruption errors.
269 if isDeserializeErr(e) {
270 e = database.DBError{
271 ErrorCode: database.ErrCorruption,
272 Description: fmt.Sprintf(
273 "failed to "+
274 "deserialized address index "+
275 "for key %x: %v", addrKey, e,
276 ),
277 }
278 }
279 return nil, 0, e
280 }
281 }
282 return results, numToSkip, nil
283 }
284 285 // minEntriesToReachLevel returns the minimum number of entries that are required to reach the given address index level.
286 func minEntriesToReachLevel(level uint8) int {
287 maxEntriesForLevel := level0MaxEntries
288 minRequired := 1
289 for l := uint8(1); l <= level; l++ {
290 minRequired += maxEntriesForLevel
291 maxEntriesForLevel *= 2
292 }
293 return minRequired
294 }
295 296 // maxEntriesForLevel returns the maximum number of entries allowed for the given address index level.
297 func maxEntriesForLevel(level uint8) int {
298 numEntries := level0MaxEntries
299 for l := level; l > 0; l-- {
300 numEntries *= 2
301 }
302 return numEntries
303 }
304 305 // dbRemoveAddrIndexEntries removes the specified number of entries from from the address index for the provided key. An
306 // assertion error will be returned if the count exceeds the total number of entries in the index.
307 func dbRemoveAddrIndexEntries(bucket internalBucket, addrKey [addrKeySize]byte, count int) (e error) {
308 // Nothing to do if no entries are being deleted.
309 if count <= 0 {
310 return nil
311 }
312 // Make use of a local map to track pending updates and define a closure to apply it to the database. This is done
313 // in order to reduce the number of database reads and because there is more than one exit path that needs to apply
314 // the updates.
315 pendingUpdates := make(map[uint8][]byte)
316 applyPending := func() (e error) {
317 for level, data := range pendingUpdates {
318 curLevelKey := keyForLevel(addrKey, level)
319 if len(data) == 0 {
320 e := bucket.Delete(curLevelKey[:])
321 if e != nil {
322 return e
323 }
324 continue
325 }
326 e := bucket.Put(curLevelKey[:], data)
327 if e != nil {
328 return e
329 }
330 }
331 return nil
332 }
333 // Loop forwards through the levels while removing entries until the specified number has been removed. This will
334 // potentially result in entirely empty lower levels which will be backfilled below.
335 var highestLoadedLevel uint8
336 numRemaining := count
337 for level := uint8(0); numRemaining > 0; level++ {
338 // Load the data for the level from the database.
339 curLevelKey := keyForLevel(addrKey, level)
340 curLevelData := bucket.Get(curLevelKey[:])
341 if len(curLevelData) == 0 && numRemaining > 0 {
342 return AssertError(
343 fmt.Sprintf(
344 "dbRemoveAddrIndexEntries "+
345 "not enough entries for address key %x to "+
346 "delete %d entries", addrKey, count,
347 ),
348 )
349 }
350 pendingUpdates[level] = curLevelData
351 highestLoadedLevel = level
352 // Delete the entire level as needed.
353 numEntries := len(curLevelData) / txEntrySize
354 if numRemaining >= numEntries {
355 pendingUpdates[level] = nil
356 numRemaining -= numEntries
357 continue
358 }
359 // Remove remaining entries to delete from the level.
360 offsetEnd := len(curLevelData) - (numRemaining * txEntrySize)
361 pendingUpdates[level] = curLevelData[:offsetEnd]
362 break
363 }
364 // When all elements in level 0 were not removed there is nothing left to do other than updating the database.
365 if len(pendingUpdates[0]) != 0 {
366 return applyPending()
367 }
368 // At this point there are one or more empty levels before the current level which need to be backfilled and the
369 // current level might have had some entries deleted from it as well. Since all levels after/ level 0 are required
370 // to either be empty, half full, or completely full, the current level must be adjusted accordingly by backfilling
371 // each previous levels in a way which satisfies the requirements. Any entries that are left are assigned to level 0
372 // after the loop as they are guaranteed to fit by the logic in the loop. In other words, this effectively squashes
373 // all remaining entries in the current level into the lowest possible levels while following the level rules. Note
374 // that the level after the current level might also have entries and gaps are not allowed, so this also keeps track
375 // of the lowest empty level so the code below knows how far to backfill in case it is required.
376 lowestEmptyLevel := uint8(255)
377 curLevelData := pendingUpdates[highestLoadedLevel]
378 curLevelMaxEntries := maxEntriesForLevel(highestLoadedLevel)
379 for level := highestLoadedLevel; level > 0; level-- {
380 // When there are not enough entries left in the current level for the number that would be required to reach
381 // it, clear the the current level which effectively moves them all up to the previous level on the next
382 // iteration. Otherwise, there are are sufficient entries, so update the current level to contain as many
383 // entries as possible while still leaving enough remaining entries required to reach the level.
384 numEntries := len(curLevelData) / txEntrySize
385 prevLevelMaxEntries := curLevelMaxEntries / 2
386 minPrevRequired := minEntriesToReachLevel(level - 1)
387 if numEntries < prevLevelMaxEntries+minPrevRequired {
388 lowestEmptyLevel = level
389 pendingUpdates[level] = nil
390 } else {
391 // This level can only be completely full or half full, so choose the appropriate offset to ensure enough
392 // entries remain to reach the level.
393 var offset int
394 if numEntries-curLevelMaxEntries >= minPrevRequired {
395 offset = curLevelMaxEntries * txEntrySize
396 } else {
397 offset = prevLevelMaxEntries * txEntrySize
398 }
399 pendingUpdates[level] = curLevelData[:offset]
400 curLevelData = curLevelData[offset:]
401 }
402 curLevelMaxEntries = prevLevelMaxEntries
403 }
404 pendingUpdates[0] = curLevelData
405 if len(curLevelData) == 0 {
406 lowestEmptyLevel = 0
407 }
408 // When the highest loaded level is empty, it's possible the level after it still has data and thus that data needs
409 // to be backfilled as well.
410 for len(pendingUpdates[highestLoadedLevel]) == 0 {
411 // When the next level is empty too, the is no data left to continue backfilling, so there is nothing left to
412 // do. Otherwise, populate the pending updates map with the newly loaded data and update the highest loaded
413 // level accordingly.
414 level := highestLoadedLevel + 1
415 curLevelKey := keyForLevel(addrKey, level)
416 levelData := bucket.Get(curLevelKey[:])
417 if len(levelData) == 0 {
418 break
419 }
420 pendingUpdates[level] = levelData
421 highestLoadedLevel = level
422 // At this point the highest level is not empty, but it might be half full. When that is the case, move it up a
423 // level to simplify the code below which backfills all lower levels that are still empty. This also means the
424 // current level will be empty, so the loop will perform another another iteration to potentially backfill this
425 // level with data from the next one.
426 curLevelMaxEntries := maxEntriesForLevel(level)
427 if len(levelData)/txEntrySize != curLevelMaxEntries {
428 pendingUpdates[level] = nil
429 pendingUpdates[level-1] = levelData
430 level--
431 curLevelMaxEntries /= 2
432 }
433 // Backfill all lower levels that are still empty by iteratively halfing the data until the lowest empty level
434 // is filled.
435 for level > lowestEmptyLevel {
436 offset := (curLevelMaxEntries / 2) * txEntrySize
437 pendingUpdates[level] = levelData[:offset]
438 levelData = levelData[offset:]
439 pendingUpdates[level-1] = levelData
440 level--
441 curLevelMaxEntries /= 2
442 }
443 // The lowest possible empty level is now the highest loaded level.
444 lowestEmptyLevel = highestLoadedLevel
445 }
446 // Apply the pending updates.
447 return applyPending()
448 }
449 450 // addrToKey converts known address types to an addrindex key. An error is returned for unsupported types.
451 func addrToKey(addr btcaddr.Address) ([addrKeySize]byte, error) {
452 switch addr := addr.(type) {
453 case *btcaddr.PubKeyHash:
454 var result [addrKeySize]byte
455 result[0] = addrKeyTypePubKeyHash
456 copy(result[1:], addr.Hash160()[:])
457 return result, nil
458 case *btcaddr.ScriptHash:
459 var result [addrKeySize]byte
460 result[0] = addrKeyTypeScriptHash
461 copy(result[1:], addr.Hash160()[:])
462 return result, nil
463 case *btcaddr.PubKey:
464 var result [addrKeySize]byte
465 result[0] = addrKeyTypePubKeyHash
466 copy(result[1:], addr.PubKeyHash().Hash160()[:])
467 return result, nil
468 // case *util.AddressWitnessScriptHash:
469 // var result [addrKeySize]byte
470 // result[0] = addrKeyTypeWitnessScriptHash
471 // // P2WSH outputs utilize a 32-byte data push created by hashing the script with sha256 instead of hash160. In
472 // // order to keep all address entries within the database uniform and compact, we use a hash160 here to reduce
473 // // the size of the salient data push to 20-bytes.
474 // copy(result[1:], btcaddr.Hash160(addr.ScriptAddress()))
475 // return result, nil
476 // case *util.AddressWitnessPubKeyHash:
477 // var result [addrKeySize]byte
478 // result[0] = addrKeyTypeWitnessPubKeyHash
479 // copy(result[1:], addr.Hash160()[:])
480 // return result, nil
481 }
482 return [addrKeySize]byte{}, errUnsupportedAddressType
483 }
484 485 // AddrIndex implements a transaction by address index. That is to say, it supports querying all transactions that
486 // reference a given address because they are either crediting or debiting the address. The returned transactions are
487 // ordered according to their order of appearance in the blockchain. In other words, first by block height and then by
488 // offset inside the block. In addition, support is provided for a memory-only index of unconfirmed transactions such as
489 // those which are kept in the memory pool before inclusion in a block.
490 type AddrIndex struct {
491 // The following fields are set when the instance is created and can't be changed afterwards, so there is no need to
492 // protect them with a separate mutex.
493 db database.DB
494 chainParams *chaincfg.Params
495 // The following fields are used to quickly link transactions and addresses that have not been included into a block
496 // yet when an address index is being maintained. The are protected by the unconfirmedLock field. The txnsByAddr
497 // field is used to keep an index of all transactions which either create an output to a given address or spend from
498 // a previous output to it keyed by the address. The addrsByTx field is essentially the reverse and is used to keep
499 // an index of all addresses which a given transaction involves. This allows fairly efficient updates when
500 // transactions are removed once they are included into a block.
501 unconfirmedLock sync.RWMutex
502 txnsByAddr map[[addrKeySize]byte]map[chainhash.Hash]*util.Tx
503 addrsByTx map[chainhash.Hash]map[[addrKeySize]byte]struct{}
504 }
505 506 // Ensure the AddrIndex type implements the Indexer interface.
507 var _ Indexer = (*AddrIndex)(nil)
508 509 // Ensure the AddrIndex type implements the NeedsInputser interface.
510 var _ NeedsInputser = (*AddrIndex)(nil)
511 512 // NeedsInputs signals that the index requires the referenced inputs in order to properly create the index. This
513 // implements the NeedsInputser interface.
514 func (idx *AddrIndex) NeedsInputs() bool {
515 return true
516 }
517 518 // Init is only provided to satisfy the Indexer interface as there is nothing to initialize for this index. This is part
519 // of the Indexer interface.
520 func (idx *AddrIndex) Init() (e error) {
521 // Nothing to do.
522 return nil
523 }
524 525 // Key returns the database key to use for the index as a byte slice. This is part of the Indexer interface.
526 func (idx *AddrIndex) Key() []byte {
527 return addrIndexKey
528 }
529 530 // Name returns the human-readable name of the index. This is part of the Indexer interface.
531 func (idx *AddrIndex) Name() string {
532 return addrIndexName
533 }
534 535 // Create is invoked when the indexer manager determines the index needs to be created for the first time. It creates
536 // the bucket for the address index. This is part of the Indexer interface.
537 func (idx *AddrIndex) Create(dbTx database.Tx) (e error) {
538 _, e = dbTx.Metadata().CreateBucket(addrIndexKey)
539 return e
540 }
541 542 // writeIndexData represents the address index data to be written for one block. It consists of the address mapped to an
543 // ordered list of the transactions that involve the address in block. It is ordered so the transactions can be stored
544 // in the order they appear in the block.
545 type writeIndexData map[[addrKeySize]byte][]int
546 547 // indexPkScript extracts all standard addresses from the passed public key script and maps each of them to the
548 // associated transaction using the passed map.
549 func (idx *AddrIndex) indexPkScript(data writeIndexData, pkScript []byte, txIdx int) {
550 // Nothing to index if the script is non-standard or otherwise doesn't contain any addresses.
551 var addrs []btcaddr.Address
552 var e error
553 _, addrs, _, e = txscript.ExtractPkScriptAddrs(
554 pkScript,
555 idx.chainParams,
556 )
557 if e != nil || len(addrs) == 0 {
558 return
559 }
560 for _, addr := range addrs {
561 var addrKey [21]byte
562 addrKey, e = addrToKey(addr)
563 if e != nil {
564 // Ignore unsupported address types.
565 continue
566 }
567 // Avoid inserting the transaction more than once. Since the transactions are indexed serially any duplicates
568 // will be indexed in a row, so checking the most recent entry for the address is enough to detect duplicates.
569 indexedTxns := data[addrKey]
570 numTxns := len(indexedTxns)
571 if numTxns > 0 && indexedTxns[numTxns-1] == txIdx {
572 continue
573 }
574 indexedTxns = append(indexedTxns, txIdx)
575 data[addrKey] = indexedTxns
576 }
577 }
578 579 // indexBlock extract all of the standard addresses from all of the transactions in the passed block and maps each of
580 // them to the associated transaction using the passed map.
581 func (idx *AddrIndex) indexBlock(
582 data writeIndexData, block *block.Block,
583 stxos []blockchain.SpentTxOut,
584 ) {
585 stxoIndex := 0
586 for txIdx, tx := range block.Transactions() {
587 // Coinbases do not reference any inputs. Since the block is required to have already gone through full
588 // validation, it has already been proven on the first transaction in the block is a coinbase.
589 if txIdx != 0 {
590 for range tx.MsgTx().TxIn {
591 // We'll access the slice of all the transactions spent in this block properly ordered to fetch the
592 // previous input script.
593 pkScript := stxos[stxoIndex].PkScript
594 idx.indexPkScript(data, pkScript, txIdx)
595 // With an input indexed, we'll advance the stxo coutner.
596 stxoIndex++
597 }
598 }
599 for _, txOut := range tx.MsgTx().TxOut {
600 idx.indexPkScript(data, txOut.PkScript, txIdx)
601 }
602 }
603 }
604 605 // ConnectBlock is invoked by the index manager when a new block has been connected to the main chain. This indexer adds
606 // a mapping for each address the transactions in the block involve. This is part of the Indexer interface.
607 func (idx *AddrIndex) ConnectBlock(
608 dbTx database.Tx, block *block.Block,
609 stxos []blockchain.SpentTxOut,
610 ) (e error) {
611 // The offset and length of the transactions within the serialized block.
612 txLocs, e := block.TxLoc()
613 if e != nil {
614 return e
615 }
616 // Get the internal block ID associated with the block.
617 blockID, e := dbFetchBlockIDByHash(dbTx, block.Hash())
618 if e != nil {
619 return e
620 }
621 // Build all of the address to transaction mappings in a local map.
622 addrsToTxns := make(writeIndexData)
623 idx.indexBlock(addrsToTxns, block, stxos)
624 // Add all of the index entries for each address.
625 addrIdxBucket := dbTx.Metadata().Bucket(addrIndexKey)
626 for addrKey, txIdxs := range addrsToTxns {
627 for _, txIdx := range txIdxs {
628 e := dbPutAddrIndexEntry(
629 addrIdxBucket, addrKey,
630 blockID, txLocs[txIdx],
631 )
632 if e != nil {
633 return e
634 }
635 }
636 }
637 return nil
638 }
639 640 // DisconnectBlock is invoked by the index manager when a block has been disconnected from the main chain. This indexer
641 // removes the address mappings each transaction in the block involve. This is part of the Indexer interface.
642 func (idx *AddrIndex) DisconnectBlock(
643 dbTx database.Tx, block *block.Block,
644 stxos []blockchain.SpentTxOut,
645 ) (e error) {
646 // Build all of the address to transaction mappings in a local map.
647 addrsToTxns := make(writeIndexData)
648 idx.indexBlock(addrsToTxns, block, stxos)
649 // Remove all of the index entries for each address.
650 bucket := dbTx.Metadata().Bucket(addrIndexKey)
651 for addrKey, txIdxs := range addrsToTxns {
652 e := dbRemoveAddrIndexEntries(bucket, addrKey, len(txIdxs))
653 if e != nil {
654 return e
655 }
656 }
657 return nil
658 }
659 660 // TxRegionsForAddress returns a slice of block regions which identify each transaction that involves the passed address
661 // according to the specified number to skip, number requested, and whether or not the results should be reversed. It
662 // also returns the number actually skipped since it could be less in the case where there are not enough entries. NOTE:
663 // These results only include transactions confirmed in blocks. See the UnconfirmedTxnsForAddress method for obtaining
664 // unconfirmed transactions that involve a given address. This function is safe for concurrent access.
665 func (idx *AddrIndex) TxRegionsForAddress(
666 dbTx database.Tx, addr btcaddr.Address, numToSkip, numRequested uint32,
667 reverse bool,
668 ) ([]database.BlockRegion, uint32, error) {
669 addrKey, e := addrToKey(addr)
670 if e != nil {
671 return nil, 0, e
672 }
673 var regions []database.BlockRegion
674 var skipped uint32
675 e = idx.db.View(
676 func(dbTx database.Tx) (e error) {
677 // Create closure to lookup the block hash given the ID using the database transaction.
678 fetchBlockHash := func(id []byte) (*chainhash.Hash, error) {
679 // Deserialize and populate the result.
680 return dbFetchBlockHashBySerializedID(dbTx, id)
681 }
682 addrIdxBucket := dbTx.Metadata().Bucket(addrIndexKey)
683 regions, skipped, e = dbFetchAddrIndexEntries(
684 addrIdxBucket,
685 addrKey, numToSkip, numRequested, reverse,
686 fetchBlockHash,
687 )
688 return e
689 },
690 )
691 return regions, skipped, e
692 }
693 694 // indexUnconfirmedAddresses modifies the unconfirmed (memory-only) address index to include mappings for the addresses
695 // encoded by the passed public key script to the transaction. This function is safe for concurrent access.
696 func (idx *AddrIndex) indexUnconfirmedAddresses(pkScript []byte, tx *util.Tx) {
697 // The error is ignored here since the only reason it can fail is if the script fails to parse and it was already
698 // validated before being admitted to the mempool.
699 _, addresses, _, _ := txscript.ExtractPkScriptAddrs(
700 pkScript,
701 idx.chainParams,
702 )
703 for _, addr := range addresses {
704 // Ignore unsupported address types.
705 addrKey, e := addrToKey(addr)
706 if e != nil {
707 continue
708 }
709 // Add a mapping from the address to the transaction.
710 idx.unconfirmedLock.Lock()
711 addrIndexEntry := idx.txnsByAddr[addrKey]
712 if addrIndexEntry == nil {
713 addrIndexEntry = make(map[chainhash.Hash]*util.Tx)
714 idx.txnsByAddr[addrKey] = addrIndexEntry
715 }
716 addrIndexEntry[*tx.Hash()] = tx
717 // Add a mapping from the transaction to the address.
718 addrsByTxEntry := idx.addrsByTx[*tx.Hash()]
719 if addrsByTxEntry == nil {
720 addrsByTxEntry = make(map[[addrKeySize]byte]struct{})
721 idx.addrsByTx[*tx.Hash()] = addrsByTxEntry
722 }
723 addrsByTxEntry[addrKey] = struct{}{}
724 idx.unconfirmedLock.Unlock()
725 }
726 }
727 728 // AddUnconfirmedTx adds all addresses related to the transaction to the unconfirmed (memory-only) address index. NOTE:
729 // This transaction MUST have already been validated by the memory pool before calling this function with it and have
730 // all of the inputs available in the provided utxo view. Failure to do so could result in some or all addresses not
731 // being indexed. This function is safe for concurrent access.
732 func (idx *AddrIndex) AddUnconfirmedTx(tx *util.Tx, utxoView *blockchain.UtxoViewpoint) {
733 // Index addresses of all referenced previous transaction outputs. The existence checks are elided since this is
734 // only called after the transaction has already been validated and thus all inputs are already known to exist.
735 for _, txIn := range tx.MsgTx().TxIn {
736 entry := utxoView.LookupEntry(txIn.PreviousOutPoint)
737 if entry == nil {
738 // Ignore missing entries. This should never happen in practice since the function comments specifically
739 // call out all inputs must be available.
740 continue
741 }
742 idx.indexUnconfirmedAddresses(entry.PkScript(), tx)
743 }
744 // Index addresses of all created outputs.
745 for _, txOut := range tx.MsgTx().TxOut {
746 idx.indexUnconfirmedAddresses(txOut.PkScript, tx)
747 }
748 }
749 750 // RemoveUnconfirmedTx removes the passed transaction from the unconfirmed (memory-only) address index. This function is
751 // safe for concurrent access.
752 func (idx *AddrIndex) RemoveUnconfirmedTx(hash *chainhash.Hash) {
753 idx.unconfirmedLock.Lock()
754 defer idx.unconfirmedLock.Unlock()
755 // Remove all address references to the transaction from the address index and remove the entry for the address
756 // altogether if it no longer references any transactions.
757 for addrKey := range idx.addrsByTx[*hash] {
758 delete(idx.txnsByAddr[addrKey], *hash)
759 if len(idx.txnsByAddr[addrKey]) == 0 {
760 delete(idx.txnsByAddr, addrKey)
761 }
762 }
763 // Remove the entry from the transaction to address lookup map as well.
764 delete(idx.addrsByTx, *hash)
765 }
766 767 // UnconfirmedTxnsForAddress returns all transactions currently in the unconfirmed (memory-only) address index that
768 // involve the passed address. Unsupported address types are ignored and will result in no results. This function is
769 // safe for concurrent access.
770 func (idx *AddrIndex) UnconfirmedTxnsForAddress(addr btcaddr.Address) []*util.Tx {
771 // Ignore unsupported address types.
772 addrKey, e := addrToKey(addr)
773 if e != nil {
774 return nil
775 }
776 // Protect concurrent access.
777 idx.unconfirmedLock.RLock()
778 defer idx.unconfirmedLock.RUnlock()
779 // Return a new slice with the results if there are any. This ensures safe concurrency.
780 if txns, exists := idx.txnsByAddr[addrKey]; exists {
781 addressTxns := make([]*util.Tx, 0, len(txns))
782 for _, tx := range txns {
783 addressTxns = append(addressTxns, tx)
784 }
785 return addressTxns
786 }
787 return nil
788 }
789 790 // NewAddrIndex returns a new instance of an indexer that is used to create a mapping of all addresses in the blockchain
791 // to the respective transactions that involve them. It implements the Indexer interface which plugs into the
792 // IndexManager that in turn is used by the blockchain package. This allows the index to be seamlessly maintained along
793 // with the chain.
794 func NewAddrIndex(db database.DB, chainParams *chaincfg.Params) *AddrIndex {
795 return &AddrIndex{
796 db: db,
797 chainParams: chainParams,
798 txnsByAddr: make(map[[addrKeySize]byte]map[chainhash.Hash]*util.Tx),
799 addrsByTx: make(map[chainhash.Hash]map[[addrKeySize]byte]struct{}),
800 }
801 }
802 803 // DropAddrIndex drops the address index from the provided database if it exists.
804 func DropAddrIndex(db database.DB, interrupt qu.C) (e error) {
805 return dropIndex(db, addrIndexKey, addrIndexName, interrupt)
806 }
807