server_workers.mx raw

   1  package server
   2  
   3  import (
   4  	"bytes"
   5  	"fmt"
   6  	"runtime"
   7  	"time"
   8  
   9  	"smesh.lol/pkg/pool"
  10  	"smesh.lol/pkg/relay/wire"
  11  	"smesh.lol/pkg/transport"
  12  )
  13  
  14  // --- Worker lifecycle ---
  15  
  16  func (s *Server) startProxyWorkers(n int) {
  17  	s.proxyIn = []chan wire.ProxyRequest{:n}
  18  	s.proxyOut = []chan wire.ProxyResponse{:n}
  19  	s.proxyBusyTime = []int64{:n}
  20  	s.proxyPool = pool.NewPool(n)
  21  	for i := 0; i < n; i++ {
  22  		s.proxyIn[i] = chan wire.ProxyRequest{}
  23  		s.proxyOut[i] = chan wire.ProxyResponse{}
  24  		spawn(wire.ProxyWorker, s.proxyIn[i], s.proxyOut[i])
  25  		fd := runtime.LastSpawnedParentFd()
  26  		s.proxyPool.FDs = append(s.proxyPool.FDs, fd)
  27  		s.t.RegisterFD(fd)
  28  	}
  29  }
  30  
  31  func (s *Server) startBlossomWorkers(n int) {
  32  	s.blossomIn = []chan wire.BlossomRequest{:n}
  33  	s.blossomOut = []chan wire.BlossomResponse{:n}
  34  	s.blossomPool = pool.NewPool(n)
  35  	for i := 0; i < n; i++ {
  36  		s.blossomIn[i] = chan wire.BlossomRequest{}
  37  		s.blossomOut[i] = chan wire.BlossomResponse{}
  38  		spawn(wire.BlossomWorker, s.blossomIn[i], s.blossomOut[i])
  39  		fd := runtime.LastSpawnedParentFd()
  40  		s.blossomPool.FDs = append(s.blossomPool.FDs, fd)
  41  		s.t.RegisterFD(fd)
  42  	}
  43  }
  44  
  45  func (s *Server) respawnProxyWorker(i int) {
  46  	s.t.RemoveFD(s.proxyPool.FDs[i])
  47  	s.proxyIn[i] = chan wire.ProxyRequest{}
  48  	s.proxyOut[i] = chan wire.ProxyResponse{}
  49  	spawn(wire.ProxyWorker, s.proxyIn[i], s.proxyOut[i])
  50  	fd := runtime.LastSpawnedParentFd()
  51  	s.proxyPool.FDs[i] = fd
  52  	s.proxyPool.Busy[i] = false
  53  	s.proxyBusyTime[i] = 0
  54  	s.proxyPool.Readers[i].Reset()
  55  	s.t.RegisterFD(fd)
  56  	fmt.Println("respawned proxy worker", i)
  57  }
  58  
  59  func (s *Server) respawnBlossomWorker(i int) {
  60  	s.t.RemoveFD(s.blossomPool.FDs[i])
  61  	s.blossomIn[i] = chan wire.BlossomRequest{}
  62  	s.blossomOut[i] = chan wire.BlossomResponse{}
  63  	spawn(wire.BlossomWorker, s.blossomIn[i], s.blossomOut[i])
  64  	fd := runtime.LastSpawnedParentFd()
  65  	s.blossomPool.FDs[i] = fd
  66  	s.blossomPool.Busy[i] = false
  67  	s.blossomPool.Readers[i].Reset()
  68  	s.t.RegisterFD(fd)
  69  	fmt.Println("respawned blossom worker", i)
  70  }
  71  
  72  // --- Proxy dispatch ---
  73  
  74  func (s *Server) doDispatchProxy(fd int, path string, headers map[string]string) bool {
  75  	target := path[len("/proxy/"):]
  76  	if target == "" {
  77  		s.t.SendHTTP(fd, 400, map[string]string{"Content-Type": "text/plain"}, []byte("missing url\n"))
  78  		return true
  79  	}
  80  	if transport.HasPrefix(target, s.selfHost) {
  81  		direct := target[len(s.selfHost)-1:]
  82  		s.t.SendHTTP(fd, 302, map[string]string{
  83  			"Location":                    direct,
  84  			"Access-Control-Allow-Origin": "*",
  85  		}, nil)
  86  		return true
  87  	}
  88  	url := []byte("https://" | target)
  89  	s.nextAsyncID++
  90  	rid := s.nextAsyncID
  91  	connClose := headers["connection"] == "close"
  92  	req := wire.ProxyRequest{ReqID: rid, MaxBytes: 32 * 1024 * 1024, URL: url}
  93  	s.proxyReapStuck()
  94  	if i := s.proxyPool.IdleIndex(); i >= 0 {
  95  		s.asyncPending[rid] = asyncHTTPEntry{connFD: fd, connClose: connClose, createdAt: time.Now().UnixNano()}
  96  		s.proxyPool.Busy[i] = true
  97  		s.proxyBusyTime[i] = time.Now().UnixNano()
  98  		if !s.proxyDispatchFrame(i, req) {
  99  			s.proxyPool.Busy[i] = false
 100  			s.proxyBusyTime[i] = 0
 101  			delete(s.asyncPending, rid)
 102  		} else {
 103  			return true
 104  		}
 105  	}
 106  	if len(s.proxyQueue) < 64 {
 107  		s.proxyQueue = append(s.proxyQueue, pendingProxy{connFD: fd, connClose: connClose, req: req})
 108  		return true
 109  	}
 110  	s.t.SendHTTP(fd, 503, map[string]string{
 111  		"Content-Type": "text/plain",
 112  		"Retry-After":  "1",
 113  	}, []byte("proxy workers busy\n"))
 114  	return true
 115  }
 116  
 117  func (s *Server) drainProxyQueue() {
 118  	for len(s.proxyQueue) > 0 {
 119  		i := s.proxyPool.IdleIndex()
 120  		if i < 0 {
 121  			return
 122  		}
 123  		p := s.proxyQueue[0]
 124  		s.proxyQueue = s.proxyQueue[1:]
 125  		s.asyncPending[p.req.ReqID] = asyncHTTPEntry{connFD: p.connFD, connClose: p.connClose, createdAt: time.Now().UnixNano()}
 126  		s.proxyPool.Busy[i] = true
 127  		s.proxyBusyTime[i] = time.Now().UnixNano()
 128  		if !s.proxyDispatchFrame(i, p.req) {
 129  			s.proxyPool.Busy[i] = false
 130  			s.proxyBusyTime[i] = 0
 131  			delete(s.asyncPending, p.req.ReqID)
 132  			s.t.SendHTTP(p.connFD, 502, map[string]string{"Content-Type": "text/plain"}, []byte("proxy dispatch failed\n"))
 133  		}
 134  	}
 135  }
 136  
 137  func (s *Server) proxyReapStuck() {
 138  	now := time.Now().UnixNano()
 139  	reaped := false
 140  	for i := 0; i < s.proxyPool.Len(); i++ {
 141  		if s.proxyPool.Busy[i] && s.proxyBusyTime[i] > 0 {
 142  			elapsed := now - s.proxyBusyTime[i]
 143  			if elapsed > 15_000_000_000 {
 144  				fmt.Println("proxy worker", i, "stuck", elapsed/1_000_000_000, "s, respawning")
 145  				s.respawnProxyWorker(i)
 146  				reaped = true
 147  			}
 148  		}
 149  	}
 150  	if reaped {
 151  		s.drainProxyQueue()
 152  	}
 153  }
 154  
 155  func (s *Server) proxyDispatchFrame(i int, req wire.ProxyRequest) bool {
 156  	var buf bytes.Buffer
 157  	if err := req.EncodeTo(&buf); err != nil {
 158  		return false
 159  	}
 160  	return pool.WriteFrame(s.proxyPool.FDs[i], 0, buf.Bytes())
 161  }
 162  
 163  // --- Blossom dispatch ---
 164  
 165  func (s *Server) doDispatchBlossom(fd int, method, path string, headers map[string]string, body []byte) bool {
 166  	bpath := path[len("/blossom"):]
 167  	i := s.blossomPool.IdleIndex()
 168  	if i >= 0 {
 169  		s.nextAsyncID++
 170  		rid := s.nextAsyncID
 171  		req := wire.BlossomRequest{
 172  			ReqID:       rid,
 173  			Dir:         []byte(s.cfg.BlossomDir),
 174  			Method:      []byte(method),
 175  			Path:        []byte(bpath),
 176  			ContentType: []byte(headers["content-type"]),
 177  			Body:        body,
 178  			Upstream:    []byte(s.cfg.BlossomUpstream),
 179  		}
 180  		var buf bytes.Buffer
 181  		if err := req.EncodeTo(&buf); err == nil {
 182  			if pool.WriteFrame(s.blossomPool.FDs[i], 1, buf.Bytes()) {
 183  				connClose := headers["connection"] == "close"
 184  				s.asyncPending[rid] = asyncHTTPEntry{connFD: fd, connClose: connClose, createdAt: time.Now().UnixNano()}
 185  				s.blossomPool.Busy[i] = true
 186  				return true
 187  			}
 188  		}
 189  	}
 190  	s.t.SendHTTP(fd, 503, map[string]string{
 191  		"Content-Type": "text/plain",
 192  		"Retry-After":  "1",
 193  	}, []byte("blossom worker busy\n"))
 194  	return true
 195  }
 196  
 197  // asyncReapStuck removes asyncPending entries older than 60s.
 198  // Does NOT send a response — connFD may have been reused by another
 199  // connection by the time 60s have elapsed.
 200  func (s *Server) asyncReapStuck() {
 201  	cutoff := time.Now().UnixNano() - 60_000_000_000
 202  	for rid, entry := range s.asyncPending {
 203  		if entry.createdAt > 0 && entry.createdAt < cutoff {
 204  			fmt.Println("asyncPending: reaping stale entry rid=", rid, "fd=", entry.connFD)
 205  			delete(s.asyncPending, rid)
 206  		}
 207  	}
 208  }
 209  
 210  // --- Worker response handlers ---
 211  
 212  func (s *Server) handleProxyResponse(i int) {
 213  	chanID, data, ok := s.proxyPool.Readers[i].Read(s.proxyPool.FDs[i])
 214  	if !ok {
 215  		if runtime.PipeClosed(s.proxyPool.FDs[i]) {
 216  			s.respawnProxyWorker(i)
 217  			s.drainProxyQueue()
 218  		}
 219  		return
 220  	}
 221  	s.proxyPool.Busy[i] = false
 222  	s.proxyBusyTime[i] = 0
 223  	s.drainProxyQueue()
 224  	if chanID != 1 {
 225  		return
 226  	}
 227  	var resp wire.ProxyResponse
 228  	if err := resp.DecodeFrom(bytes.NewReader(data)); err != nil {
 229  		return
 230  	}
 231  	entry, ok := s.asyncPending[resp.ReqID]
 232  	if !ok {
 233  		return
 234  	}
 235  	delete(s.asyncPending, resp.ReqID)
 236  	var status int
 237  	var h map[string]string
 238  	var body []byte
 239  	switch {
 240  	case resp.Status < 0:
 241  		status = 502
 242  		h = map[string]string{"Content-Type": "text/plain"}
 243  		body = []byte("proxy: " | string(resp.Err) | "\n")
 244  	case resp.Status == 415:
 245  		status = 415
 246  		h = map[string]string{"Content-Type": "text/plain"}
 247  		body = []byte("content-type not allowed\n")
 248  	case resp.Status >= 200 && resp.Status < 300:
 249  		status = 200
 250  		h = map[string]string{
 251  			"Content-Type":                 string(resp.ContentType),
 252  			"Cross-Origin-Resource-Policy": "cross-origin",
 253  			"Cache-Control":                "public, max-age=86400",
 254  			"Access-Control-Allow-Origin":  "*",
 255  		}
 256  		body = resp.Body
 257  	default:
 258  		status = int(resp.Status)
 259  		h = map[string]string{
 260  			"Content-Type":  "text/plain",
 261  			"Cache-Control": "public, max-age=3600",
 262  		}
 263  		body = []byte(fmt.Sprintf("upstream %d\n", resp.Status))
 264  	}
 265  	s.t.CompleteHTTP(entry.connFD, status, h, body, entry.connClose)
 266  }
 267  
 268  var blossomCORSHeaders = map[string]string{
 269  	"Access-Control-Allow-Origin":  "*",
 270  	"Access-Control-Allow-Methods": "GET, PUT, DELETE, HEAD, OPTIONS",
 271  	"Access-Control-Allow-Headers": "Authorization, Content-Type",
 272  }
 273  
 274  func (s *Server) handleBlossomResponse(i int) {
 275  	chanID, data, ok := s.blossomPool.Readers[i].Read(s.blossomPool.FDs[i])
 276  	if !ok {
 277  		if runtime.PipeClosed(s.blossomPool.FDs[i]) {
 278  			s.respawnBlossomWorker(i)
 279  		}
 280  		return
 281  	}
 282  	if chanID != 2 {
 283  		return
 284  	}
 285  	var resp wire.BlossomResponse
 286  	if err := resp.DecodeFrom(bytes.NewReader(data)); err != nil {
 287  		return
 288  	}
 289  	s.blossomPool.Busy[i] = false
 290  	entry, ok := s.asyncPending[resp.ReqID]
 291  	if !ok {
 292  		return
 293  	}
 294  	delete(s.asyncPending, resp.ReqID)
 295  	h := map[string]string{}
 296  	for k, v := range blossomCORSHeaders {
 297  		h[k] = v
 298  	}
 299  	if len(resp.CT) > 0 {
 300  		h["Content-Type"] = string(resp.CT)
 301  	}
 302  	if resp.Size > 0 {
 303  		h["Content-Length"] = fmt.Sprintf("%d", resp.Size)
 304  		h["Content-Type"] = "application/octet-stream"
 305  	}
 306  	s.t.CompleteHTTP(entry.connFD, int(resp.Status), h, resp.Body, entry.connClose)
 307  }
 308