package server import ( "bytes" "fmt" "runtime" "time" "smesh.lol/pkg/pool" "smesh.lol/pkg/relay/wire" "smesh.lol/pkg/transport" ) // --- Worker lifecycle --- func (s *Server) startProxyWorkers(n int) { s.proxyIn = []chan wire.ProxyRequest{:n} s.proxyOut = []chan wire.ProxyResponse{:n} s.proxyBusyTime = []int64{:n} s.proxyPool = pool.NewPool(n) for i := 0; i < n; i++ { s.proxyIn[i] = chan wire.ProxyRequest{} s.proxyOut[i] = chan wire.ProxyResponse{} spawn(wire.ProxyWorker, s.proxyIn[i], s.proxyOut[i]) fd := runtime.LastSpawnedParentFd() s.proxyPool.FDs = append(s.proxyPool.FDs, fd) s.t.RegisterFD(fd) } } func (s *Server) startBlossomWorkers(n int) { s.blossomIn = []chan wire.BlossomRequest{:n} s.blossomOut = []chan wire.BlossomResponse{:n} s.blossomPool = pool.NewPool(n) for i := 0; i < n; i++ { s.blossomIn[i] = chan wire.BlossomRequest{} s.blossomOut[i] = chan wire.BlossomResponse{} spawn(wire.BlossomWorker, s.blossomIn[i], s.blossomOut[i]) fd := runtime.LastSpawnedParentFd() s.blossomPool.FDs = append(s.blossomPool.FDs, fd) s.t.RegisterFD(fd) } } func (s *Server) respawnProxyWorker(i int) { s.t.RemoveFD(s.proxyPool.FDs[i]) s.proxyIn[i] = chan wire.ProxyRequest{} s.proxyOut[i] = chan wire.ProxyResponse{} spawn(wire.ProxyWorker, s.proxyIn[i], s.proxyOut[i]) fd := runtime.LastSpawnedParentFd() s.proxyPool.FDs[i] = fd s.proxyPool.Busy[i] = false s.proxyBusyTime[i] = 0 s.proxyPool.Readers[i].Reset() s.t.RegisterFD(fd) fmt.Println("respawned proxy worker", i) } func (s *Server) respawnBlossomWorker(i int) { s.t.RemoveFD(s.blossomPool.FDs[i]) s.blossomIn[i] = chan wire.BlossomRequest{} s.blossomOut[i] = chan wire.BlossomResponse{} spawn(wire.BlossomWorker, s.blossomIn[i], s.blossomOut[i]) fd := runtime.LastSpawnedParentFd() s.blossomPool.FDs[i] = fd s.blossomPool.Busy[i] = false s.blossomPool.Readers[i].Reset() s.t.RegisterFD(fd) fmt.Println("respawned blossom worker", i) } // --- Proxy dispatch --- func (s *Server) doDispatchProxy(fd int, path string, headers map[string]string) bool { target := path[len("/proxy/"):] if target == "" { s.t.SendHTTP(fd, 400, map[string]string{"Content-Type": "text/plain"}, []byte("missing url\n")) return true } if transport.HasPrefix(target, s.selfHost) { direct := target[len(s.selfHost)-1:] s.t.SendHTTP(fd, 302, map[string]string{ "Location": direct, "Access-Control-Allow-Origin": "*", }, nil) return true } url := []byte("https://" | target) s.nextAsyncID++ rid := s.nextAsyncID connClose := headers["connection"] == "close" req := wire.ProxyRequest{ReqID: rid, MaxBytes: 32 * 1024 * 1024, URL: url} s.proxyReapStuck() if i := s.proxyPool.IdleIndex(); i >= 0 { s.asyncPending[rid] = asyncHTTPEntry{connFD: fd, connClose: connClose, createdAt: time.Now().UnixNano()} s.proxyPool.Busy[i] = true s.proxyBusyTime[i] = time.Now().UnixNano() if !s.proxyDispatchFrame(i, req) { s.proxyPool.Busy[i] = false s.proxyBusyTime[i] = 0 delete(s.asyncPending, rid) } else { return true } } if len(s.proxyQueue) < 64 { s.proxyQueue = append(s.proxyQueue, pendingProxy{connFD: fd, connClose: connClose, req: req}) return true } s.t.SendHTTP(fd, 503, map[string]string{ "Content-Type": "text/plain", "Retry-After": "1", }, []byte("proxy workers busy\n")) return true } func (s *Server) drainProxyQueue() { for len(s.proxyQueue) > 0 { i := s.proxyPool.IdleIndex() if i < 0 { return } p := s.proxyQueue[0] s.proxyQueue = s.proxyQueue[1:] s.asyncPending[p.req.ReqID] = asyncHTTPEntry{connFD: p.connFD, connClose: p.connClose, createdAt: time.Now().UnixNano()} s.proxyPool.Busy[i] = true s.proxyBusyTime[i] = time.Now().UnixNano() if !s.proxyDispatchFrame(i, p.req) { s.proxyPool.Busy[i] = false s.proxyBusyTime[i] = 0 delete(s.asyncPending, p.req.ReqID) s.t.SendHTTP(p.connFD, 502, map[string]string{"Content-Type": "text/plain"}, []byte("proxy dispatch failed\n")) } } } func (s *Server) proxyReapStuck() { now := time.Now().UnixNano() reaped := false for i := 0; i < s.proxyPool.Len(); i++ { if s.proxyPool.Busy[i] && s.proxyBusyTime[i] > 0 { elapsed := now - s.proxyBusyTime[i] if elapsed > 15_000_000_000 { fmt.Println("proxy worker", i, "stuck", elapsed/1_000_000_000, "s, respawning") s.respawnProxyWorker(i) reaped = true } } } if reaped { s.drainProxyQueue() } } func (s *Server) proxyDispatchFrame(i int, req wire.ProxyRequest) bool { var buf bytes.Buffer if err := req.EncodeTo(&buf); err != nil { return false } return pool.WriteFrame(s.proxyPool.FDs[i], 0, buf.Bytes()) } // --- Blossom dispatch --- func (s *Server) doDispatchBlossom(fd int, method, path string, headers map[string]string, body []byte) bool { bpath := path[len("/blossom"):] i := s.blossomPool.IdleIndex() if i >= 0 { s.nextAsyncID++ rid := s.nextAsyncID req := wire.BlossomRequest{ ReqID: rid, Dir: []byte(s.cfg.BlossomDir), Method: []byte(method), Path: []byte(bpath), ContentType: []byte(headers["content-type"]), Body: body, Upstream: []byte(s.cfg.BlossomUpstream), } var buf bytes.Buffer if err := req.EncodeTo(&buf); err == nil { if pool.WriteFrame(s.blossomPool.FDs[i], 1, buf.Bytes()) { connClose := headers["connection"] == "close" s.asyncPending[rid] = asyncHTTPEntry{connFD: fd, connClose: connClose, createdAt: time.Now().UnixNano()} s.blossomPool.Busy[i] = true return true } } } s.t.SendHTTP(fd, 503, map[string]string{ "Content-Type": "text/plain", "Retry-After": "1", }, []byte("blossom worker busy\n")) return true } // asyncReapStuck removes asyncPending entries older than 60s. // Does NOT send a response — connFD may have been reused by another // connection by the time 60s have elapsed. func (s *Server) asyncReapStuck() { cutoff := time.Now().UnixNano() - 60_000_000_000 for rid, entry := range s.asyncPending { if entry.createdAt > 0 && entry.createdAt < cutoff { fmt.Println("asyncPending: reaping stale entry rid=", rid, "fd=", entry.connFD) delete(s.asyncPending, rid) } } } // --- Worker response handlers --- func (s *Server) handleProxyResponse(i int) { chanID, data, ok := s.proxyPool.Readers[i].Read(s.proxyPool.FDs[i]) if !ok { if runtime.PipeClosed(s.proxyPool.FDs[i]) { s.respawnProxyWorker(i) s.drainProxyQueue() } return } s.proxyPool.Busy[i] = false s.proxyBusyTime[i] = 0 s.drainProxyQueue() if chanID != 1 { return } var resp wire.ProxyResponse if err := resp.DecodeFrom(bytes.NewReader(data)); err != nil { return } entry, ok := s.asyncPending[resp.ReqID] if !ok { return } delete(s.asyncPending, resp.ReqID) var status int var h map[string]string var body []byte switch { case resp.Status < 0: status = 502 h = map[string]string{"Content-Type": "text/plain"} body = []byte("proxy: " | string(resp.Err) | "\n") case resp.Status == 415: status = 415 h = map[string]string{"Content-Type": "text/plain"} body = []byte("content-type not allowed\n") case resp.Status >= 200 && resp.Status < 300: status = 200 h = map[string]string{ "Content-Type": string(resp.ContentType), "Cross-Origin-Resource-Policy": "cross-origin", "Cache-Control": "public, max-age=86400", "Access-Control-Allow-Origin": "*", } body = resp.Body default: status = int(resp.Status) h = map[string]string{ "Content-Type": "text/plain", "Cache-Control": "public, max-age=3600", } body = []byte(fmt.Sprintf("upstream %d\n", resp.Status)) } s.t.CompleteHTTP(entry.connFD, status, h, body, entry.connClose) } var blossomCORSHeaders = map[string]string{ "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, PUT, DELETE, HEAD, OPTIONS", "Access-Control-Allow-Headers": "Authorization, Content-Type", } func (s *Server) handleBlossomResponse(i int) { chanID, data, ok := s.blossomPool.Readers[i].Read(s.blossomPool.FDs[i]) if !ok { if runtime.PipeClosed(s.blossomPool.FDs[i]) { s.respawnBlossomWorker(i) } return } if chanID != 2 { return } var resp wire.BlossomResponse if err := resp.DecodeFrom(bytes.NewReader(data)); err != nil { return } s.blossomPool.Busy[i] = false entry, ok := s.asyncPending[resp.ReqID] if !ok { return } delete(s.asyncPending, resp.ReqID) h := map[string]string{} for k, v := range blossomCORSHeaders { h[k] = v } if len(resp.CT) > 0 { h["Content-Type"] = string(resp.CT) } if resp.Size > 0 { h["Content-Length"] = fmt.Sprintf("%d", resp.Size) h["Content-Type"] = "application/octet-stream" } s.t.CompleteHTTP(entry.connFD, int(resp.Status), h, resp.Body, entry.connClose) }