transport.go raw

   1  package transport
   2  
   3  import (
   4  	"context"
   5  	"crypto/cipher"
   6  	"crypto/rand"
   7  	"errors"
   8  	"io"
   9  	"net"
  10  	"sync"
  11  	"time"
  12  	
  13  	"github.com/p9c/p9/pkg/fec"
  14  )
  15  
  16  // HandleFunc is a map of handlers for working on received, decoded packets
  17  type HandleFunc map[string]func(ctx interface{}) func(b []byte) (e error)
  18  
  19  // Connection is the state and working memory references for a simple reliable UDP lan transport, encrypted by a GCM AES
  20  // cipher, with the simple protocol of sending out 9 packets containing encrypted FEC shards containing a slice of
  21  // bytes.
  22  //
  23  // This protocol probably won't work well outside of a multicast lan in adverse conditions but it is designed for local
  24  // network control systems todo: it is if the updated fec segmenting code is put in
  25  type Connection struct {
  26  	maxDatagramSize int
  27  	buffers         map[string]*MsgBuffer
  28  	sendAddress     *net.UDPAddr
  29  	SendConn        net.Conn
  30  	listenAddress   *net.UDPAddr
  31  	listenConn      *net.PacketConn
  32  	ciph            cipher.AEAD
  33  	ctx             context.Context
  34  	mx              *sync.Mutex
  35  }
  36  
  37  //
  38  // // NewConnection creates a new connection with a defined default send
  39  // // connection and listener and pre shared key password for encryption on the
  40  // // local network
  41  // func NewConnection(send, listen, preSharedKey string,
  42  // 	maxDatagramSize int, ctx context.Context) (c *Connection, e error) {
  43  // 	sendAddr := &net.UDPAddr{}
  44  // 	var sendConn net.Conn
  45  // 	listenAddr := &net.UDPAddr{}
  46  // 	var listenConn net.PacketConn
  47  // 	if listen != "" {
  48  // 		config := &net.ListenConfig{Control: reusePort}
  49  // 		listenConn, e = config.ListenPacket(context.Background(), "udp4", listen)
  50  // 		if e != nil  {
  51  // 			E.Ln(e)
  52  // 		}
  53  // 	}
  54  // 	if send != "" {
  55  // 		// sendAddr, e = net.ResolveUDPAddr("udp4", send)
  56  // 		// if e != nil  {
  57  // 		// 	E.Ln(e)
  58  // 		// }
  59  // 		sendConn, e = net.Dial("udp4", send)
  60  // 		if e != nil  {
  61  // 			Error(err, sendAddr)
  62  // 		}
  63  // 		// L.Spew(sendConn)
  64  // 	}
  65  // 	var ciph cipher.AEAD
  66  // 	if ciph, e = gcm.GetCipher(preSharedKey); E.Chk(e) {
  67  // 	}
  68  // 	return &Connection{
  69  // 		maxDatagramSize: maxDatagramSize,
  70  // 		buffers:         make(map[string]*MsgBuffer),
  71  // 		sendAddress:     sendAddr,
  72  // 		SendConn:        sendConn,
  73  // 		listenAddress:   listenAddr,
  74  // 		listenConn:      &listenConn,
  75  // 		ciph:            ciph, // gcm.GetCipher(*cx.Config.MinerPass),
  76  // 		ctx:             ctx,
  77  // 		mx:              &sync.Mutex{},
  78  // 	}, err
  79  // }
  80  
  81  // SetSendConn sets up an outbound connection
  82  func (c *Connection) SetSendConn(ad string) (e error) {
  83  	// c.sendAddress, e = net.ResolveUDPAddr("udp4", ad)
  84  	// if e != nil  {
  85  	// 		// }
  86  	var sC net.Conn
  87  	if sC, e = net.Dial("udp4", ad); !E.Chk(e) {
  88  		c.SendConn = sC
  89  	}
  90  	return
  91  }
  92  
  93  // CreateShards takes a slice of bites and generates 3
  94  func (c *Connection) CreateShards(b, magic []byte) (
  95  	shards [][]byte,
  96  	e error,
  97  ) {
  98  	magicLen := 4
  99  	// get a nonce for the packet, it is both message ID and salt
 100  	nonceLen := c.ciph.NonceSize()
 101  	nonce := make([]byte, nonceLen)
 102  	if _, e = io.ReadFull(rand.Reader, nonce); E.Chk(e) {
 103  		return
 104  	}
 105  	// generate the shards
 106  	if shards, e = fec.Encode(b); E.Chk(e) {
 107  	}
 108  	for i := range shards {
 109  		encryptedShard := c.ciph.Seal(nil, nonce, shards[i], nil)
 110  		shardLen := len(encryptedShard)
 111  		// assemble the packet: magic, nonce, and encrypted shard
 112  		outBytes := make([]byte, shardLen+magicLen+nonceLen)
 113  		copy(outBytes, magic[:magicLen])
 114  		copy(outBytes[magicLen:], nonce)
 115  		copy(outBytes[magicLen+nonceLen:], encryptedShard)
 116  		shards[i] = outBytes
 117  	}
 118  	return
 119  }
 120  
 121  func send(shards [][]byte, sendConn net.Conn) (e error) {
 122  	for i := range shards {
 123  		if _, e = sendConn.Write(shards[i]); E.Chk(e) {
 124  		}
 125  	}
 126  	return
 127  }
 128  
 129  func (c *Connection) Send(b, magic []byte) (e error) {
 130  	if len(magic) != 4 {
 131  		e = errors.New("magic must be 4 bytes long")
 132  		return
 133  	}
 134  	var shards [][]byte
 135  	shards, e = c.CreateShards(b, magic)
 136  	if e = send(shards, c.SendConn); E.Chk(e) {
 137  	}
 138  	return
 139  }
 140  
 141  func (c *Connection) SendTo(addr *net.UDPAddr, b, magic []byte) (e error) {
 142  	if len(magic) != 4 {
 143  		if e = errors.New("magic must be 4 bytes long"); E.Chk(e) {
 144  			return
 145  		}
 146  	}
 147  	var sendConn *net.UDPConn
 148  	if sendConn, e = net.DialUDP("udp", nil, addr); E.Chk(e) {
 149  		return
 150  	}
 151  	var shards [][]byte
 152  	if shards, e = c.CreateShards(b, magic); E.Chk(e) {
 153  	}
 154  	if e = send(shards, sendConn); E.Chk(e) {
 155  	}
 156  	return
 157  }
 158  
 159  func (c *Connection) SendShards(shards [][]byte) (e error) {
 160  	if e = send(shards, c.SendConn); E.Chk(e) {
 161  	}
 162  	return
 163  }
 164  
 165  func (c *Connection) SendShardsTo(shards [][]byte, addr *net.UDPAddr) (e error) {
 166  	var sendConn *net.UDPConn
 167  	if sendConn, e = net.DialUDP("udp", nil, addr); !E.Chk(e) {
 168  		if e = send(shards, sendConn); E.Chk(e) {
 169  		}
 170  	}
 171  	return
 172  }
 173  
 174  // Listen runs a goroutine that collects and attempts to decode the FEC shards
 175  // once it has enough intact pieces
 176  func (c *Connection) Listen(handlers HandleFunc, ifc interface{}, lastSent *time.Time, firstSender *string,) (e error) {
 177  	F.Ln("setting read buffer")
 178  	buffer := make([]byte, c.maxDatagramSize)
 179  	go func() {
 180  		F.Ln("starting connection handler")
 181  	out:
 182  		// read from socket until context is cancelled
 183  		for {
 184  			var src net.Addr
 185  			var n int
 186  			n, src, e = (*c.listenConn).ReadFrom(buffer)
 187  			buf := buffer[:n]
 188  			if E.Chk(e) {
 189  				// Error("ReadFromUDP failed:", e)
 190  				continue
 191  			}
 192  			magic := string(buf[:4])
 193  			if _, ok := handlers[magic]; ok {
 194  				// if caller needs to know the liveness status of the controller it is working on, the code below
 195  				if lastSent != nil && firstSender != nil {
 196  					*lastSent = time.Now()
 197  				}
 198  				nonceBytes := buf[4:16]
 199  				nonce := string(nonceBytes)
 200  				// decipher
 201  				var shard []byte
 202  				if shard, e = c.ciph.Open(nil, nonceBytes, buf[16:], nil); E.Chk(e) {
 203  					// corrupted or irrelevant message
 204  					continue
 205  				}
 206  				var bn *MsgBuffer
 207  				if bn, ok = c.buffers[nonce]; ok {
 208  					if !bn.Decoded {
 209  						bn.Buffers = append(bn.Buffers, shard)
 210  						if len(bn.Buffers) >= 3 {
 211  							// try to decode it
 212  							var cipherText []byte
 213  							if cipherText, e = fec.Decode(bn.Buffers); E.Chk(e) {
 214  								continue
 215  							}
 216  							bn.Decoded = true
 217  							if e = handlers[magic](ifc)(cipherText); E.Chk(e) {
 218  								continue
 219  							}
 220  						}
 221  					} else {
 222  						for i := range c.buffers {
 223  							if i != nonce {
 224  								// superseded messages can be deleted from the buffers, we don't add more data
 225  								// for the already decoded.
 226  								// F.Ln("deleting superseded buffer", hex.EncodeToString([]byte(i)))
 227  								delete(c.buffers, i)
 228  							}
 229  						}
 230  					}
 231  				} else {
 232  					// F.Ln("new message arriving",
 233  					// 	hex.EncodeToString([]byte(nonce)))
 234  					c.buffers[nonce] = &MsgBuffer{
 235  						[][]byte{},
 236  						time.Now(), false, src,
 237  					}
 238  					c.buffers[nonce].Buffers = append(
 239  						c.buffers[nonce].
 240  							Buffers, shard,
 241  					)
 242  				}
 243  			}
 244  			select {
 245  			case <-c.ctx.Done():
 246  				break out
 247  			default:
 248  			}
 249  		}
 250  	}()
 251  	return
 252  }
 253  
 254  //
 255  // func GetUDPAddr(address string) (sendAddr *net.UDPAddr) {
 256  // 	sendHost, sendPort, e := net.SplitHostPort(address)
 257  // 	if e != nil  {
 258  // 		// 		return
 259  // 	}
 260  // 	sendPortI, e := strconv.ParseInt(sendPort, 10, 64)
 261  // 	if e != nil  {
 262  // 		// 		return
 263  // 	}
 264  // 	sendAddr = &net.UDPAddr{IP: net.ParseIP(sendHost),
 265  // 		Port: int(sendPortI)}
 266  // 	// D.Ln("multicast", Address)
 267  // 	// L.Spew(sendAddr)
 268  // 	return
 269  // }
 270