rescan.go raw

   1  package wallet
   2  
   3  import (
   4  	"github.com/p9c/p9/pkg/log"
   5  	"github.com/p9c/p9/pkg/btcaddr"
   6  	"github.com/p9c/p9/pkg/chainclient"
   7  	"github.com/p9c/p9/pkg/txscript"
   8  	"github.com/p9c/p9/pkg/waddrmgr"
   9  	"github.com/p9c/p9/pkg/wire"
  10  	"github.com/p9c/p9/pkg/wtxmgr"
  11  )
  12  
  13  // RescanProgressMsg reports the current progress made by a rescan for a set of
  14  // wallet addresses.
  15  type RescanProgressMsg struct {
  16  	Addresses    []btcaddr.Address
  17  	Notification *chainclient.RescanProgress
  18  }
  19  
  20  // RescanFinishedMsg reports the addresses that were rescanned when a
  21  // rescanfinished message was received rescanning a batch of addresses.
  22  type RescanFinishedMsg struct {
  23  	Addresses    []btcaddr.Address
  24  	Notification *chainclient.RescanFinished
  25  }
  26  
  27  // RescanJob is a job to be processed by the RescanManager. The job includes a
  28  // set of wallet addresses, a starting height to begin the rescan, and outpoints
  29  // spendable by the addresses thought to be unspent. After the rescan completes,
  30  // the error result of the rescan RPC is sent on the Err channel.
  31  type RescanJob struct {
  32  	InitialSync bool
  33  	Addrs       []btcaddr.Address
  34  	OutPoints   map[wire.OutPoint]btcaddr.Address
  35  	BlockStamp  waddrmgr.BlockStamp
  36  	err         chan error
  37  }
  38  
  39  // rescanBatch is a collection of one or more RescanJobs that were merged
  40  // together before a rescan is performed.
  41  type rescanBatch struct {
  42  	initialSync bool
  43  	addrs       []btcaddr.Address
  44  	outpoints   map[wire.OutPoint]btcaddr.Address
  45  	bs          waddrmgr.BlockStamp
  46  	errChans    []chan error
  47  }
  48  
  49  // SubmitRescan submits a RescanJob to the RescanManager. A channel is returned
  50  // with the final error of the rescan. The channel is buffered and does not need
  51  // to be read to prevent a deadlock.
  52  func (w *Wallet) SubmitRescan(job *RescanJob) <-chan error {
  53  	errChan := make(chan error, 1)
  54  	job.err = errChan
  55  	w.rescanAddJob <- job
  56  	return errChan
  57  }
  58  
  59  // batch creates the rescanBatch for a single rescan job.
  60  func (job *RescanJob) batch() *rescanBatch {
  61  	return &rescanBatch{
  62  		initialSync: job.InitialSync,
  63  		addrs:       job.Addrs,
  64  		outpoints:   job.OutPoints,
  65  		bs:          job.BlockStamp,
  66  		errChans:    []chan error{job.err},
  67  	}
  68  }
  69  
  70  // merge merges the work from k into j, setting the starting height to the
  71  // minimum of the two jobs. This method does not check for duplicate addresses
  72  // or outpoints.
  73  func (b *rescanBatch) merge(job *RescanJob) {
  74  	if job.InitialSync {
  75  		b.initialSync = true
  76  	}
  77  	b.addrs = append(b.addrs, job.Addrs...)
  78  	for op, addr := range job.OutPoints {
  79  		b.outpoints[op] = addr
  80  	}
  81  	if job.BlockStamp.Height < b.bs.Height {
  82  		b.bs = job.BlockStamp
  83  	}
  84  	b.errChans = append(b.errChans, job.err)
  85  }
  86  
  87  // done iterates through all error channels, duplicating sending the error to
  88  // inform callers that the rescan finished (or could not complete due to an
  89  // error).
  90  func (b *rescanBatch) done(e error) {
  91  	for _, c := range b.errChans {
  92  		c <- e
  93  	}
  94  }
  95  
  96  // rescanBatchHandler handles incoming rescan request, serializing rescan
  97  // submissions, and possibly batching many waiting requests together so they can
  98  // be handled by a single rescan after the current one completes.
  99  func (w *Wallet) rescanBatchHandler() {
 100  	var curBatch, nextBatch *rescanBatch
 101  	quit := w.quitChan()
 102  out:
 103  	for {
 104  		select {
 105  		case job := <-w.rescanAddJob:
 106  			if curBatch == nil {
 107  				// Set current batch as this job and send request.
 108  				curBatch = job.batch()
 109  				w.rescanBatch <- curBatch
 110  			} else {
 111  				// Create next batch if it doesn't exist, or merge the job.
 112  				if nextBatch == nil {
 113  					nextBatch = job.batch()
 114  				} else {
 115  					nextBatch.merge(job)
 116  				}
 117  			}
 118  		case n := <-w.rescanNotifications:
 119  			switch n := n.(type) {
 120  			case *chainclient.RescanProgress:
 121  				if curBatch == nil {
 122  					W.Ln(
 123  						"received rescan progress notification but no rescan currently running",
 124  					)
 125  					continue
 126  				}
 127  				w.rescanProgress <- &RescanProgressMsg{
 128  					Addresses:    curBatch.addrs,
 129  					Notification: n,
 130  				}
 131  			case *chainclient.RescanFinished:
 132  				if curBatch == nil {
 133  					W.Ln(
 134  						"received rescan finished notification but no rescan currently running",
 135  					)
 136  					continue
 137  				}
 138  				w.rescanFinished <- &RescanFinishedMsg{
 139  					Addresses:    curBatch.addrs,
 140  					Notification: n,
 141  				}
 142  				curBatch, nextBatch = nextBatch, nil
 143  				if curBatch != nil {
 144  					w.rescanBatch <- curBatch
 145  				}
 146  			default:
 147  				// Unexpected message
 148  				panic(n)
 149  			}
 150  		case <-quit.Wait():
 151  			break out
 152  		}
 153  	}
 154  	w.wg.Done()
 155  }
 156  
 157  // rescanProgressHandler handles notifications for partially and fully completed
 158  // rescans by marking each rescanned address as partially or fully synced.
 159  func (w *Wallet) rescanProgressHandler() {
 160  	quit := w.quitChan()
 161  out:
 162  	for {
 163  		// These can't be processed out of order since both chans are unbuffured and are
 164  		// sent from same context (the batch handler).
 165  		select {
 166  		case msg := <-w.rescanProgress:
 167  			n := msg.Notification
 168  			I.F(
 169  				"rescanned through block %v (height %d)",
 170  				n.Hash, n.Height,
 171  			)
 172  		case msg := <-w.rescanFinished:
 173  			n := msg.Notification
 174  			addrs := msg.Addresses
 175  			noun := log.PickNoun(len(addrs), "address", "addresses")
 176  			I.F(
 177  				"finished rescan for %d %s (synced to block %s, height %d)",
 178  				len(addrs), noun, n.Hash, n.Height,
 179  			)
 180  			go w.resendUnminedTxs()
 181  		case <-quit.Wait():
 182  			break out
 183  		}
 184  	}
 185  	w.wg.Done()
 186  }
 187  
 188  // rescanRPCHandler reads batch jobs sent by rescanBatchHandler and sends the
 189  // RPC requests to perform a rescan. New jobs are not read until a rescan
 190  // finishes.
 191  func (w *Wallet) rescanRPCHandler() {
 192  	chainClient, e := w.requireChainClient()
 193  	if e != nil {
 194  		E.Ln("rescanRPCHandler called without an RPC client", e)
 195  		w.wg.Done()
 196  		return
 197  	}
 198  	quit := w.quitChan()
 199  out:
 200  	for {
 201  		select {
 202  		case batch := <-w.rescanBatch:
 203  			// Log the newly-started rescan.
 204  			numAddrs := len(batch.addrs)
 205  			noun := log.PickNoun(numAddrs, "address", "addresses")
 206  			I.F(
 207  				"started rescan from block %v (height %d) for %d %s",
 208  				batch.bs.Hash, batch.bs.Height, numAddrs, noun,
 209  			)
 210  			e := chainClient.Rescan(
 211  				&batch.bs.Hash, batch.addrs,
 212  				batch.outpoints,
 213  			)
 214  			if e != nil {
 215  				E.F(
 216  					"rescan for %d %s failed: %v", numAddrs, noun, e,
 217  				)
 218  			}
 219  			batch.done(e)
 220  		case <-quit.Wait():
 221  			break out
 222  		}
 223  	}
 224  	w.wg.Done()
 225  }
 226  
 227  // Rescan begins a rescan for all active addresses and unspent outputs of a
 228  // wallet. This is intended to be used to sync a wallet back up to the current
 229  // best block in the main chain, and is considered an initial sync rescan.
 230  func (w *Wallet) Rescan(addrs []btcaddr.Address, unspent []wtxmgr.Credit) (e error) {
 231  	return w.rescanWithTarget(addrs, unspent, nil)
 232  }
 233  
 234  // rescanWithTarget performs a rescan starting at the optional startStamp. If
 235  // none is provided, the rescan will begin from the manager's sync tip.
 236  func (w *Wallet) rescanWithTarget(
 237  	addrs []btcaddr.Address,
 238  	unspent []wtxmgr.Credit, startStamp *waddrmgr.BlockStamp,
 239  ) (e error) {
 240  	outpoints := make(map[wire.OutPoint]btcaddr.Address, len(unspent))
 241  	for _, output := range unspent {
 242  		var outputAddrs []btcaddr.Address
 243  		_, outputAddrs, _, e = txscript.ExtractPkScriptAddrs(
 244  			output.PkScript, w.chainParams,
 245  		)
 246  		if e != nil {
 247  			return e
 248  		}
 249  		outpoints[output.OutPoint] = outputAddrs[0]
 250  	}
 251  	// If a start block stamp was provided, we will use that as the initial starting
 252  	// point for the rescan.
 253  	if startStamp == nil {
 254  		startStamp = &waddrmgr.BlockStamp{}
 255  		*startStamp = w.Manager.SyncedTo()
 256  	}
 257  	job := &RescanJob{
 258  		InitialSync: true,
 259  		Addrs:       addrs,
 260  		OutPoints:   outpoints,
 261  		BlockStamp:  *startStamp,
 262  	}
 263  	// Submit merged job and block until rescan completes.
 264  	return <-w.SubmitRescan(job)
 265  }
 266