rpc_harness.go raw

   1  package rpctest
   2  
   3  import (
   4  	"fmt"
   5  	"io/ioutil"
   6  	"net"
   7  	"os"
   8  	"path/filepath"
   9  	"strconv"
  10  	"sync"
  11  	"testing"
  12  	"time"
  13  
  14  	"github.com/p9c/p9/pkg/amt"
  15  	"github.com/p9c/p9/pkg/block"
  16  	"github.com/p9c/p9/pkg/btcaddr"
  17  	"github.com/p9c/p9/pkg/chaincfg"
  18  
  19  	"github.com/p9c/p9/pkg/qu"
  20  
  21  	"github.com/p9c/p9/pkg/chainhash"
  22  	"github.com/p9c/p9/pkg/rpcclient"
  23  	"github.com/p9c/p9/pkg/util"
  24  	"github.com/p9c/p9/pkg/wire"
  25  )
  26  
  27  const (
  28  	// These constants define the minimum and maximum p2p and rpc port numbers used by a test harness. The min port is
  29  	// inclusive while the max port is exclusive.
  30  	minPeerPort = 10000
  31  	maxPeerPort = 35000
  32  	minRPCPort  = maxPeerPort
  33  	maxRPCPort  = 60000
  34  	// BlockVersion is the default block version used when generating blocks.
  35  	BlockVersion = 4
  36  )
  37  
  38  var (
  39  	// current number of active test nodes.
  40  	numTestInstances = 0
  41  	// processID is the process ID of the current running process, it is used to calculate ports based upon it when
  42  	// launching an rpc harnesses. The intent is to allow multiple process to run in parallel without port collisions.
  43  	// It should be noted however that there is still some small probability that there will be port collisions either
  44  	// due to other processes running or simply due to the stars aligning on the process IDs.
  45  	processID = os.Getpid()
  46  	// testInstances is a private package-level slice used to keep track of all active test harnesses. This global can
  47  	// be used to perform various "joins" shutdown several active harnesses after a test, etc.
  48  	testInstances = make(map[string]*Harness)
  49  	// Used to protest concurrent access to above declared variables.
  50  	harnessStateMtx sync.RWMutex
  51  )
  52  
  53  // HarnessTestCase represents a test-case which utilizes an instance of the Harness to exercise functionality.
  54  type HarnessTestCase func(r *Harness, t *testing.T)
  55  
  56  // Harness fully encapsulates an active pod process to provide a unified platform for creating rpc driven integration
  57  // tests involving pod. The active pod node will typically be run in simnet mode in order to allow for easy generation
  58  // of test blockchains. The active pod process is fully managed by Harness, which handles the necessary initialization,
  59  // and teardown of the process along with any temporary directories created as a result. Multiple Harness instances may
  60  // be run concurrently, in order to allow for testing complex scenarios involving multiple nodes. The harness also
  61  // includes an in-memory wallet to streamline various classes of tests.
  62  type Harness struct {
  63  	// ActiveNet is the parameters of the blockchain the Harness belongs to.
  64  	ActiveNet      *chaincfg.Params
  65  	Node           *rpcclient.Client
  66  	node           *node
  67  	handlers       *rpcclient.NotificationHandlers
  68  	wallet         *memWallet
  69  	testNodeDir    string
  70  	maxConnRetries int
  71  	nodeNum        int
  72  	sync.Mutex
  73  }
  74  
  75  // New creates and initializes new instance of the rpc test harness. Optionally, websocket handlers and a specified
  76  // configuration may be passed. In the case that a nil config is passed, a default configuration will be used. NOTE:
  77  // This function is safe for concurrent access.
  78  func New(
  79  	activeNet *chaincfg.Params, handlers *rpcclient.NotificationHandlers,
  80  	extraArgs []string,
  81  ) (*Harness, error) {
  82  	harnessStateMtx.Lock()
  83  	defer harnessStateMtx.Unlock()
  84  	// Add a flag for the appropriate network type based on the provided chain chaincfg.
  85  	switch activeNet.Net {
  86  	case wire.MainNet:
  87  		// No extra flags since mainnet is the default
  88  	case wire.TestNet3:
  89  		extraArgs = append(extraArgs, "--testnet")
  90  	case wire.TestNet:
  91  		extraArgs = append(extraArgs, "--regtest")
  92  	case wire.SimNet:
  93  		extraArgs = append(extraArgs, "--simnet")
  94  	default:
  95  		return nil, fmt.Errorf(
  96  			"rpctest.New must be called with one " +
  97  				"of the supported chain networks",
  98  		)
  99  	}
 100  	testDir, e := baseDir()
 101  	if e != nil {
 102  		return nil, e
 103  	}
 104  	harnessID := strconv.Itoa(numTestInstances)
 105  	nodeTestData, e := ioutil.TempDir(testDir, "harness-"+harnessID)
 106  	if e != nil {
 107  		return nil, e
 108  	}
 109  	certFile := filepath.Join(nodeTestData, "rpc.cert")
 110  	keyFile := filepath.Join(nodeTestData, "rpc.key")
 111  	if e = genCertPair(certFile, keyFile); E.Chk(e) {
 112  		return nil, e
 113  	}
 114  	wallet, e := newMemWallet(activeNet, uint32(numTestInstances))
 115  	if e != nil {
 116  		return nil, e
 117  	}
 118  	miningAddr := fmt.Sprintf("--miningaddr=%s", wallet.coinbaseAddr)
 119  	extraArgs = append(extraArgs, miningAddr)
 120  	config, e := newConfig("rpctest", certFile, keyFile, extraArgs)
 121  	if e != nil {
 122  		return nil, e
 123  	}
 124  	// Generate p2p+rpc listening addresses.
 125  	config.listen, config.rpcListen = generateListeningAddresses()
 126  	// Create the testing node bounded to the simnet.
 127  	node, e := newNode(config, nodeTestData)
 128  	if e != nil {
 129  		return nil, e
 130  	}
 131  	nodeNum := numTestInstances
 132  	numTestInstances++
 133  	if handlers == nil {
 134  		handlers = &rpcclient.NotificationHandlers{}
 135  	}
 136  	// If a handler for the OnFilteredBlock{Connected, Disconnected} callback callback has already been set, then create
 137  	// a wrapper callback which executes both the currently registered callback and the mem wallet's callback.
 138  	if handlers.OnFilteredBlockConnected != nil {
 139  		obc := handlers.OnFilteredBlockConnected
 140  		handlers.OnFilteredBlockConnected = func(height int32, header *wire.BlockHeader, filteredTxns []*util.Tx) {
 141  			wallet.IngestBlock(height, header, filteredTxns)
 142  			obc(height, header, filteredTxns)
 143  		}
 144  	} else {
 145  		// Otherwise, we can claim the callback ourselves.
 146  		handlers.OnFilteredBlockConnected = wallet.IngestBlock
 147  	}
 148  	if handlers.OnFilteredBlockDisconnected != nil {
 149  		obd := handlers.OnFilteredBlockDisconnected
 150  		handlers.OnFilteredBlockDisconnected = func(height int32, header *wire.BlockHeader) {
 151  			wallet.UnwindBlock(height, header)
 152  			obd(height, header)
 153  		}
 154  	} else {
 155  		handlers.OnFilteredBlockDisconnected = wallet.UnwindBlock
 156  	}
 157  	h := &Harness{
 158  		handlers:       handlers,
 159  		node:           node,
 160  		maxConnRetries: 20,
 161  		testNodeDir:    nodeTestData,
 162  		ActiveNet:      activeNet,
 163  		nodeNum:        nodeNum,
 164  		wallet:         wallet,
 165  	}
 166  	// Track this newly created test instance within the package level global map of all active test instances.
 167  	testInstances[h.testNodeDir] = h
 168  	return h, nil
 169  }
 170  
 171  // SetUp initializes the rpc test state. Initialization includes: starting up a simnet node, creating a websockets
 172  // client and connecting to the started node, and finally: optionally generating and submitting a testchain with a
 173  // configurable number of mature coinbase outputs coinbase outputs. NOTE: This method and TearDown should always be
 174  // called from the same goroutine as they are not concurrent safe.
 175  func (h *Harness) SetUp(createTestChain bool, numMatureOutputs uint32) (e error) {
 176  	// Start the pod node itself. This spawns a new process which will be managed
 177  	if e = h.node.start(); E.Chk(e) {
 178  		return e
 179  	}
 180  	if e = h.connectRPCClient(); E.Chk(e) {
 181  		return e
 182  	}
 183  	h.wallet.Start()
 184  	// Filter transactions that pay to the coinbase associated with the wallet.
 185  	filterAddrs := []btcaddr.Address{h.wallet.coinbaseAddr}
 186  	if e = h.Node.LoadTxFilter(true, filterAddrs, nil); E.Chk(e) {
 187  		return e
 188  	}
 189  	// Ensure pod properly dispatches our registered call-back for each new block. Otherwise, the memWallet won't
 190  	// function properly.
 191  	if e = h.Node.NotifyBlocks(); E.Chk(e) {
 192  		return e
 193  	}
 194  	// Create a test chain with the desired number of mature coinbase outputs.
 195  	if createTestChain && numMatureOutputs != 0 {
 196  		numToGenerate := uint32(h.ActiveNet.CoinbaseMaturity) +
 197  			numMatureOutputs
 198  		_, e = h.Node.Generate(numToGenerate)
 199  		if e != nil {
 200  			return e
 201  		}
 202  	}
 203  	// Block until the wallet has fully synced up to the tip of the main chain.
 204  	_, height, e := h.Node.GetBestBlock()
 205  	if e != nil {
 206  		return e
 207  	}
 208  	ticker := time.NewTicker(time.Millisecond * 100)
 209  	for range ticker.C {
 210  		walletHeight := h.wallet.SyncedHeight()
 211  		if walletHeight == height {
 212  			break
 213  		}
 214  	}
 215  	ticker.Stop()
 216  	return nil
 217  }
 218  
 219  // tearDown stops the running rpc test instance. All created processes are killed, and temporary directories removed.
 220  // This function MUST be called with the harness state mutex held (for writes).
 221  func (h *Harness) tearDown() (e error) {
 222  	if h.Node != nil {
 223  		h.Node.Shutdown()
 224  	}
 225  	if e := h.node.shutdown(); E.Chk(e) {
 226  		return e
 227  	}
 228  	if e := os.RemoveAll(h.testNodeDir); E.Chk(e) {
 229  		return e
 230  	}
 231  	delete(testInstances, h.testNodeDir)
 232  	return nil
 233  }
 234  
 235  // TearDown stops the running rpc test instance. All created processes are killed, and temporary directories removed.
 236  // NOTE: This method and SetUp should always be called from the same goroutine as they are not concurrent safe.
 237  func (h *Harness) TearDown() (e error) {
 238  	harnessStateMtx.Lock()
 239  	defer harnessStateMtx.Unlock()
 240  	return h.tearDown()
 241  }
 242  
 243  // connectRPCClient attempts to establish an RPC connection to the created pod process belonging to this Harness
 244  // instance. If the initial connection attempt fails, this function will retry h. maxConnRetries times, backing off the
 245  // time between subsequent attempts. If after h.maxConnRetries attempts we're not able to establish a connection, this
 246  // function returns with an error.
 247  func (h *Harness) connectRPCClient() (e error) {
 248  	var client *rpcclient.Client
 249  	rpcConf := h.node.config.rpcConnConfig()
 250  	for i := 0; i < h.maxConnRetries; i++ {
 251  		if client, e = rpcclient.New(&rpcConf, h.handlers, qu.T()); E.Chk(e) {
 252  			time.Sleep(time.Duration(i) * 50 * time.Millisecond)
 253  			continue
 254  		}
 255  		break
 256  	}
 257  	if client == nil {
 258  		return fmt.Errorf("connection timeout")
 259  	}
 260  	h.Node = client
 261  	h.wallet.SetRPCClient(client)
 262  	return nil
 263  }
 264  
 265  // NewAddress returns a fresh address spendable by the Harness' internal wallet. This function is safe for concurrent
 266  // access.
 267  func (h *Harness) NewAddress() (btcaddr.Address, error) {
 268  	return h.wallet.NewAddress()
 269  }
 270  
 271  // ConfirmedBalance returns the confirmed balance of the Harness' internal wallet. This function is safe for concurrent
 272  // access.
 273  func (h *Harness) ConfirmedBalance() amt.Amount {
 274  	return h.wallet.ConfirmedBalance()
 275  }
 276  
 277  // SendOutputs creates signs and finally broadcasts a transaction spending the harness' available mature coinbase
 278  // outputs creating new outputs according to targetOutputs. This function is safe for concurrent access.
 279  func (h *Harness) SendOutputs(
 280  	targetOutputs []*wire.TxOut,
 281  	feeRate amt.Amount,
 282  ) (*chainhash.Hash, error) {
 283  	return h.wallet.SendOutputs(targetOutputs, feeRate)
 284  }
 285  
 286  // SendOutputsWithoutChange creates and sends a transaction that pays to the specified outputs while observing the
 287  // passed fee rate and ignoring a change output. The passed fee rate should be expressed in sat/b. This function is safe
 288  // for concurrent access.
 289  func (h *Harness) SendOutputsWithoutChange(
 290  	targetOutputs []*wire.TxOut,
 291  	feeRate amt.Amount,
 292  ) (*chainhash.Hash, error) {
 293  	return h.wallet.SendOutputsWithoutChange(targetOutputs, feeRate)
 294  }
 295  
 296  // CreateTransaction returns a fully signed transaction paying to the specified outputs while observing the desired fee
 297  // rate. The passed fee rate should be expressed in satoshis-per-byte. The transaction being created can optionally
 298  // include a change output indicated by the change boolean. Any unspent outputs selected as inputs for the crafted
 299  // transaction are marked as unspendable in order to avoid potential double-spends by future calls to this method. If
 300  // the created transaction is cancelled for any reason then the selected inputs MUST be freed via a call to
 301  // UnlockOutputs. Otherwise, the locked inputs won't be returned to the pool of spendable outputs. This function is safe
 302  // for concurrent access.
 303  func (h *Harness) CreateTransaction(
 304  	targetOutputs []*wire.TxOut,
 305  	feeRate amt.Amount, change bool,
 306  ) (*wire.MsgTx, error) {
 307  	return h.wallet.CreateTransaction(targetOutputs, feeRate, change)
 308  }
 309  
 310  // UnlockOutputs unlocks any outputs which were previously marked as unspendabe due to being selected to fund a
 311  // transaction via the CreateTransaction method. This function is safe for concurrent access.
 312  func (h *Harness) UnlockOutputs(inputs []*wire.TxIn) {
 313  	h.wallet.UnlockOutputs(inputs)
 314  }
 315  
 316  // RPCConfig returns the harnesses current rpc configuration. This allows other potential RPC clients created within
 317  // tests to connect to a given test harness instance.
 318  func (h *Harness) RPCConfig() rpcclient.ConnConfig {
 319  	return h.node.config.rpcConnConfig()
 320  }
 321  
 322  // P2PAddress returns the harness' P2P listening address. This allows potential peers ( such as SPV peers) created
 323  // within tests to connect to a given test harness instance.
 324  func (h *Harness) P2PAddress() string {
 325  	return h.node.config.listen
 326  }
 327  
 328  // GenerateAndSubmitBlock creates a block whose contents include the passed transactions and submits it to the running
 329  // simnet node. For generating blocks with only a coinbase tx, callers can simply pass nil instead of transactions to be
 330  // mined. Additionally, a custom block version can be set by the caller. A blockVersion of -1 indicates that the current
 331  // default block version should be used. An uninitialized time.Time should be used for the blockTime parameter if one
 332  // doesn't wish to set a custom time. This function is safe for concurrent access.
 333  func (h *Harness) GenerateAndSubmitBlock(
 334  	txns []*util.Tx, blockVersion uint32,
 335  	blockTime time.Time,
 336  ) (*block.Block, error) {
 337  	return h.GenerateAndSubmitBlockWithCustomCoinbaseOutputs(
 338  		txns,
 339  		blockVersion, blockTime, []wire.TxOut{},
 340  	)
 341  }
 342  
 343  // GenerateAndSubmitBlockWithCustomCoinbaseOutputs creates a block whose contents include the passed coinbase outputs
 344  // and transactions and submits it to the running simnet node. For generating blocks with only a coinbase tx, callers
 345  // can simply pass nil instead of transactions to be mined. Additionally, a custom block version can be set by the
 346  // caller. A blockVersion of -1 indicates that the current default block version should be used. An uninitialized
 347  // time.Time should be used for the blockTime parameter if one doesn't wish to set a custom time. The mineTo list of
 348  // outputs will be added to the coinbase; this is not checked for correctness until the block is submitted; thus, it is
 349  // the caller's responsibility to ensure that the outputs are correct. If the list is empty, the coinbase reward goes to
 350  // the wallet managed by the Harness. This function is safe for concurrent access.
 351  func (h *Harness) GenerateAndSubmitBlockWithCustomCoinbaseOutputs(
 352  	txns []*util.Tx, blockVersion uint32, blockTime time.Time,
 353  	mineTo []wire.TxOut,
 354  ) (*block.Block, error) {
 355  	h.Lock()
 356  	defer h.Unlock()
 357  	if blockVersion == ^uint32(0) {
 358  		blockVersion = BlockVersion
 359  	}
 360  	prevBlockHash, prevBlockHeight, e := h.Node.GetBestBlock()
 361  	if e != nil {
 362  		return nil, e
 363  	}
 364  	mBlock, e := h.Node.GetBlock(prevBlockHash)
 365  	if e != nil {
 366  		return nil, e
 367  	}
 368  	prevBlock := block.NewBlock(mBlock)
 369  	prevBlock.SetHeight(prevBlockHeight)
 370  	// Create a new block including the specified transactions
 371  	newBlock, e := CreateBlock(
 372  		prevBlock, txns, int32(blockVersion),
 373  		blockTime, h.wallet.coinbaseAddr, mineTo, h.ActiveNet,
 374  	)
 375  	if e != nil {
 376  		return nil, e
 377  	}
 378  	// Submit the block to the simnet node.
 379  	if e := h.Node.SubmitBlock(newBlock, nil); E.Chk(e) {
 380  		return nil, e
 381  	}
 382  	return newBlock, nil
 383  }
 384  
 385  // generateListeningAddresses returns two strings representing listening addresses designated for the current rpc test.
 386  // If there haven't been any test instances created, the default ports are used. Otherwise in order to support multiple
 387  // test nodes running at once the p2p and rpc port are incremented after each initialization.
 388  func generateListeningAddresses() (string, string) {
 389  	localhost := "127.0.0.1"
 390  	portString := func(minPort, maxPort int) string {
 391  		port := minPort + numTestInstances + ((20 * processID) %
 392  			(maxPort - minPort))
 393  		return strconv.Itoa(port)
 394  	}
 395  	p2p := net.JoinHostPort(localhost, portString(minPeerPort, maxPeerPort))
 396  	rpc := net.JoinHostPort(localhost, portString(minRPCPort, maxRPCPort))
 397  	return p2p, rpc
 398  }
 399  
 400  // baseDir is the directory path of the temp directory for all rpctest files.
 401  func baseDir() (string, error) {
 402  	dirPath := filepath.Join(os.TempDir(), "pod", "rpctest")
 403  	e := os.MkdirAll(dirPath, 0755)
 404  	return dirPath, e
 405  }
 406