channels.go raw
1 package transport
2
3 import (
4 "crypto/cipher"
5 "errors"
6 "fmt"
7 "net"
8 "runtime"
9 "strings"
10 "time"
11
12 "github.com/p9c/p9/pkg/log"
13
14 "github.com/p9c/p9/pkg/qu"
15
16 "github.com/p9c/p9/pkg/fec"
17 "github.com/p9c/p9/pkg/gcm"
18 "github.com/p9c/p9/pkg/multicast"
19 )
20
21 const (
22 UDPMulticastAddress = "224.0.0.1"
23 success int = iota // this is implicit zero of an int but starts the iota
24 closed
25 other
26 DefaultPort = 11049
27 )
28
29 var DefaultIP = net.IPv4(224, 0, 0, 1)
30 var MulticastAddress = &net.UDPAddr{IP: DefaultIP, Port: DefaultPort}
31
32 type (
33 MsgBuffer struct {
34 Buffers [][]byte
35 First time.Time
36 Decoded bool
37 Source net.Addr
38 }
39 // HandlerFunc is a function that is used to process a received message
40 HandlerFunc func(
41 ctx interface{}, src net.Addr, dst string, b []byte,
42 ) (e error)
43 Handlers map[string]HandlerFunc
44 Channel struct {
45 buffers map[string]*MsgBuffer
46 Ready qu.C
47 context interface{}
48 Creator string
49 firstSender *string
50 lastSent *time.Time
51 MaxDatagramSize int
52 receiveCiph cipher.AEAD
53 Receiver *net.UDPConn
54 sendCiph cipher.AEAD
55 Sender *net.UDPConn
56 }
57 )
58
59 // SetDestination changes the address the outbound connection of a multicast
60 // directs to
61 func (c *Channel) SetDestination(dst string) (e error) {
62 D.Ln("sending to", dst)
63 if c.Sender, e = NewSender(dst, c.MaxDatagramSize); E.Chk(e) {
64 }
65 return
66 }
67
68 // Send fires off some data through the configured multicast's outbound.
69 func (c *Channel) Send(magic []byte, nonce []byte, data []byte) (
70 n int, e error,
71 ) {
72 if len(data) == 0 {
73 e = errors.New("not sending empty packet")
74 E.Ln(e)
75 return
76 }
77 var msg []byte
78 if msg, e = EncryptMessage(c.Creator, c.sendCiph, magic, nonce, data); E.Chk(e) {
79 }
80 n, e = c.Sender.Write(msg)
81 // D.Ln(msg)
82 return
83 }
84
85 // SendMany sends a BufIter of shards as produced by GetShards
86 func (c *Channel) SendMany(magic []byte, b [][]byte) (e error) {
87 D.Ln("magic", string(magic), log.Caller("sending from", 1))
88 var nonce []byte
89 if nonce, e = GetNonce(c.sendCiph); E.Chk(e) {
90 } else {
91 for i := 0; i < len(b); i++ {
92 // D.Ln(i)
93 // D.Ln("segment length", len(b[i]))
94 if _, e = c.Send(magic, nonce, b[i]); E.Chk(e) {
95 // debug.PrintStack()
96 }
97 }
98 // T.Ln(c.Creator, "sent packets", string(magic), hex.EncodeToString(nonce), c.Sender.LocalAddr(), c.Sender.RemoteAddr())
99 }
100 return
101 }
102
103 // Close the multicast
104 func (c *Channel) Close() (e error) {
105 // if e = c.Sender.Close(); E.Chk(e) {
106 // }
107 // if e = c.Receiver.Close(); E.Chk(e) {
108 // }
109 return
110 }
111
112 // GetShards returns a buffer iterator to feed to Channel.SendMany containing
113 // fec encoded shards built from the provided buffer
114 func GetShards(data []byte) (shards [][]byte) {
115 var e error
116 if shards, e = fec.Encode(data); E.Chk(e) {
117 }
118 return
119 }
120
121 // NewUnicastChannel sets up a listener and sender for a specified destination
122 func NewUnicastChannel(
123 creator string, ctx interface{}, key []byte, sender, receiver string,
124 maxDatagramSize int,
125 handlers Handlers, quit qu.C,
126 ) (channel *Channel, e error) {
127 channel = &Channel{
128 Creator: creator,
129 MaxDatagramSize: maxDatagramSize,
130 buffers: make(map[string]*MsgBuffer),
131 context: ctx,
132 }
133 var magics []string
134 bytes := make([]byte, len(key))
135 copy(bytes, key)
136 for i := range handlers {
137 magics = append(magics, i)
138 }
139 if channel.sendCiph, e = gcm.GetCipher(bytes); E.Chk(e) {
140 }
141 copy(bytes, key)
142 if channel.receiveCiph, e = gcm.GetCipher(bytes); E.Chk(e) {
143 }
144 for i := range bytes {
145 bytes[i] = 0
146 key[i] = 0
147 }
148 if channel.Receiver, e = Listen(receiver, channel, maxDatagramSize, handlers, quit); E.Chk(e) {
149 }
150 if channel.Sender, e = NewSender(sender, maxDatagramSize); E.Chk(e) {
151 }
152 D.Ln("starting unicast multicast:", channel.Creator, sender, receiver, magics)
153 return
154 }
155
156 // NewSender creates a new UDP connection to a specified address
157 func NewSender(address string, maxDatagramSize int) (
158 conn *net.UDPConn, e error,
159 ) {
160 var addr *net.UDPAddr
161 if addr, e = net.ResolveUDPAddr("udp4", address); E.Chk(e) {
162 return
163 } else if conn, e = net.DialUDP("udp4", nil, addr); E.Chk(e) {
164 // debug.PrintStack()
165 return
166 }
167 D.Ln("started new sender on", conn.LocalAddr(), "->", conn.RemoteAddr())
168 if e = conn.SetWriteBuffer(maxDatagramSize); E.Chk(e) {
169 }
170 return
171 }
172
173 // Listen binds to the UDP Address and port given and writes packets received
174 // from that Address to a buffer which is passed to a handler
175 func Listen(
176 address string, channel *Channel, maxDatagramSize int, handlers Handlers,
177 quit qu.C,
178 ) (conn *net.UDPConn, e error) {
179 var addr *net.UDPAddr
180 if addr, e = net.ResolveUDPAddr("udp4", address); E.Chk(e) {
181 return
182 } else if conn, e = net.ListenUDP("udp4", addr); E.Chk(e) {
183 return
184 } else if conn == nil {
185 return nil, errors.New("unable to start connection ")
186 }
187 D.Ln("starting listener on", conn.LocalAddr(), "->", conn.RemoteAddr())
188 if e = conn.SetReadBuffer(maxDatagramSize); E.Chk(e) {
189 // not a critical error but should not happen
190 }
191 go Handle(address, channel, handlers, maxDatagramSize, quit)
192 return
193 }
194
195 // NewBroadcastChannel returns a broadcaster and listener with a given handler
196 // on a multicast address and specified port. The handlers define the messages
197 // that will be processed and any other messages are ignored
198 func NewBroadcastChannel(
199 creator string, ctx interface{}, key []byte, port int, maxDatagramSize int,
200 handlers Handlers,
201 quit qu.C,
202 ) (channel *Channel, e error) {
203 channel = &Channel{
204 Creator: creator,
205 MaxDatagramSize: maxDatagramSize,
206 buffers: make(map[string]*MsgBuffer),
207 context: ctx,
208 Ready: qu.T(),
209 }
210 bytes := make([]byte, len(key))
211 copy(bytes, key)
212 if channel.sendCiph, e = gcm.GetCipher(bytes); E.Chk(e) {
213 panic(e)
214 }
215 copy(bytes, key)
216 if channel.receiveCiph, e = gcm.GetCipher(bytes); E.Chk(e) {
217 panic(e)
218 }
219 for i := range bytes {
220 key[i] = 0
221 bytes[i] = 0
222 }
223 if channel.Receiver, e = ListenBroadcast(port, channel, maxDatagramSize, handlers, quit); E.Chk(e) {
224 }
225 if channel.Sender, e = NewBroadcaster(port, maxDatagramSize); E.Chk(e) {
226 }
227 channel.Ready.Q()
228 return
229 }
230
231 // NewBroadcaster creates a new UDP multicast connection on which to broadcast
232 func NewBroadcaster(port int, maxDatagramSize int) (
233 conn *net.UDPConn, e error,
234 ) {
235 address := net.JoinHostPort(UDPMulticastAddress, fmt.Sprint(port))
236 if conn, e = NewSender(address, maxDatagramSize); E.Chk(e) {
237 }
238 return
239 }
240
241 // ListenBroadcast binds to the UDP Address and port given and writes packets
242 // received from that Address to a buffer which is passed to a handler
243 func ListenBroadcast(
244 port int,
245 channel *Channel,
246 maxDatagramSize int,
247 handlers Handlers,
248 quit qu.C,
249 ) (conn *net.UDPConn, e error) {
250 if conn, e = multicast.Conn(port); E.Chk(e) {
251 return
252 }
253 address := conn.LocalAddr().String()
254 var magics []string
255 for i := range handlers {
256 magics = append(magics, i)
257 }
258 // D.S(handlers)
259 // D.Ln("magics", magics, PrevCallers())
260 D.Ln("starting broadcast listener", channel.Creator, address, magics)
261 if e = conn.SetReadBuffer(maxDatagramSize); E.Chk(e) {
262 }
263 channel.Receiver = conn
264 go Handle(address, channel, handlers, maxDatagramSize, quit)
265 return
266 }
267
268 func handleNetworkError(address string, e error) (result int) {
269 if len(strings.Split(e.Error(), "use of closed network connection")) >= 2 {
270 D.Ln("connection closed", address)
271 result = closed
272 } else {
273 E.F("ReadFromUDP failed: '%s'", e)
274 result = other
275 }
276 return
277 }
278
279 // Handle listens for messages, decodes them, aggregates them, recovers the data
280 // from the reed solomon fec shards received and invokes the handler provided
281 // matching the magic on the complete received messages
282 func Handle(
283 address string, channel *Channel,
284 handlers Handlers, maxDatagramSize int, quit qu.C,
285 ) {
286 buffer := make([]byte, maxDatagramSize)
287 T.Ln("starting handler for", channel.Creator, "listener")
288 // Loop forever reading from the socket until it is closed
289 // seenNonce := ""
290 var e error
291 var numBytes int
292 var src net.Addr
293 // var seenNonce string
294 <-channel.Ready
295 out:
296 for {
297 select {
298 case <-quit.Wait():
299 break out
300 default:
301 }
302 if numBytes, src, e = channel.Receiver.ReadFromUDP(buffer); E.Chk(e) {
303 switch handleNetworkError(address, e) {
304 case closed:
305 break out
306 case other:
307 continue
308 case success:
309 }
310 }
311 // Filter messages by magic, if there is no match in the map the packet is
312 // ignored
313 magic := string(buffer[:4])
314 if handler, ok := handlers[magic]; ok {
315 if channel.lastSent != nil && channel.firstSender != nil {
316 *channel.lastSent = time.Now()
317 }
318 msg := buffer[:numBytes]
319 nL := channel.receiveCiph.NonceSize()
320 nonceBytes := msg[4 : 4+nL]
321 nonce := string(nonceBytes)
322 var shard []byte
323 if shard, e = channel.receiveCiph.Open(nil, nonceBytes, msg[4+len(nonceBytes):], nil); e != nil {
324 continue
325 }
326 // D.Ln("read", numBytes, "from", src, e, hex.EncodeToString(msg))
327 if bn, ok := channel.buffers[nonce]; ok {
328 if !bn.Decoded {
329 bn.Buffers = append(bn.Buffers, shard)
330 if len(bn.Buffers) >= 3 {
331 // try to decode it
332 var cipherText []byte
333 if cipherText, e = fec.Decode(bn.Buffers); E.Chk(e) {
334 continue
335 }
336 // D.F("received packet with magic %s from %s len %d bytes", magic, src.String(), len(cipherText))
337 bn.Decoded = true
338 if e = handler(channel.context, src, address, cipherText); E.Chk(e) {
339 continue
340 }
341 // src = nil
342 // buffer = buffer[:0]
343 }
344 } else {
345 for i := range channel.buffers {
346 if i != nonce && channel.buffers[i].Decoded {
347 // superseded messages can be deleted from the buffers, we don't add more data
348 // for the already decoded. todo: this will be changed to track stats for the
349 // puncture rate and redundancy scaling
350 delete(channel.buffers, i)
351 }
352 }
353 }
354 } else {
355 channel.buffers[nonce] = &MsgBuffer{
356 [][]byte{},
357 time.Now(), false, src,
358 }
359 channel.buffers[nonce].Buffers = append(
360 channel.buffers[nonce].
361 Buffers, shard,
362 )
363 }
364 }
365 }
366 }
367
368 func PrevCallers() (out string) {
369 for i := 0; i < 10; i++ {
370 _, loc, iline, _ := runtime.Caller(i)
371 out += fmt.Sprintf("%s:%d \n", loc, iline)
372 }
373 return
374 }
375