connmanager.go raw

   1  package connmgr
   2  
   3  import (
   4  	"errors"
   5  	"fmt"
   6  	"github.com/p9c/p9/pkg/log"
   7  	"net"
   8  	"sync"
   9  	"sync/atomic"
  10  	"time"
  11  	
  12  	"github.com/p9c/p9/pkg/qu"
  13  )
  14  
  15  // maxFailedAttempts is the maximum number of successive failed connection
  16  // attempts after which network failure is assumed and new connections will be
  17  // delayed by the configured retry duration.
  18  const maxFailedAttempts = 3
  19  
  20  // ErrDialNil is used to indicate that Dial cannot be nil in the configuration.
  21  var ErrDialNil = errors.New("config: Dial cannot be nil")
  22  
  23  // maxRetryDuration is the max duration of time retrying of a persistent
  24  // connection is allowed to grow to. This is necessary since the retry logic
  25  // uses a backoff mechanism which increases the interval base times the number
  26  // of retries that have been done.
  27  var maxRetryDuration = time.Hour
  28  
  29  // defaultRetryDuration is the default duration of time for retrying persistent
  30  // connections.
  31  var defaultRetryDuration = time.Second * 60
  32  
  33  // defaultTargetOutbound is the default number of outbound connections to
  34  // maintain.
  35  var defaultTargetOutbound = uint32(9)
  36  
  37  // ConnState represents the state of the requested connection.
  38  type ConnState uint8
  39  
  40  // ConnState can be either pending, established, disconnected or failed. When a
  41  // new connection is requested, it is attempted and categorized as established
  42  // or failed depending on the connection result. An established connection which
  43  // was disconnected is categorized as disconnected.
  44  const (
  45  	ConnPending ConnState = iota
  46  	ConnFailing
  47  	ConnCanceled
  48  	ConnEstablished
  49  	ConnDisconnected
  50  )
  51  
  52  // ConnReq is the connection request to a network address. If permanent, the
  53  // connection will be retried on disconnection.
  54  type ConnReq struct {
  55  	// The following variables must only be used atomically.
  56  	id         uint64
  57  	Addr       net.Addr
  58  	Permanent  bool
  59  	conn       net.Conn
  60  	state      ConnState
  61  	stateMtx   sync.RWMutex
  62  	retryCount uint32
  63  }
  64  
  65  // updateState updates the state of the connection request.
  66  func (c *ConnReq) updateState(state ConnState) {
  67  	c.stateMtx.Lock()
  68  	c.state = state
  69  	c.stateMtx.Unlock()
  70  }
  71  
  72  // ID returns a unique identifier for the connection request.
  73  func (c *ConnReq) ID() uint64 {
  74  	return atomic.LoadUint64(&c.id)
  75  }
  76  
  77  // State is the connection state of the requested connection.
  78  func (c *ConnReq) State() ConnState {
  79  	c.stateMtx.RLock()
  80  	state := c.state
  81  	c.stateMtx.RUnlock()
  82  	return state
  83  }
  84  
  85  // String returns a human-readable string for the connection request.
  86  func (c *ConnReq) String() string {
  87  	if c.Addr == nil || c.Addr.String() == "" {
  88  		return fmt.Sprintf("reqid %d", atomic.LoadUint64(&c.id))
  89  	}
  90  	return fmt.Sprintf("%s (reqid %d)", c.Addr, atomic.LoadUint64(&c.id))
  91  }
  92  
  93  // Config holds the configuration options related to the connection manager.
  94  type Config struct {
  95  	// Listeners defines a slice of listeners for which the connection manager will
  96  	// take ownership of and accept connections.
  97  	//
  98  	// When a connection is accepted, the OnAccept handler will be invoked with the
  99  	// connection.
 100  	//
 101  	// Since the connection manager takes ownership of these listeners, they will be
 102  	// closed when the connection manager is stopped.
 103  	//
 104  	// This field will not have any effect if the OnAccept field is not specified.
 105  	// It may be nil if the caller does not wish to listen for incoming connections.
 106  	Listeners []net.Listener
 107  	// OnAccept is a callback that is fired when an inbound connection is accepted.
 108  	// It is the caller's responsibility to close the connection.
 109  	//
 110  	// Failure to close the connection will result in the connection manager
 111  	// believing the connection is still active and thus have undesirable side
 112  	// effects such as still counting toward maximum connection limits.
 113  	//
 114  	// This field will not have any effect if the Listeners field is not also
 115  	// specified since there couldn't possibly be any accepted connections in that
 116  	// case.
 117  	OnAccept func(net.Conn)
 118  	// TargetOutbound is the number of outbound network connections to maintain.
 119  	// Defaults to 8.
 120  	TargetOutbound uint32
 121  	// RetryDuration is the duration to wait before retrying connection requests.
 122  	// Defaults to 5s.
 123  	RetryDuration time.Duration
 124  	// OnConnection is a callback that is fired when a new outbound connection is
 125  	// established.
 126  	OnConnection func(*ConnReq, net.Conn)
 127  	// OnDisconnection is a callback that is fired when an outbound connection is
 128  	// disconnected.
 129  	OnDisconnection func(*ConnReq)
 130  	// GetNewAddress is a way to get an address to make a network connection to. If
 131  	// nil, no new connections will be made automatically.
 132  	GetNewAddress func() (net.Addr, error)
 133  	// Dial connects to the address on the named network. It cannot be nil.
 134  	Dial func(net.Addr) (net.Conn, error)
 135  }
 136  
 137  // registerPending is used to register a pending connection attempt. By
 138  // registering pending connection attempts we allow callers to cancel pending
 139  // connection attempts before their successful or in the case they're not longer
 140  // wanted.
 141  type registerPending struct {
 142  	c    *ConnReq
 143  	done qu.C
 144  }
 145  
 146  // handleConnected is used to queue a successful connection.
 147  type handleConnected struct {
 148  	c    *ConnReq
 149  	conn net.Conn
 150  }
 151  
 152  // handleDisconnected is used to remove a connection.
 153  type handleDisconnected struct {
 154  	id    uint64
 155  	retry bool
 156  }
 157  
 158  // handleFailed is used to remove a pending connection.
 159  type handleFailed struct {
 160  	c *ConnReq
 161  	e error
 162  }
 163  
 164  // ConnManager provides a manager to handle network connections.
 165  type ConnManager struct {
 166  	// The following variables must only be used atomically.
 167  	connReqCount   uint64
 168  	start          int32
 169  	stop           int32
 170  	Cfg            Config
 171  	wg             sync.WaitGroup
 172  	failedAttempts uint64
 173  	requests       chan interface{}
 174  	quit           qu.C
 175  }
 176  
 177  // handleFailedConn handles a connection failed due to a disconnect or any other failure.
 178  //
 179  // If permanent, it retries the connection after the configured retry duration.
 180  //
 181  // Otherwise, if required, it makes a new connection request.
 182  //
 183  // After maxFailedConnectionAttempts new connections will be retried after the configured retry duration.
 184  func (cm *ConnManager) handleFailedConn(c *ConnReq) {
 185  	if atomic.LoadInt32(&cm.stop) != 0 {
 186  		return
 187  	}
 188  	if c.Permanent {
 189  		c.retryCount++
 190  		d := time.Duration(c.retryCount) * cm.Cfg.RetryDuration
 191  		if d > maxRetryDuration {
 192  			d = maxRetryDuration
 193  		}
 194  		T.F("retrying connection to %v in %v", c, d)
 195  		time.AfterFunc(
 196  			d, func() {
 197  				cm.Connect(c)
 198  			},
 199  		)
 200  	} else if cm.Cfg.GetNewAddress != nil {
 201  		cm.failedAttempts++
 202  		if cm.failedAttempts >= maxFailedAttempts {
 203  			T.F(
 204  				"max failed connection attempts reached: [%d] -- retrying connection in: %v",
 205  				maxFailedAttempts,
 206  				cm.Cfg.RetryDuration,
 207  			)
 208  			time.AfterFunc(
 209  				cm.Cfg.RetryDuration, func() {
 210  					cm.NewConnReq()
 211  				},
 212  			)
 213  		} else {
 214  			go cm.NewConnReq()
 215  		}
 216  	}
 217  }
 218  
 219  // connHandler handles all connection related requests. It must be run as a goroutine. The connection handler makes sure
 220  // that we maintain a pool of active outbound connections so that we remain connected to the network. Connection
 221  // requests are processed and mapped by their assigned ids.
 222  func (cm *ConnManager) connHandler() {
 223  	var (
 224  		// pending holds all registered conn requests that have yet to succeed.
 225  		pending = make(map[uint64]*ConnReq)
 226  		// conns represents the set of all actively connected peers.
 227  		conns = make(map[uint64]*ConnReq, cm.Cfg.TargetOutbound)
 228  	)
 229  out:
 230  	for {
 231  		select {
 232  		case req := <-cm.requests:
 233  			switch msg := req.(type) {
 234  			case registerPending:
 235  				connReq := msg.c
 236  				connReq.updateState(ConnPending)
 237  				pending[msg.c.id] = connReq
 238  				msg.done.Q()
 239  			case handleConnected:
 240  				connReq := msg.c
 241  				if _, ok := pending[connReq.id]; !ok {
 242  					if msg.conn != nil {
 243  						if e := msg.conn.Close(); E.Chk(e) {
 244  						}
 245  					}
 246  					D.Ln("ignoring connection for canceled connreq", connReq)
 247  					continue
 248  				}
 249  				connReq.updateState(ConnEstablished)
 250  				connReq.conn = msg.conn
 251  				conns[connReq.id] = connReq
 252  				T.Ln("connected to ", connReq)
 253  				connReq.retryCount = 0
 254  				cm.failedAttempts = 0
 255  				delete(pending, connReq.id)
 256  				if cm.Cfg.OnConnection != nil {
 257  					go cm.Cfg.OnConnection(connReq, msg.conn)
 258  				}
 259  			case handleDisconnected:
 260  				connReq, ok := conns[msg.id]
 261  				if !ok {
 262  					connReq, ok = pending[msg.id]
 263  					if !ok {
 264  						E.Ln("unknown connid", msg.id)
 265  						continue
 266  					}
 267  					// Pending connection was found, remove it from pending map if we should ignore a later, successful
 268  					// connection.
 269  					connReq.updateState(ConnCanceled)
 270  					D.Ln("canceling:", connReq)
 271  					delete(pending, msg.id)
 272  					continue
 273  				}
 274  				// An existing connection was located, mark as disconnected and execute disconnection callback.
 275  				T.Ln("disconnected from", connReq)
 276  				delete(conns, msg.id)
 277  				if connReq.conn != nil {
 278  					if e := connReq.conn.Close(); E.Chk(e) {
 279  					}
 280  				}
 281  				if cm.Cfg.OnDisconnection != nil {
 282  					go cm.Cfg.OnDisconnection(connReq)
 283  				}
 284  				// All internal state has been cleaned up, if this connection is being removed, we will make no further
 285  				// attempts with this request.
 286  				if !msg.retry {
 287  					connReq.updateState(ConnDisconnected)
 288  					continue
 289  				}
 290  				// Otherwise, we will attempt a reconnection if we do not have enough peers, or if this is a persistent
 291  				// peer. The connection request is re added to the pending map, so that subsequent processing of
 292  				// connections and failures do not ignore the request.
 293  				if uint32(len(conns)) < cm.Cfg.TargetOutbound ||
 294  					connReq.Permanent {
 295  					connReq.updateState(ConnPending)
 296  					pending[msg.id] = connReq
 297  					cm.handleFailedConn(connReq)
 298  				}
 299  			case handleFailed:
 300  				connReq := msg.c
 301  				if _, ok := pending[connReq.id]; !ok {
 302  					D.Ln("ignoring connection for canceled conn req:", connReq)
 303  					continue
 304  				}
 305  				connReq.updateState(ConnFailing)
 306  				// T.F
 307  				// ("failed to connect to %v: %v", connReq, msg.err)
 308  				cm.handleFailedConn(connReq)
 309  			}
 310  		case <-cm.quit.Wait():
 311  			break out
 312  		}
 313  	}
 314  	cm.wg.Done()
 315  }
 316  
 317  // NewConnReq creates a new connection request and connects to the corresponding address.
 318  func (cm *ConnManager) NewConnReq() {
 319  	T.Ln("creating new connreq @", log.Caller("thingy", 1))
 320  	if atomic.LoadInt32(&cm.stop) != 0 {
 321  		return
 322  	}
 323  	if cm.Cfg.GetNewAddress == nil {
 324  		return
 325  	}
 326  	c := &ConnReq{}
 327  	atomic.StoreUint64(&c.id, atomic.AddUint64(&cm.connReqCount, 1))
 328  	// Submit a request of a pending connection attempt to the connection manager. By registering the id before the
 329  	// connection is even established, we'll be able to later cancel the connection via the Remove method.
 330  	done := qu.T()
 331  	select {
 332  	case cm.requests <- registerPending{c, done}:
 333  	case <-cm.quit.Wait():
 334  		return
 335  	}
 336  	// Wait for the registration to successfully add the pending conn req to the conn manager's internal state.
 337  	select {
 338  	case <-done.Wait():
 339  	case <-cm.quit.Wait():
 340  		return
 341  	}
 342  	addr, e := cm.Cfg.GetNewAddress()
 343  	if e != nil {
 344  		// T.Ln(e)
 345  		select {
 346  		case cm.requests <- handleFailed{c, e}:
 347  		case <-cm.quit.Wait():
 348  		}
 349  		return
 350  	}
 351  	c.Addr = addr
 352  	cm.Connect(c)
 353  }
 354  
 355  // Connect assigns an id and dials a connection to the address of the connection request.
 356  func (cm *ConnManager) Connect(c *ConnReq) {
 357  	if atomic.LoadInt32(&cm.stop) != 0 {
 358  		return
 359  	}
 360  	for i := range cm.Cfg.Listeners {
 361  		if cm.Cfg.Listeners[i].Addr().String() == c.Addr.String() {
 362  			D.Ln("not making outbound connection to our own listener address")
 363  			return
 364  		}
 365  	}
 366  	if atomic.LoadUint64(&c.id) == 0 {
 367  		atomic.StoreUint64(&c.id, atomic.AddUint64(&cm.connReqCount, 1))
 368  		// Submit a request of a pending connection attempt to the connection manager. By registering the id before the
 369  		// connection is even established, we'll be able to later cancel the connection via the Remove method.
 370  		T.Ln("sending request to register connection")
 371  		done := qu.T()
 372  		select {
 373  		case cm.requests <- registerPending{c, done}:
 374  		case <-cm.quit.Wait():
 375  			return
 376  		}
 377  		T.Ln("waiting for response")
 378  		// Wait for the registration to successfully add the pending conn req to the conn manager's internal state.
 379  		select {
 380  		case <-done.Wait():
 381  		case <-cm.quit.Wait():
 382  			return
 383  		}
 384  	}
 385  	T.Ln("response received")
 386  	if len(cm.Cfg.Listeners) > 0 {
 387  		T.F("%s attempting to connect to '%s'", cm.Cfg.Listeners[0].Addr(), c.Addr)
 388  	}
 389  	// Traces(cm.Cfg.Dial)
 390  	conn, e := cm.Cfg.Dial(c.Addr)
 391  	// E.Ln(err, c.Addr)
 392  	if e != nil {
 393  		T.Ln(e)
 394  		select {
 395  		case cm.requests <- handleFailed{c, e}:
 396  		case <-cm.quit.Wait():
 397  		}
 398  		return
 399  	}
 400  	select {
 401  	case cm.requests <- handleConnected{c, conn}:
 402  	case <-cm.quit.Wait():
 403  	}
 404  }
 405  
 406  // Disconnect disconnects the connection corresponding to the given connection id. If permanent, the connection will be
 407  // retried with an increasing backoff duration.
 408  func (cm *ConnManager) Disconnect(id uint64) {
 409  	if atomic.LoadInt32(&cm.stop) != 0 {
 410  		return
 411  	}
 412  	select {
 413  	case cm.requests <- handleDisconnected{id, true}:
 414  	case <-cm.quit.Wait():
 415  	}
 416  }
 417  
 418  // Remove removes the connection corresponding to the given connection id from known connections.
 419  //
 420  // NOTE: This method can also be used to cancel a lingering connection attempt that hasn't yet succeeded.
 421  func (cm *ConnManager) Remove(id uint64) {
 422  	if atomic.LoadInt32(&cm.stop) != 0 {
 423  		return
 424  	}
 425  	select {
 426  	case cm.requests <- handleDisconnected{id, false}:
 427  	case <-cm.quit.Wait():
 428  	}
 429  }
 430  
 431  // listenHandler accepts incoming connections on a given listener.
 432  //
 433  // It must be run as a goroutine.
 434  func (cm *ConnManager) listenHandler(listener net.Listener) {
 435  	I.C(
 436  		func() string {
 437  			return fmt.Sprint("node listening on ", listener.Addr())
 438  		},
 439  	)
 440  	for atomic.LoadInt32(&cm.stop) == 0 {
 441  		conn, e := listener.Accept()
 442  		if e != nil {
 443  			T.Ln(e)
 444  			// Only log the error if not forcibly shutting down.
 445  			if atomic.LoadInt32(&cm.stop) == 0 {
 446  				E.Ln("can't accept connection:", e)
 447  			}
 448  			continue
 449  		}
 450  		go cm.Cfg.OnAccept(conn)
 451  	}
 452  	cm.wg.Done()
 453  	if e := listener.Close(); E.Chk(e) {
 454  	}
 455  	T.Ln(fmt.Sprint("listener handler done for ", listener.Addr()))
 456  }
 457  
 458  // Start launches the connection manager and begins connecting to the network.
 459  func (cm *ConnManager) Start() {
 460  	// Already started?
 461  	if atomic.AddInt32(&cm.start, 1) != 1 {
 462  		return
 463  	}
 464  	cm.wg.Add(1)
 465  	go cm.connHandler()
 466  	// Start all the listeners so long as the caller requested them and provided a callback to be invoked when
 467  	// connections are accepted.
 468  	if cm.Cfg.OnAccept != nil {
 469  		for _, listner := range cm.Cfg.Listeners {
 470  			cm.wg.Add(1)
 471  			go cm.listenHandler(listner)
 472  		}
 473  	}
 474  	for i := atomic.LoadUint64(&cm.connReqCount); i < uint64(cm.Cfg.TargetOutbound); i++ {
 475  		go cm.NewConnReq()
 476  	}
 477  }
 478  
 479  // Wait blocks until the connection manager halts gracefully.
 480  func (cm *ConnManager) Wait() {
 481  	cm.wg.Wait()
 482  }
 483  
 484  // Stop gracefully shuts down the connection manager.
 485  func (cm *ConnManager) Stop() {
 486  	if atomic.AddInt32(&cm.stop, 1) != 1 {
 487  		D.Ln("connection manager already stopped")
 488  		return
 489  	}
 490  	// Stop all the listeners. There will not be any listeners if listening is disabled.
 491  	for _, listener := range cm.Cfg.Listeners {
 492  		// Ignore the error since this is shutdown and there is no way to recover anyways.
 493  		_ = listener.Close()
 494  	}
 495  	cm.quit.Q()
 496  }
 497  
 498  // New returns a new connection manager. Use Start to start connecting to the network.
 499  func New(cfg *Config) (*ConnManager, error) {
 500  	if cfg.Dial == nil {
 501  		E.Ln("Cfg.Dial is nil")
 502  		return nil, ErrDialNil
 503  	}
 504  	// Default to sane values
 505  	if cfg.RetryDuration <= 1 {
 506  		cfg.RetryDuration = defaultRetryDuration
 507  	}
 508  	if cfg.TargetOutbound < 1 {
 509  		cfg.TargetOutbound = defaultTargetOutbound
 510  	}
 511  	cm := ConnManager{
 512  		Cfg:      *cfg, // Copy so caller can't mutate
 513  		requests: make(chan interface{}),
 514  		quit:     qu.T(),
 515  	}
 516  	return &cm, nil
 517  }
 518