1 package wallet
2 3 import (
4 "github.com/p9c/p9/pkg/log"
5 "github.com/p9c/p9/pkg/btcaddr"
6 "github.com/p9c/p9/pkg/chainclient"
7 "github.com/p9c/p9/pkg/txscript"
8 "github.com/p9c/p9/pkg/waddrmgr"
9 "github.com/p9c/p9/pkg/wire"
10 "github.com/p9c/p9/pkg/wtxmgr"
11 )
12 13 // RescanProgressMsg reports the current progress made by a rescan for a set of
14 // wallet addresses.
15 type RescanProgressMsg struct {
16 Addresses []btcaddr.Address
17 Notification *chainclient.RescanProgress
18 }
19 20 // RescanFinishedMsg reports the addresses that were rescanned when a
21 // rescanfinished message was received rescanning a batch of addresses.
22 type RescanFinishedMsg struct {
23 Addresses []btcaddr.Address
24 Notification *chainclient.RescanFinished
25 }
26 27 // RescanJob is a job to be processed by the RescanManager. The job includes a
28 // set of wallet addresses, a starting height to begin the rescan, and outpoints
29 // spendable by the addresses thought to be unspent. After the rescan completes,
30 // the error result of the rescan RPC is sent on the Err channel.
31 type RescanJob struct {
32 InitialSync bool
33 Addrs []btcaddr.Address
34 OutPoints map[wire.OutPoint]btcaddr.Address
35 BlockStamp waddrmgr.BlockStamp
36 err chan error
37 }
38 39 // rescanBatch is a collection of one or more RescanJobs that were merged
40 // together before a rescan is performed.
41 type rescanBatch struct {
42 initialSync bool
43 addrs []btcaddr.Address
44 outpoints map[wire.OutPoint]btcaddr.Address
45 bs waddrmgr.BlockStamp
46 errChans []chan error
47 }
48 49 // SubmitRescan submits a RescanJob to the RescanManager. A channel is returned
50 // with the final error of the rescan. The channel is buffered and does not need
51 // to be read to prevent a deadlock.
52 func (w *Wallet) SubmitRescan(job *RescanJob) <-chan error {
53 errChan := make(chan error, 1)
54 job.err = errChan
55 w.rescanAddJob <- job
56 return errChan
57 }
58 59 // batch creates the rescanBatch for a single rescan job.
60 func (job *RescanJob) batch() *rescanBatch {
61 return &rescanBatch{
62 initialSync: job.InitialSync,
63 addrs: job.Addrs,
64 outpoints: job.OutPoints,
65 bs: job.BlockStamp,
66 errChans: []chan error{job.err},
67 }
68 }
69 70 // merge merges the work from k into j, setting the starting height to the
71 // minimum of the two jobs. This method does not check for duplicate addresses
72 // or outpoints.
73 func (b *rescanBatch) merge(job *RescanJob) {
74 if job.InitialSync {
75 b.initialSync = true
76 }
77 b.addrs = append(b.addrs, job.Addrs...)
78 for op, addr := range job.OutPoints {
79 b.outpoints[op] = addr
80 }
81 if job.BlockStamp.Height < b.bs.Height {
82 b.bs = job.BlockStamp
83 }
84 b.errChans = append(b.errChans, job.err)
85 }
86 87 // done iterates through all error channels, duplicating sending the error to
88 // inform callers that the rescan finished (or could not complete due to an
89 // error).
90 func (b *rescanBatch) done(e error) {
91 for _, c := range b.errChans {
92 c <- e
93 }
94 }
95 96 // rescanBatchHandler handles incoming rescan request, serializing rescan
97 // submissions, and possibly batching many waiting requests together so they can
98 // be handled by a single rescan after the current one completes.
99 func (w *Wallet) rescanBatchHandler() {
100 var curBatch, nextBatch *rescanBatch
101 quit := w.quitChan()
102 out:
103 for {
104 select {
105 case job := <-w.rescanAddJob:
106 if curBatch == nil {
107 // Set current batch as this job and send request.
108 curBatch = job.batch()
109 w.rescanBatch <- curBatch
110 } else {
111 // Create next batch if it doesn't exist, or merge the job.
112 if nextBatch == nil {
113 nextBatch = job.batch()
114 } else {
115 nextBatch.merge(job)
116 }
117 }
118 case n := <-w.rescanNotifications:
119 switch n := n.(type) {
120 case *chainclient.RescanProgress:
121 if curBatch == nil {
122 W.Ln(
123 "received rescan progress notification but no rescan currently running",
124 )
125 continue
126 }
127 w.rescanProgress <- &RescanProgressMsg{
128 Addresses: curBatch.addrs,
129 Notification: n,
130 }
131 case *chainclient.RescanFinished:
132 if curBatch == nil {
133 W.Ln(
134 "received rescan finished notification but no rescan currently running",
135 )
136 continue
137 }
138 w.rescanFinished <- &RescanFinishedMsg{
139 Addresses: curBatch.addrs,
140 Notification: n,
141 }
142 curBatch, nextBatch = nextBatch, nil
143 if curBatch != nil {
144 w.rescanBatch <- curBatch
145 }
146 default:
147 // Unexpected message
148 panic(n)
149 }
150 case <-quit.Wait():
151 break out
152 }
153 }
154 w.wg.Done()
155 }
156 157 // rescanProgressHandler handles notifications for partially and fully completed
158 // rescans by marking each rescanned address as partially or fully synced.
159 func (w *Wallet) rescanProgressHandler() {
160 quit := w.quitChan()
161 out:
162 for {
163 // These can't be processed out of order since both chans are unbuffured and are
164 // sent from same context (the batch handler).
165 select {
166 case msg := <-w.rescanProgress:
167 n := msg.Notification
168 I.F(
169 "rescanned through block %v (height %d)",
170 n.Hash, n.Height,
171 )
172 case msg := <-w.rescanFinished:
173 n := msg.Notification
174 addrs := msg.Addresses
175 noun := log.PickNoun(len(addrs), "address", "addresses")
176 I.F(
177 "finished rescan for %d %s (synced to block %s, height %d)",
178 len(addrs), noun, n.Hash, n.Height,
179 )
180 go w.resendUnminedTxs()
181 case <-quit.Wait():
182 break out
183 }
184 }
185 w.wg.Done()
186 }
187 188 // rescanRPCHandler reads batch jobs sent by rescanBatchHandler and sends the
189 // RPC requests to perform a rescan. New jobs are not read until a rescan
190 // finishes.
191 func (w *Wallet) rescanRPCHandler() {
192 chainClient, e := w.requireChainClient()
193 if e != nil {
194 E.Ln("rescanRPCHandler called without an RPC client", e)
195 w.wg.Done()
196 return
197 }
198 quit := w.quitChan()
199 out:
200 for {
201 select {
202 case batch := <-w.rescanBatch:
203 // Log the newly-started rescan.
204 numAddrs := len(batch.addrs)
205 noun := log.PickNoun(numAddrs, "address", "addresses")
206 I.F(
207 "started rescan from block %v (height %d) for %d %s",
208 batch.bs.Hash, batch.bs.Height, numAddrs, noun,
209 )
210 e := chainClient.Rescan(
211 &batch.bs.Hash, batch.addrs,
212 batch.outpoints,
213 )
214 if e != nil {
215 E.F(
216 "rescan for %d %s failed: %v", numAddrs, noun, e,
217 )
218 }
219 batch.done(e)
220 case <-quit.Wait():
221 break out
222 }
223 }
224 w.wg.Done()
225 }
226 227 // Rescan begins a rescan for all active addresses and unspent outputs of a
228 // wallet. This is intended to be used to sync a wallet back up to the current
229 // best block in the main chain, and is considered an initial sync rescan.
230 func (w *Wallet) Rescan(addrs []btcaddr.Address, unspent []wtxmgr.Credit) (e error) {
231 return w.rescanWithTarget(addrs, unspent, nil)
232 }
233 234 // rescanWithTarget performs a rescan starting at the optional startStamp. If
235 // none is provided, the rescan will begin from the manager's sync tip.
236 func (w *Wallet) rescanWithTarget(
237 addrs []btcaddr.Address,
238 unspent []wtxmgr.Credit, startStamp *waddrmgr.BlockStamp,
239 ) (e error) {
240 outpoints := make(map[wire.OutPoint]btcaddr.Address, len(unspent))
241 for _, output := range unspent {
242 var outputAddrs []btcaddr.Address
243 _, outputAddrs, _, e = txscript.ExtractPkScriptAddrs(
244 output.PkScript, w.chainParams,
245 )
246 if e != nil {
247 return e
248 }
249 outpoints[output.OutPoint] = outputAddrs[0]
250 }
251 // If a start block stamp was provided, we will use that as the initial starting
252 // point for the rescan.
253 if startStamp == nil {
254 startStamp = &waddrmgr.BlockStamp{}
255 *startStamp = w.Manager.SyncedTo()
256 }
257 job := &RescanJob{
258 InitialSync: true,
259 Addrs: addrs,
260 OutPoints: outpoints,
261 BlockStamp: *startStamp,
262 }
263 // Submit merged job and block until rescan completes.
264 return <-w.SubmitRescan(job)
265 }
266