server_workers.mx raw
1 package server
2
3 import (
4 "bytes"
5 "fmt"
6 "runtime"
7 "time"
8
9 "smesh.lol/pkg/pool"
10 "smesh.lol/pkg/relay/wire"
11 "smesh.lol/pkg/transport"
12 )
13
14 // --- Worker lifecycle ---
15
16 func (s *Server) startProxyWorkers(n int) {
17 s.proxyIn = []chan wire.ProxyRequest{:n}
18 s.proxyOut = []chan wire.ProxyResponse{:n}
19 s.proxyBusyTime = []int64{:n}
20 s.proxyPool = pool.NewPool(n)
21 for i := 0; i < n; i++ {
22 s.proxyIn[i] = chan wire.ProxyRequest{}
23 s.proxyOut[i] = chan wire.ProxyResponse{}
24 spawn(wire.ProxyWorker, s.proxyIn[i], s.proxyOut[i])
25 fd := runtime.LastSpawnedParentFd()
26 s.proxyPool.FDs = append(s.proxyPool.FDs, fd)
27 s.t.RegisterFD(fd)
28 }
29 }
30
31 func (s *Server) startBlossomWorkers(n int) {
32 s.blossomIn = []chan wire.BlossomRequest{:n}
33 s.blossomOut = []chan wire.BlossomResponse{:n}
34 s.blossomPool = pool.NewPool(n)
35 for i := 0; i < n; i++ {
36 s.blossomIn[i] = chan wire.BlossomRequest{}
37 s.blossomOut[i] = chan wire.BlossomResponse{}
38 spawn(wire.BlossomWorker, s.blossomIn[i], s.blossomOut[i])
39 fd := runtime.LastSpawnedParentFd()
40 s.blossomPool.FDs = append(s.blossomPool.FDs, fd)
41 s.t.RegisterFD(fd)
42 }
43 }
44
45 func (s *Server) respawnProxyWorker(i int) {
46 s.t.RemoveFD(s.proxyPool.FDs[i])
47 s.proxyIn[i] = chan wire.ProxyRequest{}
48 s.proxyOut[i] = chan wire.ProxyResponse{}
49 spawn(wire.ProxyWorker, s.proxyIn[i], s.proxyOut[i])
50 fd := runtime.LastSpawnedParentFd()
51 s.proxyPool.FDs[i] = fd
52 s.proxyPool.Busy[i] = false
53 s.proxyBusyTime[i] = 0
54 s.proxyPool.Readers[i].Reset()
55 s.t.RegisterFD(fd)
56 fmt.Println("respawned proxy worker", i)
57 }
58
59 func (s *Server) respawnBlossomWorker(i int) {
60 s.t.RemoveFD(s.blossomPool.FDs[i])
61 s.blossomIn[i] = chan wire.BlossomRequest{}
62 s.blossomOut[i] = chan wire.BlossomResponse{}
63 spawn(wire.BlossomWorker, s.blossomIn[i], s.blossomOut[i])
64 fd := runtime.LastSpawnedParentFd()
65 s.blossomPool.FDs[i] = fd
66 s.blossomPool.Busy[i] = false
67 s.blossomPool.Readers[i].Reset()
68 s.t.RegisterFD(fd)
69 fmt.Println("respawned blossom worker", i)
70 }
71
72 // --- Proxy dispatch ---
73
74 func (s *Server) doDispatchProxy(fd int, path string, headers map[string]string) bool {
75 target := path[len("/proxy/"):]
76 if target == "" {
77 s.t.SendHTTP(fd, 400, map[string]string{"Content-Type": "text/plain"}, []byte("missing url\n"))
78 return true
79 }
80 if transport.HasPrefix(target, s.selfHost) {
81 direct := target[len(s.selfHost)-1:]
82 s.t.SendHTTP(fd, 302, map[string]string{
83 "Location": direct,
84 "Access-Control-Allow-Origin": "*",
85 }, nil)
86 return true
87 }
88 url := []byte("https://" | target)
89 s.nextAsyncID++
90 rid := s.nextAsyncID
91 connClose := headers["connection"] == "close"
92 req := wire.ProxyRequest{ReqID: rid, MaxBytes: 32 * 1024 * 1024, URL: url}
93 s.proxyReapStuck()
94 if i := s.proxyPool.IdleIndex(); i >= 0 {
95 s.asyncPending[rid] = asyncHTTPEntry{connFD: fd, connClose: connClose, createdAt: time.Now().UnixNano()}
96 s.proxyPool.Busy[i] = true
97 s.proxyBusyTime[i] = time.Now().UnixNano()
98 if !s.proxyDispatchFrame(i, req) {
99 s.proxyPool.Busy[i] = false
100 s.proxyBusyTime[i] = 0
101 delete(s.asyncPending, rid)
102 } else {
103 return true
104 }
105 }
106 if len(s.proxyQueue) < 64 {
107 s.proxyQueue = append(s.proxyQueue, pendingProxy{connFD: fd, connClose: connClose, req: req})
108 return true
109 }
110 s.t.SendHTTP(fd, 503, map[string]string{
111 "Content-Type": "text/plain",
112 "Retry-After": "1",
113 }, []byte("proxy workers busy\n"))
114 return true
115 }
116
117 func (s *Server) drainProxyQueue() {
118 for len(s.proxyQueue) > 0 {
119 i := s.proxyPool.IdleIndex()
120 if i < 0 {
121 return
122 }
123 p := s.proxyQueue[0]
124 s.proxyQueue = s.proxyQueue[1:]
125 s.asyncPending[p.req.ReqID] = asyncHTTPEntry{connFD: p.connFD, connClose: p.connClose, createdAt: time.Now().UnixNano()}
126 s.proxyPool.Busy[i] = true
127 s.proxyBusyTime[i] = time.Now().UnixNano()
128 if !s.proxyDispatchFrame(i, p.req) {
129 s.proxyPool.Busy[i] = false
130 s.proxyBusyTime[i] = 0
131 delete(s.asyncPending, p.req.ReqID)
132 s.t.SendHTTP(p.connFD, 502, map[string]string{"Content-Type": "text/plain"}, []byte("proxy dispatch failed\n"))
133 }
134 }
135 }
136
137 func (s *Server) proxyReapStuck() {
138 now := time.Now().UnixNano()
139 reaped := false
140 for i := 0; i < s.proxyPool.Len(); i++ {
141 if s.proxyPool.Busy[i] && s.proxyBusyTime[i] > 0 {
142 elapsed := now - s.proxyBusyTime[i]
143 if elapsed > 15_000_000_000 {
144 fmt.Println("proxy worker", i, "stuck", elapsed/1_000_000_000, "s, respawning")
145 s.respawnProxyWorker(i)
146 reaped = true
147 }
148 }
149 }
150 if reaped {
151 s.drainProxyQueue()
152 }
153 }
154
155 func (s *Server) proxyDispatchFrame(i int, req wire.ProxyRequest) bool {
156 var buf bytes.Buffer
157 if err := req.EncodeTo(&buf); err != nil {
158 return false
159 }
160 return pool.WriteFrame(s.proxyPool.FDs[i], 0, buf.Bytes())
161 }
162
163 // --- Blossom dispatch ---
164
165 func (s *Server) doDispatchBlossom(fd int, method, path string, headers map[string]string, body []byte) bool {
166 bpath := path[len("/blossom"):]
167 i := s.blossomPool.IdleIndex()
168 if i >= 0 {
169 s.nextAsyncID++
170 rid := s.nextAsyncID
171 req := wire.BlossomRequest{
172 ReqID: rid,
173 Dir: []byte(s.cfg.BlossomDir),
174 Method: []byte(method),
175 Path: []byte(bpath),
176 ContentType: []byte(headers["content-type"]),
177 Body: body,
178 Upstream: []byte(s.cfg.BlossomUpstream),
179 }
180 var buf bytes.Buffer
181 if err := req.EncodeTo(&buf); err == nil {
182 if pool.WriteFrame(s.blossomPool.FDs[i], 1, buf.Bytes()) {
183 connClose := headers["connection"] == "close"
184 s.asyncPending[rid] = asyncHTTPEntry{connFD: fd, connClose: connClose, createdAt: time.Now().UnixNano()}
185 s.blossomPool.Busy[i] = true
186 return true
187 }
188 }
189 }
190 s.t.SendHTTP(fd, 503, map[string]string{
191 "Content-Type": "text/plain",
192 "Retry-After": "1",
193 }, []byte("blossom worker busy\n"))
194 return true
195 }
196
197 // asyncReapStuck removes asyncPending entries older than 60s.
198 // Does NOT send a response — connFD may have been reused by another
199 // connection by the time 60s have elapsed.
200 func (s *Server) asyncReapStuck() {
201 cutoff := time.Now().UnixNano() - 60_000_000_000
202 for rid, entry := range s.asyncPending {
203 if entry.createdAt > 0 && entry.createdAt < cutoff {
204 fmt.Println("asyncPending: reaping stale entry rid=", rid, "fd=", entry.connFD)
205 delete(s.asyncPending, rid)
206 }
207 }
208 }
209
210 // --- Worker response handlers ---
211
212 func (s *Server) handleProxyResponse(i int) {
213 chanID, data, ok := s.proxyPool.Readers[i].Read(s.proxyPool.FDs[i])
214 if !ok {
215 if runtime.PipeClosed(s.proxyPool.FDs[i]) {
216 s.respawnProxyWorker(i)
217 s.drainProxyQueue()
218 }
219 return
220 }
221 s.proxyPool.Busy[i] = false
222 s.proxyBusyTime[i] = 0
223 s.drainProxyQueue()
224 if chanID != 1 {
225 return
226 }
227 var resp wire.ProxyResponse
228 if err := resp.DecodeFrom(bytes.NewReader(data)); err != nil {
229 return
230 }
231 entry, ok := s.asyncPending[resp.ReqID]
232 if !ok {
233 return
234 }
235 delete(s.asyncPending, resp.ReqID)
236 var status int
237 var h map[string]string
238 var body []byte
239 switch {
240 case resp.Status < 0:
241 status = 502
242 h = map[string]string{"Content-Type": "text/plain"}
243 body = []byte("proxy: " | string(resp.Err) | "\n")
244 case resp.Status == 415:
245 status = 415
246 h = map[string]string{"Content-Type": "text/plain"}
247 body = []byte("content-type not allowed\n")
248 case resp.Status >= 200 && resp.Status < 300:
249 status = 200
250 h = map[string]string{
251 "Content-Type": string(resp.ContentType),
252 "Cross-Origin-Resource-Policy": "cross-origin",
253 "Cache-Control": "public, max-age=86400",
254 "Access-Control-Allow-Origin": "*",
255 }
256 body = resp.Body
257 default:
258 status = int(resp.Status)
259 h = map[string]string{
260 "Content-Type": "text/plain",
261 "Cache-Control": "public, max-age=3600",
262 }
263 body = []byte(fmt.Sprintf("upstream %d\n", resp.Status))
264 }
265 s.t.CompleteHTTP(entry.connFD, status, h, body, entry.connClose)
266 }
267
268 var blossomCORSHeaders = map[string]string{
269 "Access-Control-Allow-Origin": "*",
270 "Access-Control-Allow-Methods": "GET, PUT, DELETE, HEAD, OPTIONS",
271 "Access-Control-Allow-Headers": "Authorization, Content-Type",
272 }
273
274 func (s *Server) handleBlossomResponse(i int) {
275 chanID, data, ok := s.blossomPool.Readers[i].Read(s.blossomPool.FDs[i])
276 if !ok {
277 if runtime.PipeClosed(s.blossomPool.FDs[i]) {
278 s.respawnBlossomWorker(i)
279 }
280 return
281 }
282 if chanID != 2 {
283 return
284 }
285 var resp wire.BlossomResponse
286 if err := resp.DecodeFrom(bytes.NewReader(data)); err != nil {
287 return
288 }
289 s.blossomPool.Busy[i] = false
290 entry, ok := s.asyncPending[resp.ReqID]
291 if !ok {
292 return
293 }
294 delete(s.asyncPending, resp.ReqID)
295 h := map[string]string{}
296 for k, v := range blossomCORSHeaders {
297 h[k] = v
298 }
299 if len(resp.CT) > 0 {
300 h["Content-Type"] = string(resp.CT)
301 }
302 if resp.Size > 0 {
303 h["Content-Length"] = fmt.Sprintf("%d", resp.Size)
304 h["Content-Type"] = "application/octet-stream"
305 }
306 s.t.CompleteHTTP(entry.connFD, int(resp.Status), h, resp.Body, entry.connClose)
307 }
308