connmanager_test.go raw
1 package connmgr
2
3 import (
4 "errors"
5 "fmt"
6 "io"
7 "net"
8 "sync/atomic"
9 "testing"
10 "time"
11
12 "github.com/p9c/p9/pkg/qu"
13 )
14
15 func init() {
16
17 // Override the max retry duration when running tests.
18 maxRetryDuration = 2 * time.Millisecond
19 }
20
21 // mockAddr mocks a network address
22 type mockAddr struct {
23 net, address string
24 }
25
26 func (m mockAddr) Network() string { return m.net }
27 func (m mockAddr) String() string { return m.address }
28
29 // mockConn mocks a network connection by implementing the net.Conn interface.
30 type mockConn struct {
31 io.Reader
32 io.Writer
33 io.Closer
34 // local network, address for the connection.
35 lnet, laddr string
36 // remote network, address for the connection.
37 rAddr net.Addr
38 }
39
40 // LocalAddr returns the local address for the connection.
41 func (c mockConn) LocalAddr() net.Addr {
42 return &mockAddr{c.lnet, c.laddr}
43 }
44
45 // RemoteAddr returns the remote address for the connection.
46 func (c mockConn) RemoteAddr() net.Addr {
47 return &mockAddr{c.rAddr.Network(), c.rAddr.String()}
48 }
49
50 // Close handles closing the connection.
51 func (c mockConn) Close() (e error) {
52 return nil
53 }
54 func (c mockConn) SetDeadline(t time.Time) error { return nil }
55 func (c mockConn) SetReadDeadline(t time.Time) error { return nil }
56 func (c mockConn) SetWriteDeadline(t time.Time) (e error) { return nil }
57
58 // mockDialer mocks the net.Dial interface by returning a mock connection to the given address.
59 func mockDialer(addr net.Addr) (net.Conn, error) {
60 r, w := io.Pipe()
61 c := &mockConn{rAddr: addr}
62 c.Reader = r
63 c.Writer = w
64 return c, nil
65 }
66
67 // TestNewConfig tests that new ConnManager config is validated as expected.
68 func TestNewConfig(t *testing.T) {
69 _, e := New(&Config{})
70 if e == nil {
71 t.Fatalf("New expected error: 'Dial can't be nil', got nil")
72 }
73 _, e = New(&Config{
74 Dial: mockDialer,
75 },
76 )
77 if e != nil {
78 t.Fatalf("New unexpected error: %v", e)
79 }
80 }
81
82 // TestStartStop tests that the connection manager starts and stops as expected.
83 func TestStartStop(t *testing.T) {
84 connected := make(chan *ConnReq)
85 disconnected := make(chan *ConnReq)
86 cmgr, e := New(&Config{
87 TargetOutbound: 1,
88 GetNewAddress: func() (net.Addr, error) {
89 return &net.TCPAddr{
90 IP: net.ParseIP("127.0.0.1"),
91 Port: 18555,
92 }, nil
93 },
94 Dial: mockDialer,
95 OnConnection: func(c *ConnReq, conn net.Conn) {
96 connected <- c
97 },
98 OnDisconnection: func(c *ConnReq) {
99 disconnected <- c
100 },
101 },
102 )
103 if e != nil {
104 t.Fatalf("New error: %v", e)
105 }
106 cmgr.Start()
107 gotConnReq := <-connected
108 cmgr.Stop()
109 // already stopped
110 cmgr.Stop()
111 // ignored
112 cr := &ConnReq{
113 Addr: &net.TCPAddr{
114 IP: net.ParseIP("127.0.0.1"),
115 Port: 18555,
116 },
117 Permanent: true,
118 }
119 cmgr.Connect(cr)
120 if cr.ID() != 0 {
121 t.Fatalf("start/stop: got id: %v, want: 0", cr.ID())
122 }
123 cmgr.Disconnect(gotConnReq.ID())
124 cmgr.Remove(gotConnReq.ID())
125 select {
126 case <-disconnected:
127 t.Fatalf("start/stop: unexpected disconnection")
128 case <-time.Tick(10 * time.Millisecond):
129 break
130 }
131 }
132
133 // TestConnectMode tests that the connection manager works in the connect mode. In connect mode, automatic connections
134 // are disabled, so we test that requests using Connect are handled and that no other connections are made.
135 func TestConnectMode(t *testing.T) {
136 connected := make(chan *ConnReq)
137 cmgr, e := New(&Config{
138 TargetOutbound: 2,
139 Dial: mockDialer,
140 OnConnection: func(c *ConnReq, conn net.Conn) {
141 connected <- c
142 },
143 },
144 )
145 if e != nil {
146 t.Fatalf("New error: %v", e)
147 }
148 cr := &ConnReq{
149 Addr: &net.TCPAddr{
150 IP: net.ParseIP("127.0.0.1"),
151 Port: 18555,
152 },
153 Permanent: true,
154 }
155 cmgr.Start()
156 cmgr.Connect(cr)
157 gotConnReq := <-connected
158 wantID := cr.ID()
159 gotID := gotConnReq.ID()
160 if gotID != wantID {
161 t.Fatalf("connect mode: %v - want ID %v, got ID %v", cr.Addr, wantID, gotID)
162 }
163 gotState := cr.State()
164 wantState := ConnEstablished
165 if gotState != wantState {
166 t.Fatalf("connect mode: %v - want state %v, got state %v", cr.Addr, wantState, gotState)
167 }
168 select {
169 case c := <-connected:
170 t.Fatalf("connect mode: got unexpected connection - %v", c.Addr)
171 case <-time.After(time.Millisecond):
172 break
173 }
174 cmgr.Stop()
175 }
176
177 // TestTargetOutbound tests the target number of outbound connections. We wait until all connections are established,
178 // then test they there are the only connections made.
179 func TestTargetOutbound(t *testing.T) {
180 targetOutbound := uint32(10)
181 connected := make(chan *ConnReq)
182 cmgr, e := New(&Config{
183 TargetOutbound: targetOutbound,
184 Dial: mockDialer,
185 GetNewAddress: func() (net.Addr, error) {
186 return &net.TCPAddr{
187 IP: net.ParseIP("127.0.0.1"),
188 Port: 18555,
189 }, nil
190 },
191 OnConnection: func(c *ConnReq, conn net.Conn) {
192 connected <- c
193 },
194 },
195 )
196 if e != nil {
197 t.Fatalf("New error: %v", e)
198 }
199 cmgr.Start()
200 for i := uint32(0); i < targetOutbound; i++ {
201 <-connected
202 }
203 select {
204 case c := <-connected:
205 t.Fatalf("target outbound: got unexpected connection - %v", c.Addr)
206 case <-time.After(time.Millisecond):
207 break
208 }
209 cmgr.Stop()
210 }
211
212 // TestRetryPermanent tests that permanent connection requests are retried. We make a permanent connection request using
213 // Connect, disconnect it using Disconnect and we wait for it to be connected back.
214 func TestRetryPermanent(t *testing.T) {
215 connected := make(chan *ConnReq)
216 disconnected := make(chan *ConnReq)
217 cmgr, e := New(&Config{
218 RetryDuration: time.Millisecond,
219 TargetOutbound: 1,
220 Dial: mockDialer,
221 OnConnection: func(c *ConnReq, conn net.Conn) {
222 connected <- c
223 },
224 OnDisconnection: func(c *ConnReq) {
225 disconnected <- c
226 },
227 },
228 )
229 if e != nil {
230 t.Fatalf("New error: %v", e)
231 }
232 cr := &ConnReq{
233 Addr: &net.TCPAddr{
234 IP: net.ParseIP("127.0.0.1"),
235 Port: 18555,
236 },
237 Permanent: true,
238 }
239 go cmgr.Connect(cr)
240 cmgr.Start()
241 gotConnReq := <-connected
242 wantID := cr.ID()
243 gotID := gotConnReq.ID()
244 if gotID != wantID {
245 t.Fatalf("retry: %v - want ID %v, got ID %v", cr.Addr, wantID, gotID)
246 }
247 gotState := cr.State()
248 wantState := ConnEstablished
249 if gotState != wantState {
250 t.Fatalf("retry: %v - want state %v, got state %v", cr.Addr, wantState, gotState)
251 }
252 cmgr.Disconnect(cr.ID())
253 gotConnReq = <-disconnected
254 wantID = cr.ID()
255 gotID = gotConnReq.ID()
256 if gotID != wantID {
257 t.Fatalf("retry: %v - want ID %v, got ID %v", cr.Addr, wantID, gotID)
258 }
259 gotState = cr.State()
260 wantState = ConnPending
261 if gotState != wantState {
262 t.Fatalf("retry: %v - want state %v, got state %v", cr.Addr, wantState, gotState)
263 }
264 gotConnReq = <-connected
265 wantID = cr.ID()
266 gotID = gotConnReq.ID()
267 if gotID != wantID {
268 t.Fatalf("retry: %v - want ID %v, got ID %v", cr.Addr, wantID, gotID)
269 }
270 gotState = cr.State()
271 wantState = ConnEstablished
272 if gotState != wantState {
273 t.Fatalf("retry: %v - want state %v, got state %v", cr.Addr, wantState, gotState)
274 }
275 cmgr.Remove(cr.ID())
276 gotConnReq = <-disconnected
277 wantID = cr.ID()
278 gotID = gotConnReq.ID()
279 if gotID != wantID {
280 t.Fatalf("retry: %v - want ID %v, got ID %v", cr.Addr, wantID, gotID)
281 }
282 gotState = cr.State()
283 wantState = ConnDisconnected
284 if gotState != wantState {
285 t.Fatalf("retry: %v - want state %v, got state %v", cr.Addr, wantState, gotState)
286 }
287 cmgr.Stop()
288 }
289
290 // TestMaxRetryDuration tests the maximum retry duration. We have a timed dialer which initially returns err but after
291 // RetryDuration hits maxRetryDuration returns a mock conn.
292 func TestMaxRetryDuration(t *testing.T) {
293 networkUp := qu.T()
294 time.AfterFunc(5*time.Millisecond, func() {
295 networkUp.Q()
296 },
297 )
298 timedDialer := func(addr net.Addr) (net.Conn, error) {
299 select {
300 case <-networkUp.Wait():
301 return mockDialer(addr)
302 default:
303 return nil, errors.New("network down")
304 }
305 }
306 connected := make(chan *ConnReq)
307 cmgr, e := New(&Config{
308 RetryDuration: time.Millisecond,
309 TargetOutbound: 1,
310 Dial: timedDialer,
311 OnConnection: func(c *ConnReq, conn net.Conn) {
312 connected <- c
313 },
314 },
315 )
316 if e != nil {
317 t.Fatalf("New error: %v", e)
318 }
319 cr := &ConnReq{
320 Addr: &net.TCPAddr{
321 IP: net.ParseIP("127.0.0.1"),
322 Port: 18555,
323 },
324 Permanent: true,
325 }
326 go cmgr.Connect(cr)
327 cmgr.Start()
328 // retry in 1ms
329 // retry in 2ms - max retry duration reached
330 // retry in 2ms - timedDialer returns mockDial
331 select {
332 case <-connected:
333 case <-time.Tick(100 * time.Millisecond):
334 t.Fatalf("max retry duration: connection timeout")
335 }
336 }
337
338 // TestNetworkFailure tests that the connection manager handles a network failure gracefully.
339 func TestNetworkFailure(t *testing.T) {
340 var dials uint32
341 errDialer := func(net net.Addr) (net.Conn, error) {
342 atomic.AddUint32(&dials, 1)
343 return nil, errors.New("network down")
344 }
345 cmgr, e := New(&Config{
346 TargetOutbound: 5,
347 RetryDuration: 5 * time.Millisecond,
348 Dial: errDialer,
349 GetNewAddress: func() (net.Addr, error) {
350 return &net.TCPAddr{
351 IP: net.ParseIP("127.0.0.1"),
352 Port: 18555,
353 }, nil
354 },
355 OnConnection: func(c *ConnReq, conn net.Conn) {
356 t.Fatalf("network failure: got unexpected connection - %v", c.Addr)
357 },
358 },
359 )
360 if e != nil {
361 t.Fatalf("New error: %v", e)
362 }
363 cmgr.Start()
364 time.AfterFunc(10*time.Millisecond, cmgr.Stop)
365 cmgr.Wait()
366 wantMaxDials := uint32(75)
367 if atomic.LoadUint32(&dials) > wantMaxDials {
368 t.Fatalf("network failure: unexpected number of dials - got %v, want < %v",
369 atomic.LoadUint32(&dials), wantMaxDials,
370 )
371 }
372 }
373
374 // TestStopFailed tests that failed connections are ignored after connmgr is stopped. We have a dailer which sets the
375 // stop flag on the conn manager and returns an err so that the handler assumes that the conn manager is stopped and
376 // ignores the failure.
377 func TestStopFailed(t *testing.T) {
378 done := qu.Ts(1)
379 waitDialer := func(addr net.Addr) (net.Conn, error) {
380 done <- struct{}{}
381 time.Sleep(time.Millisecond)
382 return nil, errors.New("network down")
383 }
384 cmgr, e := New(&Config{
385 Dial: waitDialer,
386 },
387 )
388 if e != nil {
389 t.Fatalf("New error: %v", e)
390 }
391 cmgr.Start()
392 go func() {
393 <-done
394 atomic.StoreInt32(&cmgr.stop, 1)
395 time.Sleep(2 * time.Millisecond)
396 atomic.StoreInt32(&cmgr.stop, 0)
397 cmgr.Stop()
398 }()
399 cr := &ConnReq{
400 Addr: &net.TCPAddr{
401 IP: net.ParseIP("127.0.0.1"),
402 Port: 18555,
403 },
404 Permanent: true,
405 }
406 go cmgr.Connect(cr)
407 cmgr.Wait()
408 }
409
410 // TestRemovePendingConnection tests that it's possible to cancel a pending connection, removing its internal state from
411 // the ConnMgr.
412 func TestRemovePendingConnection(t *testing.T) {
413 // Create a ConnMgr instance with an instance of a dialer that'll never succeed.
414 wait := qu.T()
415 indefiniteDialer := func(addr net.Addr) (net.Conn, error) {
416 <-wait
417 return nil, fmt.Errorf("error")
418 }
419 cmgr, e := New(&Config{
420 Dial: indefiniteDialer,
421 },
422 )
423 if e != nil {
424 t.Fatalf("New error: %v", e)
425 }
426 cmgr.Start()
427 // Establish a connection request to a random IP we've chosen.
428 cr := &ConnReq{
429 Addr: &net.TCPAddr{
430 IP: net.ParseIP("127.0.0.1"),
431 Port: 18555,
432 },
433 Permanent: true,
434 }
435 go cmgr.Connect(cr)
436 time.Sleep(10 * time.Millisecond)
437 if cr.State() != ConnPending {
438 t.Fatalf("pending request hasn't been registered, status: %v",
439 cr.State(),
440 )
441 }
442 // The request launched above will actually never be able to establish a connection. So we'll cancel it _before_
443 // it's able to be completed.
444 cmgr.Remove(cr.ID())
445 time.Sleep(10 * time.Millisecond)
446 // Now examine the status of the connection request, it should read a
447 // status of failed.
448 if cr.State() != ConnCanceled {
449 t.Fatalf("request wasn't canceled, status is: %v", cr.State())
450 }
451 wait.Q()
452 cmgr.Stop()
453 }
454
455 // TestCancelIgnoreDelayedConnection tests that a canceled connection request will not execute the on connection
456 // callback, even if an outstanding retry succeeds.
457 func TestCancelIgnoreDelayedConnection(t *testing.T) {
458 retryTimeout := 10 * time.Millisecond
459 // Setup a dialer that will continue to return an error until the connect chan is signaled, the dial attempt
460 // immediately after will succeed in returning a connection.
461 connect := qu.T()
462 failingDialer := func(addr net.Addr) (net.Conn, error) {
463 select {
464 case <-connect.Wait():
465 return mockDialer(addr)
466 default:
467 }
468 return nil, fmt.Errorf("error")
469 }
470 connected := make(chan *ConnReq)
471 cmgr, e := New(&Config{
472 Dial: failingDialer,
473 RetryDuration: retryTimeout,
474 OnConnection: func(c *ConnReq, conn net.Conn) {
475 connected <- c
476 },
477 },
478 )
479 if e != nil {
480 t.Fatalf("New error: %v", e)
481 }
482 cmgr.Start()
483 defer cmgr.Stop()
484 // Establish a connection request to a random IP we've chosen.
485 cr := &ConnReq{
486 Addr: &net.TCPAddr{
487 IP: net.ParseIP("127.0.0.1"),
488 Port: 18555,
489 },
490 }
491 cmgr.Connect(cr)
492 // Allow for the first retry timeout to elapse.
493 time.Sleep(2 * retryTimeout)
494 // Connection be marked as failed, even after reattempting to
495 // connect.
496 if cr.State() != ConnFailing {
497 t.Fatalf("failing request should have status failed, status: %v",
498 cr.State(),
499 )
500 }
501 // Remove the connection, and then immediately allow the next connection to succeed.
502 cmgr.Remove(cr.ID())
503 connect.Q()
504 // Allow the connection manager to process the removal.
505 time.Sleep(5 * time.Millisecond)
506 // Now examine the status of the connection request, it should read a status of canceled.
507 if cr.State() != ConnCanceled {
508 t.Fatalf("request wasn't canceled, status is: %v", cr.State())
509 }
510 // Finally, the connection manager should not signal the on-connection callback, since we explicitly canceled this
511 // request. We give a generous window to ensure the connection manager's lienar backoff is allowed to properly
512 // elapse.
513 select {
514 case <-connected:
515 t.Fatalf("on-connect should not be called for canceled req")
516 case <-time.After(5 * retryTimeout):
517 }
518 }
519
520 // mockListener implements the net.Receiver interface and is used to test code that deals with net.P2PListeners without
521 // having to actually make any real connections.
522 type mockListener struct {
523 localAddr string
524 provideConn chan net.Conn
525 }
526
527 // Accept returns a mock connection when it receives a signal via the Connect function. This is part of the net.Receiver
528 // interface.
529 func (m *mockListener) Accept() (net.Conn, error) {
530 for conn := range m.provideConn {
531 return conn, nil
532 }
533 return nil, errors.New("network connection closed")
534 }
535
536 // Close closes the mock listener which will cause any blocked Accept operations to be unblocked and return errors. This
537 // is part of the net.Receiver interface.
538 func (m *mockListener) Close() (e error) {
539 close(m.provideConn)
540 return nil
541 }
542
543 // Addr returns the address the mock listener was configured with. This is part of the net.Receiver interface.
544 func (m *mockListener) Addr() net.Addr {
545 return &mockAddr{"tcp", m.localAddr}
546 }
547
548 // Connect fakes a connection to the mock listener from the provided remote address. It will cause the Accept function
549 // to return a mock connection configured with the provided remote address and the local address for the mock listener.
550 func (m *mockListener) Connect(ip string, port int) {
551 m.provideConn <- &mockConn{
552 laddr: m.localAddr,
553 lnet: "tcp",
554 rAddr: &net.TCPAddr{
555 IP: net.ParseIP(ip),
556 Port: port,
557 },
558 }
559 }
560
561 // newMockListener returns a new mock listener for the provided local address and port. No ports are actually opened.
562 func newMockListener(localAddr string) *mockListener {
563 return &mockListener{
564 localAddr: localAddr,
565 provideConn: make(chan net.Conn),
566 }
567 }
568
569 // TestListeners ensures providing listeners to the connection manager along with an accept callback works properly.
570 func TestListeners(t *testing.T) {
571 // Setup a connection manager with a couple of mock listeners that notify a channel when they receive mock
572 // connections.
573 receivedConns := make(chan net.Conn)
574 listener1 := newMockListener("127.0.0.1:11047")
575 listener2 := newMockListener("127.0.0.1:9333")
576 listeners := []net.Listener{listener1, listener2}
577 cmgr, e := New(&Config{
578 Listeners: listeners,
579 OnAccept: func(conn net.Conn) {
580 receivedConns <- conn
581 },
582 Dial: mockDialer,
583 },
584 )
585 if e != nil {
586 t.Fatalf("New error: %v", e)
587 }
588 cmgr.Start()
589 // Fake a couple of mock connections to each of the listeners.
590 go func() {
591 for i, listener := range listeners {
592 l := listener.(*mockListener)
593 l.Connect("127.0.0.1", 10000+i*2)
594 l.Connect("127.0.0.1", 10000+i*2+1)
595 }
596 }()
597 // Tally the receive connections to ensure the expected number are received. Also, fail the test after a timeout so
598 // it will not hang forever should the test not work.
599 expectedNumConns := len(listeners) * 2
600 var numConns int
601 out:
602 for {
603 select {
604 case <-receivedConns:
605 numConns++
606 if numConns == expectedNumConns {
607 break out
608 }
609 case <-time.After(time.Millisecond * 50):
610 t.Fatalf("Timeout waiting for %d expected connections",
611 expectedNumConns,
612 )
613 }
614 }
615 cmgr.Stop()
616 cmgr.Wait()
617 }
618