peer_test.go raw
1 package peer_test
2
3 import (
4 "errors"
5 "github.com/p9c/p9/pkg/chaincfg"
6 "io"
7 "net"
8 "strconv"
9 "testing"
10 "time"
11
12 "github.com/p9c/p9/pkg/qu"
13
14 "github.com/btcsuite/go-socks/socks"
15
16 "github.com/p9c/p9/pkg/chainhash"
17 "github.com/p9c/p9/pkg/peer"
18 "github.com/p9c/p9/pkg/wire"
19 )
20
21 // conn mocks a network connection by implementing the net.Conn interface. It is used to test peer connection without
22 // actually opening a network connection.
23 type conn struct {
24 io.Reader
25 io.Writer
26 io.Closer
27 // local network, address for the connection.
28 lnet, laddr string
29 // remote network, address for the connection.
30 rnet, raddr string
31 // mocks socks proxy if true
32 proxy bool
33 }
34
35 // LocalAddr returns the local address for the connection.
36 func (c conn) LocalAddr() net.Addr {
37 return &addr{c.lnet, c.laddr}
38 }
39
40 // Remote returns the remote address for the connection.
41 func (c conn) RemoteAddr() net.Addr {
42 if !c.proxy {
43 return &addr{c.rnet, c.raddr}
44 }
45 host, strPort, _ := net.SplitHostPort(c.raddr)
46 port, _ := strconv.Atoi(strPort)
47 return &socks.ProxiedAddr{
48 Net: c.rnet,
49 Host: host,
50 Port: port,
51 }
52 }
53
54 // Close handles closing the connection.
55 func (c conn) Close() (e error) {
56 if c.Closer == nil {
57 return nil
58 }
59 return c.Closer.Close()
60 }
61 func (c conn) SetDeadline(t time.Time) error { return nil }
62 func (c conn) SetReadDeadline(t time.Time) error { return nil }
63 func (c conn) SetWriteDeadline(t time.Time) (e error) { return nil }
64
65 // addr mocks a network address
66 type addr struct {
67 net, address string
68 }
69
70 func (m addr) Network() string { return m.net }
71 func (m addr) String() string { return m.address }
72
73 // pipe turns two mock connections into a full-duplex connection similar to net.Pipe to allow pipe's with (fake)
74 // addresses.
75 func pipe(c1, c2 *conn) (*conn, *conn) {
76 r1, w1 := io.Pipe()
77 r2, w2 := io.Pipe()
78 c1.Writer = w1
79 c1.Closer = w1
80 c2.Reader = r1
81 c1.Reader = r2
82 c2.Writer = w2
83 c2.Closer = w2
84 return c1, c2
85 }
86
87 // peerStats holds the expected peer stats used for testing peer.
88 type peerStats struct {
89 wantUserAgent string
90 wantServices wire.ServiceFlag
91 wantProtocolVersion uint32
92 wantConnected bool
93 wantVersionKnown bool
94 wantVerAckReceived bool
95 wantLastBlock int32
96 wantStartingHeight int32
97 wantLastPingTime time.Time
98 wantLastPingNonce uint64
99 wantLastPingMicros int64
100 wantTimeOffset int64
101 wantBytesSent uint64
102 wantBytesReceived uint64
103 wantWitnessEnabled bool
104 }
105
106 // testPeer tests the given peer's flags and stats
107 func testPeer(t *testing.T, p *peer.Peer, s peerStats) {
108 if p.UserAgent() != s.wantUserAgent {
109 t.Errorf("testPeer: wrong UserAgent - got %v, want %v", p.UserAgent(), s.wantUserAgent)
110 return
111 }
112 if p.Services() != s.wantServices {
113 t.Errorf("testPeer: wrong Services - got %v, want %v", p.Services(), s.wantServices)
114 return
115 }
116 if !p.LastPingTime().Equal(s.wantLastPingTime) {
117 t.Errorf("testPeer: wrong LastPingTime - got %v, want %v", p.LastPingTime(), s.wantLastPingTime)
118 return
119 }
120 if p.LastPingNonce() != s.wantLastPingNonce {
121 t.Errorf("testPeer: wrong LastPingNonce - got %v, want %v", p.LastPingNonce(), s.wantLastPingNonce)
122 return
123 }
124 if p.LastPingMicros() != s.wantLastPingMicros {
125 t.Errorf("testPeer: wrong LastPingMicros - got %v, want %v", p.LastPingMicros(), s.wantLastPingMicros)
126 return
127 }
128 if p.VerAckReceived() != s.wantVerAckReceived {
129 t.Errorf("testPeer: wrong VerAckReceived - got %v, want %v", p.VerAckReceived(), s.wantVerAckReceived)
130 return
131 }
132 if p.VersionKnown() != s.wantVersionKnown {
133 t.Errorf("testPeer: wrong VersionKnown - got %v, want %v", p.VersionKnown(), s.wantVersionKnown)
134 return
135 }
136 if p.ProtocolVersion() != s.wantProtocolVersion {
137 t.Errorf("testPeer: wrong ProtocolVersion - got %v, want %v", p.ProtocolVersion(), s.wantProtocolVersion)
138 return
139 }
140 if p.LastBlock() != s.wantLastBlock {
141 t.Errorf("testPeer: wrong LastBlock - got %v, want %v", p.LastBlock(), s.wantLastBlock)
142 return
143 }
144 // Allow for a deviation of 1s, as the second may tick when the message is in transit and the protocol doesn't support any further precision.
145 if p.TimeOffset() != s.wantTimeOffset && p.TimeOffset() != s.wantTimeOffset-1 {
146 t.Errorf(
147 "testPeer: wrong TimeOffset - got %v, want %v or %v", p.TimeOffset(),
148 s.wantTimeOffset, s.wantTimeOffset-1,
149 )
150 return
151 }
152 if p.BytesSent() != s.wantBytesSent {
153 t.Errorf("testPeer: wrong BytesSent - got %v, want %v", p.BytesSent(), s.wantBytesSent)
154 return
155 }
156 if p.BytesReceived() != s.wantBytesReceived {
157 t.Errorf("testPeer: wrong BytesReceived - got %v, want %v", p.BytesReceived(), s.wantBytesReceived)
158 return
159 }
160 if p.StartingHeight() != s.wantStartingHeight {
161 t.Errorf("testPeer: wrong StartingHeight - got %v, want %v", p.StartingHeight(), s.wantStartingHeight)
162 return
163 }
164 if p.Connected() != s.wantConnected {
165 t.Errorf("testPeer: wrong Connected - got %v, want %v", p.Connected(), s.wantConnected)
166 return
167 }
168 if p.IsWitnessEnabled() != s.wantWitnessEnabled {
169 t.Errorf(
170 "testPeer: wrong WitnessEnabled - got %v, want %v",
171 p.IsWitnessEnabled(), s.wantWitnessEnabled,
172 )
173 return
174 }
175 stats := p.StatsSnapshot()
176 if p.ID() != stats.ID {
177 t.Errorf("testPeer: wrong ID - got %v, want %v", p.ID(), stats.ID)
178 return
179 }
180 if p.Addr() != stats.Addr {
181 t.Errorf("testPeer: wrong Addr - got %v, want %v", p.Addr(), stats.Addr)
182 return
183 }
184 if p.LastSend() != stats.LastSend {
185 t.Errorf("testPeer: wrong LastSend - got %v, want %v", p.LastSend(), stats.LastSend)
186 return
187 }
188 if p.LastRecv() != stats.LastRecv {
189 t.Errorf("testPeer: wrong LastRecv - got %v, want %v", p.LastRecv(), stats.LastRecv)
190 return
191 }
192 }
193
194 // TestPeerConnection tests connection between inbound and outbound peers.
195 func TestPeerConnection(t *testing.T) {
196 verack := qu.T()
197 peer1Cfg := &peer.Config{
198 Listeners: peer.MessageListeners{
199 OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
200 verack <- struct{}{}
201 },
202 OnWrite: func(
203 p *peer.Peer, bytesWritten int, msg wire.Message,
204 e error,
205 ) {
206 if _, ok := msg.(*wire.MsgVerAck); ok {
207 verack <- struct{}{}
208 }
209 },
210 },
211 UserAgentName: "peer",
212 UserAgentVersion: "1.0",
213 UserAgentComments: []string{"comment"},
214 ChainParams: &chaincfg.MainNetParams,
215 ProtocolVersion: wire.RejectVersion, // Configure with older version
216 Services: 0,
217 TrickleInterval: time.Second * 10,
218 }
219 peer2Cfg := &peer.Config{
220 Listeners: peer1Cfg.Listeners,
221 UserAgentName: "peer",
222 UserAgentVersion: "1.0",
223 UserAgentComments: []string{"comment"},
224 ChainParams: &chaincfg.MainNetParams,
225 Services: wire.SFNodeNetwork, // | wire.SFNodeWitness,
226 TrickleInterval: time.Second * 10,
227 }
228 wantStats1 := peerStats{
229 wantUserAgent: wire.DefaultUserAgent + "peer:1.0(comment)/",
230 wantServices: 0,
231 wantProtocolVersion: wire.RejectVersion,
232 wantConnected: true,
233 wantVersionKnown: true,
234 wantVerAckReceived: true,
235 wantLastPingTime: time.Time{},
236 wantLastPingNonce: uint64(0),
237 wantLastPingMicros: int64(0),
238 wantTimeOffset: int64(0),
239 wantBytesSent: 167, // 143 version + 24 verack
240 wantBytesReceived: 167,
241 wantWitnessEnabled: false,
242 }
243 wantStats2 := peerStats{
244 wantUserAgent: wire.DefaultUserAgent + "peer:1.0(comment)/",
245 wantServices: wire.SFNodeNetwork, // | wire.SFNodeWitness,
246 wantProtocolVersion: wire.RejectVersion,
247 wantConnected: true,
248 wantVersionKnown: true,
249 wantVerAckReceived: true,
250 wantLastPingTime: time.Time{},
251 wantLastPingNonce: uint64(0),
252 wantLastPingMicros: int64(0),
253 wantTimeOffset: int64(0),
254 wantBytesSent: 167, // 143 version + 24 verack
255 wantBytesReceived: 167,
256 wantWitnessEnabled: true,
257 }
258 tests := []struct {
259 name string
260 setup func() (*peer.Peer, *peer.Peer, error)
261 }{
262 {
263 "basic handshake",
264 func() (*peer.Peer, *peer.Peer, error) {
265 inConn, outConn := pipe(
266 &conn{raddr: "10.0.0.1:11047"},
267 &conn{raddr: "10.0.0.2:11047"},
268 )
269 inPeer := peer.NewInboundPeer(peer1Cfg)
270 inPeer.AssociateConnection(inConn)
271 outPeer, e := peer.NewOutboundPeer(peer2Cfg, "10.0.0.2:11047")
272 if e != nil {
273 return nil, nil, e
274 }
275 outPeer.AssociateConnection(outConn)
276 for i := 0; i < 4; i++ {
277 select {
278 case <-verack.Wait():
279 case <-time.After(time.Second):
280 return nil, nil, errors.New("verack timeout")
281 }
282 }
283 return inPeer, outPeer, nil
284 },
285 },
286 {
287 "socks proxy",
288 func() (*peer.Peer, *peer.Peer, error) {
289 inConn, outConn := pipe(
290 &conn{raddr: "10.0.0.1:11047", proxy: true},
291 &conn{raddr: "10.0.0.2:11047"},
292 )
293 inPeer := peer.NewInboundPeer(peer1Cfg)
294 inPeer.AssociateConnection(inConn)
295 outPeer, e := peer.NewOutboundPeer(peer2Cfg, "10.0.0.2:11047")
296 if e != nil {
297 return nil, nil, e
298 }
299 outPeer.AssociateConnection(outConn)
300 for i := 0; i < 4; i++ {
301 select {
302 case <-verack.Wait():
303 case <-time.After(time.Second):
304 return nil, nil, errors.New("verack timeout")
305 }
306 }
307 return inPeer, outPeer, nil
308 },
309 },
310 }
311 t.Logf("Running %d tests", len(tests))
312 for i, test := range tests {
313 inPeer, outPeer, e := test.setup()
314 if e != nil {
315 t.Errorf("TestPeerConnection setup #%d: unexpected err %v", i, e)
316 return
317 }
318 testPeer(t, inPeer, wantStats2)
319 testPeer(t, outPeer, wantStats1)
320 inPeer.Disconnect()
321 outPeer.Disconnect()
322 inPeer.WaitForDisconnect()
323 outPeer.WaitForDisconnect()
324 }
325 }
326
327 // TestPeerListeners tests that the peer listeners are called as expected.
328 func TestPeerListeners(t *testing.T) {
329 verack := qu.Ts(1)
330 ok := make(chan wire.Message, 20)
331 peerCfg := &peer.Config{
332 Listeners: peer.MessageListeners{
333 OnGetAddr: func(p *peer.Peer, msg *wire.MsgGetAddr) {
334 ok <- msg
335 },
336 OnAddr: func(p *peer.Peer, msg *wire.MsgAddr) {
337 ok <- msg
338 },
339 OnPing: func(p *peer.Peer, msg *wire.MsgPing) {
340 ok <- msg
341 },
342 OnPong: func(p *peer.Peer, msg *wire.MsgPong) {
343 ok <- msg
344 },
345 OnAlert: func(p *peer.Peer, msg *wire.MsgAlert) {
346 ok <- msg
347 },
348 OnMemPool: func(p *peer.Peer, msg *wire.MsgMemPool) {
349 ok <- msg
350 },
351 OnTx: func(p *peer.Peer, msg *wire.MsgTx) {
352 ok <- msg
353 },
354 OnBlock: func(p *peer.Peer, msg *wire.Block, buf []byte) {
355 ok <- msg
356 },
357 OnInv: func(p *peer.Peer, msg *wire.MsgInv) {
358 ok <- msg
359 },
360 OnHeaders: func(p *peer.Peer, msg *wire.MsgHeaders) {
361 ok <- msg
362 },
363 OnNotFound: func(p *peer.Peer, msg *wire.MsgNotFound) {
364 ok <- msg
365 },
366 OnGetData: func(p *peer.Peer, msg *wire.MsgGetData) {
367 ok <- msg
368 },
369 OnGetBlocks: func(p *peer.Peer, msg *wire.MsgGetBlocks) {
370 ok <- msg
371 },
372 OnGetHeaders: func(p *peer.Peer, msg *wire.MsgGetHeaders) {
373 ok <- msg
374 },
375 OnGetCFilters: func(p *peer.Peer, msg *wire.MsgGetCFilters) {
376 ok <- msg
377 },
378 OnGetCFHeaders: func(p *peer.Peer, msg *wire.MsgGetCFHeaders) {
379 ok <- msg
380 },
381 OnGetCFCheckpt: func(p *peer.Peer, msg *wire.MsgGetCFCheckpt) {
382 ok <- msg
383 },
384 OnCFilter: func(p *peer.Peer, msg *wire.MsgCFilter) {
385 ok <- msg
386 },
387 OnCFHeaders: func(p *peer.Peer, msg *wire.MsgCFHeaders) {
388 ok <- msg
389 },
390 OnFeeFilter: func(p *peer.Peer, msg *wire.MsgFeeFilter) {
391 ok <- msg
392 },
393 OnFilterAdd: func(p *peer.Peer, msg *wire.MsgFilterAdd) {
394 ok <- msg
395 },
396 OnFilterClear: func(p *peer.Peer, msg *wire.MsgFilterClear) {
397 ok <- msg
398 },
399 OnFilterLoad: func(p *peer.Peer, msg *wire.MsgFilterLoad) {
400 ok <- msg
401 },
402 OnMerkleBlock: func(p *peer.Peer, msg *wire.MsgMerkleBlock) {
403 ok <- msg
404 },
405 OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgReject {
406 ok <- msg
407 return nil
408 },
409 OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
410 verack <- struct{}{}
411 },
412 OnReject: func(p *peer.Peer, msg *wire.MsgReject) {
413 ok <- msg
414 },
415 OnSendHeaders: func(p *peer.Peer, msg *wire.MsgSendHeaders) {
416 ok <- msg
417 },
418 },
419 UserAgentName: "peer",
420 UserAgentVersion: "1.0",
421 UserAgentComments: []string{"comment"},
422 ChainParams: &chaincfg.MainNetParams,
423 Services: wire.SFNodeBloom,
424 TrickleInterval: time.Second * 10,
425 }
426 inConn, outConn := pipe(
427 &conn{raddr: "10.0.0.1:11047"},
428 &conn{raddr: "10.0.0.2:11047"},
429 )
430 inPeer := peer.NewInboundPeer(peerCfg)
431 inPeer.AssociateConnection(inConn)
432 peerCfg.Listeners = peer.MessageListeners{
433 OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
434 verack <- struct{}{}
435 },
436 }
437 outPeer, e := peer.NewOutboundPeer(peerCfg, "10.0.0.1:11047")
438 if e != nil {
439 t.Errorf("NewOutboundPeer: unexpected err %v\n", e)
440 return
441 }
442 outPeer.AssociateConnection(outConn)
443 for i := 0; i < 2; i++ {
444 select {
445 case <-verack.Wait():
446 case <-time.After(time.Second * 1):
447 t.Errorf("TestPeerListeners: verack timeout\n")
448 return
449 }
450 }
451 tests := []struct {
452 listener string
453 msg wire.Message
454 }{
455 {
456 "OnGetAddr",
457 wire.NewMsgGetAddr(),
458 },
459 {
460 "OnAddr",
461 wire.NewMsgAddr(),
462 },
463 {
464 "OnPing",
465 wire.NewMsgPing(42),
466 },
467 {
468 "OnPong",
469 wire.NewMsgPong(42),
470 },
471 {
472 "OnAlert",
473 wire.NewMsgAlert([]byte("payload"), []byte("signature")),
474 },
475 {
476 "OnMemPool",
477 wire.NewMsgMemPool(),
478 },
479 {
480 "OnTx",
481 wire.NewMsgTx(wire.TxVersion),
482 },
483 {
484 "OnBlock",
485 wire.NewMsgBlock(
486 wire.NewBlockHeader(
487 1,
488 &chainhash.Hash{}, &chainhash.Hash{}, 1, 1,
489 ),
490 ),
491 },
492 {
493 "OnInv",
494 wire.NewMsgInv(),
495 },
496 {
497 "OnHeaders",
498 wire.NewMsgHeaders(),
499 },
500 {
501 "OnNotFound",
502 wire.NewMsgNotFound(),
503 },
504 {
505 "OnGetData",
506 wire.NewMsgGetData(),
507 },
508 {
509 "OnGetBlocks",
510 wire.NewMsgGetBlocks(&chainhash.Hash{}),
511 },
512 {
513 "OnGetHeaders",
514 wire.NewMsgGetHeaders(),
515 },
516 {
517 "OnGetCFilters",
518 wire.NewMsgGetCFilters(wire.GCSFilterRegular, 0, &chainhash.Hash{}),
519 },
520 {
521 "OnGetCFHeaders",
522 wire.NewMsgGetCFHeaders(wire.GCSFilterRegular, 0, &chainhash.Hash{}),
523 },
524 {
525 "OnGetCFCheckpt",
526 wire.NewMsgGetCFCheckpt(wire.GCSFilterRegular, &chainhash.Hash{}),
527 },
528 {
529 "OnCFilter",
530 wire.NewMsgCFilter(
531 wire.GCSFilterRegular, &chainhash.Hash{},
532 []byte("payload"),
533 ),
534 },
535 {
536 "OnCFHeaders",
537 wire.NewMsgCFHeaders(),
538 },
539 {
540 "OnFeeFilter",
541 wire.NewMsgFeeFilter(15000),
542 },
543 {
544 "OnFilterAdd",
545 wire.NewMsgFilterAdd([]byte{0x01}),
546 },
547 {
548 "OnFilterClear",
549 wire.NewMsgFilterClear(),
550 },
551 {
552 "OnFilterLoad",
553 wire.NewMsgFilterLoad([]byte{0x01}, 10, 0, wire.BloomUpdateNone),
554 },
555 {
556 "OnMerkleBlock",
557 wire.NewMsgMerkleBlock(
558 wire.NewBlockHeader(
559 1,
560 &chainhash.Hash{}, &chainhash.Hash{}, 1, 1,
561 ),
562 ),
563 },
564 // only one verack message is allowed
565 {
566 "OnReject",
567 wire.NewMsgReject("block", wire.RejectDuplicate, "dupe block"),
568 },
569 {
570 "OnSendHeaders",
571 wire.NewMsgSendHeaders(),
572 },
573 }
574 t.Logf("Running %d tests", len(tests))
575 for _, test := range tests {
576 // Queue the test message
577 outPeer.QueueMessage(test.msg, nil)
578 select {
579 case <-ok:
580 case <-time.After(time.Second * 1):
581 t.Errorf("TestPeerListeners: %s timeout", test.listener)
582 return
583 }
584 }
585 inPeer.Disconnect()
586 outPeer.Disconnect()
587 }
588
589 // TestOutboundPeer tests that the outbound peer works as expected.
590 func TestOutboundPeer(t *testing.T) {
591 peerCfg := &peer.Config{
592 NewestBlock: func() (*chainhash.Hash, int32, error) {
593 return nil, 0, errors.New("newest block not found")
594 },
595 UserAgentName: "peer",
596 UserAgentVersion: "1.0",
597 UserAgentComments: []string{"comment"},
598 ChainParams: &chaincfg.MainNetParams,
599 Services: 0,
600 TrickleInterval: time.Second * 10,
601 }
602 r, w := io.Pipe()
603 c := &conn{raddr: "10.0.0.1:11047", Writer: w, Reader: r}
604 p, e := peer.NewOutboundPeer(peerCfg, "10.0.0.1:11047")
605 if e != nil {
606 t.Errorf("NewOutboundPeer: unexpected err - %v\n", e)
607 return
608 }
609 // Test trying to connect twice.
610 p.AssociateConnection(c)
611 p.AssociateConnection(c)
612 disconnected := qu.T()
613 go func() {
614 p.WaitForDisconnect()
615 disconnected <- struct{}{}
616 }()
617 select {
618 case <-disconnected.Wait():
619 disconnected.Q()
620 case <-time.After(time.Second):
621 t.Fatal("Peer did not automatically disconnect.")
622 }
623 if p.Connected() {
624 t.Fatalf("Should not be connected as NewestBlock produces error.")
625 }
626 // Test Queue Inv
627 fakeBlockHash := &chainhash.Hash{0: 0x00, 1: 0x01}
628 fakeInv := wire.NewInvVect(wire.InvTypeBlock, fakeBlockHash)
629 // Should be noops as the peer could not connect.
630 p.QueueInventory(fakeInv)
631 p.AddKnownInventory(fakeInv)
632 p.QueueInventory(fakeInv)
633 fakeMsg := wire.NewMsgVerAck()
634 p.QueueMessage(fakeMsg, nil)
635 done := qu.T()
636 p.QueueMessage(fakeMsg, done)
637 <-done
638 p.Disconnect()
639 // Test NewestBlock
640 var newestBlock = func() (*chainhash.Hash, int32, error) {
641 hashStr := "14a0810ac680a3eb3f82edc878cea25ec41d6b790744e5daeef"
642 var hash *chainhash.Hash
643 hash, e = chainhash.NewHashFromStr(hashStr)
644 if e != nil {
645 return nil, 0, e
646 }
647 return hash, 234439, nil
648 }
649 peerCfg.NewestBlock = newestBlock
650 r1, w1 := io.Pipe()
651 c1 := &conn{raddr: "10.0.0.1:11047", Writer: w1, Reader: r1}
652 p1, e := peer.NewOutboundPeer(peerCfg, "10.0.0.1:11047")
653 if e != nil {
654 t.Errorf("NewOutboundPeer: unexpected err - %v\n", e)
655 return
656 }
657 p1.AssociateConnection(c1)
658 // Test update latest block
659 latestBlockHash, e := chainhash.NewHashFromStr("1a63f9cdff1752e6375c8c76e543a71d239e1a2e5c6db1aa679")
660 if e != nil {
661 t.Errorf("NewHashFromStr: unexpected err %v\n", e)
662 return
663 }
664 p1.UpdateLastAnnouncedBlock(latestBlockHash)
665 p1.UpdateLastBlockHeight(234440)
666 if p1.LastAnnouncedBlock() != latestBlockHash {
667 t.Errorf(
668 "LastAnnouncedBlock: wrong block - got %v, want %v",
669 p1.LastAnnouncedBlock(), latestBlockHash,
670 )
671 return
672 }
673 // Test Queue Inv after connection
674 p1.QueueInventory(fakeInv)
675 p1.Disconnect()
676 // Test regression
677 peerCfg.ChainParams = &chaincfg.RegressionTestParams
678 peerCfg.Services = wire.SFNodeBloom
679 r2, w2 := io.Pipe()
680 c2 := &conn{raddr: "10.0.0.1:11047", Writer: w2, Reader: r2}
681 p2, e := peer.NewOutboundPeer(peerCfg, "10.0.0.1:11047")
682 if e != nil {
683 t.Errorf("NewOutboundPeer: unexpected err - %v\n", e)
684 return
685 }
686 p2.AssociateConnection(c2)
687 // Test PushXXX
688 var addrs []*wire.NetAddress
689 for i := 0; i < 5; i++ {
690 na := wire.NetAddress{}
691 addrs = append(addrs, &na)
692 }
693 if _, e = p2.PushAddrMsg(addrs); e != nil {
694 t.Errorf("PushAddrMsg: unexpected err %v\n", e)
695 return
696 }
697 if e := p2.PushGetBlocksMsg(nil, &chainhash.Hash{}); e != nil {
698 t.Errorf("PushGetBlocksMsg: unexpected err %v\n", e)
699 return
700 }
701 if e := p2.PushGetHeadersMsg(nil, &chainhash.Hash{}); e != nil {
702 t.Errorf("PushGetHeadersMsg: unexpected err %v\n", e)
703 return
704 }
705 p2.PushRejectMsg("block", wire.RejectMalformed, "malformed", nil, false)
706 p2.PushRejectMsg("block", wire.RejectInvalid, "invalid", nil, false)
707 // Test Queue Messages
708 p2.QueueMessage(wire.NewMsgGetAddr(), nil)
709 p2.QueueMessage(wire.NewMsgPing(1), nil)
710 p2.QueueMessage(wire.NewMsgMemPool(), nil)
711 p2.QueueMessage(wire.NewMsgGetData(), nil)
712 p2.QueueMessage(wire.NewMsgGetHeaders(), nil)
713 p2.QueueMessage(wire.NewMsgFeeFilter(20000), nil)
714 p2.Disconnect()
715 }
716
717 // // Tests that the node disconnects from peers with an unsupported protocol version.
718 // func TestUnsupportedVersionPeer(// t *testing.T) {
719 // peerCfg := &peer.Config{
720 // UserAgentName: "peer",
721 // UserAgentVersion: "1.0",
722 // UserAgentComments: []string{"comment"},
723 // ChainParams: &chaincfg.MainNetParams,
724 // Services: 0,
725 // TrickleInterval: time.Second * 10,
726 // }
727 // localNA := wire.NewNetAddressIPPort(
728 // net.ParseIP("10.0.0.1"),
729 // uint16(11047),
730 // wire.SFNodeNetwork,
731 // )
732 // remoteNA := wire.NewNetAddressIPPort(
733 // net.ParseIP("10.0.0.2"),
734 // uint16(11047),
735 // wire.SFNodeNetwork,
736 // )
737 // localConn, remoteConn := pipe(
738 // &conn{laddr: "10.0.0.1:11047", raddr: "10.0.0.2:11047"},
739 // &conn{laddr: "10.0.0.2:11047", raddr: "10.0.0.1:11047"},
740 // )
741 // p, e := peer.NewOutboundPeer(peerCfg, "10.0.0.1:11047")
742 // if e != nil {
743 // t.Fatalf("NewOutboundPeer: unexpected err - %v\n", e)
744 // }
745 // p.AssociateConnection(localConn)
746 // // Read outbound messages to peer into a channel
747 // outboundMessages := make(chan wire.Message)
748 // go func() {
749 // for {
750 // _, msg, _, e = wire.ReadMessageN(
751 // remoteConn,
752 // p.ProtocolVersion(),
753 // peerCfg.ChainParams.Net,
754 // )
755 // if e == io.EOF {
756 // close(outboundMessages)
757 // return
758 // }
759 // if e != nil {
760 // t.Errorf("Error reading message from local node: %v\n", e)
761 // return
762 // }
763 // outboundMessages <- msg
764 // }
765 // }()
766 // // Read version message sent to remote peer
767 // select {
768 // case msg := <-outboundMessages:
769 // if _, ok := msg.(*wire.MsgVersion); !ok {
770 // t.Fatalf("Expected version message, got [%s]", msg.Command())
771 // }
772 // case <-time.After(time.Second):
773 // t.Fatal("Peer did not send version message")
774 // }
775 // // Remote peer writes version message advertising invalid protocol version 1
776 // invalidVersionMsg := wire.NewMsgVersion(remoteNA, localNA, 0, 0)
777 // invalidVersionMsg.ProtocolVersion = 1
778 // _, e = wire.WriteMessageN(
779 // remoteConn.Writer,
780 // invalidVersionMsg,
781 // uint32(invalidVersionMsg.ProtocolVersion),
782 // peerCfg.ChainParams.Net,
783 // )
784 // if e != nil {
785 // t.Fatalf("wire.WriteMessageN: unexpected err - %v\n", e)
786 // }
787 // // Expect peer to disconnect automatically
788 // disconnected := qu.T()
789 // go func() {
790 // p.WaitForDisconnect()
791 // disconnected <- struct{}{}
792 // }()
793 // select {
794 // case <-disconnected:
795 // close(disconnected)
796 // case <-time.After(time.Second):
797 // t.Fatal("Peer did not automatically disconnect")
798 // }
799 // // Expect no further outbound messages from peer
800 // select {
801 // case msg, chanOpen := <-outboundMessages:
802 // if chanOpen {
803 // t.Fatalf("Expected no further messages, received [%s]", msg.Command())
804 // }
805 // case <-time.After(time.Second):
806 // t.Fatal("Timeout waiting for remote reader to close")
807 // }
808 // }
809
810 // TestDuplicateVersionMsg ensures that receiving a version message after one has already been received results in the
811 // peer being disconnected.
812 func TestDuplicateVersionMsg(t *testing.T) {
813 // Create a pair of peers that are connected to each other using a fake connection.
814 verack := qu.T()
815 peerCfg := &peer.Config{
816 Listeners: peer.MessageListeners{
817 OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
818 verack <- struct{}{}
819 },
820 },
821 UserAgentName: "peer",
822 UserAgentVersion: "1.0",
823 ChainParams: &chaincfg.MainNetParams,
824 Services: 0,
825 }
826 inConn, outConn := pipe(
827 &conn{laddr: "10.0.0.1:9108", raddr: "10.0.0.2:9108"},
828 &conn{laddr: "10.0.0.2:9108", raddr: "10.0.0.1:9108"},
829 )
830 outPeer, e := peer.NewOutboundPeer(peerCfg, inConn.laddr)
831 if e != nil {
832 t.Fatalf("NewOutboundPeer: unexpected err: %v\n", e)
833 }
834 outPeer.AssociateConnection(outConn)
835 inPeer := peer.NewInboundPeer(peerCfg)
836 inPeer.AssociateConnection(inConn)
837 // Wait for the veracks from the initial protocol version negotiation.
838 for i := 0; i < 2; i++ {
839 select {
840 case <-verack.Wait():
841 case <-time.After(time.Second):
842 t.Fatal("verack timeout")
843 }
844 }
845 // Queue a duplicate version message from the outbound peer and wait until it is sent.
846 done := qu.T()
847 outPeer.QueueMessage(&wire.MsgVersion{}, done)
848 select {
849 case <-done.Wait():
850 case <-time.After(time.Second):
851 t.Fatal("send duplicate version timeout")
852 }
853 // Ensure the peer that is the recipient of the duplicate version closes the connection.
854 disconnected := qu.Ts(1)
855 go func() {
856 inPeer.WaitForDisconnect()
857 disconnected <- struct{}{}
858 }()
859 select {
860 case <-disconnected.Wait():
861 case <-time.After(time.Second):
862 t.Fatal("peer did not disconnect")
863 }
864 }
865 func init() {
866
867 // Allow self connection when running the tests.
868 peer.TstAllowSelfConns()
869 }
870