connmanager_test.go raw

   1  package connmgr
   2  
   3  import (
   4  	"errors"
   5  	"fmt"
   6  	"io"
   7  	"net"
   8  	"sync/atomic"
   9  	"testing"
  10  	"time"
  11  	
  12  	"github.com/p9c/p9/pkg/qu"
  13  )
  14  
  15  func init() {
  16  	
  17  	// Override the max retry duration when running tests.
  18  	maxRetryDuration = 2 * time.Millisecond
  19  }
  20  
  21  // mockAddr mocks a network address
  22  type mockAddr struct {
  23  	net, address string
  24  }
  25  
  26  func (m mockAddr) Network() string { return m.net }
  27  func (m mockAddr) String() string  { return m.address }
  28  
  29  // mockConn mocks a network connection by implementing the net.Conn interface.
  30  type mockConn struct {
  31  	io.Reader
  32  	io.Writer
  33  	io.Closer
  34  	// local network, address for the connection.
  35  	lnet, laddr string
  36  	// remote network, address for the connection.
  37  	rAddr net.Addr
  38  }
  39  
  40  // LocalAddr returns the local address for the connection.
  41  func (c mockConn) LocalAddr() net.Addr {
  42  	return &mockAddr{c.lnet, c.laddr}
  43  }
  44  
  45  // RemoteAddr returns the remote address for the connection.
  46  func (c mockConn) RemoteAddr() net.Addr {
  47  	return &mockAddr{c.rAddr.Network(), c.rAddr.String()}
  48  }
  49  
  50  // Close handles closing the connection.
  51  func (c mockConn) Close() (e error) {
  52  	return nil
  53  }
  54  func (c mockConn) SetDeadline(t time.Time) error          { return nil }
  55  func (c mockConn) SetReadDeadline(t time.Time) error      { return nil }
  56  func (c mockConn) SetWriteDeadline(t time.Time) (e error) { return nil }
  57  
  58  // mockDialer mocks the net.Dial interface by returning a mock connection to the given address.
  59  func mockDialer(addr net.Addr) (net.Conn, error) {
  60  	r, w := io.Pipe()
  61  	c := &mockConn{rAddr: addr}
  62  	c.Reader = r
  63  	c.Writer = w
  64  	return c, nil
  65  }
  66  
  67  // TestNewConfig tests that new ConnManager config is validated as expected.
  68  func TestNewConfig(t *testing.T) {
  69  	_, e := New(&Config{})
  70  	if e == nil {
  71  		t.Fatalf("New expected error: 'Dial can't be nil', got nil")
  72  	}
  73  	_, e = New(&Config{
  74  		Dial: mockDialer,
  75  	},
  76  	)
  77  	if e != nil {
  78  		t.Fatalf("New unexpected error: %v", e)
  79  	}
  80  }
  81  
  82  // TestStartStop tests that the connection manager starts and stops as expected.
  83  func TestStartStop(t *testing.T) {
  84  	connected := make(chan *ConnReq)
  85  	disconnected := make(chan *ConnReq)
  86  	cmgr, e := New(&Config{
  87  		TargetOutbound: 1,
  88  		GetNewAddress: func() (net.Addr, error) {
  89  			return &net.TCPAddr{
  90  				IP:   net.ParseIP("127.0.0.1"),
  91  				Port: 18555,
  92  			}, nil
  93  		},
  94  		Dial: mockDialer,
  95  		OnConnection: func(c *ConnReq, conn net.Conn) {
  96  			connected <- c
  97  		},
  98  		OnDisconnection: func(c *ConnReq) {
  99  			disconnected <- c
 100  		},
 101  	},
 102  	)
 103  	if e != nil {
 104  		t.Fatalf("New error: %v", e)
 105  	}
 106  	cmgr.Start()
 107  	gotConnReq := <-connected
 108  	cmgr.Stop()
 109  	// already stopped
 110  	cmgr.Stop()
 111  	// ignored
 112  	cr := &ConnReq{
 113  		Addr: &net.TCPAddr{
 114  			IP:   net.ParseIP("127.0.0.1"),
 115  			Port: 18555,
 116  		},
 117  		Permanent: true,
 118  	}
 119  	cmgr.Connect(cr)
 120  	if cr.ID() != 0 {
 121  		t.Fatalf("start/stop: got id: %v, want: 0", cr.ID())
 122  	}
 123  	cmgr.Disconnect(gotConnReq.ID())
 124  	cmgr.Remove(gotConnReq.ID())
 125  	select {
 126  	case <-disconnected:
 127  		t.Fatalf("start/stop: unexpected disconnection")
 128  	case <-time.Tick(10 * time.Millisecond):
 129  		break
 130  	}
 131  }
 132  
 133  // TestConnectMode tests that the connection manager works in the connect mode. In connect mode, automatic connections
 134  // are disabled, so we test that requests using Connect are handled and that no other connections are made.
 135  func TestConnectMode(t *testing.T) {
 136  	connected := make(chan *ConnReq)
 137  	cmgr, e := New(&Config{
 138  		TargetOutbound: 2,
 139  		Dial:           mockDialer,
 140  		OnConnection: func(c *ConnReq, conn net.Conn) {
 141  			connected <- c
 142  		},
 143  	},
 144  	)
 145  	if e != nil {
 146  		t.Fatalf("New error: %v", e)
 147  	}
 148  	cr := &ConnReq{
 149  		Addr: &net.TCPAddr{
 150  			IP:   net.ParseIP("127.0.0.1"),
 151  			Port: 18555,
 152  		},
 153  		Permanent: true,
 154  	}
 155  	cmgr.Start()
 156  	cmgr.Connect(cr)
 157  	gotConnReq := <-connected
 158  	wantID := cr.ID()
 159  	gotID := gotConnReq.ID()
 160  	if gotID != wantID {
 161  		t.Fatalf("connect mode: %v - want ID %v, got ID %v", cr.Addr, wantID, gotID)
 162  	}
 163  	gotState := cr.State()
 164  	wantState := ConnEstablished
 165  	if gotState != wantState {
 166  		t.Fatalf("connect mode: %v - want state %v, got state %v", cr.Addr, wantState, gotState)
 167  	}
 168  	select {
 169  	case c := <-connected:
 170  		t.Fatalf("connect mode: got unexpected connection - %v", c.Addr)
 171  	case <-time.After(time.Millisecond):
 172  		break
 173  	}
 174  	cmgr.Stop()
 175  }
 176  
 177  // TestTargetOutbound tests the target number of outbound connections. We wait until all connections are established,
 178  // then test they there are the only connections made.
 179  func TestTargetOutbound(t *testing.T) {
 180  	targetOutbound := uint32(10)
 181  	connected := make(chan *ConnReq)
 182  	cmgr, e := New(&Config{
 183  		TargetOutbound: targetOutbound,
 184  		Dial:           mockDialer,
 185  		GetNewAddress: func() (net.Addr, error) {
 186  			return &net.TCPAddr{
 187  				IP:   net.ParseIP("127.0.0.1"),
 188  				Port: 18555,
 189  			}, nil
 190  		},
 191  		OnConnection: func(c *ConnReq, conn net.Conn) {
 192  			connected <- c
 193  		},
 194  	},
 195  	)
 196  	if e != nil {
 197  		t.Fatalf("New error: %v", e)
 198  	}
 199  	cmgr.Start()
 200  	for i := uint32(0); i < targetOutbound; i++ {
 201  		<-connected
 202  	}
 203  	select {
 204  	case c := <-connected:
 205  		t.Fatalf("target outbound: got unexpected connection - %v", c.Addr)
 206  	case <-time.After(time.Millisecond):
 207  		break
 208  	}
 209  	cmgr.Stop()
 210  }
 211  
 212  // TestRetryPermanent tests that permanent connection requests are retried. We make a permanent connection request using
 213  // Connect, disconnect it using Disconnect and we wait for it to be connected back.
 214  func TestRetryPermanent(t *testing.T) {
 215  	connected := make(chan *ConnReq)
 216  	disconnected := make(chan *ConnReq)
 217  	cmgr, e := New(&Config{
 218  		RetryDuration:  time.Millisecond,
 219  		TargetOutbound: 1,
 220  		Dial:           mockDialer,
 221  		OnConnection: func(c *ConnReq, conn net.Conn) {
 222  			connected <- c
 223  		},
 224  		OnDisconnection: func(c *ConnReq) {
 225  			disconnected <- c
 226  		},
 227  	},
 228  	)
 229  	if e != nil {
 230  		t.Fatalf("New error: %v", e)
 231  	}
 232  	cr := &ConnReq{
 233  		Addr: &net.TCPAddr{
 234  			IP:   net.ParseIP("127.0.0.1"),
 235  			Port: 18555,
 236  		},
 237  		Permanent: true,
 238  	}
 239  	go cmgr.Connect(cr)
 240  	cmgr.Start()
 241  	gotConnReq := <-connected
 242  	wantID := cr.ID()
 243  	gotID := gotConnReq.ID()
 244  	if gotID != wantID {
 245  		t.Fatalf("retry: %v - want ID %v, got ID %v", cr.Addr, wantID, gotID)
 246  	}
 247  	gotState := cr.State()
 248  	wantState := ConnEstablished
 249  	if gotState != wantState {
 250  		t.Fatalf("retry: %v - want state %v, got state %v", cr.Addr, wantState, gotState)
 251  	}
 252  	cmgr.Disconnect(cr.ID())
 253  	gotConnReq = <-disconnected
 254  	wantID = cr.ID()
 255  	gotID = gotConnReq.ID()
 256  	if gotID != wantID {
 257  		t.Fatalf("retry: %v - want ID %v, got ID %v", cr.Addr, wantID, gotID)
 258  	}
 259  	gotState = cr.State()
 260  	wantState = ConnPending
 261  	if gotState != wantState {
 262  		t.Fatalf("retry: %v - want state %v, got state %v", cr.Addr, wantState, gotState)
 263  	}
 264  	gotConnReq = <-connected
 265  	wantID = cr.ID()
 266  	gotID = gotConnReq.ID()
 267  	if gotID != wantID {
 268  		t.Fatalf("retry: %v - want ID %v, got ID %v", cr.Addr, wantID, gotID)
 269  	}
 270  	gotState = cr.State()
 271  	wantState = ConnEstablished
 272  	if gotState != wantState {
 273  		t.Fatalf("retry: %v - want state %v, got state %v", cr.Addr, wantState, gotState)
 274  	}
 275  	cmgr.Remove(cr.ID())
 276  	gotConnReq = <-disconnected
 277  	wantID = cr.ID()
 278  	gotID = gotConnReq.ID()
 279  	if gotID != wantID {
 280  		t.Fatalf("retry: %v - want ID %v, got ID %v", cr.Addr, wantID, gotID)
 281  	}
 282  	gotState = cr.State()
 283  	wantState = ConnDisconnected
 284  	if gotState != wantState {
 285  		t.Fatalf("retry: %v - want state %v, got state %v", cr.Addr, wantState, gotState)
 286  	}
 287  	cmgr.Stop()
 288  }
 289  
 290  // TestMaxRetryDuration tests the maximum retry duration. We have a timed dialer which initially returns err but after
 291  // RetryDuration hits maxRetryDuration returns a mock conn.
 292  func TestMaxRetryDuration(t *testing.T) {
 293  	networkUp := qu.T()
 294  	time.AfterFunc(5*time.Millisecond, func() {
 295  		networkUp.Q()
 296  	},
 297  	)
 298  	timedDialer := func(addr net.Addr) (net.Conn, error) {
 299  		select {
 300  		case <-networkUp.Wait():
 301  			return mockDialer(addr)
 302  		default:
 303  			return nil, errors.New("network down")
 304  		}
 305  	}
 306  	connected := make(chan *ConnReq)
 307  	cmgr, e := New(&Config{
 308  		RetryDuration:  time.Millisecond,
 309  		TargetOutbound: 1,
 310  		Dial:           timedDialer,
 311  		OnConnection: func(c *ConnReq, conn net.Conn) {
 312  			connected <- c
 313  		},
 314  	},
 315  	)
 316  	if e != nil {
 317  		t.Fatalf("New error: %v", e)
 318  	}
 319  	cr := &ConnReq{
 320  		Addr: &net.TCPAddr{
 321  			IP:   net.ParseIP("127.0.0.1"),
 322  			Port: 18555,
 323  		},
 324  		Permanent: true,
 325  	}
 326  	go cmgr.Connect(cr)
 327  	cmgr.Start()
 328  	// retry in 1ms
 329  	// retry in 2ms - max retry duration reached
 330  	// retry in 2ms - timedDialer returns mockDial
 331  	select {
 332  	case <-connected:
 333  	case <-time.Tick(100 * time.Millisecond):
 334  		t.Fatalf("max retry duration: connection timeout")
 335  	}
 336  }
 337  
 338  // TestNetworkFailure tests that the connection manager handles a network failure gracefully.
 339  func TestNetworkFailure(t *testing.T) {
 340  	var dials uint32
 341  	errDialer := func(net net.Addr) (net.Conn, error) {
 342  		atomic.AddUint32(&dials, 1)
 343  		return nil, errors.New("network down")
 344  	}
 345  	cmgr, e := New(&Config{
 346  		TargetOutbound: 5,
 347  		RetryDuration:  5 * time.Millisecond,
 348  		Dial:           errDialer,
 349  		GetNewAddress: func() (net.Addr, error) {
 350  			return &net.TCPAddr{
 351  				IP:   net.ParseIP("127.0.0.1"),
 352  				Port: 18555,
 353  			}, nil
 354  		},
 355  		OnConnection: func(c *ConnReq, conn net.Conn) {
 356  			t.Fatalf("network failure: got unexpected connection - %v", c.Addr)
 357  		},
 358  	},
 359  	)
 360  	if e != nil {
 361  		t.Fatalf("New error: %v", e)
 362  	}
 363  	cmgr.Start()
 364  	time.AfterFunc(10*time.Millisecond, cmgr.Stop)
 365  	cmgr.Wait()
 366  	wantMaxDials := uint32(75)
 367  	if atomic.LoadUint32(&dials) > wantMaxDials {
 368  		t.Fatalf("network failure: unexpected number of dials - got %v, want < %v",
 369  			atomic.LoadUint32(&dials), wantMaxDials,
 370  		)
 371  	}
 372  }
 373  
 374  // TestStopFailed tests that failed connections are ignored after connmgr is stopped. We have a dailer which sets the
 375  // stop flag on the conn manager and returns an err so that the handler assumes that the conn manager is stopped and
 376  // ignores the failure.
 377  func TestStopFailed(t *testing.T) {
 378  	done := qu.Ts(1)
 379  	waitDialer := func(addr net.Addr) (net.Conn, error) {
 380  		done <- struct{}{}
 381  		time.Sleep(time.Millisecond)
 382  		return nil, errors.New("network down")
 383  	}
 384  	cmgr, e := New(&Config{
 385  		Dial: waitDialer,
 386  	},
 387  	)
 388  	if e != nil {
 389  		t.Fatalf("New error: %v", e)
 390  	}
 391  	cmgr.Start()
 392  	go func() {
 393  		<-done
 394  		atomic.StoreInt32(&cmgr.stop, 1)
 395  		time.Sleep(2 * time.Millisecond)
 396  		atomic.StoreInt32(&cmgr.stop, 0)
 397  		cmgr.Stop()
 398  	}()
 399  	cr := &ConnReq{
 400  		Addr: &net.TCPAddr{
 401  			IP:   net.ParseIP("127.0.0.1"),
 402  			Port: 18555,
 403  		},
 404  		Permanent: true,
 405  	}
 406  	go cmgr.Connect(cr)
 407  	cmgr.Wait()
 408  }
 409  
 410  // TestRemovePendingConnection tests that it's possible to cancel a pending connection, removing its internal state from
 411  // the ConnMgr.
 412  func TestRemovePendingConnection(t *testing.T) {
 413  	// Create a ConnMgr instance with an instance of a dialer that'll never succeed.
 414  	wait := qu.T()
 415  	indefiniteDialer := func(addr net.Addr) (net.Conn, error) {
 416  		<-wait
 417  		return nil, fmt.Errorf("error")
 418  	}
 419  	cmgr, e := New(&Config{
 420  		Dial: indefiniteDialer,
 421  	},
 422  	)
 423  	if e != nil {
 424  		t.Fatalf("New error: %v", e)
 425  	}
 426  	cmgr.Start()
 427  	// Establish a connection request to a random IP we've chosen.
 428  	cr := &ConnReq{
 429  		Addr: &net.TCPAddr{
 430  			IP:   net.ParseIP("127.0.0.1"),
 431  			Port: 18555,
 432  		},
 433  		Permanent: true,
 434  	}
 435  	go cmgr.Connect(cr)
 436  	time.Sleep(10 * time.Millisecond)
 437  	if cr.State() != ConnPending {
 438  		t.Fatalf("pending request hasn't been registered, status: %v",
 439  			cr.State(),
 440  		)
 441  	}
 442  	// The request launched above will actually never be able to establish a connection. So we'll cancel it _before_
 443  	// it's able to be completed.
 444  	cmgr.Remove(cr.ID())
 445  	time.Sleep(10 * time.Millisecond)
 446  	// Now examine the status of the connection request, it should read a
 447  	// status of failed.
 448  	if cr.State() != ConnCanceled {
 449  		t.Fatalf("request wasn't canceled, status is: %v", cr.State())
 450  	}
 451  	wait.Q()
 452  	cmgr.Stop()
 453  }
 454  
 455  // TestCancelIgnoreDelayedConnection tests that a canceled connection request will not execute the on connection
 456  // callback, even if an outstanding retry succeeds.
 457  func TestCancelIgnoreDelayedConnection(t *testing.T) {
 458  	retryTimeout := 10 * time.Millisecond
 459  	// Setup a dialer that will continue to return an error until the connect chan is signaled, the dial attempt
 460  	// immediately after will succeed in returning a connection.
 461  	connect := qu.T()
 462  	failingDialer := func(addr net.Addr) (net.Conn, error) {
 463  		select {
 464  		case <-connect.Wait():
 465  			return mockDialer(addr)
 466  		default:
 467  		}
 468  		return nil, fmt.Errorf("error")
 469  	}
 470  	connected := make(chan *ConnReq)
 471  	cmgr, e := New(&Config{
 472  		Dial:          failingDialer,
 473  		RetryDuration: retryTimeout,
 474  		OnConnection: func(c *ConnReq, conn net.Conn) {
 475  			connected <- c
 476  		},
 477  	},
 478  	)
 479  	if e != nil {
 480  		t.Fatalf("New error: %v", e)
 481  	}
 482  	cmgr.Start()
 483  	defer cmgr.Stop()
 484  	// Establish a connection request to a random IP we've chosen.
 485  	cr := &ConnReq{
 486  		Addr: &net.TCPAddr{
 487  			IP:   net.ParseIP("127.0.0.1"),
 488  			Port: 18555,
 489  		},
 490  	}
 491  	cmgr.Connect(cr)
 492  	// Allow for the first retry timeout to elapse.
 493  	time.Sleep(2 * retryTimeout)
 494  	// Connection be marked as failed, even after reattempting to
 495  	// connect.
 496  	if cr.State() != ConnFailing {
 497  		t.Fatalf("failing request should have status failed, status: %v",
 498  			cr.State(),
 499  		)
 500  	}
 501  	// Remove the connection, and then immediately allow the next connection to succeed.
 502  	cmgr.Remove(cr.ID())
 503  	connect.Q()
 504  	// Allow the connection manager to process the removal.
 505  	time.Sleep(5 * time.Millisecond)
 506  	// Now examine the status of the connection request, it should read a status of canceled.
 507  	if cr.State() != ConnCanceled {
 508  		t.Fatalf("request wasn't canceled, status is: %v", cr.State())
 509  	}
 510  	// Finally, the connection manager should not signal the on-connection callback, since we explicitly canceled this
 511  	// request. We give a generous window to ensure the connection manager's lienar backoff is allowed to properly
 512  	// elapse.
 513  	select {
 514  	case <-connected:
 515  		t.Fatalf("on-connect should not be called for canceled req")
 516  	case <-time.After(5 * retryTimeout):
 517  	}
 518  }
 519  
 520  // mockListener implements the net.Receiver interface and is used to test code that deals with net.P2PListeners without
 521  // having to actually make any real connections.
 522  type mockListener struct {
 523  	localAddr   string
 524  	provideConn chan net.Conn
 525  }
 526  
 527  // Accept returns a mock connection when it receives a signal via the Connect function. This is part of the net.Receiver
 528  // interface.
 529  func (m *mockListener) Accept() (net.Conn, error) {
 530  	for conn := range m.provideConn {
 531  		return conn, nil
 532  	}
 533  	return nil, errors.New("network connection closed")
 534  }
 535  
 536  // Close closes the mock listener which will cause any blocked Accept operations to be unblocked and return errors. This
 537  // is part of the net.Receiver interface.
 538  func (m *mockListener) Close() (e error) {
 539  	close(m.provideConn)
 540  	return nil
 541  }
 542  
 543  // Addr returns the address the mock listener was configured with. This is part of the net.Receiver interface.
 544  func (m *mockListener) Addr() net.Addr {
 545  	return &mockAddr{"tcp", m.localAddr}
 546  }
 547  
 548  // Connect fakes a connection to the mock listener from the provided remote address. It will cause the Accept function
 549  // to return a mock connection configured with the provided remote address and the local address for the mock listener.
 550  func (m *mockListener) Connect(ip string, port int) {
 551  	m.provideConn <- &mockConn{
 552  		laddr: m.localAddr,
 553  		lnet:  "tcp",
 554  		rAddr: &net.TCPAddr{
 555  			IP:   net.ParseIP(ip),
 556  			Port: port,
 557  		},
 558  	}
 559  }
 560  
 561  // newMockListener returns a new mock listener for the provided local address and port. No ports are actually opened.
 562  func newMockListener(localAddr string) *mockListener {
 563  	return &mockListener{
 564  		localAddr:   localAddr,
 565  		provideConn: make(chan net.Conn),
 566  	}
 567  }
 568  
 569  // TestListeners ensures providing listeners to the connection manager along with an accept callback works properly.
 570  func TestListeners(t *testing.T) {
 571  	// Setup a connection manager with a couple of mock listeners that notify a channel when they receive mock
 572  	// connections.
 573  	receivedConns := make(chan net.Conn)
 574  	listener1 := newMockListener("127.0.0.1:11047")
 575  	listener2 := newMockListener("127.0.0.1:9333")
 576  	listeners := []net.Listener{listener1, listener2}
 577  	cmgr, e := New(&Config{
 578  		Listeners: listeners,
 579  		OnAccept: func(conn net.Conn) {
 580  			receivedConns <- conn
 581  		},
 582  		Dial: mockDialer,
 583  	},
 584  	)
 585  	if e != nil {
 586  		t.Fatalf("New error: %v", e)
 587  	}
 588  	cmgr.Start()
 589  	// Fake a couple of mock connections to each of the listeners.
 590  	go func() {
 591  		for i, listener := range listeners {
 592  			l := listener.(*mockListener)
 593  			l.Connect("127.0.0.1", 10000+i*2)
 594  			l.Connect("127.0.0.1", 10000+i*2+1)
 595  		}
 596  	}()
 597  	// Tally the receive connections to ensure the expected number are received. Also, fail the test after a timeout so
 598  	// it will not hang forever should the test not work.
 599  	expectedNumConns := len(listeners) * 2
 600  	var numConns int
 601  out:
 602  	for {
 603  		select {
 604  		case <-receivedConns:
 605  			numConns++
 606  			if numConns == expectedNumConns {
 607  				break out
 608  			}
 609  		case <-time.After(time.Millisecond * 50):
 610  			t.Fatalf("Timeout waiting for %d expected connections",
 611  				expectedNumConns,
 612  			)
 613  		}
 614  	}
 615  	cmgr.Stop()
 616  	cmgr.Wait()
 617  }
 618