1 package connmgr
2 3 import (
4 "errors"
5 "fmt"
6 "github.com/p9c/p9/pkg/log"
7 "net"
8 "sync"
9 "sync/atomic"
10 "time"
11 12 "github.com/p9c/p9/pkg/qu"
13 )
14 15 // maxFailedAttempts is the maximum number of successive failed connection
16 // attempts after which network failure is assumed and new connections will be
17 // delayed by the configured retry duration.
18 const maxFailedAttempts = 3
19 20 // ErrDialNil is used to indicate that Dial cannot be nil in the configuration.
21 var ErrDialNil = errors.New("config: Dial cannot be nil")
22 23 // maxRetryDuration is the max duration of time retrying of a persistent
24 // connection is allowed to grow to. This is necessary since the retry logic
25 // uses a backoff mechanism which increases the interval base times the number
26 // of retries that have been done.
27 var maxRetryDuration = time.Hour
28 29 // defaultRetryDuration is the default duration of time for retrying persistent
30 // connections.
31 var defaultRetryDuration = time.Second * 60
32 33 // defaultTargetOutbound is the default number of outbound connections to
34 // maintain.
35 var defaultTargetOutbound = uint32(9)
36 37 // ConnState represents the state of the requested connection.
38 type ConnState uint8
39 40 // ConnState can be either pending, established, disconnected or failed. When a
41 // new connection is requested, it is attempted and categorized as established
42 // or failed depending on the connection result. An established connection which
43 // was disconnected is categorized as disconnected.
44 const (
45 ConnPending ConnState = iota
46 ConnFailing
47 ConnCanceled
48 ConnEstablished
49 ConnDisconnected
50 )
51 52 // ConnReq is the connection request to a network address. If permanent, the
53 // connection will be retried on disconnection.
54 type ConnReq struct {
55 // The following variables must only be used atomically.
56 id uint64
57 Addr net.Addr
58 Permanent bool
59 conn net.Conn
60 state ConnState
61 stateMtx sync.RWMutex
62 retryCount uint32
63 }
64 65 // updateState updates the state of the connection request.
66 func (c *ConnReq) updateState(state ConnState) {
67 c.stateMtx.Lock()
68 c.state = state
69 c.stateMtx.Unlock()
70 }
71 72 // ID returns a unique identifier for the connection request.
73 func (c *ConnReq) ID() uint64 {
74 return atomic.LoadUint64(&c.id)
75 }
76 77 // State is the connection state of the requested connection.
78 func (c *ConnReq) State() ConnState {
79 c.stateMtx.RLock()
80 state := c.state
81 c.stateMtx.RUnlock()
82 return state
83 }
84 85 // String returns a human-readable string for the connection request.
86 func (c *ConnReq) String() string {
87 if c.Addr == nil || c.Addr.String() == "" {
88 return fmt.Sprintf("reqid %d", atomic.LoadUint64(&c.id))
89 }
90 return fmt.Sprintf("%s (reqid %d)", c.Addr, atomic.LoadUint64(&c.id))
91 }
92 93 // Config holds the configuration options related to the connection manager.
94 type Config struct {
95 // Listeners defines a slice of listeners for which the connection manager will
96 // take ownership of and accept connections.
97 //
98 // When a connection is accepted, the OnAccept handler will be invoked with the
99 // connection.
100 //
101 // Since the connection manager takes ownership of these listeners, they will be
102 // closed when the connection manager is stopped.
103 //
104 // This field will not have any effect if the OnAccept field is not specified.
105 // It may be nil if the caller does not wish to listen for incoming connections.
106 Listeners []net.Listener
107 // OnAccept is a callback that is fired when an inbound connection is accepted.
108 // It is the caller's responsibility to close the connection.
109 //
110 // Failure to close the connection will result in the connection manager
111 // believing the connection is still active and thus have undesirable side
112 // effects such as still counting toward maximum connection limits.
113 //
114 // This field will not have any effect if the Listeners field is not also
115 // specified since there couldn't possibly be any accepted connections in that
116 // case.
117 OnAccept func(net.Conn)
118 // TargetOutbound is the number of outbound network connections to maintain.
119 // Defaults to 8.
120 TargetOutbound uint32
121 // RetryDuration is the duration to wait before retrying connection requests.
122 // Defaults to 5s.
123 RetryDuration time.Duration
124 // OnConnection is a callback that is fired when a new outbound connection is
125 // established.
126 OnConnection func(*ConnReq, net.Conn)
127 // OnDisconnection is a callback that is fired when an outbound connection is
128 // disconnected.
129 OnDisconnection func(*ConnReq)
130 // GetNewAddress is a way to get an address to make a network connection to. If
131 // nil, no new connections will be made automatically.
132 GetNewAddress func() (net.Addr, error)
133 // Dial connects to the address on the named network. It cannot be nil.
134 Dial func(net.Addr) (net.Conn, error)
135 }
136 137 // registerPending is used to register a pending connection attempt. By
138 // registering pending connection attempts we allow callers to cancel pending
139 // connection attempts before their successful or in the case they're not longer
140 // wanted.
141 type registerPending struct {
142 c *ConnReq
143 done qu.C
144 }
145 146 // handleConnected is used to queue a successful connection.
147 type handleConnected struct {
148 c *ConnReq
149 conn net.Conn
150 }
151 152 // handleDisconnected is used to remove a connection.
153 type handleDisconnected struct {
154 id uint64
155 retry bool
156 }
157 158 // handleFailed is used to remove a pending connection.
159 type handleFailed struct {
160 c *ConnReq
161 e error
162 }
163 164 // ConnManager provides a manager to handle network connections.
165 type ConnManager struct {
166 // The following variables must only be used atomically.
167 connReqCount uint64
168 start int32
169 stop int32
170 Cfg Config
171 wg sync.WaitGroup
172 failedAttempts uint64
173 requests chan interface{}
174 quit qu.C
175 }
176 177 // handleFailedConn handles a connection failed due to a disconnect or any other failure.
178 //
179 // If permanent, it retries the connection after the configured retry duration.
180 //
181 // Otherwise, if required, it makes a new connection request.
182 //
183 // After maxFailedConnectionAttempts new connections will be retried after the configured retry duration.
184 func (cm *ConnManager) handleFailedConn(c *ConnReq) {
185 if atomic.LoadInt32(&cm.stop) != 0 {
186 return
187 }
188 if c.Permanent {
189 c.retryCount++
190 d := time.Duration(c.retryCount) * cm.Cfg.RetryDuration
191 if d > maxRetryDuration {
192 d = maxRetryDuration
193 }
194 T.F("retrying connection to %v in %v", c, d)
195 time.AfterFunc(
196 d, func() {
197 cm.Connect(c)
198 },
199 )
200 } else if cm.Cfg.GetNewAddress != nil {
201 cm.failedAttempts++
202 if cm.failedAttempts >= maxFailedAttempts {
203 T.F(
204 "max failed connection attempts reached: [%d] -- retrying connection in: %v",
205 maxFailedAttempts,
206 cm.Cfg.RetryDuration,
207 )
208 time.AfterFunc(
209 cm.Cfg.RetryDuration, func() {
210 cm.NewConnReq()
211 },
212 )
213 } else {
214 go cm.NewConnReq()
215 }
216 }
217 }
218 219 // connHandler handles all connection related requests. It must be run as a goroutine. The connection handler makes sure
220 // that we maintain a pool of active outbound connections so that we remain connected to the network. Connection
221 // requests are processed and mapped by their assigned ids.
222 func (cm *ConnManager) connHandler() {
223 var (
224 // pending holds all registered conn requests that have yet to succeed.
225 pending = make(map[uint64]*ConnReq)
226 // conns represents the set of all actively connected peers.
227 conns = make(map[uint64]*ConnReq, cm.Cfg.TargetOutbound)
228 )
229 out:
230 for {
231 select {
232 case req := <-cm.requests:
233 switch msg := req.(type) {
234 case registerPending:
235 connReq := msg.c
236 connReq.updateState(ConnPending)
237 pending[msg.c.id] = connReq
238 msg.done.Q()
239 case handleConnected:
240 connReq := msg.c
241 if _, ok := pending[connReq.id]; !ok {
242 if msg.conn != nil {
243 if e := msg.conn.Close(); E.Chk(e) {
244 }
245 }
246 D.Ln("ignoring connection for canceled connreq", connReq)
247 continue
248 }
249 connReq.updateState(ConnEstablished)
250 connReq.conn = msg.conn
251 conns[connReq.id] = connReq
252 T.Ln("connected to ", connReq)
253 connReq.retryCount = 0
254 cm.failedAttempts = 0
255 delete(pending, connReq.id)
256 if cm.Cfg.OnConnection != nil {
257 go cm.Cfg.OnConnection(connReq, msg.conn)
258 }
259 case handleDisconnected:
260 connReq, ok := conns[msg.id]
261 if !ok {
262 connReq, ok = pending[msg.id]
263 if !ok {
264 E.Ln("unknown connid", msg.id)
265 continue
266 }
267 // Pending connection was found, remove it from pending map if we should ignore a later, successful
268 // connection.
269 connReq.updateState(ConnCanceled)
270 D.Ln("canceling:", connReq)
271 delete(pending, msg.id)
272 continue
273 }
274 // An existing connection was located, mark as disconnected and execute disconnection callback.
275 T.Ln("disconnected from", connReq)
276 delete(conns, msg.id)
277 if connReq.conn != nil {
278 if e := connReq.conn.Close(); E.Chk(e) {
279 }
280 }
281 if cm.Cfg.OnDisconnection != nil {
282 go cm.Cfg.OnDisconnection(connReq)
283 }
284 // All internal state has been cleaned up, if this connection is being removed, we will make no further
285 // attempts with this request.
286 if !msg.retry {
287 connReq.updateState(ConnDisconnected)
288 continue
289 }
290 // Otherwise, we will attempt a reconnection if we do not have enough peers, or if this is a persistent
291 // peer. The connection request is re added to the pending map, so that subsequent processing of
292 // connections and failures do not ignore the request.
293 if uint32(len(conns)) < cm.Cfg.TargetOutbound ||
294 connReq.Permanent {
295 connReq.updateState(ConnPending)
296 pending[msg.id] = connReq
297 cm.handleFailedConn(connReq)
298 }
299 case handleFailed:
300 connReq := msg.c
301 if _, ok := pending[connReq.id]; !ok {
302 D.Ln("ignoring connection for canceled conn req:", connReq)
303 continue
304 }
305 connReq.updateState(ConnFailing)
306 // T.F
307 // ("failed to connect to %v: %v", connReq, msg.err)
308 cm.handleFailedConn(connReq)
309 }
310 case <-cm.quit.Wait():
311 break out
312 }
313 }
314 cm.wg.Done()
315 }
316 317 // NewConnReq creates a new connection request and connects to the corresponding address.
318 func (cm *ConnManager) NewConnReq() {
319 T.Ln("creating new connreq @", log.Caller("thingy", 1))
320 if atomic.LoadInt32(&cm.stop) != 0 {
321 return
322 }
323 if cm.Cfg.GetNewAddress == nil {
324 return
325 }
326 c := &ConnReq{}
327 atomic.StoreUint64(&c.id, atomic.AddUint64(&cm.connReqCount, 1))
328 // Submit a request of a pending connection attempt to the connection manager. By registering the id before the
329 // connection is even established, we'll be able to later cancel the connection via the Remove method.
330 done := qu.T()
331 select {
332 case cm.requests <- registerPending{c, done}:
333 case <-cm.quit.Wait():
334 return
335 }
336 // Wait for the registration to successfully add the pending conn req to the conn manager's internal state.
337 select {
338 case <-done.Wait():
339 case <-cm.quit.Wait():
340 return
341 }
342 addr, e := cm.Cfg.GetNewAddress()
343 if e != nil {
344 // T.Ln(e)
345 select {
346 case cm.requests <- handleFailed{c, e}:
347 case <-cm.quit.Wait():
348 }
349 return
350 }
351 c.Addr = addr
352 cm.Connect(c)
353 }
354 355 // Connect assigns an id and dials a connection to the address of the connection request.
356 func (cm *ConnManager) Connect(c *ConnReq) {
357 if atomic.LoadInt32(&cm.stop) != 0 {
358 return
359 }
360 for i := range cm.Cfg.Listeners {
361 if cm.Cfg.Listeners[i].Addr().String() == c.Addr.String() {
362 D.Ln("not making outbound connection to our own listener address")
363 return
364 }
365 }
366 if atomic.LoadUint64(&c.id) == 0 {
367 atomic.StoreUint64(&c.id, atomic.AddUint64(&cm.connReqCount, 1))
368 // Submit a request of a pending connection attempt to the connection manager. By registering the id before the
369 // connection is even established, we'll be able to later cancel the connection via the Remove method.
370 T.Ln("sending request to register connection")
371 done := qu.T()
372 select {
373 case cm.requests <- registerPending{c, done}:
374 case <-cm.quit.Wait():
375 return
376 }
377 T.Ln("waiting for response")
378 // Wait for the registration to successfully add the pending conn req to the conn manager's internal state.
379 select {
380 case <-done.Wait():
381 case <-cm.quit.Wait():
382 return
383 }
384 }
385 T.Ln("response received")
386 if len(cm.Cfg.Listeners) > 0 {
387 T.F("%s attempting to connect to '%s'", cm.Cfg.Listeners[0].Addr(), c.Addr)
388 }
389 // Traces(cm.Cfg.Dial)
390 conn, e := cm.Cfg.Dial(c.Addr)
391 // E.Ln(err, c.Addr)
392 if e != nil {
393 T.Ln(e)
394 select {
395 case cm.requests <- handleFailed{c, e}:
396 case <-cm.quit.Wait():
397 }
398 return
399 }
400 select {
401 case cm.requests <- handleConnected{c, conn}:
402 case <-cm.quit.Wait():
403 }
404 }
405 406 // Disconnect disconnects the connection corresponding to the given connection id. If permanent, the connection will be
407 // retried with an increasing backoff duration.
408 func (cm *ConnManager) Disconnect(id uint64) {
409 if atomic.LoadInt32(&cm.stop) != 0 {
410 return
411 }
412 select {
413 case cm.requests <- handleDisconnected{id, true}:
414 case <-cm.quit.Wait():
415 }
416 }
417 418 // Remove removes the connection corresponding to the given connection id from known connections.
419 //
420 // NOTE: This method can also be used to cancel a lingering connection attempt that hasn't yet succeeded.
421 func (cm *ConnManager) Remove(id uint64) {
422 if atomic.LoadInt32(&cm.stop) != 0 {
423 return
424 }
425 select {
426 case cm.requests <- handleDisconnected{id, false}:
427 case <-cm.quit.Wait():
428 }
429 }
430 431 // listenHandler accepts incoming connections on a given listener.
432 //
433 // It must be run as a goroutine.
434 func (cm *ConnManager) listenHandler(listener net.Listener) {
435 I.C(
436 func() string {
437 return fmt.Sprint("node listening on ", listener.Addr())
438 },
439 )
440 for atomic.LoadInt32(&cm.stop) == 0 {
441 conn, e := listener.Accept()
442 if e != nil {
443 T.Ln(e)
444 // Only log the error if not forcibly shutting down.
445 if atomic.LoadInt32(&cm.stop) == 0 {
446 E.Ln("can't accept connection:", e)
447 }
448 continue
449 }
450 go cm.Cfg.OnAccept(conn)
451 }
452 cm.wg.Done()
453 if e := listener.Close(); E.Chk(e) {
454 }
455 T.Ln(fmt.Sprint("listener handler done for ", listener.Addr()))
456 }
457 458 // Start launches the connection manager and begins connecting to the network.
459 func (cm *ConnManager) Start() {
460 // Already started?
461 if atomic.AddInt32(&cm.start, 1) != 1 {
462 return
463 }
464 cm.wg.Add(1)
465 go cm.connHandler()
466 // Start all the listeners so long as the caller requested them and provided a callback to be invoked when
467 // connections are accepted.
468 if cm.Cfg.OnAccept != nil {
469 for _, listner := range cm.Cfg.Listeners {
470 cm.wg.Add(1)
471 go cm.listenHandler(listner)
472 }
473 }
474 for i := atomic.LoadUint64(&cm.connReqCount); i < uint64(cm.Cfg.TargetOutbound); i++ {
475 go cm.NewConnReq()
476 }
477 }
478 479 // Wait blocks until the connection manager halts gracefully.
480 func (cm *ConnManager) Wait() {
481 cm.wg.Wait()
482 }
483 484 // Stop gracefully shuts down the connection manager.
485 func (cm *ConnManager) Stop() {
486 if atomic.AddInt32(&cm.stop, 1) != 1 {
487 D.Ln("connection manager already stopped")
488 return
489 }
490 // Stop all the listeners. There will not be any listeners if listening is disabled.
491 for _, listener := range cm.Cfg.Listeners {
492 // Ignore the error since this is shutdown and there is no way to recover anyways.
493 _ = listener.Close()
494 }
495 cm.quit.Q()
496 }
497 498 // New returns a new connection manager. Use Start to start connecting to the network.
499 func New(cfg *Config) (*ConnManager, error) {
500 if cfg.Dial == nil {
501 E.Ln("Cfg.Dial is nil")
502 return nil, ErrDialNil
503 }
504 // Default to sane values
505 if cfg.RetryDuration <= 1 {
506 cfg.RetryDuration = defaultRetryDuration
507 }
508 if cfg.TargetOutbound < 1 {
509 cfg.TargetOutbound = defaultTargetOutbound
510 }
511 cm := ConnManager{
512 Cfg: *cfg, // Copy so caller can't mutate
513 requests: make(chan interface{}),
514 quit: qu.T(),
515 }
516 return &cm, nil
517 }
518