peer_test.go raw

   1  package peer_test
   2  
   3  import (
   4  	"errors"
   5  	"github.com/p9c/p9/pkg/chaincfg"
   6  	"io"
   7  	"net"
   8  	"strconv"
   9  	"testing"
  10  	"time"
  11  	
  12  	"github.com/p9c/p9/pkg/qu"
  13  	
  14  	"github.com/btcsuite/go-socks/socks"
  15  	
  16  	"github.com/p9c/p9/pkg/chainhash"
  17  	"github.com/p9c/p9/pkg/peer"
  18  	"github.com/p9c/p9/pkg/wire"
  19  )
  20  
  21  // conn mocks a network connection by implementing the net.Conn interface. It is used to test peer connection without
  22  // actually opening a network connection.
  23  type conn struct {
  24  	io.Reader
  25  	io.Writer
  26  	io.Closer
  27  	// local network, address for the connection.
  28  	lnet, laddr string
  29  	// remote network, address for the connection.
  30  	rnet, raddr string
  31  	// mocks socks proxy if true
  32  	proxy bool
  33  }
  34  
  35  // LocalAddr returns the local address for the connection.
  36  func (c conn) LocalAddr() net.Addr {
  37  	return &addr{c.lnet, c.laddr}
  38  }
  39  
  40  // Remote returns the remote address for the connection.
  41  func (c conn) RemoteAddr() net.Addr {
  42  	if !c.proxy {
  43  		return &addr{c.rnet, c.raddr}
  44  	}
  45  	host, strPort, _ := net.SplitHostPort(c.raddr)
  46  	port, _ := strconv.Atoi(strPort)
  47  	return &socks.ProxiedAddr{
  48  		Net:  c.rnet,
  49  		Host: host,
  50  		Port: port,
  51  	}
  52  }
  53  
  54  // Close handles closing the connection.
  55  func (c conn) Close() (e error) {
  56  	if c.Closer == nil {
  57  		return nil
  58  	}
  59  	return c.Closer.Close()
  60  }
  61  func (c conn) SetDeadline(t time.Time) error          { return nil }
  62  func (c conn) SetReadDeadline(t time.Time) error      { return nil }
  63  func (c conn) SetWriteDeadline(t time.Time) (e error) { return nil }
  64  
  65  // addr mocks a network address
  66  type addr struct {
  67  	net, address string
  68  }
  69  
  70  func (m addr) Network() string { return m.net }
  71  func (m addr) String() string  { return m.address }
  72  
  73  // pipe turns two mock connections into a full-duplex connection similar to net.Pipe to allow pipe's with (fake)
  74  // addresses.
  75  func pipe(c1, c2 *conn) (*conn, *conn) {
  76  	r1, w1 := io.Pipe()
  77  	r2, w2 := io.Pipe()
  78  	c1.Writer = w1
  79  	c1.Closer = w1
  80  	c2.Reader = r1
  81  	c1.Reader = r2
  82  	c2.Writer = w2
  83  	c2.Closer = w2
  84  	return c1, c2
  85  }
  86  
  87  // peerStats holds the expected peer stats used for testing peer.
  88  type peerStats struct {
  89  	wantUserAgent       string
  90  	wantServices        wire.ServiceFlag
  91  	wantProtocolVersion uint32
  92  	wantConnected       bool
  93  	wantVersionKnown    bool
  94  	wantVerAckReceived  bool
  95  	wantLastBlock       int32
  96  	wantStartingHeight  int32
  97  	wantLastPingTime    time.Time
  98  	wantLastPingNonce   uint64
  99  	wantLastPingMicros  int64
 100  	wantTimeOffset      int64
 101  	wantBytesSent       uint64
 102  	wantBytesReceived   uint64
 103  	wantWitnessEnabled  bool
 104  }
 105  
 106  // testPeer tests the given peer's flags and stats
 107  func testPeer(t *testing.T, p *peer.Peer, s peerStats) {
 108  	if p.UserAgent() != s.wantUserAgent {
 109  		t.Errorf("testPeer: wrong UserAgent - got %v, want %v", p.UserAgent(), s.wantUserAgent)
 110  		return
 111  	}
 112  	if p.Services() != s.wantServices {
 113  		t.Errorf("testPeer: wrong Services - got %v, want %v", p.Services(), s.wantServices)
 114  		return
 115  	}
 116  	if !p.LastPingTime().Equal(s.wantLastPingTime) {
 117  		t.Errorf("testPeer: wrong LastPingTime - got %v, want %v", p.LastPingTime(), s.wantLastPingTime)
 118  		return
 119  	}
 120  	if p.LastPingNonce() != s.wantLastPingNonce {
 121  		t.Errorf("testPeer: wrong LastPingNonce - got %v, want %v", p.LastPingNonce(), s.wantLastPingNonce)
 122  		return
 123  	}
 124  	if p.LastPingMicros() != s.wantLastPingMicros {
 125  		t.Errorf("testPeer: wrong LastPingMicros - got %v, want %v", p.LastPingMicros(), s.wantLastPingMicros)
 126  		return
 127  	}
 128  	if p.VerAckReceived() != s.wantVerAckReceived {
 129  		t.Errorf("testPeer: wrong VerAckReceived - got %v, want %v", p.VerAckReceived(), s.wantVerAckReceived)
 130  		return
 131  	}
 132  	if p.VersionKnown() != s.wantVersionKnown {
 133  		t.Errorf("testPeer: wrong VersionKnown - got %v, want %v", p.VersionKnown(), s.wantVersionKnown)
 134  		return
 135  	}
 136  	if p.ProtocolVersion() != s.wantProtocolVersion {
 137  		t.Errorf("testPeer: wrong ProtocolVersion - got %v, want %v", p.ProtocolVersion(), s.wantProtocolVersion)
 138  		return
 139  	}
 140  	if p.LastBlock() != s.wantLastBlock {
 141  		t.Errorf("testPeer: wrong LastBlock - got %v, want %v", p.LastBlock(), s.wantLastBlock)
 142  		return
 143  	}
 144  	// Allow for a deviation of 1s, as the second may tick when the message is in transit and the protocol doesn't support any further precision.
 145  	if p.TimeOffset() != s.wantTimeOffset && p.TimeOffset() != s.wantTimeOffset-1 {
 146  		t.Errorf(
 147  			"testPeer: wrong TimeOffset - got %v, want %v or %v", p.TimeOffset(),
 148  			s.wantTimeOffset, s.wantTimeOffset-1,
 149  		)
 150  		return
 151  	}
 152  	if p.BytesSent() != s.wantBytesSent {
 153  		t.Errorf("testPeer: wrong BytesSent - got %v, want %v", p.BytesSent(), s.wantBytesSent)
 154  		return
 155  	}
 156  	if p.BytesReceived() != s.wantBytesReceived {
 157  		t.Errorf("testPeer: wrong BytesReceived - got %v, want %v", p.BytesReceived(), s.wantBytesReceived)
 158  		return
 159  	}
 160  	if p.StartingHeight() != s.wantStartingHeight {
 161  		t.Errorf("testPeer: wrong StartingHeight - got %v, want %v", p.StartingHeight(), s.wantStartingHeight)
 162  		return
 163  	}
 164  	if p.Connected() != s.wantConnected {
 165  		t.Errorf("testPeer: wrong Connected - got %v, want %v", p.Connected(), s.wantConnected)
 166  		return
 167  	}
 168  	if p.IsWitnessEnabled() != s.wantWitnessEnabled {
 169  		t.Errorf(
 170  			"testPeer: wrong WitnessEnabled - got %v, want %v",
 171  			p.IsWitnessEnabled(), s.wantWitnessEnabled,
 172  		)
 173  		return
 174  	}
 175  	stats := p.StatsSnapshot()
 176  	if p.ID() != stats.ID {
 177  		t.Errorf("testPeer: wrong ID - got %v, want %v", p.ID(), stats.ID)
 178  		return
 179  	}
 180  	if p.Addr() != stats.Addr {
 181  		t.Errorf("testPeer: wrong Addr - got %v, want %v", p.Addr(), stats.Addr)
 182  		return
 183  	}
 184  	if p.LastSend() != stats.LastSend {
 185  		t.Errorf("testPeer: wrong LastSend - got %v, want %v", p.LastSend(), stats.LastSend)
 186  		return
 187  	}
 188  	if p.LastRecv() != stats.LastRecv {
 189  		t.Errorf("testPeer: wrong LastRecv - got %v, want %v", p.LastRecv(), stats.LastRecv)
 190  		return
 191  	}
 192  }
 193  
 194  // TestPeerConnection tests connection between inbound and outbound peers.
 195  func TestPeerConnection(t *testing.T) {
 196  	verack := qu.T()
 197  	peer1Cfg := &peer.Config{
 198  		Listeners: peer.MessageListeners{
 199  			OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
 200  				verack <- struct{}{}
 201  			},
 202  			OnWrite: func(
 203  				p *peer.Peer, bytesWritten int, msg wire.Message,
 204  				e error,
 205  			) {
 206  				if _, ok := msg.(*wire.MsgVerAck); ok {
 207  					verack <- struct{}{}
 208  				}
 209  			},
 210  		},
 211  		UserAgentName:     "peer",
 212  		UserAgentVersion:  "1.0",
 213  		UserAgentComments: []string{"comment"},
 214  		ChainParams:       &chaincfg.MainNetParams,
 215  		ProtocolVersion:   wire.RejectVersion, // Configure with older version
 216  		Services:          0,
 217  		TrickleInterval:   time.Second * 10,
 218  	}
 219  	peer2Cfg := &peer.Config{
 220  		Listeners:         peer1Cfg.Listeners,
 221  		UserAgentName:     "peer",
 222  		UserAgentVersion:  "1.0",
 223  		UserAgentComments: []string{"comment"},
 224  		ChainParams:       &chaincfg.MainNetParams,
 225  		Services:          wire.SFNodeNetwork, // | wire.SFNodeWitness,
 226  		TrickleInterval:   time.Second * 10,
 227  	}
 228  	wantStats1 := peerStats{
 229  		wantUserAgent:       wire.DefaultUserAgent + "peer:1.0(comment)/",
 230  		wantServices:        0,
 231  		wantProtocolVersion: wire.RejectVersion,
 232  		wantConnected:       true,
 233  		wantVersionKnown:    true,
 234  		wantVerAckReceived:  true,
 235  		wantLastPingTime:    time.Time{},
 236  		wantLastPingNonce:   uint64(0),
 237  		wantLastPingMicros:  int64(0),
 238  		wantTimeOffset:      int64(0),
 239  		wantBytesSent:       167, // 143 version + 24 verack
 240  		wantBytesReceived:   167,
 241  		wantWitnessEnabled:  false,
 242  	}
 243  	wantStats2 := peerStats{
 244  		wantUserAgent:       wire.DefaultUserAgent + "peer:1.0(comment)/",
 245  		wantServices:        wire.SFNodeNetwork, // | wire.SFNodeWitness,
 246  		wantProtocolVersion: wire.RejectVersion,
 247  		wantConnected:       true,
 248  		wantVersionKnown:    true,
 249  		wantVerAckReceived:  true,
 250  		wantLastPingTime:    time.Time{},
 251  		wantLastPingNonce:   uint64(0),
 252  		wantLastPingMicros:  int64(0),
 253  		wantTimeOffset:      int64(0),
 254  		wantBytesSent:       167, // 143 version + 24 verack
 255  		wantBytesReceived:   167,
 256  		wantWitnessEnabled:  true,
 257  	}
 258  	tests := []struct {
 259  		name  string
 260  		setup func() (*peer.Peer, *peer.Peer, error)
 261  	}{
 262  		{
 263  			"basic handshake",
 264  			func() (*peer.Peer, *peer.Peer, error) {
 265  				inConn, outConn := pipe(
 266  					&conn{raddr: "10.0.0.1:11047"},
 267  					&conn{raddr: "10.0.0.2:11047"},
 268  				)
 269  				inPeer := peer.NewInboundPeer(peer1Cfg)
 270  				inPeer.AssociateConnection(inConn)
 271  				outPeer, e := peer.NewOutboundPeer(peer2Cfg, "10.0.0.2:11047")
 272  				if e != nil {
 273  					return nil, nil, e
 274  				}
 275  				outPeer.AssociateConnection(outConn)
 276  				for i := 0; i < 4; i++ {
 277  					select {
 278  					case <-verack.Wait():
 279  					case <-time.After(time.Second):
 280  						return nil, nil, errors.New("verack timeout")
 281  					}
 282  				}
 283  				return inPeer, outPeer, nil
 284  			},
 285  		},
 286  		{
 287  			"socks proxy",
 288  			func() (*peer.Peer, *peer.Peer, error) {
 289  				inConn, outConn := pipe(
 290  					&conn{raddr: "10.0.0.1:11047", proxy: true},
 291  					&conn{raddr: "10.0.0.2:11047"},
 292  				)
 293  				inPeer := peer.NewInboundPeer(peer1Cfg)
 294  				inPeer.AssociateConnection(inConn)
 295  				outPeer, e := peer.NewOutboundPeer(peer2Cfg, "10.0.0.2:11047")
 296  				if e != nil {
 297  					return nil, nil, e
 298  				}
 299  				outPeer.AssociateConnection(outConn)
 300  				for i := 0; i < 4; i++ {
 301  					select {
 302  					case <-verack.Wait():
 303  					case <-time.After(time.Second):
 304  						return nil, nil, errors.New("verack timeout")
 305  					}
 306  				}
 307  				return inPeer, outPeer, nil
 308  			},
 309  		},
 310  	}
 311  	t.Logf("Running %d tests", len(tests))
 312  	for i, test := range tests {
 313  		inPeer, outPeer, e := test.setup()
 314  		if e != nil {
 315  			t.Errorf("TestPeerConnection setup #%d: unexpected err %v", i, e)
 316  			return
 317  		}
 318  		testPeer(t, inPeer, wantStats2)
 319  		testPeer(t, outPeer, wantStats1)
 320  		inPeer.Disconnect()
 321  		outPeer.Disconnect()
 322  		inPeer.WaitForDisconnect()
 323  		outPeer.WaitForDisconnect()
 324  	}
 325  }
 326  
 327  // TestPeerListeners tests that the peer listeners are called as expected.
 328  func TestPeerListeners(t *testing.T) {
 329  	verack := qu.Ts(1)
 330  	ok := make(chan wire.Message, 20)
 331  	peerCfg := &peer.Config{
 332  		Listeners: peer.MessageListeners{
 333  			OnGetAddr: func(p *peer.Peer, msg *wire.MsgGetAddr) {
 334  				ok <- msg
 335  			},
 336  			OnAddr: func(p *peer.Peer, msg *wire.MsgAddr) {
 337  				ok <- msg
 338  			},
 339  			OnPing: func(p *peer.Peer, msg *wire.MsgPing) {
 340  				ok <- msg
 341  			},
 342  			OnPong: func(p *peer.Peer, msg *wire.MsgPong) {
 343  				ok <- msg
 344  			},
 345  			OnAlert: func(p *peer.Peer, msg *wire.MsgAlert) {
 346  				ok <- msg
 347  			},
 348  			OnMemPool: func(p *peer.Peer, msg *wire.MsgMemPool) {
 349  				ok <- msg
 350  			},
 351  			OnTx: func(p *peer.Peer, msg *wire.MsgTx) {
 352  				ok <- msg
 353  			},
 354  			OnBlock: func(p *peer.Peer, msg *wire.Block, buf []byte) {
 355  				ok <- msg
 356  			},
 357  			OnInv: func(p *peer.Peer, msg *wire.MsgInv) {
 358  				ok <- msg
 359  			},
 360  			OnHeaders: func(p *peer.Peer, msg *wire.MsgHeaders) {
 361  				ok <- msg
 362  			},
 363  			OnNotFound: func(p *peer.Peer, msg *wire.MsgNotFound) {
 364  				ok <- msg
 365  			},
 366  			OnGetData: func(p *peer.Peer, msg *wire.MsgGetData) {
 367  				ok <- msg
 368  			},
 369  			OnGetBlocks: func(p *peer.Peer, msg *wire.MsgGetBlocks) {
 370  				ok <- msg
 371  			},
 372  			OnGetHeaders: func(p *peer.Peer, msg *wire.MsgGetHeaders) {
 373  				ok <- msg
 374  			},
 375  			OnGetCFilters: func(p *peer.Peer, msg *wire.MsgGetCFilters) {
 376  				ok <- msg
 377  			},
 378  			OnGetCFHeaders: func(p *peer.Peer, msg *wire.MsgGetCFHeaders) {
 379  				ok <- msg
 380  			},
 381  			OnGetCFCheckpt: func(p *peer.Peer, msg *wire.MsgGetCFCheckpt) {
 382  				ok <- msg
 383  			},
 384  			OnCFilter: func(p *peer.Peer, msg *wire.MsgCFilter) {
 385  				ok <- msg
 386  			},
 387  			OnCFHeaders: func(p *peer.Peer, msg *wire.MsgCFHeaders) {
 388  				ok <- msg
 389  			},
 390  			OnFeeFilter: func(p *peer.Peer, msg *wire.MsgFeeFilter) {
 391  				ok <- msg
 392  			},
 393  			OnFilterAdd: func(p *peer.Peer, msg *wire.MsgFilterAdd) {
 394  				ok <- msg
 395  			},
 396  			OnFilterClear: func(p *peer.Peer, msg *wire.MsgFilterClear) {
 397  				ok <- msg
 398  			},
 399  			OnFilterLoad: func(p *peer.Peer, msg *wire.MsgFilterLoad) {
 400  				ok <- msg
 401  			},
 402  			OnMerkleBlock: func(p *peer.Peer, msg *wire.MsgMerkleBlock) {
 403  				ok <- msg
 404  			},
 405  			OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgReject {
 406  				ok <- msg
 407  				return nil
 408  			},
 409  			OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
 410  				verack <- struct{}{}
 411  			},
 412  			OnReject: func(p *peer.Peer, msg *wire.MsgReject) {
 413  				ok <- msg
 414  			},
 415  			OnSendHeaders: func(p *peer.Peer, msg *wire.MsgSendHeaders) {
 416  				ok <- msg
 417  			},
 418  		},
 419  		UserAgentName:     "peer",
 420  		UserAgentVersion:  "1.0",
 421  		UserAgentComments: []string{"comment"},
 422  		ChainParams:       &chaincfg.MainNetParams,
 423  		Services:          wire.SFNodeBloom,
 424  		TrickleInterval:   time.Second * 10,
 425  	}
 426  	inConn, outConn := pipe(
 427  		&conn{raddr: "10.0.0.1:11047"},
 428  		&conn{raddr: "10.0.0.2:11047"},
 429  	)
 430  	inPeer := peer.NewInboundPeer(peerCfg)
 431  	inPeer.AssociateConnection(inConn)
 432  	peerCfg.Listeners = peer.MessageListeners{
 433  		OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
 434  			verack <- struct{}{}
 435  		},
 436  	}
 437  	outPeer, e := peer.NewOutboundPeer(peerCfg, "10.0.0.1:11047")
 438  	if e != nil {
 439  		t.Errorf("NewOutboundPeer: unexpected err %v\n", e)
 440  		return
 441  	}
 442  	outPeer.AssociateConnection(outConn)
 443  	for i := 0; i < 2; i++ {
 444  		select {
 445  		case <-verack.Wait():
 446  		case <-time.After(time.Second * 1):
 447  			t.Errorf("TestPeerListeners: verack timeout\n")
 448  			return
 449  		}
 450  	}
 451  	tests := []struct {
 452  		listener string
 453  		msg      wire.Message
 454  	}{
 455  		{
 456  			"OnGetAddr",
 457  			wire.NewMsgGetAddr(),
 458  		},
 459  		{
 460  			"OnAddr",
 461  			wire.NewMsgAddr(),
 462  		},
 463  		{
 464  			"OnPing",
 465  			wire.NewMsgPing(42),
 466  		},
 467  		{
 468  			"OnPong",
 469  			wire.NewMsgPong(42),
 470  		},
 471  		{
 472  			"OnAlert",
 473  			wire.NewMsgAlert([]byte("payload"), []byte("signature")),
 474  		},
 475  		{
 476  			"OnMemPool",
 477  			wire.NewMsgMemPool(),
 478  		},
 479  		{
 480  			"OnTx",
 481  			wire.NewMsgTx(wire.TxVersion),
 482  		},
 483  		{
 484  			"OnBlock",
 485  			wire.NewMsgBlock(
 486  				wire.NewBlockHeader(
 487  					1,
 488  					&chainhash.Hash{}, &chainhash.Hash{}, 1, 1,
 489  				),
 490  			),
 491  		},
 492  		{
 493  			"OnInv",
 494  			wire.NewMsgInv(),
 495  		},
 496  		{
 497  			"OnHeaders",
 498  			wire.NewMsgHeaders(),
 499  		},
 500  		{
 501  			"OnNotFound",
 502  			wire.NewMsgNotFound(),
 503  		},
 504  		{
 505  			"OnGetData",
 506  			wire.NewMsgGetData(),
 507  		},
 508  		{
 509  			"OnGetBlocks",
 510  			wire.NewMsgGetBlocks(&chainhash.Hash{}),
 511  		},
 512  		{
 513  			"OnGetHeaders",
 514  			wire.NewMsgGetHeaders(),
 515  		},
 516  		{
 517  			"OnGetCFilters",
 518  			wire.NewMsgGetCFilters(wire.GCSFilterRegular, 0, &chainhash.Hash{}),
 519  		},
 520  		{
 521  			"OnGetCFHeaders",
 522  			wire.NewMsgGetCFHeaders(wire.GCSFilterRegular, 0, &chainhash.Hash{}),
 523  		},
 524  		{
 525  			"OnGetCFCheckpt",
 526  			wire.NewMsgGetCFCheckpt(wire.GCSFilterRegular, &chainhash.Hash{}),
 527  		},
 528  		{
 529  			"OnCFilter",
 530  			wire.NewMsgCFilter(
 531  				wire.GCSFilterRegular, &chainhash.Hash{},
 532  				[]byte("payload"),
 533  			),
 534  		},
 535  		{
 536  			"OnCFHeaders",
 537  			wire.NewMsgCFHeaders(),
 538  		},
 539  		{
 540  			"OnFeeFilter",
 541  			wire.NewMsgFeeFilter(15000),
 542  		},
 543  		{
 544  			"OnFilterAdd",
 545  			wire.NewMsgFilterAdd([]byte{0x01}),
 546  		},
 547  		{
 548  			"OnFilterClear",
 549  			wire.NewMsgFilterClear(),
 550  		},
 551  		{
 552  			"OnFilterLoad",
 553  			wire.NewMsgFilterLoad([]byte{0x01}, 10, 0, wire.BloomUpdateNone),
 554  		},
 555  		{
 556  			"OnMerkleBlock",
 557  			wire.NewMsgMerkleBlock(
 558  				wire.NewBlockHeader(
 559  					1,
 560  					&chainhash.Hash{}, &chainhash.Hash{}, 1, 1,
 561  				),
 562  			),
 563  		},
 564  		// only one verack message is allowed
 565  		{
 566  			"OnReject",
 567  			wire.NewMsgReject("block", wire.RejectDuplicate, "dupe block"),
 568  		},
 569  		{
 570  			"OnSendHeaders",
 571  			wire.NewMsgSendHeaders(),
 572  		},
 573  	}
 574  	t.Logf("Running %d tests", len(tests))
 575  	for _, test := range tests {
 576  		// Queue the test message
 577  		outPeer.QueueMessage(test.msg, nil)
 578  		select {
 579  		case <-ok:
 580  		case <-time.After(time.Second * 1):
 581  			t.Errorf("TestPeerListeners: %s timeout", test.listener)
 582  			return
 583  		}
 584  	}
 585  	inPeer.Disconnect()
 586  	outPeer.Disconnect()
 587  }
 588  
 589  // TestOutboundPeer tests that the outbound peer works as expected.
 590  func TestOutboundPeer(t *testing.T) {
 591  	peerCfg := &peer.Config{
 592  		NewestBlock: func() (*chainhash.Hash, int32, error) {
 593  			return nil, 0, errors.New("newest block not found")
 594  		},
 595  		UserAgentName:     "peer",
 596  		UserAgentVersion:  "1.0",
 597  		UserAgentComments: []string{"comment"},
 598  		ChainParams:       &chaincfg.MainNetParams,
 599  		Services:          0,
 600  		TrickleInterval:   time.Second * 10,
 601  	}
 602  	r, w := io.Pipe()
 603  	c := &conn{raddr: "10.0.0.1:11047", Writer: w, Reader: r}
 604  	p, e := peer.NewOutboundPeer(peerCfg, "10.0.0.1:11047")
 605  	if e != nil {
 606  		t.Errorf("NewOutboundPeer: unexpected err - %v\n", e)
 607  		return
 608  	}
 609  	// Test trying to connect twice.
 610  	p.AssociateConnection(c)
 611  	p.AssociateConnection(c)
 612  	disconnected := qu.T()
 613  	go func() {
 614  		p.WaitForDisconnect()
 615  		disconnected <- struct{}{}
 616  	}()
 617  	select {
 618  	case <-disconnected.Wait():
 619  		disconnected.Q()
 620  	case <-time.After(time.Second):
 621  		t.Fatal("Peer did not automatically disconnect.")
 622  	}
 623  	if p.Connected() {
 624  		t.Fatalf("Should not be connected as NewestBlock produces error.")
 625  	}
 626  	// Test Queue Inv
 627  	fakeBlockHash := &chainhash.Hash{0: 0x00, 1: 0x01}
 628  	fakeInv := wire.NewInvVect(wire.InvTypeBlock, fakeBlockHash)
 629  	// Should be noops as the peer could not connect.
 630  	p.QueueInventory(fakeInv)
 631  	p.AddKnownInventory(fakeInv)
 632  	p.QueueInventory(fakeInv)
 633  	fakeMsg := wire.NewMsgVerAck()
 634  	p.QueueMessage(fakeMsg, nil)
 635  	done := qu.T()
 636  	p.QueueMessage(fakeMsg, done)
 637  	<-done
 638  	p.Disconnect()
 639  	// Test NewestBlock
 640  	var newestBlock = func() (*chainhash.Hash, int32, error) {
 641  		hashStr := "14a0810ac680a3eb3f82edc878cea25ec41d6b790744e5daeef"
 642  		var hash *chainhash.Hash
 643  		hash, e = chainhash.NewHashFromStr(hashStr)
 644  		if e != nil {
 645  			return nil, 0, e
 646  		}
 647  		return hash, 234439, nil
 648  	}
 649  	peerCfg.NewestBlock = newestBlock
 650  	r1, w1 := io.Pipe()
 651  	c1 := &conn{raddr: "10.0.0.1:11047", Writer: w1, Reader: r1}
 652  	p1, e := peer.NewOutboundPeer(peerCfg, "10.0.0.1:11047")
 653  	if e != nil {
 654  		t.Errorf("NewOutboundPeer: unexpected err - %v\n", e)
 655  		return
 656  	}
 657  	p1.AssociateConnection(c1)
 658  	// Test update latest block
 659  	latestBlockHash, e := chainhash.NewHashFromStr("1a63f9cdff1752e6375c8c76e543a71d239e1a2e5c6db1aa679")
 660  	if e != nil {
 661  		t.Errorf("NewHashFromStr: unexpected err %v\n", e)
 662  		return
 663  	}
 664  	p1.UpdateLastAnnouncedBlock(latestBlockHash)
 665  	p1.UpdateLastBlockHeight(234440)
 666  	if p1.LastAnnouncedBlock() != latestBlockHash {
 667  		t.Errorf(
 668  			"LastAnnouncedBlock: wrong block - got %v, want %v",
 669  			p1.LastAnnouncedBlock(), latestBlockHash,
 670  		)
 671  		return
 672  	}
 673  	// Test Queue Inv after connection
 674  	p1.QueueInventory(fakeInv)
 675  	p1.Disconnect()
 676  	// Test regression
 677  	peerCfg.ChainParams = &chaincfg.RegressionTestParams
 678  	peerCfg.Services = wire.SFNodeBloom
 679  	r2, w2 := io.Pipe()
 680  	c2 := &conn{raddr: "10.0.0.1:11047", Writer: w2, Reader: r2}
 681  	p2, e := peer.NewOutboundPeer(peerCfg, "10.0.0.1:11047")
 682  	if e != nil {
 683  		t.Errorf("NewOutboundPeer: unexpected err - %v\n", e)
 684  		return
 685  	}
 686  	p2.AssociateConnection(c2)
 687  	// Test PushXXX
 688  	var addrs []*wire.NetAddress
 689  	for i := 0; i < 5; i++ {
 690  		na := wire.NetAddress{}
 691  		addrs = append(addrs, &na)
 692  	}
 693  	if _, e = p2.PushAddrMsg(addrs); e != nil {
 694  		t.Errorf("PushAddrMsg: unexpected err %v\n", e)
 695  		return
 696  	}
 697  	if e := p2.PushGetBlocksMsg(nil, &chainhash.Hash{}); e != nil {
 698  		t.Errorf("PushGetBlocksMsg: unexpected err %v\n", e)
 699  		return
 700  	}
 701  	if e := p2.PushGetHeadersMsg(nil, &chainhash.Hash{}); e != nil {
 702  		t.Errorf("PushGetHeadersMsg: unexpected err %v\n", e)
 703  		return
 704  	}
 705  	p2.PushRejectMsg("block", wire.RejectMalformed, "malformed", nil, false)
 706  	p2.PushRejectMsg("block", wire.RejectInvalid, "invalid", nil, false)
 707  	// Test Queue Messages
 708  	p2.QueueMessage(wire.NewMsgGetAddr(), nil)
 709  	p2.QueueMessage(wire.NewMsgPing(1), nil)
 710  	p2.QueueMessage(wire.NewMsgMemPool(), nil)
 711  	p2.QueueMessage(wire.NewMsgGetData(), nil)
 712  	p2.QueueMessage(wire.NewMsgGetHeaders(), nil)
 713  	p2.QueueMessage(wire.NewMsgFeeFilter(20000), nil)
 714  	p2.Disconnect()
 715  }
 716  
 717  // // Tests that the node disconnects from peers with an unsupported protocol version.
 718  // func TestUnsupportedVersionPeer(// 	t *testing.T) {
 719  // 	peerCfg := &peer.Config{
 720  // 		UserAgentName:     "peer",
 721  // 		UserAgentVersion:  "1.0",
 722  // 		UserAgentComments: []string{"comment"},
 723  // 		ChainParams:       &chaincfg.MainNetParams,
 724  // 		Services:          0,
 725  // 		TrickleInterval:   time.Second * 10,
 726  // 	}
 727  // 	localNA := wire.NewNetAddressIPPort(
 728  // 		net.ParseIP("10.0.0.1"),
 729  // 		uint16(11047),
 730  // 		wire.SFNodeNetwork,
 731  // 	)
 732  // 	remoteNA := wire.NewNetAddressIPPort(
 733  // 		net.ParseIP("10.0.0.2"),
 734  // 		uint16(11047),
 735  // 		wire.SFNodeNetwork,
 736  // 	)
 737  // 	localConn, remoteConn := pipe(
 738  // 		&conn{laddr: "10.0.0.1:11047", raddr: "10.0.0.2:11047"},
 739  // 		&conn{laddr: "10.0.0.2:11047", raddr: "10.0.0.1:11047"},
 740  // 	)
 741  // 	p, e := peer.NewOutboundPeer(peerCfg, "10.0.0.1:11047")
 742  // 	if e != nil  {
 743  // 		t.Fatalf("NewOutboundPeer: unexpected err - %v\n", e)
 744  // 	}
 745  // 	p.AssociateConnection(localConn)
 746  // 	// Read outbound messages to peer into a channel
 747  // 	outboundMessages := make(chan wire.Message)
 748  // 	go func() {
 749  // 		for {
 750  // 			_, msg, _, e = wire.ReadMessageN(
 751  // 				remoteConn,
 752  // 				p.ProtocolVersion(),
 753  // 				peerCfg.ChainParams.Net,
 754  // 			)
 755  // 			if e ==  io.EOF {
 756  // 				close(outboundMessages)
 757  // 				return
 758  // 			}
 759  // 			if e != nil  {
 760  // 				t.Errorf("Error reading message from local node: %v\n", e)
 761  // 				return
 762  // 			}
 763  // 			outboundMessages <- msg
 764  // 		}
 765  // 	}()
 766  // 	// Read version message sent to remote peer
 767  // 	select {
 768  // 	case msg := <-outboundMessages:
 769  // 		if _, ok := msg.(*wire.MsgVersion); !ok {
 770  // 			t.Fatalf("Expected version message, got [%s]", msg.Command())
 771  // 		}
 772  // 	case <-time.After(time.Second):
 773  // 		t.Fatal("Peer did not send version message")
 774  // 	}
 775  // 	// Remote peer writes version message advertising invalid protocol version 1
 776  // 	invalidVersionMsg := wire.NewMsgVersion(remoteNA, localNA, 0, 0)
 777  // 	invalidVersionMsg.ProtocolVersion = 1
 778  // 	_, e = wire.WriteMessageN(
 779  // 		remoteConn.Writer,
 780  // 		invalidVersionMsg,
 781  // 		uint32(invalidVersionMsg.ProtocolVersion),
 782  // 		peerCfg.ChainParams.Net,
 783  // 	)
 784  // 	if e != nil  {
 785  // 		t.Fatalf("wire.WriteMessageN: unexpected err - %v\n", e)
 786  // 	}
 787  // 	// Expect peer to disconnect automatically
 788  // 	disconnected := qu.T()
 789  // 	go func() {
 790  // 		p.WaitForDisconnect()
 791  // 		disconnected <- struct{}{}
 792  // 	}()
 793  // 	select {
 794  // 	case <-disconnected:
 795  // 		close(disconnected)
 796  // 	case <-time.After(time.Second):
 797  // 		t.Fatal("Peer did not automatically disconnect")
 798  // 	}
 799  // 	// Expect no further outbound messages from peer
 800  // 	select {
 801  // 	case msg, chanOpen := <-outboundMessages:
 802  // 		if chanOpen {
 803  // 			t.Fatalf("Expected no further messages, received [%s]", msg.Command())
 804  // 		}
 805  // 	case <-time.After(time.Second):
 806  // 		t.Fatal("Timeout waiting for remote reader to close")
 807  // 	}
 808  // }
 809  
 810  // TestDuplicateVersionMsg ensures that receiving a version message after one has already been received results in the
 811  // peer being disconnected.
 812  func TestDuplicateVersionMsg(t *testing.T) {
 813  	// Create a pair of peers that are connected to each other using a fake connection.
 814  	verack := qu.T()
 815  	peerCfg := &peer.Config{
 816  		Listeners: peer.MessageListeners{
 817  			OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
 818  				verack <- struct{}{}
 819  			},
 820  		},
 821  		UserAgentName:    "peer",
 822  		UserAgentVersion: "1.0",
 823  		ChainParams:      &chaincfg.MainNetParams,
 824  		Services:         0,
 825  	}
 826  	inConn, outConn := pipe(
 827  		&conn{laddr: "10.0.0.1:9108", raddr: "10.0.0.2:9108"},
 828  		&conn{laddr: "10.0.0.2:9108", raddr: "10.0.0.1:9108"},
 829  	)
 830  	outPeer, e := peer.NewOutboundPeer(peerCfg, inConn.laddr)
 831  	if e != nil {
 832  		t.Fatalf("NewOutboundPeer: unexpected err: %v\n", e)
 833  	}
 834  	outPeer.AssociateConnection(outConn)
 835  	inPeer := peer.NewInboundPeer(peerCfg)
 836  	inPeer.AssociateConnection(inConn)
 837  	// Wait for the veracks from the initial protocol version negotiation.
 838  	for i := 0; i < 2; i++ {
 839  		select {
 840  		case <-verack.Wait():
 841  		case <-time.After(time.Second):
 842  			t.Fatal("verack timeout")
 843  		}
 844  	}
 845  	// Queue a duplicate version message from the outbound peer and wait until it is sent.
 846  	done := qu.T()
 847  	outPeer.QueueMessage(&wire.MsgVersion{}, done)
 848  	select {
 849  	case <-done.Wait():
 850  	case <-time.After(time.Second):
 851  		t.Fatal("send duplicate version timeout")
 852  	}
 853  	// Ensure the peer that is the recipient of the duplicate version closes the connection.
 854  	disconnected := qu.Ts(1)
 855  	go func() {
 856  		inPeer.WaitForDisconnect()
 857  		disconnected <- struct{}{}
 858  	}()
 859  	select {
 860  	case <-disconnected.Wait():
 861  	case <-time.After(time.Second):
 862  		t.Fatal("peer did not disconnect")
 863  	}
 864  }
 865  func init() {
 866  	
 867  	// Allow self connection when running the tests.
 868  	peer.TstAllowSelfConns()
 869  }
 870