_neutrino.go raw

   1  package chainclient
   2  
   3  import (
   4  	"errors"
   5  	"fmt"
   6  	"github.com/p9c/p9/pkg/btcaddr"
   7  	"github.com/p9c/p9/pkg/chaincfg"
   8  	"sync"
   9  	"time"
  10  	
  11  	"github.com/p9c/p9/pkg/qu"
  12  	
  13  	sac "github.com/p9c/p9/cmd/spv"
  14  	"github.com/p9c/p9/pkg/chainhash"
  15  	"github.com/p9c/p9/pkg/gcs"
  16  	"github.com/p9c/p9/pkg/gcs/builder"
  17  	"github.com/p9c/p9/pkg/rpcclient"
  18  	"github.com/p9c/p9/pkg/txscript"
  19  	"github.com/p9c/p9/pkg/util"
  20  	"github.com/p9c/p9/pkg/waddrmgr"
  21  	"github.com/p9c/p9/pkg/wire"
  22  	"github.com/p9c/p9/pkg/wtxmgr"
  23  )
  24  
  25  // NeutrinoClient is an implementation of the btcwallet chainclient.Interface interface.
  26  type NeutrinoClient struct {
  27  	CS          *sac.ChainService
  28  	chainParams *chaincfg.Params
  29  	// We currently support one rescan/notification goroutine per client
  30  	rescan              *sac.Rescan
  31  	enqueueNotification chan interface{}
  32  	dequeueNotification chan interface{}
  33  	startTime           time.Time
  34  	lastProgressSent    bool
  35  	currentBlock        chan *waddrmgr.BlockStamp
  36  	quit                qu.C
  37  	rescanQuit          qu.C
  38  	rescanErr           <-chan error
  39  	wg                  sync.WaitGroup
  40  	started             bool
  41  	scanning            bool
  42  	finished            bool
  43  	isRescan            bool
  44  	clientMtx           sync.Mutex
  45  }
  46  
  47  // NewNeutrinoClient creates a new NeutrinoClient struct with a backing ChainService.
  48  func NewNeutrinoClient(
  49  	chainParams *chaincfg.Params,
  50  	chainService *sac.ChainService,
  51  ) *NeutrinoClient {
  52  	return &NeutrinoClient{
  53  		CS:          chainService,
  54  		chainParams: chainParams,
  55  	}
  56  }
  57  
  58  // BackEnd returns the name of the driver.
  59  func (s *NeutrinoClient) BackEnd() string {
  60  	return "neutrino"
  61  }
  62  
  63  // Start replicates the RPC client's Start method.
  64  func (s *NeutrinoClient) Start() (e error) {
  65  	s.CS.Start()
  66  	s.clientMtx.Lock()
  67  	defer s.clientMtx.Unlock()
  68  	if !s.started {
  69  		s.enqueueNotification = make(chan interface{})
  70  		s.dequeueNotification = make(chan interface{})
  71  		s.currentBlock = make(chan *waddrmgr.BlockStamp)
  72  		s.quit = qu.T()
  73  		s.started = true
  74  		s.wg.Add(1)
  75  		go func() {
  76  			select {
  77  			case s.enqueueNotification <- ClientConnected{}:
  78  			case <-s.quit.Wait():
  79  			}
  80  		}()
  81  		go s.notificationHandler()
  82  	}
  83  	return nil
  84  }
  85  
  86  // Stop replicates the RPC client's Stop method.
  87  func (s *NeutrinoClient) Stop() {
  88  	s.clientMtx.Lock()
  89  	defer s.clientMtx.Unlock()
  90  	if !s.started {
  91  		return
  92  	}
  93  	s.quit.Q()
  94  	s.started = false
  95  }
  96  
  97  // WaitForShutdown replicates the RPC client's WaitForShutdown method.
  98  func (s *NeutrinoClient) WaitForShutdown() {
  99  	s.wg.Wait()
 100  }
 101  
 102  // GetBlock replicates the RPC client's GetBlock command.
 103  func (s *NeutrinoClient) GetBlock(hash *chainhash.Hash) (*wire.Block, error) {
 104  	// TODO(roasbeef): add a block cache?
 105  	//  * which evication strategy? depends on use case
 106  	//  Should the block cache be INSIDE neutrino instead of in btcwallet?
 107  	block, e := s.CS.GetBlock(*hash)
 108  	if e != nil {
 109  		return nil, e
 110  	}
 111  	return block.WireBlock(), nil
 112  }
 113  
 114  // GetBlockHeight gets the height of a block by its hash. It serves as a replacement for the use of
 115  // GetBlockVerboseTxAsync for the wallet package since we can't actually return a FutureGetBlockVerboseResult because
 116  // the underlying type is private to rpcclient.
 117  func (s *NeutrinoClient) GetBlockHeight(hash *chainhash.Hash) (int32, error) {
 118  	return s.CS.GetBlockHeight(hash)
 119  }
 120  
 121  // GetBestBlock replicates the RPC client's GetBestBlock command.
 122  func (s *NeutrinoClient) GetBestBlock() (h *chainhash.Hash, height int32, e error) {
 123  	var chainTip *waddrmgr.BlockStamp
 124  	chainTip, e = s.CS.BestBlock()
 125  	if e != nil {
 126  		return nil, 0, e
 127  	}
 128  	return &chainTip.Hash, chainTip.Height, nil
 129  }
 130  
 131  // BlockStamp returns the latest block notified by the client, or an error if the client has been shut down.
 132  func (s *NeutrinoClient) BlockStamp() (*waddrmgr.BlockStamp, error) {
 133  	select {
 134  	case bs := <-s.currentBlock:
 135  		return bs, nil
 136  	case <-s.quit.Wait():
 137  		return nil, errors.New("disconnected")
 138  	}
 139  }
 140  
 141  // GetBlockHash returns the block hash for the given height, or an error if the client has been shut down or the hash at
 142  // the block height doesn't exist or is unknown.
 143  func (s *NeutrinoClient) GetBlockHash(height int64) (*chainhash.Hash, error) {
 144  	return s.CS.GetBlockHash(height)
 145  }
 146  
 147  // GetBlockHeader returns the block header for the given block hash, or an error if the client has been shut down or the
 148  // hash doesn't exist or is unknown.
 149  func (s *NeutrinoClient) GetBlockHeader(
 150  	blockHash *chainhash.Hash,
 151  ) (*wire.BlockHeader, error) {
 152  	return s.CS.GetBlockHeader(blockHash)
 153  }
 154  
 155  // SendRawTransaction replicates the RPC client's SendRawTransaction command.
 156  func (s *NeutrinoClient) SendRawTransaction(tx *wire.MsgTx, allowHighFees bool) (
 157  	*chainhash.Hash, error,
 158  ) {
 159  	e := s.CS.SendTransaction(tx)
 160  	if e != nil {
 161  		return nil, e
 162  	}
 163  	hash := tx.TxHash()
 164  	return &hash, nil
 165  }
 166  
 167  // FilterBlocks scans the blocks contained in the FilterBlocksRequest for any addresses of interest. For each requested
 168  // block, the corresponding compact filter will first be checked for matches, skipping those that do not report
 169  // anything. If the filter returns a positive match, the full block will be fetched and filtered. This method returns a
 170  // FilterBlocksResponse for the first block containing a matching address. If no matches are found in the range of
 171  // blocks requested, the returned response will be nil.
 172  func (s *NeutrinoClient) FilterBlocks(
 173  	req *FilterBlocksRequest,
 174  ) (*FilterBlocksResponse, error) {
 175  	blockFilterer := NewBlockFilterer(s.chainParams, req)
 176  	// Construct the watchlist using the addresses and outpoints contained in the filter blocks request.
 177  	watchList, e := buildFilterBlocksWatchList(req)
 178  	if e != nil {
 179  		return nil, e
 180  	}
 181  	// Iterate over the requested blocks, fetching the compact filter for each one, and matching it against the
 182  	// watchlist generated above. If the filter returns a positive match, the full block is then requested and scanned
 183  	// for addresses using the block filterer.
 184  	for i, blk := range req.Blocks {
 185  		filter, e := s.pollCFilter(&blk.Hash)
 186  		if e != nil {
 187  			return nil, e
 188  		}
 189  		// Skip any empty filters.
 190  		if filter == nil || filter.N() == 0 {
 191  			continue
 192  		}
 193  		key := builder.DeriveKey(&blk.Hash)
 194  		matched, e := filter.MatchAny(key, watchList)
 195  		if e != nil {
 196  			return nil, e
 197  		} else if !matched {
 198  			continue
 199  		}
 200  		T.F(
 201  			"fetching block height=%d hash=%v",
 202  			blk.Height, blk.Hash,
 203  		)
 204  		// TODO(conner): can optimize bandwidth by only fetching
 205  		// stripped blocks
 206  		rawBlock, e := s.GetBlock(&blk.Hash)
 207  		if e != nil {
 208  			return nil, e
 209  		}
 210  		if !blockFilterer.FilterBlock(rawBlock) {
 211  			continue
 212  		}
 213  		// If any external or internal addresses were detected in this block, we return them to the caller so that the
 214  		// rescan windows can widened with subsequent addresses. The `BatchIndex` is returned so that the caller can
 215  		// compute the *next* block from which to begin again.
 216  		resp := &FilterBlocksResponse{
 217  			BatchIndex:         uint32(i),
 218  			BlockMeta:          blk,
 219  			FoundExternalAddrs: blockFilterer.FoundExternal,
 220  			FoundInternalAddrs: blockFilterer.FoundInternal,
 221  			FoundOutPoints:     blockFilterer.FoundOutPoints,
 222  			RelevantTxns:       blockFilterer.RelevantTxns,
 223  		}
 224  		return resp, nil
 225  	}
 226  	// No addresses were found for this range.
 227  	return nil, nil
 228  }
 229  
 230  // buildFilterBlocksWatchList constructs a watchlist used for matching against a cfilter from a FilterBlocksRequest. The
 231  // watchlist will be populated with all external addresses, internal addresses, and outpoints contained in the request.
 232  func buildFilterBlocksWatchList(req *FilterBlocksRequest) ([][]byte, error) {
 233  	// Construct a watch list containing the script addresses of all internal and external addresses that were
 234  	// requested, in addition to the set of outpoints currently being watched.
 235  	watchListSize := len(req.ExternalAddrs) +
 236  		len(req.InternalAddrs) +
 237  		len(req.WatchedOutPoints)
 238  	watchList := make([][]byte, 0, watchListSize)
 239  	for _, addr := range req.ExternalAddrs {
 240  		p2shAddr, e := txscript.PayToAddrScript(addr)
 241  		if e != nil {
 242  			return nil, e
 243  		}
 244  		watchList = append(watchList, p2shAddr)
 245  	}
 246  	for _, addr := range req.InternalAddrs {
 247  		p2shAddr, e := txscript.PayToAddrScript(addr)
 248  		if e != nil {
 249  			return nil, e
 250  		}
 251  		watchList = append(watchList, p2shAddr)
 252  	}
 253  	for _, addr := range req.WatchedOutPoints {
 254  		addr, e := txscript.PayToAddrScript(addr)
 255  		if e != nil {
 256  			return nil, e
 257  		}
 258  		watchList = append(watchList, addr)
 259  	}
 260  	return watchList, nil
 261  }
 262  
 263  // pollCFilter attempts to fetch a CFilter from the neutrino client. This is used to get around the fact that the filter
 264  // headers may lag behind the highest known block header.
 265  func (s *NeutrinoClient) pollCFilter(hash *chainhash.Hash) (filter *gcs.Filter, e error) {
 266  	var (
 267  		count int
 268  	)
 269  	const maxFilterRetries = 50
 270  	for count < maxFilterRetries {
 271  		if count > 0 {
 272  			time.Sleep(100 * time.Millisecond)
 273  		}
 274  		filter, e = s.CS.GetCFilter(*hash, wire.GCSFilterRegular)
 275  		if e != nil {
 276  			count++
 277  			continue
 278  		}
 279  		return filter, nil
 280  	}
 281  	return nil, e
 282  }
 283  
 284  // Rescan replicates the RPC client's Rescan command.
 285  func (s *NeutrinoClient) Rescan(
 286  	startHash *chainhash.Hash, addrs []btcaddr.Address,
 287  	outPoints map[wire.OutPoint]btcaddr.Address,
 288  ) (e error) {
 289  	s.clientMtx.Lock()
 290  	defer s.clientMtx.Unlock()
 291  	if !s.started {
 292  		return fmt.Errorf(
 293  			"can't do a rescan when the chain client " +
 294  				"is not started",
 295  		)
 296  	}
 297  	if s.scanning {
 298  		// Restart the rescan by killing the existing rescan.
 299  		s.rescanQuit.Q()
 300  		s.clientMtx.Unlock()
 301  		s.rescan.WaitForShutdown()
 302  		s.clientMtx.Lock()
 303  		s.rescan = nil
 304  		s.rescanErr = nil
 305  	}
 306  	s.rescanQuit = qu.T()
 307  	s.scanning = true
 308  	s.finished = false
 309  	s.lastProgressSent = false
 310  	s.isRescan = true
 311  	bestBlock, e := s.CS.BestBlock()
 312  	if e != nil {
 313  		return fmt.Errorf("can't get chain service's best block: %s", e)
 314  	}
 315  	header, e := s.CS.GetBlockHeader(&bestBlock.Hash)
 316  	if e != nil {
 317  		return fmt.Errorf(
 318  			"can't get block header for hash %v: %s",
 319  			bestBlock.Hash, e,
 320  		)
 321  	}
 322  	// If the wallet is already fully caught up, or the rescan has started with state that indicates a "fresh" wallet,
 323  	// we'll send a notification indicating the rescan has "finished".
 324  	if header.BlockHash() == *startHash {
 325  		s.finished = true
 326  		select {
 327  		case s.enqueueNotification <- &RescanFinished{
 328  			Hash:   startHash,
 329  			Height: bestBlock.Height,
 330  			Time:   header.Timestamp,
 331  		}:
 332  		case <-s.quit.Wait():
 333  			return nil
 334  		case <-s.rescanQuit.Wait():
 335  			return nil
 336  		}
 337  	}
 338  	var inputsToWatch []sac.InputWithScript
 339  	for op, addr := range outPoints {
 340  		addrScript, e := txscript.PayToAddrScript(addr)
 341  		if e != nil {
 342  		}
 343  		inputsToWatch = append(
 344  			inputsToWatch, sac.InputWithScript{
 345  				OutPoint: op,
 346  				PkScript: addrScript,
 347  			},
 348  		)
 349  	}
 350  	newRescan := s.CS.NewRescan(
 351  		sac.NotificationHandlers(
 352  			rpcclient.NotificationHandlers{
 353  				OnBlockConnected:         s.onBlockConnected,
 354  				OnFilteredBlockConnected: s.onFilteredBlockConnected,
 355  				OnBlockDisconnected:      s.onBlockDisconnected,
 356  			},
 357  		),
 358  		sac.StartBlock(&waddrmgr.BlockStamp{Hash: *startHash}),
 359  		sac.StartTime(s.startTime),
 360  		sac.QuitChan(s.rescanQuit),
 361  		sac.WatchAddrs(addrs...),
 362  		sac.WatchInputs(inputsToWatch...),
 363  	)
 364  	s.rescan = newRescan
 365  	s.rescanErr = s.rescan.Start()
 366  	return nil
 367  }
 368  
 369  // NotifyBlocks replicates the RPC client's NotifyBlocks command.
 370  func (s *NeutrinoClient) NotifyBlocks() (e error) {
 371  	s.clientMtx.Lock()
 372  	// If we're scanning, we're already notifying on blocks. Otherwise, start a rescan without watching any addresses.
 373  	if !s.scanning {
 374  		s.clientMtx.Unlock()
 375  		return s.NotifyReceived([]btcaddr.Address{})
 376  	}
 377  	s.clientMtx.Unlock()
 378  	return nil
 379  }
 380  
 381  // NotifyReceived replicates the RPC client's NotifyReceived command.
 382  func (s *NeutrinoClient) NotifyReceived(addrs []btcaddr.Address) (e error) {
 383  	s.clientMtx.Lock()
 384  	// If we have a rescan running, we just need to add the appropriate addresses to the watch list.
 385  	if s.scanning {
 386  		s.clientMtx.Unlock()
 387  		return s.rescan.Update(sac.AddAddrs(addrs...))
 388  	}
 389  	s.rescanQuit = qu.T()
 390  	s.scanning = true
 391  	// Don't need RescanFinished or RescanProgress notifications.
 392  	s.finished = true
 393  	s.lastProgressSent = true
 394  	// Rescan with just the specified addresses.
 395  	newRescan := s.CS.NewRescan(
 396  		sac.NotificationHandlers(
 397  			rpcclient.NotificationHandlers{
 398  				OnBlockConnected:         s.onBlockConnected,
 399  				OnFilteredBlockConnected: s.onFilteredBlockConnected,
 400  				OnBlockDisconnected:      s.onBlockDisconnected,
 401  			},
 402  		),
 403  		sac.StartTime(s.startTime),
 404  		sac.QuitChan(s.rescanQuit),
 405  		sac.WatchAddrs(addrs...),
 406  	)
 407  	s.rescan = newRescan
 408  	s.rescanErr = s.rescan.Start()
 409  	s.clientMtx.Unlock()
 410  	return nil
 411  }
 412  
 413  // Notifications replicates the RPC client's Notifications method.
 414  func (s *NeutrinoClient) Notifications() <-chan interface{} {
 415  	return s.dequeueNotification
 416  }
 417  
 418  // SetStartTime is a non-interface method to set the birthday of the wallet using this object. Since only a single
 419  // rescan at a time is currently supported, only one birthday needs to be set. This does not fully restart a running
 420  // rescan, so should not be used to update a rescan while it is running. TODO: When factoring out to multiple rescans
 421  // per Neutrino client, add a birthday per client.
 422  func (s *NeutrinoClient) SetStartTime(startTime time.Time) {
 423  	s.clientMtx.Lock()
 424  	defer s.clientMtx.Unlock()
 425  	s.startTime = startTime
 426  }
 427  
 428  // onFilteredBlockConnected sends appropriate notifications to the notification channel.
 429  func (s *NeutrinoClient) onFilteredBlockConnected(
 430  	height int32,
 431  	header *wire.BlockHeader, relevantTxs []*util.Tx,
 432  ) {
 433  	ntfn := FilteredBlockConnected{
 434  		Block: &wtxmgr.BlockMeta{
 435  			Block: wtxmgr.Block{
 436  				Hash:   header.BlockHash(),
 437  				Height: height,
 438  			},
 439  			Time: header.Timestamp,
 440  		},
 441  	}
 442  	for _, tx := range relevantTxs {
 443  		rec, e := wtxmgr.NewTxRecordFromMsgTx(
 444  			tx.MsgTx(),
 445  			header.Timestamp,
 446  		)
 447  		if e != nil {
 448  			E.Ln(
 449  				"cannot create transaction record for relevant tx:", e,
 450  			)
 451  			// TODO(aakselrod): Return?
 452  			continue
 453  		}
 454  		ntfn.RelevantTxs = append(ntfn.RelevantTxs, rec)
 455  	}
 456  	select {
 457  	case s.enqueueNotification <- ntfn:
 458  	case <-s.quit.Wait():
 459  		return
 460  	case <-s.rescanQuit.Wait():
 461  		return
 462  	}
 463  	// Handle RescanFinished notification if required.
 464  	bs, e := s.CS.BestBlock()
 465  	if e != nil {
 466  		E.Ln("can't get chain service's best block:", e)
 467  		return
 468  	}
 469  	if bs.Hash == header.BlockHash() {
 470  		// Only send the RescanFinished notification once.
 471  		s.clientMtx.Lock()
 472  		if s.finished {
 473  			s.clientMtx.Unlock()
 474  			return
 475  		}
 476  		// Only send the RescanFinished notification once the underlying chain service sees itself as current.
 477  		current := s.CS.IsCurrent() && s.lastProgressSent
 478  		if current {
 479  			s.finished = true
 480  		}
 481  		s.clientMtx.Unlock()
 482  		if current {
 483  			select {
 484  			case s.enqueueNotification <- &RescanFinished{
 485  				Hash:   &bs.Hash,
 486  				Height: bs.Height,
 487  				Time:   header.Timestamp,
 488  			}:
 489  			case <-s.quit.Wait():
 490  				return
 491  			case <-s.rescanQuit.Wait():
 492  				return
 493  			}
 494  		}
 495  	}
 496  }
 497  
 498  // onBlockDisconnected sends appropriate notifications to the notification channel.
 499  func (s *NeutrinoClient) onBlockDisconnected(
 500  	hash *chainhash.Hash, height int32,
 501  	t time.Time,
 502  ) {
 503  	select {
 504  	case s.enqueueNotification <- BlockDisconnected{
 505  		Block: wtxmgr.Block{
 506  			Hash:   *hash,
 507  			Height: height,
 508  		},
 509  		Time: t,
 510  	}:
 511  	case <-s.quit.Wait():
 512  	case <-s.rescanQuit.Wait():
 513  	}
 514  }
 515  func (s *NeutrinoClient) onBlockConnected(
 516  	hash *chainhash.Hash, height int32,
 517  	time time.Time,
 518  ) {
 519  	// TODO: Move this closure out and parameterize it? Is it useful
 520  	// outside here?
 521  	sendRescanProgress := func() {
 522  		select {
 523  		case s.enqueueNotification <- &RescanProgress{
 524  			Hash:   hash,
 525  			Height: height,
 526  			Time:   time,
 527  		}:
 528  		case <-s.quit.Wait():
 529  		case <-s.rescanQuit.Wait():
 530  		}
 531  	}
 532  	// Only send BlockConnected notification if we're processing blocks before the birthday. Otherwise, we can just
 533  	// update using RescanProgress notifications.
 534  	if time.Before(s.startTime) {
 535  		// Send a RescanProgress notification every 10K blocks.
 536  		if height%10000 == 0 {
 537  			s.clientMtx.Lock()
 538  			shouldSend := s.isRescan && !s.finished
 539  			s.clientMtx.Unlock()
 540  			if shouldSend {
 541  				sendRescanProgress()
 542  			}
 543  		}
 544  	} else {
 545  		// Send a RescanProgress notification if we're just going over the boundary between pre-birthday and
 546  		// post-birthday blocks, and note that we've sent it.
 547  		s.clientMtx.Lock()
 548  		if !s.lastProgressSent {
 549  			shouldSend := s.isRescan && !s.finished
 550  			if shouldSend {
 551  				s.clientMtx.Unlock()
 552  				sendRescanProgress()
 553  				s.clientMtx.Lock()
 554  				s.lastProgressSent = true
 555  			}
 556  		}
 557  		s.clientMtx.Unlock()
 558  		select {
 559  		case s.enqueueNotification <- BlockConnected{
 560  			Block: wtxmgr.Block{
 561  				Hash:   *hash,
 562  				Height: height,
 563  			},
 564  			Time: time,
 565  		}:
 566  		case <-s.quit.Wait():
 567  		case <-s.rescanQuit.Wait():
 568  		}
 569  	}
 570  }
 571  
 572  // notificationHandler queues and dequeues notifications. There are currently no bounds on the queue, so the dequeue
 573  // channel should be read continually to avoid running out of memory.
 574  func (s *NeutrinoClient) notificationHandler() {
 575  	hash, height, e := s.GetBestBlock()
 576  	if e != nil {
 577  		E.F("failed to get best block from chain service:", e)
 578  		s.Stop()
 579  		s.wg.Done()
 580  		return
 581  	}
 582  	bs := &waddrmgr.BlockStamp{Hash: *hash, Height: height}
 583  	// TODO: Rather than leaving this as an unbounded queue for all types of notifications, try dropping ones where a
 584  	//  later enqueued notification can fully invalidate one waiting to be processed. For example, blockconnected
 585  	//  notifications for greater block heights can remove the need to process earlier blockconnected notifications still
 586  	//  waiting here.
 587  	var notifications []interface{}
 588  	enqueue := s.enqueueNotification
 589  	var dequeue chan interface{}
 590  	var next interface{}
 591  out:
 592  	for {
 593  		s.clientMtx.Lock()
 594  		rescanErr := s.rescanErr
 595  		s.clientMtx.Unlock()
 596  		select {
 597  		case n, ok := <-enqueue:
 598  			if !ok {
 599  				// If no notifications are queued for handling, the queue is finished.
 600  				if len(notifications) == 0 {
 601  					break out
 602  				}
 603  				// nil channel so no more reads can occur.
 604  				enqueue = nil
 605  				continue
 606  			}
 607  			if len(notifications) == 0 {
 608  				next = n
 609  				dequeue = s.dequeueNotification
 610  			}
 611  			notifications = append(notifications, n)
 612  		case dequeue <- next:
 613  			if n, ok := next.(BlockConnected); ok {
 614  				bs = &waddrmgr.BlockStamp{
 615  					Height: n.Height,
 616  					Hash:   n.Hash,
 617  				}
 618  			}
 619  			notifications[0] = nil
 620  			notifications = notifications[1:]
 621  			if len(notifications) != 0 {
 622  				next = notifications[0]
 623  			} else {
 624  				// If no more notifications can be enqueued, the queue is finished.
 625  				if enqueue == nil {
 626  					break out
 627  				}
 628  				dequeue = nil
 629  			}
 630  		case e := <-rescanErr:
 631  			if e != nil {
 632  				E.Ln("neutrino rescan ended with error:", e)
 633  			}
 634  		case s.currentBlock <- bs:
 635  		case <-s.quit.Wait():
 636  			break out
 637  		}
 638  	}
 639  	s.Stop()
 640  	close(s.dequeueNotification)
 641  	s.wg.Done()
 642  }
 643