1 package indexers
2 3 import (
4 "errors"
5 "github.com/p9c/p9/pkg/block"
6 "github.com/p9c/p9/pkg/chaincfg"
7 8 "github.com/p9c/p9/pkg/qu"
9 10 "github.com/p9c/p9/pkg/blockchain"
11 "github.com/p9c/p9/pkg/chainhash"
12 "github.com/p9c/p9/pkg/database"
13 "github.com/p9c/p9/pkg/gcs"
14 "github.com/p9c/p9/pkg/gcs/builder"
15 "github.com/p9c/p9/pkg/wire"
16 )
17 18 const (
19 // cfIndexName is the human-readable name for the index.
20 cfIndexName = "committed filter index"
21 )
22 23 // Committed filters come in one flavor currently: basic. They are generated and dropped in pairs, and both are indexed
24 // by a block's hash. Besides holding different content, they also live in different buckets.
25 var (
26 // cfIndexParentBucketKey is the name of the parent bucket used to house the index. The rest of the buckets live
27 // below this bucket.
28 cfIndexParentBucketKey = []byte("cfindexparentbucket")
29 // cfIndexKeys is an array of db bucket names used to house indexes of block hashes to cfilters.
30 cfIndexKeys = [][]byte{
31 []byte("cf0byhashidx"),
32 }
33 // cfHeaderKeys is an array of db bucket names used to house indexes of block hashes to cf headers.
34 cfHeaderKeys = [][]byte{
35 []byte("cf0headerbyhashidx"),
36 }
37 // cfHashKeys is an array of db bucket names used to house indexes of block hashes to cf hashes.
38 cfHashKeys = [][]byte{
39 []byte("cf0hashbyhashidx"),
40 }
41 maxFilterType = uint8(len(cfHeaderKeys) - 1)
42 // zeroHash is the chainhash.Hash value of all zero bytes, defined here for convenience.
43 zeroHash chainhash.Hash
44 )
45 46 // dbFetchFilterIdxEntry retrieves a data blob from the filter index database. An entry's absence is not considered an
47 // error.
48 func dbFetchFilterIdxEntry(dbTx database.Tx, key []byte, h *chainhash.Hash) ([]byte, error) {
49 idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key)
50 return idx.Get(h[:]), nil
51 }
52 53 // dbStoreFilterIdxEntry stores a data blob in the filter index database.
54 func dbStoreFilterIdxEntry(dbTx database.Tx, key []byte, h *chainhash.Hash, f []byte) (e error) {
55 idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key)
56 return idx.Put(h[:], f)
57 }
58 59 // dbDeleteFilterIdxEntry deletes a data blob from the filter index database.
60 func dbDeleteFilterIdxEntry(dbTx database.Tx, key []byte, h *chainhash.Hash) (e error) {
61 idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key)
62 return idx.Delete(h[:])
63 }
64 65 // CFIndex implements a committed filter (cf) by hash index.
66 type CFIndex struct {
67 db database.DB
68 chainParams *chaincfg.Params
69 }
70 71 // Ensure the CfIndex type implements the Indexer interface.
72 var _ Indexer = (*CFIndex)(nil)
73 74 // Ensure the CfIndex type implements the NeedsInputser interface.
75 var _ NeedsInputser = (*CFIndex)(nil)
76 77 // NeedsInputs signals that the index requires the referenced inputs in order to properly create the index. This
78 // implements the NeedsInputser interface.
79 func (idx *CFIndex) NeedsInputs() bool {
80 return true
81 }
82 83 // Init initializes the hash-based cf index. This is part of the Indexer interface.
84 func (idx *CFIndex) Init() (e error) {
85 return nil // Nothing to do.
86 }
87 88 // Key returns the database key to use for the index as a byte slice. This is part of the Indexer interface.
89 func (idx *CFIndex) Key() []byte {
90 return cfIndexParentBucketKey
91 }
92 93 // Name returns the human-readable name of the index. This is part of the Indexer interface.
94 func (idx *CFIndex) Name() string {
95 return cfIndexName
96 }
97 98 // Create is invoked when the indexer manager determines the index needs to be created for the first time. It creates
99 // buckets for the two hash-based cf indexes (regular only currently).
100 func (idx *CFIndex) Create(dbTx database.Tx) (e error) {
101 meta := dbTx.Metadata()
102 cfIndexParentBucket, e := meta.CreateBucket(cfIndexParentBucketKey)
103 if e != nil {
104 return e
105 }
106 for _, bucketName := range cfIndexKeys {
107 _, e = cfIndexParentBucket.CreateBucket(bucketName)
108 if e != nil {
109 return e
110 }
111 }
112 for _, bucketName := range cfHeaderKeys {
113 _, e = cfIndexParentBucket.CreateBucket(bucketName)
114 if e != nil {
115 return e
116 }
117 }
118 for _, bucketName := range cfHashKeys {
119 _, e = cfIndexParentBucket.CreateBucket(bucketName)
120 if e != nil {
121 return e
122 }
123 }
124 return nil
125 }
126 127 // storeFilter stores a given filter, and performs the steps needed to generate the filter's header.
128 func storeFilter(
129 dbTx database.Tx, block *block.Block, f *gcs.Filter,
130 filterType wire.FilterType,
131 ) (e error) {
132 if uint8(filterType) > maxFilterType {
133 return errors.New("unsupported filter type")
134 }
135 // Figure out which buckets to use.
136 fkey := cfIndexKeys[filterType]
137 hkey := cfHeaderKeys[filterType]
138 hashkey := cfHashKeys[filterType]
139 // Start by storing the filter.
140 h := block.Hash()
141 filterBytes, e := f.NBytes()
142 if e != nil {
143 return e
144 }
145 e = dbStoreFilterIdxEntry(dbTx, fkey, h, filterBytes)
146 if e != nil {
147 return e
148 }
149 // Next store the filter hash.
150 filterHash, e := builder.GetFilterHash(f)
151 if e != nil {
152 return e
153 }
154 e = dbStoreFilterIdxEntry(dbTx, hashkey, h, filterHash[:])
155 if e != nil {
156 return e
157 }
158 // Then fetch the previous block's filter header.
159 var prevHeader *chainhash.Hash
160 ph := &block.WireBlock().Header.PrevBlock
161 if ph.IsEqual(&zeroHash) {
162 prevHeader = &zeroHash
163 } else {
164 var pfh []byte
165 pfh, e = dbFetchFilterIdxEntry(dbTx, hkey, ph)
166 if e != nil {
167 return e
168 }
169 // Construct the new block's filter header, and store it.
170 prevHeader, e = chainhash.NewHash(pfh)
171 if e != nil {
172 return e
173 }
174 }
175 fh, e := builder.MakeHeaderForFilter(f, *prevHeader)
176 if e != nil {
177 return e
178 }
179 return dbStoreFilterIdxEntry(dbTx, hkey, h, fh[:])
180 }
181 182 // ConnectBlock is invoked by the index manager when a new block has been connected to the main chain. This indexer adds
183 // a hash-to-cf mapping for every passed block. This is part of the Indexer interface.
184 func (idx *CFIndex) ConnectBlock(
185 dbTx database.Tx, block *block.Block,
186 stxos []blockchain.SpentTxOut,
187 ) (e error) {
188 prevScripts := make([][]byte, len(stxos))
189 for i, stxo := range stxos {
190 prevScripts[i] = stxo.PkScript
191 }
192 f, e := builder.BuildBasicFilter(block.WireBlock(), prevScripts)
193 if e != nil {
194 return e
195 }
196 return storeFilter(dbTx, block, f, wire.GCSFilterRegular)
197 }
198 199 // DisconnectBlock is invoked by the index manager when a block has been disconnected from the main chain. This indexer
200 // removes the hash-to-cf mapping for every passed block. This is part of the Indexer interface.
201 func (idx *CFIndex) DisconnectBlock(
202 dbTx database.Tx, block *block.Block,
203 _ []blockchain.SpentTxOut,
204 ) (e error) {
205 for _, key := range cfIndexKeys {
206 e := dbDeleteFilterIdxEntry(dbTx, key, block.Hash())
207 if e != nil {
208 return e
209 }
210 }
211 for _, key := range cfHeaderKeys {
212 e := dbDeleteFilterIdxEntry(dbTx, key, block.Hash())
213 if e != nil {
214 return e
215 }
216 }
217 for _, key := range cfHashKeys {
218 e := dbDeleteFilterIdxEntry(dbTx, key, block.Hash())
219 if e != nil {
220 return e
221 }
222 }
223 return nil
224 }
225 226 // entryByBlockHash fetches a filter index entry of a particular type (eg. filter, filter header, etc) for a filter type
227 // and block hash.
228 func (idx *CFIndex) entryByBlockHash(
229 filterTypeKeys [][]byte,
230 filterType wire.FilterType, h *chainhash.Hash,
231 ) (entry []byte, e error) {
232 if uint8(filterType) > maxFilterType {
233 return nil, errors.New("unsupported filter type")
234 }
235 key := filterTypeKeys[filterType]
236 e = idx.db.View(
237 func(dbTx database.Tx) (e error) {
238 entry, e = dbFetchFilterIdxEntry(dbTx, key, h)
239 return e
240 },
241 )
242 return entry, e
243 }
244 245 // entriesByBlockHashes batch fetches a filter index entry of a particular type (eg. filter, filter header, etc) for a
246 // filter type and slice of block hashes.
247 func (idx *CFIndex) entriesByBlockHashes(
248 filterTypeKeys [][]byte,
249 filterType wire.FilterType, blockHashes []*chainhash.Hash,
250 ) (entries [][]byte, e error) {
251 if uint8(filterType) > maxFilterType {
252 return nil, errors.New("unsupported filter type")
253 }
254 key := filterTypeKeys[filterType]
255 entries = make([][]byte, 0, len(blockHashes))
256 e = idx.db.View(
257 func(dbTx database.Tx) (e error) {
258 for _, blockHash := range blockHashes {
259 entry, e := dbFetchFilterIdxEntry(dbTx, key, blockHash)
260 if e != nil {
261 return e
262 }
263 entries = append(entries, entry)
264 }
265 return nil
266 },
267 )
268 return entries, e
269 }
270 271 // FilterByBlockHash returns the serialized contents of a block's basic or committed filter.
272 func (idx *CFIndex) FilterByBlockHash(
273 h *chainhash.Hash,
274 filterType wire.FilterType,
275 ) ([]byte, error) {
276 return idx.entryByBlockHash(cfIndexKeys, filterType, h)
277 }
278 279 // FiltersByBlockHashes returns the serialized contents of a block's basic or committed filter for a set of blocks by
280 // hash.
281 func (idx *CFIndex) FiltersByBlockHashes(
282 blockHashes []*chainhash.Hash,
283 filterType wire.FilterType,
284 ) ([][]byte, error) {
285 return idx.entriesByBlockHashes(cfIndexKeys, filterType, blockHashes)
286 }
287 288 // FilterHeaderByBlockHash returns the serialized contents of a block's basic committed filter header.
289 func (idx *CFIndex) FilterHeaderByBlockHash(
290 h *chainhash.Hash,
291 filterType wire.FilterType,
292 ) ([]byte, error) {
293 return idx.entryByBlockHash(cfHeaderKeys, filterType, h)
294 }
295 296 // FilterHeadersByBlockHashes returns the serialized contents of a block's basic committed filter header for a set of
297 // blocks by hash.
298 func (idx *CFIndex) FilterHeadersByBlockHashes(
299 blockHashes []*chainhash.Hash,
300 filterType wire.FilterType,
301 ) ([][]byte, error) {
302 return idx.entriesByBlockHashes(cfHeaderKeys, filterType, blockHashes)
303 }
304 305 // FilterHashByBlockHash returns the serialized contents of a block's basic committed filter hash.
306 func (idx *CFIndex) FilterHashByBlockHash(
307 h *chainhash.Hash,
308 filterType wire.FilterType,
309 ) ([]byte, error) {
310 return idx.entryByBlockHash(cfHashKeys, filterType, h)
311 }
312 313 // FilterHashesByBlockHashes returns the serialized contents of a block's basic committed filter hash for a set of
314 // blocks by hash.
315 func (idx *CFIndex) FilterHashesByBlockHashes(
316 blockHashes []*chainhash.Hash,
317 filterType wire.FilterType,
318 ) ([][]byte, error) {
319 return idx.entriesByBlockHashes(cfHashKeys, filterType, blockHashes)
320 }
321 322 // NewCfIndex returns a new instance of an indexer that is used to create a mapping of the hashes of all blocks in the
323 // blockchain to their respective committed filters. It implements the Indexer interface which plugs into the
324 // IndexManager that in turn is used by the blockchain package. This allows the index to be seamlessly maintained along
325 // with the chain.
326 func NewCfIndex(db database.DB, chainParams *chaincfg.Params) *CFIndex {
327 return &CFIndex{db: db, chainParams: chainParams}
328 }
329 330 // DropCfIndex drops the CF index from the provided database if exists.
331 func DropCfIndex(db database.DB, interrupt qu.C) (e error) {
332 return dropIndex(db, cfIndexParentBucketKey, cfIndexName, interrupt)
333 }
334