chainntfns.go raw

   1  package wallet
   2  
   3  import (
   4  	"bytes"
   5  	"github.com/p9c/p9/pkg/btcaddr"
   6  	"strings"
   7  	
   8  	"github.com/p9c/p9/pkg/chainclient"
   9  	"github.com/p9c/p9/pkg/txscript"
  10  	wm "github.com/p9c/p9/pkg/waddrmgr"
  11  	"github.com/p9c/p9/pkg/walletdb"
  12  	tm "github.com/p9c/p9/pkg/wtxmgr"
  13  )
  14  
  15  func (w *Wallet) handleChainNotifications() {
  16  	defer w.wg.Done()
  17  	if w == nil {
  18  		panic("w should not be nil")
  19  	}
  20  	chainClient, e := w.requireChainClient()
  21  	if e != nil {
  22  		E.Ln("handleChainNotifications called without RPC client", e)
  23  		return
  24  	}
  25  	sync := func(w *Wallet) {
  26  		if w.db != nil {
  27  			// At the moment there is no recourse if the rescan fails for some reason, however, the wallet will not be
  28  			// marked synced and many methods will error early since the wallet is known to be out of date.
  29  			e := w.syncWithChain()
  30  			if e != nil && !w.ShuttingDown() {
  31  				W.Ln("unable to synchronize wallet to chain:", e)
  32  			}
  33  		}
  34  	}
  35  	catchUpHashes := func(
  36  		w *Wallet, client chainclient.Interface,
  37  		height int32,
  38  	) (e error) {
  39  		// TODO(aakselrod): There's a race condition here, which happens when a reorg occurs between the rescanProgress
  40  		//  notification and the last GetBlockHash call. The solution when using pod is to make pod send blockconnected
  41  		//  notifications with each block the way Neutrino does, and get rid of the loop. The other alternative is to
  42  		//  check the final hash and, if it doesn't match the original hash returned by the notification, to roll back
  43  		//  and restart the rescan.
  44  		I.F(
  45  			"handleChainNotifications: catching up block hashes to height %d, this might take a while", height,
  46  		)
  47  		e = walletdb.Update(
  48  			w.db, func(tx walletdb.ReadWriteTx) (e error) {
  49  				ns := tx.ReadWriteBucket(waddrmgrNamespaceKey)
  50  				startBlock := w.Manager.SyncedTo()
  51  				for i := startBlock.Height + 1; i <= height; i++ {
  52  					hash, e := client.GetBlockHash(int64(i))
  53  					if e != nil {
  54  						return e
  55  					}
  56  					header, e := chainClient.GetBlockHeader(hash)
  57  					if e != nil {
  58  						return e
  59  					}
  60  					bs := wm.BlockStamp{
  61  						Height:    i,
  62  						Hash:      *hash,
  63  						Timestamp: header.Timestamp,
  64  					}
  65  					e = w.Manager.SetSyncedTo(ns, &bs)
  66  					if e != nil {
  67  						return e
  68  					}
  69  				}
  70  				return nil
  71  			},
  72  		)
  73  		if e != nil {
  74  			E.F(
  75  				"failed to update address manager sync state for height %d: %v",
  76  				height, e,
  77  			)
  78  		}
  79  		I.Ln("done catching up block hashes")
  80  		return e
  81  	}
  82  	for {
  83  		select {
  84  		case n, ok := <-chainClient.Notifications():
  85  			if !ok {
  86  				return
  87  			}
  88  			var notificationName string
  89  			var e error
  90  			switch n := n.(type) {
  91  			case chainclient.ClientConnected:
  92  				if w != nil {
  93  					go sync(w)
  94  				}
  95  			case chainclient.BlockConnected:
  96  				e = walletdb.Update(
  97  					w.db, func(tx walletdb.ReadWriteTx) (e error) {
  98  						return w.connectBlock(tx, tm.BlockMeta(n))
  99  					},
 100  				)
 101  				notificationName = "blockconnected"
 102  			case chainclient.BlockDisconnected:
 103  				e = walletdb.Update(
 104  					w.db, func(tx walletdb.ReadWriteTx) (e error) {
 105  						return w.disconnectBlock(tx, tm.BlockMeta(n))
 106  					},
 107  				)
 108  				notificationName = "blockdisconnected"
 109  			case chainclient.RelevantTx:
 110  				e = walletdb.Update(
 111  					w.db, func(tx walletdb.ReadWriteTx) (e error) {
 112  						return w.addRelevantTx(tx, n.TxRecord, n.Block)
 113  					},
 114  				)
 115  				notificationName = "recvtx/redeemingtx"
 116  			case chainclient.FilteredBlockConnected:
 117  				// Atomically update for the whole block.
 118  				if len(n.RelevantTxs) > 0 {
 119  					e = walletdb.Update(
 120  						w.db, func(
 121  							tx walletdb.ReadWriteTx,
 122  						) (e error) {
 123  							for _, rec := range n.RelevantTxs {
 124  								e = w.addRelevantTx(
 125  									tx, rec,
 126  									n.Block,
 127  								)
 128  								if e != nil {
 129  									return e
 130  								}
 131  							}
 132  							return nil
 133  						},
 134  					)
 135  				}
 136  				notificationName = "filteredblockconnected"
 137  			// The following require some database maintenance, but also need to be reported to the wallet's rescan
 138  			// goroutine.
 139  			case *chainclient.RescanProgress:
 140  				e = catchUpHashes(w, chainClient, n.Height)
 141  				notificationName = "rescanprogress"
 142  				select {
 143  				case w.rescanNotifications <- n:
 144  				case <-w.quitChan().Wait():
 145  					return
 146  				}
 147  			case *chainclient.RescanFinished:
 148  				e = catchUpHashes(w, chainClient, n.Height)
 149  				notificationName = "rescanprogress"
 150  				w.SetChainSynced(true)
 151  				select {
 152  				case w.rescanNotifications <- n:
 153  				case <-w.quitChan().Wait():
 154  					return
 155  				}
 156  			}
 157  			if e != nil {
 158  				// On out-of-sync blockconnected notifications, only send a debug message.
 159  				errStr := "failed to process consensus server " +
 160  					"notification (name: `%s`, detail: `%v`)"
 161  				if notificationName == "blockconnected" &&
 162  					strings.Contains(
 163  						e.Error(),
 164  						"couldn't get hash from database",
 165  					) {
 166  					D.F(errStr, notificationName, e)
 167  				} else {
 168  					E.F(errStr, notificationName, e)
 169  				}
 170  			}
 171  		case <-w.quit.Wait():
 172  			return
 173  		}
 174  	}
 175  }
 176  
 177  // connectBlock handles a chain server notification by marking a wallet that's currently in-sync with the chain server
 178  // as being synced up to the passed block.
 179  func (w *Wallet) connectBlock(dbtx walletdb.ReadWriteTx, b tm.BlockMeta) (e error) {
 180  	addrmgrNs := dbtx.ReadWriteBucket(waddrmgrNamespaceKey)
 181  	bs := wm.BlockStamp{
 182  		Height:    b.Height,
 183  		Hash:      b.Hash,
 184  		Timestamp: b.Time,
 185  	}
 186  	e = w.Manager.SetSyncedTo(addrmgrNs, &bs)
 187  	if e != nil {
 188  		return e
 189  	}
 190  	// Notify interested clients of the connected block.
 191  	//
 192  	// TODO: move all notifications outside of the database transaction.
 193  	w.NtfnServer.notifyAttachedBlock(dbtx, &b)
 194  	return nil
 195  }
 196  
 197  // disconnectBlock handles a chain server reorganize by rolling back all block history from the reorged block for a
 198  // wallet in-sync with the chain server.
 199  func (w *Wallet) disconnectBlock(dbtx walletdb.ReadWriteTx, b tm.BlockMeta) (e error) {
 200  	addrmgrNs := dbtx.ReadWriteBucket(waddrmgrNamespaceKey)
 201  	txmgrNs := dbtx.ReadWriteBucket(wtxmgrNamespaceKey)
 202  	if !w.ChainSynced() {
 203  		return nil
 204  	}
 205  	// Disconnect the removed block and all blocks after it if we know about the disconnected block. Otherwise, the
 206  	// block is in the future.
 207  	if b.Height <= w.Manager.SyncedTo().Height {
 208  		hash, e := w.Manager.BlockHash(addrmgrNs, b.Height)
 209  		if e != nil {
 210  			return e
 211  		}
 212  		if bytes.Equal(hash[:], b.Hash[:]) {
 213  			bs := wm.BlockStamp{
 214  				Height: b.Height - 1,
 215  			}
 216  			hash, e = w.Manager.BlockHash(addrmgrNs, bs.Height)
 217  			if e != nil {
 218  				return e
 219  			}
 220  			b.Hash = *hash
 221  			client := w.ChainClient()
 222  			header, e := client.GetBlockHeader(hash)
 223  			if e != nil {
 224  				return e
 225  			}
 226  			bs.Timestamp = header.Timestamp
 227  			e = w.Manager.SetSyncedTo(addrmgrNs, &bs)
 228  			if e != nil {
 229  				return e
 230  			}
 231  			e = w.TxStore.Rollback(txmgrNs, b.Height)
 232  			if e != nil {
 233  				return e
 234  			}
 235  		}
 236  	}
 237  	// Notify interested clients of the disconnected block.
 238  	w.NtfnServer.notifyDetachedBlock(&b.Hash)
 239  	return nil
 240  }
 241  func (w *Wallet) addRelevantTx(dbtx walletdb.ReadWriteTx, rec *tm.TxRecord, block *tm.BlockMeta) (e error) {
 242  	addrmgrNs := dbtx.ReadWriteBucket(waddrmgrNamespaceKey)
 243  	txmgrNs := dbtx.ReadWriteBucket(wtxmgrNamespaceKey)
 244  	// At the moment all notified transactions are assumed to actually be relevant. This assumption will not hold true
 245  	// when SPV support is added, but until then, simply insert the transaction because there should either be one or
 246  	// more relevant inputs or outputs.
 247  	e = w.TxStore.InsertTx(txmgrNs, rec, block)
 248  	if e != nil {
 249  		return e
 250  	}
 251  	// Chk every output to determine whether it is controlled by a wallet key. If so, mark the output as a credit.
 252  	for i, output := range rec.MsgTx.TxOut {
 253  		var addrs []btcaddr.Address
 254  		_, addrs, _, e = txscript.ExtractPkScriptAddrs(
 255  			output.PkScript,
 256  			w.chainParams,
 257  		)
 258  		if e != nil {
 259  			// Non-standard outputs are skipped.
 260  			continue
 261  		}
 262  		for _, addr := range addrs {
 263  			ma, e := w.Manager.Address(addrmgrNs, addr)
 264  			if e == nil {
 265  				// TODO: Credits should be added with the account they belong to, so tm is able to track per-account
 266  				//  balances.
 267  				e = w.TxStore.AddCredit(
 268  					txmgrNs, rec, block, uint32(i),
 269  					ma.Internal(),
 270  				)
 271  				if e != nil {
 272  					return e
 273  				}
 274  				e = w.Manager.MarkUsed(addrmgrNs, addr)
 275  				if e != nil {
 276  					return e
 277  				}
 278  				T.Ln("marked address used:", addr)
 279  				continue
 280  			}
 281  			// Missing addresses are skipped. Other errors should be propagated.
 282  			if !wm.IsError(e, wm.ErrAddressNotFound) {
 283  				return e
 284  			}
 285  		}
 286  	}
 287  	// Send notification of mined or unmined transaction to any interested clients.
 288  	//
 289  	// TODO: Avoid the extra db hits.
 290  	if block == nil {
 291  		details, e := w.TxStore.UniqueTxDetails(txmgrNs, &rec.Hash, nil)
 292  		if e != nil {
 293  			E.Ln("cannot query transaction details for notification:", e)
 294  		}
 295  		// It's possible that the transaction was not found within the wallet's set of unconfirmed transactions due to
 296  		// it already being confirmed, so we'll avoid notifying it.
 297  		//
 298  		// TODO(wilmer): ideally we should find the culprit to why we're receiving an additional unconfirmed
 299  		//  chain.RelevantTx notification from the chain backend.
 300  		if details != nil {
 301  			w.NtfnServer.notifyUnminedTransaction(dbtx, details)
 302  		}
 303  	} else {
 304  		details, e := w.TxStore.UniqueTxDetails(txmgrNs, &rec.Hash, &block.Block)
 305  		if e != nil {
 306  			E.Ln("cannot query transaction details for notification:", e)
 307  		}
 308  		// We'll only notify the transaction if it was found within the wallet's set of confirmed transactions.
 309  		if details != nil {
 310  			w.NtfnServer.notifyMinedTransaction(dbtx, details, block)
 311  		}
 312  	}
 313  	return nil
 314  }
 315