transport.go raw
1 package transport
2
3 import (
4 "context"
5 "crypto/cipher"
6 "crypto/rand"
7 "errors"
8 "io"
9 "net"
10 "sync"
11 "time"
12
13 "github.com/p9c/p9/pkg/fec"
14 )
15
16 // HandleFunc is a map of handlers for working on received, decoded packets
17 type HandleFunc map[string]func(ctx interface{}) func(b []byte) (e error)
18
19 // Connection is the state and working memory references for a simple reliable UDP lan transport, encrypted by a GCM AES
20 // cipher, with the simple protocol of sending out 9 packets containing encrypted FEC shards containing a slice of
21 // bytes.
22 //
23 // This protocol probably won't work well outside of a multicast lan in adverse conditions but it is designed for local
24 // network control systems todo: it is if the updated fec segmenting code is put in
25 type Connection struct {
26 maxDatagramSize int
27 buffers map[string]*MsgBuffer
28 sendAddress *net.UDPAddr
29 SendConn net.Conn
30 listenAddress *net.UDPAddr
31 listenConn *net.PacketConn
32 ciph cipher.AEAD
33 ctx context.Context
34 mx *sync.Mutex
35 }
36
37 //
38 // // NewConnection creates a new connection with a defined default send
39 // // connection and listener and pre shared key password for encryption on the
40 // // local network
41 // func NewConnection(send, listen, preSharedKey string,
42 // maxDatagramSize int, ctx context.Context) (c *Connection, e error) {
43 // sendAddr := &net.UDPAddr{}
44 // var sendConn net.Conn
45 // listenAddr := &net.UDPAddr{}
46 // var listenConn net.PacketConn
47 // if listen != "" {
48 // config := &net.ListenConfig{Control: reusePort}
49 // listenConn, e = config.ListenPacket(context.Background(), "udp4", listen)
50 // if e != nil {
51 // E.Ln(e)
52 // }
53 // }
54 // if send != "" {
55 // // sendAddr, e = net.ResolveUDPAddr("udp4", send)
56 // // if e != nil {
57 // // E.Ln(e)
58 // // }
59 // sendConn, e = net.Dial("udp4", send)
60 // if e != nil {
61 // Error(err, sendAddr)
62 // }
63 // // L.Spew(sendConn)
64 // }
65 // var ciph cipher.AEAD
66 // if ciph, e = gcm.GetCipher(preSharedKey); E.Chk(e) {
67 // }
68 // return &Connection{
69 // maxDatagramSize: maxDatagramSize,
70 // buffers: make(map[string]*MsgBuffer),
71 // sendAddress: sendAddr,
72 // SendConn: sendConn,
73 // listenAddress: listenAddr,
74 // listenConn: &listenConn,
75 // ciph: ciph, // gcm.GetCipher(*cx.Config.MinerPass),
76 // ctx: ctx,
77 // mx: &sync.Mutex{},
78 // }, err
79 // }
80
81 // SetSendConn sets up an outbound connection
82 func (c *Connection) SetSendConn(ad string) (e error) {
83 // c.sendAddress, e = net.ResolveUDPAddr("udp4", ad)
84 // if e != nil {
85 // // }
86 var sC net.Conn
87 if sC, e = net.Dial("udp4", ad); !E.Chk(e) {
88 c.SendConn = sC
89 }
90 return
91 }
92
93 // CreateShards takes a slice of bites and generates 3
94 func (c *Connection) CreateShards(b, magic []byte) (
95 shards [][]byte,
96 e error,
97 ) {
98 magicLen := 4
99 // get a nonce for the packet, it is both message ID and salt
100 nonceLen := c.ciph.NonceSize()
101 nonce := make([]byte, nonceLen)
102 if _, e = io.ReadFull(rand.Reader, nonce); E.Chk(e) {
103 return
104 }
105 // generate the shards
106 if shards, e = fec.Encode(b); E.Chk(e) {
107 }
108 for i := range shards {
109 encryptedShard := c.ciph.Seal(nil, nonce, shards[i], nil)
110 shardLen := len(encryptedShard)
111 // assemble the packet: magic, nonce, and encrypted shard
112 outBytes := make([]byte, shardLen+magicLen+nonceLen)
113 copy(outBytes, magic[:magicLen])
114 copy(outBytes[magicLen:], nonce)
115 copy(outBytes[magicLen+nonceLen:], encryptedShard)
116 shards[i] = outBytes
117 }
118 return
119 }
120
121 func send(shards [][]byte, sendConn net.Conn) (e error) {
122 for i := range shards {
123 if _, e = sendConn.Write(shards[i]); E.Chk(e) {
124 }
125 }
126 return
127 }
128
129 func (c *Connection) Send(b, magic []byte) (e error) {
130 if len(magic) != 4 {
131 e = errors.New("magic must be 4 bytes long")
132 return
133 }
134 var shards [][]byte
135 shards, e = c.CreateShards(b, magic)
136 if e = send(shards, c.SendConn); E.Chk(e) {
137 }
138 return
139 }
140
141 func (c *Connection) SendTo(addr *net.UDPAddr, b, magic []byte) (e error) {
142 if len(magic) != 4 {
143 if e = errors.New("magic must be 4 bytes long"); E.Chk(e) {
144 return
145 }
146 }
147 var sendConn *net.UDPConn
148 if sendConn, e = net.DialUDP("udp", nil, addr); E.Chk(e) {
149 return
150 }
151 var shards [][]byte
152 if shards, e = c.CreateShards(b, magic); E.Chk(e) {
153 }
154 if e = send(shards, sendConn); E.Chk(e) {
155 }
156 return
157 }
158
159 func (c *Connection) SendShards(shards [][]byte) (e error) {
160 if e = send(shards, c.SendConn); E.Chk(e) {
161 }
162 return
163 }
164
165 func (c *Connection) SendShardsTo(shards [][]byte, addr *net.UDPAddr) (e error) {
166 var sendConn *net.UDPConn
167 if sendConn, e = net.DialUDP("udp", nil, addr); !E.Chk(e) {
168 if e = send(shards, sendConn); E.Chk(e) {
169 }
170 }
171 return
172 }
173
174 // Listen runs a goroutine that collects and attempts to decode the FEC shards
175 // once it has enough intact pieces
176 func (c *Connection) Listen(handlers HandleFunc, ifc interface{}, lastSent *time.Time, firstSender *string,) (e error) {
177 F.Ln("setting read buffer")
178 buffer := make([]byte, c.maxDatagramSize)
179 go func() {
180 F.Ln("starting connection handler")
181 out:
182 // read from socket until context is cancelled
183 for {
184 var src net.Addr
185 var n int
186 n, src, e = (*c.listenConn).ReadFrom(buffer)
187 buf := buffer[:n]
188 if E.Chk(e) {
189 // Error("ReadFromUDP failed:", e)
190 continue
191 }
192 magic := string(buf[:4])
193 if _, ok := handlers[magic]; ok {
194 // if caller needs to know the liveness status of the controller it is working on, the code below
195 if lastSent != nil && firstSender != nil {
196 *lastSent = time.Now()
197 }
198 nonceBytes := buf[4:16]
199 nonce := string(nonceBytes)
200 // decipher
201 var shard []byte
202 if shard, e = c.ciph.Open(nil, nonceBytes, buf[16:], nil); E.Chk(e) {
203 // corrupted or irrelevant message
204 continue
205 }
206 var bn *MsgBuffer
207 if bn, ok = c.buffers[nonce]; ok {
208 if !bn.Decoded {
209 bn.Buffers = append(bn.Buffers, shard)
210 if len(bn.Buffers) >= 3 {
211 // try to decode it
212 var cipherText []byte
213 if cipherText, e = fec.Decode(bn.Buffers); E.Chk(e) {
214 continue
215 }
216 bn.Decoded = true
217 if e = handlers[magic](ifc)(cipherText); E.Chk(e) {
218 continue
219 }
220 }
221 } else {
222 for i := range c.buffers {
223 if i != nonce {
224 // superseded messages can be deleted from the buffers, we don't add more data
225 // for the already decoded.
226 // F.Ln("deleting superseded buffer", hex.EncodeToString([]byte(i)))
227 delete(c.buffers, i)
228 }
229 }
230 }
231 } else {
232 // F.Ln("new message arriving",
233 // hex.EncodeToString([]byte(nonce)))
234 c.buffers[nonce] = &MsgBuffer{
235 [][]byte{},
236 time.Now(), false, src,
237 }
238 c.buffers[nonce].Buffers = append(
239 c.buffers[nonce].
240 Buffers, shard,
241 )
242 }
243 }
244 select {
245 case <-c.ctx.Done():
246 break out
247 default:
248 }
249 }
250 }()
251 return
252 }
253
254 //
255 // func GetUDPAddr(address string) (sendAddr *net.UDPAddr) {
256 // sendHost, sendPort, e := net.SplitHostPort(address)
257 // if e != nil {
258 // // return
259 // }
260 // sendPortI, e := strconv.ParseInt(sendPort, 10, 64)
261 // if e != nil {
262 // // return
263 // }
264 // sendAddr = &net.UDPAddr{IP: net.ParseIP(sendHost),
265 // Port: int(sendPortI)}
266 // // D.Ln("multicast", Address)
267 // // L.Spew(sendAddr)
268 // return
269 // }
270