channels.go raw

   1  package transport
   2  
   3  import (
   4  	"crypto/cipher"
   5  	"errors"
   6  	"fmt"
   7  	"net"
   8  	"runtime"
   9  	"strings"
  10  	"time"
  11  
  12  	"github.com/p9c/p9/pkg/log"
  13  
  14  	"github.com/p9c/p9/pkg/qu"
  15  
  16  	"github.com/p9c/p9/pkg/fec"
  17  	"github.com/p9c/p9/pkg/gcm"
  18  	"github.com/p9c/p9/pkg/multicast"
  19  )
  20  
  21  const (
  22  	UDPMulticastAddress     = "224.0.0.1"
  23  	success             int = iota // this is implicit zero of an int but starts the iota
  24  	closed
  25  	other
  26  	DefaultPort = 11049
  27  )
  28  
  29  var DefaultIP = net.IPv4(224, 0, 0, 1)
  30  var MulticastAddress = &net.UDPAddr{IP: DefaultIP, Port: DefaultPort}
  31  
  32  type (
  33  	MsgBuffer struct {
  34  		Buffers [][]byte
  35  		First   time.Time
  36  		Decoded bool
  37  		Source  net.Addr
  38  	}
  39  	// HandlerFunc is a function that is used to process a received message
  40  	HandlerFunc func(
  41  		ctx interface{}, src net.Addr, dst string, b []byte,
  42  	) (e error)
  43  	Handlers    map[string]HandlerFunc
  44  	Channel     struct {
  45  		buffers         map[string]*MsgBuffer
  46  		Ready           qu.C
  47  		context         interface{}
  48  		Creator         string
  49  		firstSender     *string
  50  		lastSent        *time.Time
  51  		MaxDatagramSize int
  52  		receiveCiph     cipher.AEAD
  53  		Receiver        *net.UDPConn
  54  		sendCiph        cipher.AEAD
  55  		Sender          *net.UDPConn
  56  	}
  57  )
  58  
  59  // SetDestination changes the address the outbound connection of a multicast
  60  // directs to
  61  func (c *Channel) SetDestination(dst string) (e error) {
  62  	D.Ln("sending to", dst)
  63  	if c.Sender, e = NewSender(dst, c.MaxDatagramSize); E.Chk(e) {
  64  	}
  65  	return
  66  }
  67  
  68  // Send fires off some data through the configured multicast's outbound.
  69  func (c *Channel) Send(magic []byte, nonce []byte, data []byte) (
  70  	n int, e error,
  71  ) {
  72  	if len(data) == 0 {
  73  		e = errors.New("not sending empty packet")
  74  		E.Ln(e)
  75  		return
  76  	}
  77  	var msg []byte
  78  	if msg, e = EncryptMessage(c.Creator, c.sendCiph, magic, nonce, data); E.Chk(e) {
  79  	}
  80  	n, e = c.Sender.Write(msg)
  81  	// D.Ln(msg)
  82  	return
  83  }
  84  
  85  // SendMany sends a BufIter of shards as produced by GetShards
  86  func (c *Channel) SendMany(magic []byte, b [][]byte) (e error) {
  87  	D.Ln("magic", string(magic), log.Caller("sending from", 1))
  88  	var nonce []byte
  89  	if nonce, e = GetNonce(c.sendCiph); E.Chk(e) {
  90  	} else {
  91  		for i := 0; i < len(b); i++ {
  92  			// D.Ln(i)
  93  			// D.Ln("segment length", len(b[i]))
  94  			if _, e = c.Send(magic, nonce, b[i]); E.Chk(e) {
  95  				// debug.PrintStack()
  96  			}
  97  		}
  98  		// T.Ln(c.Creator, "sent packets", string(magic), hex.EncodeToString(nonce), c.Sender.LocalAddr(), c.Sender.RemoteAddr())
  99  	}
 100  	return
 101  }
 102  
 103  // Close the multicast
 104  func (c *Channel) Close() (e error) {
 105  	// if e = c.Sender.Close(); E.Chk(e) {
 106  	// }
 107  	// if e = c.Receiver.Close(); E.Chk(e) {
 108  	// }
 109  	return
 110  }
 111  
 112  // GetShards returns a buffer iterator to feed to Channel.SendMany containing
 113  // fec encoded shards built from the provided buffer
 114  func GetShards(data []byte) (shards [][]byte) {
 115  	var e error
 116  	if shards, e = fec.Encode(data); E.Chk(e) {
 117  	}
 118  	return
 119  }
 120  
 121  // NewUnicastChannel sets up a listener and sender for a specified destination
 122  func NewUnicastChannel(
 123  	creator string, ctx interface{}, key []byte, sender, receiver string,
 124  	maxDatagramSize int,
 125  	handlers Handlers, quit qu.C,
 126  ) (channel *Channel, e error) {
 127  	channel = &Channel{
 128  		Creator:         creator,
 129  		MaxDatagramSize: maxDatagramSize,
 130  		buffers:         make(map[string]*MsgBuffer),
 131  		context:         ctx,
 132  	}
 133  	var magics []string
 134  	bytes := make([]byte, len(key))
 135  	copy(bytes, key)
 136  	for i := range handlers {
 137  		magics = append(magics, i)
 138  	}
 139  	if channel.sendCiph, e = gcm.GetCipher(bytes); E.Chk(e) {
 140  	}
 141  	copy(bytes, key)
 142  	if channel.receiveCiph, e = gcm.GetCipher(bytes); E.Chk(e) {
 143  	}
 144  	for i := range bytes {
 145  		bytes[i] = 0
 146  		key[i] = 0
 147  	}
 148  	if channel.Receiver, e = Listen(receiver, channel, maxDatagramSize, handlers, quit); E.Chk(e) {
 149  	}
 150  	if channel.Sender, e = NewSender(sender, maxDatagramSize); E.Chk(e) {
 151  	}
 152  	D.Ln("starting unicast multicast:", channel.Creator, sender, receiver, magics)
 153  	return
 154  }
 155  
 156  // NewSender creates a new UDP connection to a specified address
 157  func NewSender(address string, maxDatagramSize int) (
 158  	conn *net.UDPConn, e error,
 159  ) {
 160  	var addr *net.UDPAddr
 161  	if addr, e = net.ResolveUDPAddr("udp4", address); E.Chk(e) {
 162  		return
 163  	} else if conn, e = net.DialUDP("udp4", nil, addr); E.Chk(e) {
 164  		// debug.PrintStack()
 165  		return
 166  	}
 167  	D.Ln("started new sender on", conn.LocalAddr(), "->", conn.RemoteAddr())
 168  	if e = conn.SetWriteBuffer(maxDatagramSize); E.Chk(e) {
 169  	}
 170  	return
 171  }
 172  
 173  // Listen binds to the UDP Address and port given and writes packets received
 174  // from that Address to a buffer which is passed to a handler
 175  func Listen(
 176  	address string, channel *Channel, maxDatagramSize int, handlers Handlers,
 177  	quit qu.C,
 178  ) (conn *net.UDPConn, e error) {
 179  	var addr *net.UDPAddr
 180  	if addr, e = net.ResolveUDPAddr("udp4", address); E.Chk(e) {
 181  		return
 182  	} else if conn, e = net.ListenUDP("udp4", addr); E.Chk(e) {
 183  		return
 184  	} else if conn == nil {
 185  		return nil, errors.New("unable to start connection ")
 186  	}
 187  	D.Ln("starting listener on", conn.LocalAddr(), "->", conn.RemoteAddr())
 188  	if e = conn.SetReadBuffer(maxDatagramSize); E.Chk(e) {
 189  		// not a critical error but should not happen
 190  	}
 191  	go Handle(address, channel, handlers, maxDatagramSize, quit)
 192  	return
 193  }
 194  
 195  // NewBroadcastChannel returns a broadcaster and listener with a given handler
 196  // on a multicast address and specified port. The handlers define the messages
 197  // that will be processed and any other messages are ignored
 198  func NewBroadcastChannel(
 199  	creator string, ctx interface{}, key []byte, port int, maxDatagramSize int,
 200  	handlers Handlers,
 201  	quit qu.C,
 202  ) (channel *Channel, e error) {
 203  	channel = &Channel{
 204  		Creator:         creator,
 205  		MaxDatagramSize: maxDatagramSize,
 206  		buffers:         make(map[string]*MsgBuffer),
 207  		context:         ctx,
 208  		Ready:           qu.T(),
 209  	}
 210  	bytes := make([]byte, len(key))
 211  	copy(bytes, key)
 212  	if channel.sendCiph, e = gcm.GetCipher(bytes); E.Chk(e) {
 213  		panic(e)
 214  	}
 215  	copy(bytes, key)
 216  	if channel.receiveCiph, e = gcm.GetCipher(bytes); E.Chk(e) {
 217  		panic(e)
 218  	}
 219  	for i := range bytes {
 220  		key[i] = 0
 221  		bytes[i] = 0
 222  	}
 223  	if channel.Receiver, e = ListenBroadcast(port, channel, maxDatagramSize, handlers, quit); E.Chk(e) {
 224  	}
 225  	if channel.Sender, e = NewBroadcaster(port, maxDatagramSize); E.Chk(e) {
 226  	}
 227  	channel.Ready.Q()
 228  	return
 229  }
 230  
 231  // NewBroadcaster creates a new UDP multicast connection on which to broadcast
 232  func NewBroadcaster(port int, maxDatagramSize int) (
 233  	conn *net.UDPConn, e error,
 234  ) {
 235  	address := net.JoinHostPort(UDPMulticastAddress, fmt.Sprint(port))
 236  	if conn, e = NewSender(address, maxDatagramSize); E.Chk(e) {
 237  	}
 238  	return
 239  }
 240  
 241  // ListenBroadcast binds to the UDP Address and port given and writes packets
 242  // received from that Address to a buffer which is passed to a handler
 243  func ListenBroadcast(
 244  	port int,
 245  	channel *Channel,
 246  	maxDatagramSize int,
 247  	handlers Handlers,
 248  	quit qu.C,
 249  ) (conn *net.UDPConn, e error) {
 250  	if conn, e = multicast.Conn(port); E.Chk(e) {
 251  		return
 252  	}
 253  	address := conn.LocalAddr().String()
 254  	var magics []string
 255  	for i := range handlers {
 256  		magics = append(magics, i)
 257  	}
 258  	// D.S(handlers)
 259  	// D.Ln("magics", magics, PrevCallers())
 260  	D.Ln("starting broadcast listener", channel.Creator, address, magics)
 261  	if e = conn.SetReadBuffer(maxDatagramSize); E.Chk(e) {
 262  	}
 263  	channel.Receiver = conn
 264  	go Handle(address, channel, handlers, maxDatagramSize, quit)
 265  	return
 266  }
 267  
 268  func handleNetworkError(address string, e error) (result int) {
 269  	if len(strings.Split(e.Error(), "use of closed network connection")) >= 2 {
 270  		D.Ln("connection closed", address)
 271  		result = closed
 272  	} else {
 273  		E.F("ReadFromUDP failed: '%s'", e)
 274  		result = other
 275  	}
 276  	return
 277  }
 278  
 279  // Handle listens for messages, decodes them, aggregates them, recovers the data
 280  // from the reed solomon fec shards received and invokes the handler provided
 281  // matching the magic on the complete received messages
 282  func Handle(
 283  	address string, channel *Channel,
 284  	handlers Handlers, maxDatagramSize int, quit qu.C,
 285  ) {
 286  	buffer := make([]byte, maxDatagramSize)
 287  	T.Ln("starting handler for", channel.Creator, "listener")
 288  	// Loop forever reading from the socket until it is closed
 289  	// seenNonce := ""
 290  	var e error
 291  	var numBytes int
 292  	var src net.Addr
 293  	// var seenNonce string
 294  	<-channel.Ready
 295  out:
 296  	for {
 297  		select {
 298  		case <-quit.Wait():
 299  			break out
 300  		default:
 301  		}
 302  		if numBytes, src, e = channel.Receiver.ReadFromUDP(buffer); E.Chk(e) {
 303  			switch handleNetworkError(address, e) {
 304  			case closed:
 305  				break out
 306  			case other:
 307  				continue
 308  			case success:
 309  			}
 310  		}
 311  		// Filter messages by magic, if there is no match in the map the packet is
 312  		// ignored
 313  		magic := string(buffer[:4])
 314  		if handler, ok := handlers[magic]; ok {
 315  			if channel.lastSent != nil && channel.firstSender != nil {
 316  				*channel.lastSent = time.Now()
 317  			}
 318  			msg := buffer[:numBytes]
 319  			nL := channel.receiveCiph.NonceSize()
 320  			nonceBytes := msg[4 : 4+nL]
 321  			nonce := string(nonceBytes)
 322  			var shard []byte
 323  			if shard, e = channel.receiveCiph.Open(nil, nonceBytes, msg[4+len(nonceBytes):], nil); e != nil {
 324  				continue
 325  			}
 326  			// D.Ln("read", numBytes, "from", src, e, hex.EncodeToString(msg))
 327  			if bn, ok := channel.buffers[nonce]; ok {
 328  				if !bn.Decoded {
 329  					bn.Buffers = append(bn.Buffers, shard)
 330  					if len(bn.Buffers) >= 3 {
 331  						// try to decode it
 332  						var cipherText []byte
 333  						if cipherText, e = fec.Decode(bn.Buffers); E.Chk(e) {
 334  							continue
 335  						}
 336  						// D.F("received packet with magic %s from %s len %d bytes", magic, src.String(), len(cipherText))
 337  						bn.Decoded = true
 338  						if e = handler(channel.context, src, address, cipherText); E.Chk(e) {
 339  							continue
 340  						}
 341  						// src = nil
 342  						// buffer = buffer[:0]
 343  					}
 344  				} else {
 345  					for i := range channel.buffers {
 346  						if i != nonce && channel.buffers[i].Decoded {
 347  							// superseded messages can be deleted from the buffers, we don't add more data
 348  							// for the already decoded. todo: this will be changed to track stats for the
 349  							// puncture rate and redundancy scaling
 350  							delete(channel.buffers, i)
 351  						}
 352  					}
 353  				}
 354  			} else {
 355  				channel.buffers[nonce] = &MsgBuffer{
 356  					[][]byte{},
 357  					time.Now(), false, src,
 358  				}
 359  				channel.buffers[nonce].Buffers = append(
 360  					channel.buffers[nonce].
 361  						Buffers, shard,
 362  				)
 363  			}
 364  		}
 365  	}
 366  }
 367  
 368  func PrevCallers() (out string) {
 369  	for i := 0; i < 10; i++ {
 370  		_, loc, iline, _ := runtime.Caller(i)
 371  		out += fmt.Sprintf("%s:%d \n", loc, iline)
 372  	}
 373  	return
 374  }
 375