1 package wallet
2 3 import (
4 "bytes"
5 "github.com/p9c/p9/pkg/btcaddr"
6 "strings"
7 8 "github.com/p9c/p9/pkg/chainclient"
9 "github.com/p9c/p9/pkg/txscript"
10 wm "github.com/p9c/p9/pkg/waddrmgr"
11 "github.com/p9c/p9/pkg/walletdb"
12 tm "github.com/p9c/p9/pkg/wtxmgr"
13 )
14 15 func (w *Wallet) handleChainNotifications() {
16 defer w.wg.Done()
17 if w == nil {
18 panic("w should not be nil")
19 }
20 chainClient, e := w.requireChainClient()
21 if e != nil {
22 E.Ln("handleChainNotifications called without RPC client", e)
23 return
24 }
25 sync := func(w *Wallet) {
26 if w.db != nil {
27 // At the moment there is no recourse if the rescan fails for some reason, however, the wallet will not be
28 // marked synced and many methods will error early since the wallet is known to be out of date.
29 e := w.syncWithChain()
30 if e != nil && !w.ShuttingDown() {
31 W.Ln("unable to synchronize wallet to chain:", e)
32 }
33 }
34 }
35 catchUpHashes := func(
36 w *Wallet, client chainclient.Interface,
37 height int32,
38 ) (e error) {
39 // TODO(aakselrod): There's a race condition here, which happens when a reorg occurs between the rescanProgress
40 // notification and the last GetBlockHash call. The solution when using pod is to make pod send blockconnected
41 // notifications with each block the way Neutrino does, and get rid of the loop. The other alternative is to
42 // check the final hash and, if it doesn't match the original hash returned by the notification, to roll back
43 // and restart the rescan.
44 I.F(
45 "handleChainNotifications: catching up block hashes to height %d, this might take a while", height,
46 )
47 e = walletdb.Update(
48 w.db, func(tx walletdb.ReadWriteTx) (e error) {
49 ns := tx.ReadWriteBucket(waddrmgrNamespaceKey)
50 startBlock := w.Manager.SyncedTo()
51 for i := startBlock.Height + 1; i <= height; i++ {
52 hash, e := client.GetBlockHash(int64(i))
53 if e != nil {
54 return e
55 }
56 header, e := chainClient.GetBlockHeader(hash)
57 if e != nil {
58 return e
59 }
60 bs := wm.BlockStamp{
61 Height: i,
62 Hash: *hash,
63 Timestamp: header.Timestamp,
64 }
65 e = w.Manager.SetSyncedTo(ns, &bs)
66 if e != nil {
67 return e
68 }
69 }
70 return nil
71 },
72 )
73 if e != nil {
74 E.F(
75 "failed to update address manager sync state for height %d: %v",
76 height, e,
77 )
78 }
79 I.Ln("done catching up block hashes")
80 return e
81 }
82 for {
83 select {
84 case n, ok := <-chainClient.Notifications():
85 if !ok {
86 return
87 }
88 var notificationName string
89 var e error
90 switch n := n.(type) {
91 case chainclient.ClientConnected:
92 if w != nil {
93 go sync(w)
94 }
95 case chainclient.BlockConnected:
96 e = walletdb.Update(
97 w.db, func(tx walletdb.ReadWriteTx) (e error) {
98 return w.connectBlock(tx, tm.BlockMeta(n))
99 },
100 )
101 notificationName = "blockconnected"
102 case chainclient.BlockDisconnected:
103 e = walletdb.Update(
104 w.db, func(tx walletdb.ReadWriteTx) (e error) {
105 return w.disconnectBlock(tx, tm.BlockMeta(n))
106 },
107 )
108 notificationName = "blockdisconnected"
109 case chainclient.RelevantTx:
110 e = walletdb.Update(
111 w.db, func(tx walletdb.ReadWriteTx) (e error) {
112 return w.addRelevantTx(tx, n.TxRecord, n.Block)
113 },
114 )
115 notificationName = "recvtx/redeemingtx"
116 case chainclient.FilteredBlockConnected:
117 // Atomically update for the whole block.
118 if len(n.RelevantTxs) > 0 {
119 e = walletdb.Update(
120 w.db, func(
121 tx walletdb.ReadWriteTx,
122 ) (e error) {
123 for _, rec := range n.RelevantTxs {
124 e = w.addRelevantTx(
125 tx, rec,
126 n.Block,
127 )
128 if e != nil {
129 return e
130 }
131 }
132 return nil
133 },
134 )
135 }
136 notificationName = "filteredblockconnected"
137 // The following require some database maintenance, but also need to be reported to the wallet's rescan
138 // goroutine.
139 case *chainclient.RescanProgress:
140 e = catchUpHashes(w, chainClient, n.Height)
141 notificationName = "rescanprogress"
142 select {
143 case w.rescanNotifications <- n:
144 case <-w.quitChan().Wait():
145 return
146 }
147 case *chainclient.RescanFinished:
148 e = catchUpHashes(w, chainClient, n.Height)
149 notificationName = "rescanprogress"
150 w.SetChainSynced(true)
151 select {
152 case w.rescanNotifications <- n:
153 case <-w.quitChan().Wait():
154 return
155 }
156 }
157 if e != nil {
158 // On out-of-sync blockconnected notifications, only send a debug message.
159 errStr := "failed to process consensus server " +
160 "notification (name: `%s`, detail: `%v`)"
161 if notificationName == "blockconnected" &&
162 strings.Contains(
163 e.Error(),
164 "couldn't get hash from database",
165 ) {
166 D.F(errStr, notificationName, e)
167 } else {
168 E.F(errStr, notificationName, e)
169 }
170 }
171 case <-w.quit.Wait():
172 return
173 }
174 }
175 }
176 177 // connectBlock handles a chain server notification by marking a wallet that's currently in-sync with the chain server
178 // as being synced up to the passed block.
179 func (w *Wallet) connectBlock(dbtx walletdb.ReadWriteTx, b tm.BlockMeta) (e error) {
180 addrmgrNs := dbtx.ReadWriteBucket(waddrmgrNamespaceKey)
181 bs := wm.BlockStamp{
182 Height: b.Height,
183 Hash: b.Hash,
184 Timestamp: b.Time,
185 }
186 e = w.Manager.SetSyncedTo(addrmgrNs, &bs)
187 if e != nil {
188 return e
189 }
190 // Notify interested clients of the connected block.
191 //
192 // TODO: move all notifications outside of the database transaction.
193 w.NtfnServer.notifyAttachedBlock(dbtx, &b)
194 return nil
195 }
196 197 // disconnectBlock handles a chain server reorganize by rolling back all block history from the reorged block for a
198 // wallet in-sync with the chain server.
199 func (w *Wallet) disconnectBlock(dbtx walletdb.ReadWriteTx, b tm.BlockMeta) (e error) {
200 addrmgrNs := dbtx.ReadWriteBucket(waddrmgrNamespaceKey)
201 txmgrNs := dbtx.ReadWriteBucket(wtxmgrNamespaceKey)
202 if !w.ChainSynced() {
203 return nil
204 }
205 // Disconnect the removed block and all blocks after it if we know about the disconnected block. Otherwise, the
206 // block is in the future.
207 if b.Height <= w.Manager.SyncedTo().Height {
208 hash, e := w.Manager.BlockHash(addrmgrNs, b.Height)
209 if e != nil {
210 return e
211 }
212 if bytes.Equal(hash[:], b.Hash[:]) {
213 bs := wm.BlockStamp{
214 Height: b.Height - 1,
215 }
216 hash, e = w.Manager.BlockHash(addrmgrNs, bs.Height)
217 if e != nil {
218 return e
219 }
220 b.Hash = *hash
221 client := w.ChainClient()
222 header, e := client.GetBlockHeader(hash)
223 if e != nil {
224 return e
225 }
226 bs.Timestamp = header.Timestamp
227 e = w.Manager.SetSyncedTo(addrmgrNs, &bs)
228 if e != nil {
229 return e
230 }
231 e = w.TxStore.Rollback(txmgrNs, b.Height)
232 if e != nil {
233 return e
234 }
235 }
236 }
237 // Notify interested clients of the disconnected block.
238 w.NtfnServer.notifyDetachedBlock(&b.Hash)
239 return nil
240 }
241 func (w *Wallet) addRelevantTx(dbtx walletdb.ReadWriteTx, rec *tm.TxRecord, block *tm.BlockMeta) (e error) {
242 addrmgrNs := dbtx.ReadWriteBucket(waddrmgrNamespaceKey)
243 txmgrNs := dbtx.ReadWriteBucket(wtxmgrNamespaceKey)
244 // At the moment all notified transactions are assumed to actually be relevant. This assumption will not hold true
245 // when SPV support is added, but until then, simply insert the transaction because there should either be one or
246 // more relevant inputs or outputs.
247 e = w.TxStore.InsertTx(txmgrNs, rec, block)
248 if e != nil {
249 return e
250 }
251 // Chk every output to determine whether it is controlled by a wallet key. If so, mark the output as a credit.
252 for i, output := range rec.MsgTx.TxOut {
253 var addrs []btcaddr.Address
254 _, addrs, _, e = txscript.ExtractPkScriptAddrs(
255 output.PkScript,
256 w.chainParams,
257 )
258 if e != nil {
259 // Non-standard outputs are skipped.
260 continue
261 }
262 for _, addr := range addrs {
263 ma, e := w.Manager.Address(addrmgrNs, addr)
264 if e == nil {
265 // TODO: Credits should be added with the account they belong to, so tm is able to track per-account
266 // balances.
267 e = w.TxStore.AddCredit(
268 txmgrNs, rec, block, uint32(i),
269 ma.Internal(),
270 )
271 if e != nil {
272 return e
273 }
274 e = w.Manager.MarkUsed(addrmgrNs, addr)
275 if e != nil {
276 return e
277 }
278 T.Ln("marked address used:", addr)
279 continue
280 }
281 // Missing addresses are skipped. Other errors should be propagated.
282 if !wm.IsError(e, wm.ErrAddressNotFound) {
283 return e
284 }
285 }
286 }
287 // Send notification of mined or unmined transaction to any interested clients.
288 //
289 // TODO: Avoid the extra db hits.
290 if block == nil {
291 details, e := w.TxStore.UniqueTxDetails(txmgrNs, &rec.Hash, nil)
292 if e != nil {
293 E.Ln("cannot query transaction details for notification:", e)
294 }
295 // It's possible that the transaction was not found within the wallet's set of unconfirmed transactions due to
296 // it already being confirmed, so we'll avoid notifying it.
297 //
298 // TODO(wilmer): ideally we should find the culprit to why we're receiving an additional unconfirmed
299 // chain.RelevantTx notification from the chain backend.
300 if details != nil {
301 w.NtfnServer.notifyUnminedTransaction(dbtx, details)
302 }
303 } else {
304 details, e := w.TxStore.UniqueTxDetails(txmgrNs, &rec.Hash, &block.Block)
305 if e != nil {
306 E.Ln("cannot query transaction details for notification:", e)
307 }
308 // We'll only notify the transaction if it was found within the wallet's set of confirmed transactions.
309 if details != nil {
310 w.NtfnServer.notifyMinedTransaction(dbtx, details, block)
311 }
312 }
313 return nil
314 }
315