1 package addrmgr
2 3 import (
4 "container/list"
5 crand "crypto/rand" // for seeding
6 "encoding/base32"
7 "encoding/binary"
8 "encoding/json"
9 "fmt"
10 "io"
11 "math/rand"
12 "net"
13 "os"
14 "path/filepath"
15 "strconv"
16 "strings"
17 "sync"
18 "sync/atomic"
19 "time"
20 21 "github.com/p9c/p9/pkg/qu"
22 23 "github.com/p9c/p9/pkg/chainhash"
24 "github.com/p9c/p9/pkg/wire"
25 )
26 27 // AddrManager provides a concurrency safe address manager for caching potential peers on the bitcoin network.
28 type AddrManager struct {
29 mtx sync.Mutex
30 PeersFile string
31 lookupFunc func(string) ([]net.IP, error)
32 rand *rand.Rand
33 key [32]byte
34 addrIndex map[string]*KnownAddress // address key to ka for all addrs.
35 addrNew [newBucketCount]map[string]*KnownAddress
36 addrTried [triedBucketCount]*list.List
37 started int32
38 shutdown int32
39 wg sync.WaitGroup
40 quit qu.C
41 nTried int
42 nNew int
43 lamtx sync.Mutex
44 localAddresses map[string]*localAddress
45 }
46 type serializedKnownAddress struct {
47 Addr string
48 Src string
49 Attempts int
50 TimeStamp int64
51 LastAttempt int64
52 LastSuccess int64
53 // no refcount or tried, that is available from context.
54 }
55 type serializedAddrManager struct {
56 Version int
57 Key [32]byte
58 Addresses []*serializedKnownAddress
59 NewBuckets [newBucketCount][]string // string is NetAddressKey
60 TriedBuckets [triedBucketCount][]string
61 }
62 type localAddress struct {
63 na *wire.NetAddress
64 score AddressPriority
65 }
66 67 // AddressPriority type is used to describe the hierarchy of local address routeable methods.
68 type AddressPriority int
69 70 const (
71 // InterfacePrio signifies the address is on a local interface
72 InterfacePrio AddressPriority = iota
73 // BoundPrio signifies the address has been explicitly bounded to.
74 BoundPrio
75 // UpnpPrio signifies the address was obtained from UPnP.
76 UpnpPrio
77 // HTTPPrio signifies the address was obtained from an external HTTP service.
78 HTTPPrio
79 // ManualPrio signifies the address was provided by --externalip.
80 ManualPrio
81 )
82 const (
83 // needAddressThreshold is the number of addresses under which the address manager will claim to need more
84 // addresses.
85 needAddressThreshold = 1000
86 // dumpAddressInterval is the interval used to dump the address cache to
87 // disk for future use.
88 dumpAddressInterval = time.Minute * 5
89 // triedBucketSize is the maximum number of addresses in each tried address bucket.
90 triedBucketSize = 256
91 // triedBucketCount is the number of buckets we split tried addresses over.
92 triedBucketCount = 64
93 // newBucketSize is the maximum number of addresses in each new address bucket.
94 newBucketSize = 64
95 // newBucketCount is the number of buckets that we spread new addresses over.
96 newBucketCount = 1024
97 // triedBucketsPerGroup is the number of tried buckets over which an address group will be spread.
98 triedBucketsPerGroup = 8
99 // newBucketsPerGroup is the number of new buckets over which an source address group will be spread.
100 newBucketsPerGroup = 64
101 // newBucketsPerAddress is the number of buckets a frequently seen new address may end up in.
102 newBucketsPerAddress = 8
103 // numMissingDays is the number of days before which we assume an address has vanished if we have not seen it
104 // announced in that long.
105 numMissingDays = 30
106 // numRetries is the number of tried without a single success before we assume an address is bad.
107 numRetries = 3
108 // maxFailures is the maximum number of failures we will accept without a success before considering an address bad.
109 maxFailures = 10
110 // minBadDays is the number of days since the last success before we will consider evicting an address.
111 minBadDays = 7
112 // getAddrMax is the most addresses that we will send in response to a getAddr (in practise the most addresses we
113 // will return from a call to AddressCache()).
114 getAddrMax = 2500
115 // getAddrPercent is the percentage of total addresses known that we will share with a call to AddressCache.
116 getAddrPercent = 23
117 // serialisationVersion is the current version of the on-disk format.
118 serialisationVersion = 1
119 )
120 121 // updateAddress is a helper function to either update an address already known to the address manager, or to add the
122 // address if not already known.
123 func (a *AddrManager) updateAddress(netAddr, srcAddr *wire.NetAddress) {
124 // Filter out non-routable addresses. Note that non-routable also includes invalid and local addresses.
125 if !IsRoutable(netAddr) {
126 return
127 }
128 addr := NetAddressKey(netAddr)
129 ka := a.find(netAddr)
130 if ka != nil {
131 // TODO: only update addresses periodically.
132 //
133 // Update the last seen time and services. note that to prevent causing excess garbage on getaddr messages the
134 // netaddresses in addrmanager are *immutable*, if we need to change them then we replace the pointer with a new
135 // copy so that we don't have to copy every na for getaddr.
136 if netAddr.Timestamp.After(ka.na.Timestamp) ||
137 (ka.na.Services&netAddr.Services) !=
138 netAddr.Services {
139 naCopy := *ka.na
140 naCopy.Timestamp = netAddr.Timestamp
141 naCopy.AddService(netAddr.Services)
142 ka.na = &naCopy
143 }
144 // If already in tried, we have nothing to do here.
145 if ka.tried {
146 return
147 }
148 // Already at our max?
149 if ka.refs == newBucketsPerAddress {
150 return
151 }
152 // The more entries we have, the less likely we are to add more. likelihood is 2N.
153 factor := int32(2 * ka.refs)
154 if a.rand.Int31n(factor) != 0 {
155 return
156 }
157 } else {
158 // Make a copy of the net address to avoid races since it is updated elsewhere in the addrmanager code and would
159 // otherwise change the actual netaddress on the peer.
160 netAddrCopy := *netAddr
161 ka = &KnownAddress{na: &netAddrCopy, srcAddr: srcAddr}
162 a.addrIndex[addr] = ka
163 a.nNew++
164 // XXX time penalty?
165 }
166 bucket := a.getNewBucket(netAddr, srcAddr)
167 // Already exists?
168 if _, ok := a.addrNew[bucket][addr]; ok {
169 return
170 }
171 // Enforce max addresses.
172 if len(a.addrNew[bucket]) > newBucketSize {
173 a.expireNew(bucket)
174 }
175 // Add to new bucket.
176 ka.refs++
177 a.addrNew[bucket][addr] = ka
178 // T.F("added new address %s for a total of %d addresses", addr,
179 // a.nTried+a.nNew,
180 // )
181 }
182 183 // expireNew makes space in the new buckets by expiring the really bad entries. If no bad entries are available we look
184 // at a few and remove the oldest.
185 func (a *AddrManager) expireNew(bucket int) {
186 // First see if there are any entries that are so bad we can just throw them away. otherwise we throw away the
187 // oldest entry in the cache. Bitcoind here chooses four random and just throws the oldest of those away, but we
188 // keep track of oldest in the initial traversal and use that information instead.
189 var oldest *KnownAddress
190 for k, v := range a.addrNew[bucket] {
191 if v.isBad() {
192 T.F("expiring bad address %v", k)
193 delete(a.addrNew[bucket], k)
194 v.refs--
195 if v.refs == 0 {
196 a.nNew--
197 delete(a.addrIndex, k)
198 }
199 continue
200 }
201 if oldest == nil {
202 oldest = v
203 } else if !v.na.Timestamp.After(oldest.na.Timestamp) {
204 oldest = v
205 }
206 }
207 if oldest != nil {
208 key := NetAddressKey(oldest.na)
209 T.F("expiring oldest address %v", key)
210 delete(a.addrNew[bucket], key)
211 oldest.refs--
212 if oldest.refs == 0 {
213 a.nNew--
214 delete(a.addrIndex, key)
215 }
216 }
217 }
218 219 // pickTried selects an address from the tried bucket to be evicted. We just choose the eldest.
220 //
221 // Bitcoind selects 4 random entries and throws away the older of them.
222 func (a *AddrManager) pickTried(bucket int) *list.Element {
223 var oldest *KnownAddress
224 var oldestElem *list.Element
225 for e := a.addrTried[bucket].Front(); e != nil; e = e.Next() {
226 ka := e.Value.(*KnownAddress)
227 if oldest == nil || oldest.na.Timestamp.After(ka.na.Timestamp) {
228 oldestElem = e
229 oldest = ka
230 }
231 }
232 return oldestElem
233 }
234 func (a *AddrManager) getNewBucket(netAddr, srcAddr *wire.NetAddress) int {
235 // bitcoind:
236 // doublesha256(key + sourcegroup + int64(doublesha256(key + group +
237 // sourcegroup))%bucket_per_source_group) % num_new_buckets
238 data1 := []byte{}
239 data1 = append(data1, a.key[:]...)
240 data1 = append(data1, []byte(GroupKey(netAddr))...)
241 data1 = append(data1, []byte(GroupKey(srcAddr))...)
242 hash1 := chainhash.DoubleHashB(data1)
243 hash64 := binary.LittleEndian.Uint64(hash1)
244 hash64 %= newBucketsPerGroup
245 var hashbuf [8]byte
246 binary.LittleEndian.PutUint64(hashbuf[:], hash64)
247 data2 := []byte{}
248 data2 = append(data2, a.key[:]...)
249 data2 = append(data2, GroupKey(srcAddr)...)
250 data2 = append(data2, hashbuf[:]...)
251 hash2 := chainhash.DoubleHashB(data2)
252 return int(binary.LittleEndian.Uint64(hash2) % newBucketCount)
253 }
254 func (a *AddrManager) getTriedBucket(netAddr *wire.NetAddress) int {
255 // bitcoind hashes this as:
256 // doublesha256(key + group + truncate_to_64bits(doublesha256(key)) %
257 // buckets_per_group) % num_buckets
258 data1 := []byte{}
259 data1 = append(data1, a.key[:]...)
260 data1 = append(data1, []byte(NetAddressKey(netAddr))...)
261 hash1 := chainhash.DoubleHashB(data1)
262 hash64 := binary.LittleEndian.Uint64(hash1)
263 hash64 %= triedBucketsPerGroup
264 var hashbuf [8]byte
265 binary.LittleEndian.PutUint64(hashbuf[:], hash64)
266 data2 := []byte{}
267 data2 = append(data2, a.key[:]...)
268 data2 = append(data2, GroupKey(netAddr)...)
269 data2 = append(data2, hashbuf[:]...)
270 hash2 := chainhash.DoubleHashB(data2)
271 return int(binary.LittleEndian.Uint64(hash2) % triedBucketCount)
272 }
273 274 // addressHandler is the main handler for the address manager.
275 //
276 // It must be run as a goroutine.
277 func (a *AddrManager) addressHandler() {
278 T.Ln("starting address handler")
279 dumpAddressTicker := time.NewTicker(dumpAddressInterval)
280 defer dumpAddressTicker.Stop()
281 out:
282 for {
283 select {
284 case <-dumpAddressTicker.C:
285 T.Ln("saving peers data")
286 a.savePeers()
287 case <-a.quit.Wait():
288 break out
289 }
290 }
291 a.savePeers()
292 a.wg.Done()
293 T.Ln("address handler done")
294 }
295 296 // savePeers saves all the known addresses to a file so they can be read back in at next run.
297 func (a *AddrManager) savePeers() {
298 a.mtx.Lock()
299 defer a.mtx.Unlock()
300 // First we make a serialisable datastructure so we can encode it to json.
301 sam := new(serializedAddrManager)
302 sam.Version = serialisationVersion
303 copy(sam.Key[:], a.key[:])
304 sam.Addresses = make([]*serializedKnownAddress, len(a.addrIndex))
305 i := 0
306 for k, v := range a.addrIndex {
307 ska := new(serializedKnownAddress)
308 ska.Addr = k
309 ska.TimeStamp = v.na.Timestamp.Unix()
310 ska.Src = NetAddressKey(v.srcAddr)
311 ska.Attempts = v.attempts
312 ska.LastAttempt = v.lastattempt.Unix()
313 ska.LastSuccess = v.lastsuccess.Unix()
314 // Tried and refs are implicit in the rest of the structure and will be worked out from context on
315 // deserialisation.
316 sam.Addresses[i] = ska
317 i++
318 }
319 for i := range a.addrNew {
320 sam.NewBuckets[i] = make([]string, len(a.addrNew[i]))
321 j := 0
322 for k := range a.addrNew[i] {
323 sam.NewBuckets[i][j] = k
324 j++
325 }
326 }
327 for i := range a.addrTried {
328 sam.TriedBuckets[i] = make([]string, a.addrTried[i].Len())
329 j := 0
330 for e := a.addrTried[i].Front(); e != nil; e = e.Next() {
331 ka := e.Value.(*KnownAddress)
332 sam.TriedBuckets[i][j] = NetAddressKey(ka.na)
333 j++
334 }
335 }
336 w, e := os.Create(a.PeersFile)
337 if e != nil {
338 E.F("error opening file %s: %v", a.PeersFile, e)
339 return
340 }
341 enc := json.NewEncoder(w)
342 defer func() {
343 if e := w.Close(); E.Chk(e) {
344 }
345 }()
346 if e := enc.Encode(&sam); E.Chk(e) {
347 E.F("failed to encode file %s: %v", a.PeersFile, e)
348 return
349 }
350 }
351 352 // loadPeers loads the known address from the saved file. If empty, missing, or malformed file, just don't load anything
353 // and start fresh
354 func (a *AddrManager) loadPeers() {
355 T.Ln("loading peers")
356 357 a.mtx.Lock()
358 defer a.mtx.Unlock()
359 e := a.deserializePeers(a.PeersFile)
360 if e != nil {
361 E.F("failed to parse file %s: %v", a.PeersFile, e)
362 // if it is invalid we nuke the old one unconditionally.
363 e = os.Remove(a.PeersFile)
364 if e != nil {
365 W.F("failed to remove corrupt peers file %s: %v", a.PeersFile, e)
366 }
367 a.reset()
368 return
369 }
370 // Tracec(func() string {
371 // return fmt.Sprintf(
372 // "loaded %d addresses from file '%s'",
373 // a.numAddresses(), a.PeersFile,
374 // )
375 // })
376 }
377 func (a *AddrManager) deserializePeers(filePath string) (e error) {
378 _, e = os.Stat(filePath)
379 if os.IsNotExist(e) {
380 return nil
381 }
382 r, e := os.Open(filePath)
383 if e != nil {
384 E.Ln(e)
385 return fmt.Errorf("%s error opening file: %v", filePath, e)
386 }
387 defer func() {
388 if e = r.Close(); E.Chk(e) {
389 }
390 }()
391 var sam serializedAddrManager
392 dec := json.NewDecoder(r)
393 e = dec.Decode(&sam)
394 if e != nil {
395 E.Ln(e)
396 return fmt.Errorf("error reading %s: %v", filePath, e)
397 }
398 if sam.Version != serialisationVersion {
399 return fmt.Errorf(
400 "unknown version %v in serialized addrmanager",
401 sam.Version,
402 )
403 }
404 copy(a.key[:], sam.Key[:])
405 for _, v := range sam.Addresses {
406 ka := new(KnownAddress)
407 ka.na, e = a.DeserializeNetAddress(v.Addr)
408 if e != nil {
409 E.Ln(e)
410 return fmt.Errorf("failed to deserialize netaddress "+
411 "%s: %v", v.Addr, e,
412 )
413 }
414 ka.srcAddr, e = a.DeserializeNetAddress(v.Src)
415 if e != nil {
416 E.Ln(e)
417 return fmt.Errorf("failed to deserialize netaddress "+
418 "%s: %v", v.Src, e,
419 )
420 }
421 ka.attempts = v.Attempts
422 ka.lastattempt = time.Unix(v.LastAttempt, 0)
423 ka.lastsuccess = time.Unix(v.LastSuccess, 0)
424 a.addrIndex[NetAddressKey(ka.na)] = ka
425 }
426 for i := range sam.NewBuckets {
427 for _, val := range sam.NewBuckets[i] {
428 ka, ok := a.addrIndex[val]
429 if !ok {
430 return fmt.Errorf("newbucket contains %s but "+
431 "none in address list", val,
432 )
433 }
434 if ka.refs == 0 {
435 a.nNew++
436 }
437 ka.refs++
438 a.addrNew[i][val] = ka
439 }
440 }
441 for i := range sam.TriedBuckets {
442 for _, val := range sam.TriedBuckets[i] {
443 ka, ok := a.addrIndex[val]
444 if !ok {
445 return fmt.Errorf(
446 "newbucket contains %s but none in address list",
447 val,
448 )
449 }
450 ka.tried = true
451 a.nTried++
452 a.addrTried[i].PushBack(ka)
453 }
454 }
455 // Sanity checking.
456 for k, v := range a.addrIndex {
457 if v.refs == 0 && !v.tried {
458 return fmt.Errorf("address %s after serialisationwith no references", k)
459 }
460 if v.refs > 0 && v.tried {
461 return fmt.Errorf("address %s after serialisation which is both new and tried", k)
462 }
463 }
464 return nil
465 }
466 467 // DeserializeNetAddress converts a given address string to a *wire.NetAddress
468 func (a *AddrManager) DeserializeNetAddress(addr string) (*wire.NetAddress, error) {
469 host, portStr, e := net.SplitHostPort(addr)
470 if e != nil {
471 E.Ln(e)
472 return nil, e
473 }
474 port, e := strconv.ParseUint(portStr, 10, 16)
475 if e != nil {
476 E.Ln(e)
477 return nil, e
478 }
479 return a.HostToNetAddress(host, uint16(port), wire.SFNodeNetwork)
480 }
481 482 // Start begins the core address handler which manages a pool of known
483 // addresses, timeouts, and interval based writes.
484 func (a *AddrManager) Start() {
485 // Already started?
486 if atomic.AddInt32(&a.started, 1) != 1 {
487 return
488 }
489 // Load peers we already know about from file.
490 T.Ln("loading peers data")
491 a.loadPeers()
492 // Start the address ticker to save addresses periodically.
493 a.wg.Add(1)
494 go a.addressHandler()
495 }
496 497 // Stop gracefully shuts down the address manager by stopping the main handler.
498 func (a *AddrManager) Stop() (e error) {
499 if atomic.AddInt32(&a.shutdown, 1) != 1 {
500 D.Ln("address manager is already in the process of shutting down")
501 return nil
502 }
503 // D.Ln("address manager shutting down"}
504 a.quit.Q()
505 a.wg.Wait()
506 return nil
507 }
508 509 // AddAddresses adds new addresses to the address manager.
510 //
511 // It enforces a max number of addresses and silently ignores duplicate addresses.
512 //
513 // It is safe for concurrent access.
514 func (a *AddrManager) AddAddresses(addrs []*wire.NetAddress, srcAddr *wire.NetAddress) {
515 a.mtx.Lock()
516 defer a.mtx.Unlock()
517 for _, na := range addrs {
518 a.updateAddress(na, srcAddr)
519 }
520 }
521 522 // AddAddress adds a new address to the address manager.
523 //
524 // It enforces a max number of addresses and silently ignores duplicate addresses.
525 //
526 // It is safe for concurrent access.
527 func (a *AddrManager) AddAddress(addr, srcAddr *wire.NetAddress) {
528 a.mtx.Lock()
529 defer a.mtx.Unlock()
530 a.updateAddress(addr, srcAddr)
531 }
532 533 // AddAddressByIP adds an address where we are given an ip:port and not a wire.NetAddress.
534 func (a *AddrManager) AddAddressByIP(addrIP string) (e error) {
535 // Split IP and port
536 addr, portStr, e := net.SplitHostPort(addrIP)
537 if e != nil {
538 E.Ln(e)
539 return e
540 }
541 // Put it in wire.Netaddress
542 ip := net.ParseIP(addr)
543 if ip == nil {
544 return fmt.Errorf("invalid ip address %s", addr)
545 }
546 port, e := strconv.ParseUint(portStr, 10, 0)
547 if e != nil {
548 E.Ln(e)
549 return fmt.Errorf("invalid port %s: %v", portStr, e)
550 }
551 na := wire.NewNetAddressIPPort(ip, uint16(port), 0)
552 a.AddAddress(na, na) // XXX use correct src address
553 return nil
554 }
555 556 // NumAddresses returns the number of addresses known to the address manager.
557 func (a *AddrManager) numAddresses() int {
558 return a.nTried + a.nNew
559 }
560 561 // NumAddresses returns the number of addresses known to the address manager.
562 func (a *AddrManager) NumAddresses() int {
563 a.mtx.Lock()
564 defer a.mtx.Unlock()
565 return a.numAddresses()
566 }
567 568 // NeedMoreAddresses returns whether or not the address manager needs more addresses.
569 func (a *AddrManager) NeedMoreAddresses() bool {
570 a.mtx.Lock()
571 defer a.mtx.Unlock()
572 return a.numAddresses() < needAddressThreshold
573 }
574 575 // AddressCache returns the current address cache. It must be treated as read-only (but since it is a copy now, this is
576 // not as dangerous).
577 func (a *AddrManager) AddressCache() []*wire.NetAddress {
578 a.mtx.Lock()
579 defer a.mtx.Unlock()
580 addrIndexLen := len(a.addrIndex)
581 if addrIndexLen == 0 {
582 return nil
583 }
584 allAddr := make([]*wire.NetAddress, 0, addrIndexLen)
585 // Iteration order is undefined here, but we randomise it anyway.
586 for _, v := range a.addrIndex {
587 allAddr = append(allAddr, v.na)
588 }
589 numAddresses := addrIndexLen * getAddrPercent / 100
590 if numAddresses > getAddrMax {
591 numAddresses = getAddrMax
592 }
593 // Fisher-Yates shuffle the array. We only need to do the first `numAddresses' since we are throwing the rest.
594 for i := 0; i < numAddresses; i++ {
595 // pick a number between current index and the end
596 j := rand.Intn(addrIndexLen-i) + i
597 allAddr[i], allAddr[j] = allAddr[j], allAddr[i]
598 }
599 // slice off the limit we are willing to share.
600 return allAddr[0:numAddresses]
601 }
602 603 // reset resets the address manager by reinitialising the random source and allocating fresh empty bucket storage.
604 func (a *AddrManager) reset() {
605 a.addrIndex = make(map[string]*KnownAddress)
606 // fill key with bytes from a good random source.
607 _, e := io.ReadFull(crand.Reader, a.key[:])
608 if e != nil {
609 E.Ln(e)
610 }
611 for i := range a.addrNew {
612 a.addrNew[i] = make(map[string]*KnownAddress)
613 }
614 for i := range a.addrTried {
615 a.addrTried[i] = list.New()
616 }
617 }
618 619 // HostToNetAddress returns a netaddress given a host address.
620 //
621 // If the address is a Tor .onion address this will be taken care of.
622 //
623 // Else if the host is not an IP address it will be resolved ( via Tor if required).
624 func (a *AddrManager) HostToNetAddress(host string, port uint16, services wire.ServiceFlag) (*wire.NetAddress, error) {
625 // Tor address is 16 char base32 + ".onion"
626 var ip net.IP
627 if len(host) == 22 && host[16:] == ".onion" {
628 // go base32 encoding uses capitals (as does the rfc but Tor and bitcoind tend to user lowercase, so we switch
629 // case here.
630 data, e := base32.StdEncoding.DecodeString(
631 strings.ToUpper(host[:16]),
632 )
633 if e != nil {
634 E.Ln(e)
635 return nil, e
636 }
637 prefix := []byte{0xfd, 0x87, 0xd8, 0x7e, 0xeb, 0x43}
638 ip = append(prefix, data...)
639 } else if ip = net.ParseIP(host); ip == nil {
640 ips, e := a.lookupFunc(host)
641 if e != nil {
642 E.Ln(e)
643 return nil, e
644 }
645 if len(ips) == 0 {
646 return nil, fmt.Errorf("no addresses found for %s", host)
647 }
648 ip = ips[0]
649 }
650 return wire.NewNetAddressIPPort(ip, port, services), nil
651 }
652 653 // ipString returns a string for the ip from the provided NetAddress. If the ip is in the range used for Tor addresses
654 // then it will be transformed into the relevant .onion address.
655 func ipString(na *wire.NetAddress) string {
656 if IsOnionCatTor(na) {
657 // We know now that na.IP is long enough.
658 s := base32.StdEncoding.EncodeToString(na.IP[6:])
659 return strings.ToLower(s) + ".onion"
660 }
661 return na.IP.String()
662 }
663 664 // NetAddressKey returns a string key in the form of ip:port for IPv4 addresses or [ip]:port for IPv6 addresses.
665 func NetAddressKey(na *wire.NetAddress) string {
666 port := strconv.FormatUint(uint64(na.Port), 10)
667 return net.JoinHostPort(ipString(na), port)
668 }
669 670 // GetAddress returns a single address that should be routable. It picks a random one from the possible addresses with
671 // preference given to ones that have not been used recently and should not pick 'close' addresses consecutively.
672 func (a *AddrManager) GetAddress() *KnownAddress {
673 // Protect concurrent access.
674 a.mtx.Lock()
675 defer a.mtx.Unlock()
676 if a.numAddresses() == 0 {
677 return nil
678 }
679 // Use a 50% chance for choosing between tried and new table entries.
680 if a.nTried > 0 && (a.nNew == 0 || a.rand.Intn(2) == 0) {
681 // Tried entry.
682 large := 1 << 30
683 factor := 1.0
684 for {
685 // pick a random bucket.
686 bucket := a.rand.Intn(len(a.addrTried))
687 if a.addrTried[bucket].Len() == 0 {
688 continue
689 }
690 // Pick a random entry in the list
691 e := a.addrTried[bucket].Front()
692 for i :=
693 a.rand.Int63n(int64(a.addrTried[bucket].Len())); i > 0; i-- {
694 e = e.Next()
695 }
696 ka := e.Value.(*KnownAddress)
697 randval := a.rand.Intn(large)
698 if float64(randval) < (factor * ka.chance() * float64(large)) {
699 T.C(func() string {
700 return fmt.Sprintf("selected %v from tried bucket", NetAddressKey(ka.na))
701 },
702 )
703 return ka
704 }
705 factor *= 1.2
706 }
707 } else {
708 // new node.
709 // TODO: use a closure/function to avoid repeating this.
710 large := 1 << 30
711 factor := 1.0
712 for {
713 // Pick a random bucket.
714 bucket := a.rand.Intn(len(a.addrNew))
715 if len(a.addrNew[bucket]) == 0 {
716 continue
717 }
718 // Then, a random entry in it.
719 var ka *KnownAddress
720 nth := a.rand.Intn(len(a.addrNew[bucket]))
721 for _, value := range a.addrNew[bucket] {
722 if nth == 0 {
723 ka = value
724 }
725 nth--
726 }
727 randval := a.rand.Intn(large)
728 if float64(randval) < (factor * ka.chance() * float64(large)) {
729 T.C(func() string {
730 return fmt.Sprintf("Selected %v from new bucket",
731 NetAddressKey(ka.na),
732 )
733 },
734 )
735 return ka
736 }
737 factor *= 1.2
738 }
739 }
740 }
741 func (a *AddrManager) find(addr *wire.NetAddress) *KnownAddress {
742 return a.addrIndex[NetAddressKey(addr)]
743 }
744 745 // Attempt increases the given address' attempt counter and updates the last attempt time.
746 func (a *AddrManager) Attempt(addr *wire.NetAddress) {
747 a.mtx.Lock()
748 defer a.mtx.Unlock()
749 // find address. Surely address will be in tried by now?
750 ka := a.find(addr)
751 if ka == nil {
752 return
753 }
754 // set last tried time to now
755 ka.attempts++
756 ka.lastattempt = time.Now()
757 }
758 759 // Connected Marks the given address as currently connected and working at the current time. The address must already be
760 // known to AddrManager else it will be ignored.
761 func (a *AddrManager) Connected(addr *wire.NetAddress) {
762 a.mtx.Lock()
763 defer a.mtx.Unlock()
764 ka := a.find(addr)
765 if ka == nil {
766 return
767 }
768 // Update the time as long as it has been 20 minutes since last we did so.
769 now := time.Now()
770 if now.After(ka.na.Timestamp.Add(time.Minute * 20)) {
771 // ka.na is immutable, so replace it.
772 naCopy := *ka.na
773 naCopy.Timestamp = time.Now()
774 ka.na = &naCopy
775 }
776 }
777 778 // Good marks the given address as good. To be called after a successful connection and version exchange. If the address
779 // is unknown to the address manager it will be ignored.
780 func (a *AddrManager) Good(addr *wire.NetAddress) {
781 a.mtx.Lock()
782 defer a.mtx.Unlock()
783 ka := a.find(addr)
784 if ka == nil {
785 return
786 }
787 // ka.Timestamp is not updated here to avoid leaking information about currently connected peers.
788 now := time.Now()
789 ka.lastsuccess = now
790 ka.lastattempt = now
791 ka.attempts = 0
792 // move to tried set, optionally evicting other addresses if needed.
793 if ka.tried {
794 return
795 }
796 // ok, need to move it to tried. remove from all new buckets. record one of the buckets in question and call it the
797 // `first'
798 addrKey := NetAddressKey(addr)
799 oldBucket := -1
800 for i := range a.addrNew {
801 // we check for existence so we can record the first one
802 if _, ok := a.addrNew[i][addrKey]; ok {
803 delete(a.addrNew[i], addrKey)
804 ka.refs--
805 if oldBucket == -1 {
806 oldBucket = i
807 }
808 }
809 }
810 a.nNew--
811 if oldBucket == -1 {
812 // What? wasn't in a bucket after all.... Panic?
813 return
814 }
815 bucket := a.getTriedBucket(ka.na)
816 // Room in this tried bucket?
817 if a.addrTried[bucket].Len() < triedBucketSize {
818 ka.tried = true
819 a.addrTried[bucket].PushBack(ka)
820 a.nTried++
821 return
822 }
823 // No room, we have to evict something else.
824 entry := a.pickTried(bucket)
825 rmka := entry.Value.(*KnownAddress)
826 // First bucket it would have been put in.
827 newBucket := a.getNewBucket(rmka.na, rmka.srcAddr)
828 // If no room in the original bucket, we put it in a bucket we just freed up a space in.
829 if len(a.addrNew[newBucket]) >= newBucketSize {
830 newBucket = oldBucket
831 }
832 // replace with ka in list.
833 ka.tried = true
834 entry.Value = ka
835 rmka.tried = false
836 rmka.refs++
837 // We don't touch a.nTried here since the number of tried stays the same but we decemented new above, raise it again
838 // since we're putting something back.
839 a.nNew++
840 rmkey := NetAddressKey(rmka.na)
841 T.F("replacing %s with %s in tried", rmkey, addrKey)
842 843 // We made sure there is space here just above.
844 a.addrNew[newBucket][rmkey] = rmka
845 }
846 847 // SetServices sets the services for the giiven address to the provided value.
848 func (a *AddrManager) SetServices(addr *wire.NetAddress, services wire.ServiceFlag) {
849 a.mtx.Lock()
850 defer a.mtx.Unlock()
851 ka := a.find(addr)
852 if ka == nil {
853 return
854 }
855 // Update the services if needed.
856 if ka.na.Services != services {
857 // ka.na is immutable, so replace it.
858 naCopy := *ka.na
859 naCopy.Services = services
860 ka.na = &naCopy
861 }
862 }
863 864 // AddLocalAddress adds na to the list of known local addresses to advertise with the given priority.
865 func (a *AddrManager) AddLocalAddress(na *wire.NetAddress, priority AddressPriority) (e error) {
866 if !IsRoutable(na) {
867 return fmt.Errorf("address %s is not routable", na.IP)
868 }
869 a.lamtx.Lock()
870 defer a.lamtx.Unlock()
871 key := NetAddressKey(na)
872 la, ok := a.localAddresses[key]
873 if !ok || la.score < priority {
874 if ok {
875 la.score = priority + 1
876 } else {
877 a.localAddresses[key] = &localAddress{
878 na: na,
879 score: priority,
880 }
881 }
882 }
883 return nil
884 }
885 886 // getReachabilityFrom returns the relative reachability of the provided local address to the provided remote address.
887 func getReachabilityFrom(localAddr, remoteAddr *wire.NetAddress) int {
888 const (
889 Unreachable = 0
890 Default = iota
891 Teredo
892 Ipv6Weak
893 Ipv4
894 Ipv6Strong
895 Private
896 )
897 if !IsRoutable(remoteAddr) {
898 return Unreachable
899 }
900 if IsOnionCatTor(remoteAddr) {
901 if IsOnionCatTor(localAddr) {
902 return Private
903 }
904 if IsRoutable(localAddr) && IsIPv4(localAddr) {
905 return Ipv4
906 }
907 return Default
908 }
909 if IsRFC4380(remoteAddr) {
910 if !IsRoutable(localAddr) {
911 return Default
912 }
913 if IsRFC4380(localAddr) {
914 return Teredo
915 }
916 if IsIPv4(localAddr) {
917 return Ipv4
918 }
919 return Ipv6Weak
920 }
921 if IsIPv4(remoteAddr) {
922 if IsRoutable(localAddr) && IsIPv4(localAddr) {
923 return Ipv4
924 }
925 return Unreachable
926 }
927 /* ipv6 */
928 var tunnelled bool
929 // Is our v6 is tunnelled?
930 if IsRFC3964(localAddr) || IsRFC6052(localAddr) || IsRFC6145(localAddr) {
931 tunnelled = true
932 }
933 if !IsRoutable(localAddr) {
934 return Default
935 }
936 if IsRFC4380(localAddr) {
937 return Teredo
938 }
939 if IsIPv4(localAddr) {
940 return Ipv4
941 }
942 if tunnelled {
943 // only prioritise ipv6 if we aren't tunnelling it.
944 return Ipv6Weak
945 }
946 return Ipv6Strong
947 }
948 949 // GetBestLocalAddress returns the most appropriate local address to use for the given remote address.
950 func (a *AddrManager) GetBestLocalAddress(remoteAddr *wire.NetAddress) *wire.NetAddress {
951 a.lamtx.Lock()
952 defer a.lamtx.Unlock()
953 bestreach := 0
954 var bestscore AddressPriority
955 var bestAddress *wire.NetAddress
956 for _, la := range a.localAddresses {
957 reach := getReachabilityFrom(la.na, remoteAddr)
958 if reach > bestreach ||
959 (reach == bestreach && la.score > bestscore) {
960 bestreach = reach
961 bestscore = la.score
962 bestAddress = la.na
963 }
964 }
965 if bestAddress != nil {
966 T.F("suggesting address %s:%d for %s:%d", bestAddress.IP,
967 bestAddress.Port, remoteAddr.IP, remoteAddr.Port,
968 )
969 } else {
970 T.F("no worthy address for %s:%d", remoteAddr.IP,
971 remoteAddr.Port,
972 )
973 // Send something unroutable if nothing suitable.
974 var ip net.IP
975 if !IsIPv4(remoteAddr) && !IsOnionCatTor(remoteAddr) {
976 ip = net.IPv6zero
977 } else {
978 ip = net.IPv4zero
979 }
980 services := wire.SFNodeNetwork | /*wire.SFNodeWitness |*/ wire.SFNodeBloom
981 bestAddress = wire.NewNetAddressIPPort(ip, 0, services)
982 }
983 return bestAddress
984 }
985 986 // New returns a new bitcoin address manager. Use Start to begin processing asynchronous address updates.
987 func New(dataDir string, lookupFunc func(string) ([]net.IP, error)) *AddrManager {
988 am := AddrManager{
989 PeersFile: filepath.Join(dataDir, "peers.json"),
990 lookupFunc: lookupFunc,
991 rand: rand.New(rand.NewSource(time.Now().UnixNano())),
992 quit: qu.T(),
993 localAddresses: make(map[string]*localAddress),
994 }
995 am.reset()
996 return &am
997 }
998