smesh3.go raw
1 package app
2
3 import (
4 "context"
5 "embed"
6 "encoding/hex"
7 "fmt"
8 "io/fs"
9 "net"
10 "net/http"
11 "os"
12 "path/filepath"
13 "strconv"
14 "strings"
15 "sync"
16 "time"
17
18 "github.com/fsnotify/fsnotify"
19 "git.smesh.lol/orly/pkg/lol/log"
20 )
21
22 //go:embed smesh3
23 var smesh3FS embed.FS
24
25 // Smesh3Server serves the smesh web client with optional hot-reload.
26 // When dir is set, serves from disk and watches for changes via fsnotify.
27 // Connected clients receive version updates over SSE.
28 type Smesh3Server struct {
29 server *http.Server
30 listener net.Listener
31 watcher *fsnotify.Watcher
32 port int
33 dir string
34
35 deployPub []byte // 32-byte x-only pubkey for /__deploy auth
36 clientTag string // client tag for published events (NIP-89)
37
38 mu sync.RWMutex
39 version int64
40 clients map[chan string]struct{}
41 pendingFiles map[string]struct{}
42 cancelFn context.CancelFunc
43 }
44
45 // NewSmesh3Server creates a new smesh HTTP server.
46 // If dir is non-empty, files are served from disk with hot-reload.
47 // deployPubHex is the hex-encoded pubkey authorized for /__deploy (empty disables).
48 func NewSmesh3Server(port int, dir, deployPubHex, clientTag string) *Smesh3Server {
49 s := &Smesh3Server{
50 port: port,
51 dir: dir,
52 clientTag: clientTag,
53 version: time.Now().UnixMilli(),
54 clients: make(map[chan string]struct{}),
55 pendingFiles: make(map[string]struct{}),
56 }
57 if len(deployPubHex) == 64 {
58 pub, err := hex.DecodeString(deployPubHex)
59 if err == nil && len(pub) == 32 {
60 s.deployPub = pub
61 }
62 }
63 return s
64 }
65
66 // Start begins serving the smesh client.
67 func (s *Smesh3Server) Start(ctx context.Context) error {
68 ctx, s.cancelFn = context.WithCancel(ctx)
69
70 var fileHandler http.Handler
71 if s.dir != "" {
72 fileHandler = http.FileServer(http.Dir(s.dir))
73 if err := s.startWatcher(ctx); err != nil {
74 log.W.F("smesh: fsnotify failed, hot-reload disabled: %v", err)
75 }
76 } else {
77 webDist, err := fs.Sub(smesh3FS, "smesh3")
78 if err != nil {
79 return fmt.Errorf("failed to load embedded smesh app: %w", err)
80 }
81 fileHandler = http.FileServer(http.FS(webDist))
82 }
83
84 mux := http.NewServeMux()
85
86 // SSE endpoint for version updates.
87 mux.HandleFunc("/__sse", s.handleSSE)
88
89 // Version endpoint (quick poll fallback).
90 mux.HandleFunc("/__version", s.handleVersion)
91
92 // Client tag for published events.
93 mux.HandleFunc("/__client-tag", func(w http.ResponseWriter, r *http.Request) {
94 w.Header().Set("Content-Type", "text/plain")
95 w.Header().Set("Cache-Control", "no-cache")
96 fmt.Fprint(w, s.clientTag)
97 })
98
99 // Signed bundle deploy endpoint.
100 if len(s.deployPub) == 32 {
101 mux.HandleFunc("/__deploy", s.handleDeploy)
102 log.I.F("smesh: /__deploy enabled for pubkey %x", s.deployPub)
103 }
104
105 // Satellite SW loader pages — serve minimal HTML that registers the SW
106 // and periodically posts a keepalive message to prevent browser from
107 // terminating the SW. Without this, BroadcastChannel messages are lost.
108 for _, swDir := range []string{"$sw-relay"} {
109 dir := swDir
110 mux.HandleFunc("/"+dir+"/loader.html", func(w http.ResponseWriter, r *http.Request) {
111 w.Header().Set("Content-Type", "text/html; charset=utf-8")
112 w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
113 _ = s.version
114 fmt.Fprintf(w, `<!DOCTYPE html><html><body>
115 <script>
116 if('serviceWorker' in navigator){
117 function sendPort(w){
118 var ch=new MessageChannel();
119 w.postMessage({type:'bus-port'},[ch.port2]);
120 ch.port1.onmessage=function(ev){
121 var d=ev.data;
122 if(typeof d==='string'&&d.length>0&&d[0]==='{')
123 window.parent.postMessage(d,'*');
124 };
125 console.log('%s port sent to SW');
126 }
127 async function boot(){
128 var regs=await navigator.serviceWorker.getRegistrations();
129 for(var r of regs){if(r.scope.includes('/%s/'))await r.unregister();}
130 var reg=await navigator.serviceWorker.register('/%s/$entry.mjs',{type:'module',scope:'/%s/'});
131 console.log('%s SW registered');
132 var sw=reg.active;
133 if(sw) sendPort(sw);
134 reg.addEventListener('updatefound',function(){
135 var n=reg.installing;
136 if(n) n.addEventListener('statechange',function(){
137 if(n.state==='activated') sendPort(n);
138 });
139 });
140 if(!sw){
141 sw=reg.installing||reg.waiting;
142 if(sw) sw.addEventListener('statechange',function(){if(sw.state==='activated')sendPort(sw);});
143 }
144 }
145 boot().catch(e=>console.error('%s SW boot failed:',e));
146 window.addEventListener('message',ev=>{
147 var d=ev.data;
148 if(typeof d==='string'&&d.length>0&&d[0]==='{'&&navigator.serviceWorker.controller){
149 navigator.serviceWorker.controller.postMessage(d);
150 }
151 });
152 setInterval(()=>{
153 if(navigator.serviceWorker.controller)navigator.serviceWorker.controller.postMessage('keepalive');
154 },20000);
155 }
156 </script>
157 </body></html>`, dir, dir, dir, dir, dir, dir)
158 })
159 }
160
161 // File serving with MIME fix and SPA fallback.
162 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
163 path := r.URL.Path
164
165 if strings.HasSuffix(path, ".mjs") {
166 w.Header().Set("Content-Type", "application/javascript")
167 }
168 if strings.Contains(path, "$sw") {
169 w.Header().Set("Service-Worker-Allowed", "/")
170 }
171
172 // No caching in disk/dev mode; short cache in embedded/prod.
173 if s.dir != "" {
174 w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
175 } else if path == "/" || strings.HasSuffix(path, ".html") || path == "/sw.js" {
176 w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
177 }
178
179 if s.dir != "" {
180 // Disk mode: check file exists, SPA fallback.
181 cleanPath := filepath.Join(s.dir, filepath.Clean(path))
182 if _, err := os.Stat(cleanPath); err != nil && path != "/" {
183 r.URL.Path = "/"
184 }
185 fileHandler.ServeHTTP(w, r)
186 return
187 }
188
189 // Embedded mode.
190 if path == "/" {
191 fileHandler.ServeHTTP(w, r)
192 return
193 }
194 cleanPath := strings.TrimPrefix(path, "/")
195 webDist, _ := fs.Sub(smesh3FS, "smesh3")
196 if f, err := webDist.Open(cleanPath); err == nil {
197 f.Close()
198 fileHandler.ServeHTTP(w, r)
199 return
200 }
201 r.URL.Path = "/"
202 fileHandler.ServeHTTP(w, r)
203 })
204
205 addr := fmt.Sprintf("127.0.0.1:%d", s.port)
206 s.server = &http.Server{
207 Addr: addr,
208 Handler: mux,
209 ReadTimeout: 15 * time.Second,
210 WriteTimeout: 0, // SSE needs no write timeout
211 IdleTimeout: 120 * time.Second,
212 }
213
214 var err error
215 s.listener, err = net.Listen("tcp", addr)
216 if err != nil {
217 return fmt.Errorf("smesh: failed to listen on %s: %w", addr, err)
218 }
219
220 go func() {
221 <-ctx.Done()
222 shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
223 defer cancel()
224 if err := s.server.Shutdown(shutdownCtx); err != nil {
225 log.W.F("smesh server shutdown error: %v", err)
226 }
227 }()
228
229 mode := "embedded"
230 if s.dir != "" {
231 mode = "disk:" + s.dir
232 }
233 log.I.F("smesh web client serving on http://%s (%s)", addr, mode)
234
235 go func() {
236 if err := s.server.Serve(s.listener); err != nil && err != http.ErrServerClosed {
237 log.E.F("smesh server error: %v", err)
238 }
239 }()
240
241 // Extra listeners for SW origin isolation (127.0.0.2=marmot, 127.0.0.3=relay, 127.0.0.4=crypto)
242 for _, ip := range []string{"127.0.0.2", "127.0.0.3", "127.0.0.4"} {
243 swAddr := fmt.Sprintf("%s:%d", ip, s.port)
244 ln, err := net.Listen("tcp", swAddr)
245 if err != nil {
246 log.W.F("smesh: failed to listen on %s: %v", swAddr, err)
247 continue
248 }
249 srv := &http.Server{
250 Handler: mux,
251 ReadTimeout: 15 * time.Second,
252 WriteTimeout: 0,
253 IdleTimeout: 120 * time.Second,
254 }
255 log.I.F("smesh SW origin serving on http://%s", swAddr)
256 go func() {
257 if err := srv.Serve(ln); err != nil && err != http.ErrServerClosed {
258 log.E.F("smesh SW server error on %s: %v", swAddr, err)
259 }
260 }()
261 go func() {
262 <-ctx.Done()
263 shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
264 defer cancel()
265 srv.Shutdown(shutdownCtx)
266 }()
267 }
268
269 return nil
270 }
271
272 // Stop shuts down the smesh server.
273 func (s *Smesh3Server) Stop() {
274 if s.cancelFn != nil {
275 s.cancelFn()
276 }
277 if s.watcher != nil {
278 s.watcher.Close()
279 }
280 if s.server != nil {
281 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
282 defer cancel()
283 s.server.Shutdown(ctx)
284 }
285 }
286
287 // startWatcher sets up fsnotify on the disk directory.
288 func (s *Smesh3Server) startWatcher(ctx context.Context) error {
289 w, err := fsnotify.NewWatcher()
290 if err != nil {
291 return err
292 }
293 s.watcher = w
294
295 // Watch the root dir and all subdirectories (2 levels).
296 if err := w.Add(s.dir); err != nil {
297 return err
298 }
299 entries, _ := os.ReadDir(s.dir)
300 for _, e := range entries {
301 if e.IsDir() {
302 subdir := filepath.Join(s.dir, e.Name())
303 w.Add(subdir)
304 sub2, _ := os.ReadDir(subdir)
305 for _, e2 := range sub2 {
306 if e2.IsDir() {
307 w.Add(filepath.Join(subdir, e2.Name()))
308 }
309 }
310 }
311 }
312
313 go s.watchLoop(ctx)
314 log.I.F("smesh: watching %s for changes", s.dir)
315 return nil
316 }
317
318 // watchLoop debounces fsnotify events, tracks changed files, and notifies clients.
319 func (s *Smesh3Server) watchLoop(ctx context.Context) {
320 var debounce *time.Timer
321 for {
322 select {
323 case <-ctx.Done():
324 return
325 case ev, ok := <-s.watcher.Events:
326 if !ok {
327 return
328 }
329 if ev.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Remove|fsnotify.Rename|fsnotify.Chmod) == 0 {
330 continue
331 }
332 // Record the changed file path.
333 rel, err := filepath.Rel(s.dir, ev.Name)
334 if err == nil {
335 path := "/" + filepath.ToSlash(rel)
336 if path == "/index.html" {
337 path = "/"
338 }
339 s.mu.Lock()
340 s.pendingFiles[path] = struct{}{}
341 s.mu.Unlock()
342 }
343 // Debounce: wait 200ms for batch writes (rsync).
344 if debounce != nil {
345 debounce.Stop()
346 }
347 debounce = time.AfterFunc(200*time.Millisecond, func() {
348 s.flushChanges()
349 })
350 case err, ok := <-s.watcher.Errors:
351 if !ok {
352 return
353 }
354 log.W.F("smesh: fsnotify error: %v", err)
355 }
356 }
357 }
358
359 // flushChanges collects pending file changes, bumps version, and notifies SSE clients.
360 func (s *Smesh3Server) flushChanges() {
361 s.mu.Lock()
362 s.version = time.Now().UnixMilli()
363 v := s.version
364 files := make([]string, 0, len(s.pendingFiles))
365 for f := range s.pendingFiles {
366 files = append(files, f)
367 }
368 s.pendingFiles = make(map[string]struct{})
369 clients := make([]chan string, 0, len(s.clients))
370 for ch := range s.clients {
371 clients = append(clients, ch)
372 }
373 s.mu.Unlock()
374
375 // Build JSON: {"v":123,"files":["/smesh3.mjs"]}
376 var sb strings.Builder
377 sb.WriteString(`{"v":`)
378 sb.WriteString(strconv.FormatInt(v, 10))
379 sb.WriteString(`,"files":[`)
380 for i, f := range files {
381 if i > 0 {
382 sb.WriteByte(',')
383 }
384 sb.WriteByte('"')
385 sb.WriteString(f)
386 sb.WriteByte('"')
387 }
388 sb.WriteString("]}")
389 msg := sb.String()
390
391 log.I.F("smesh: %d files changed, version=%d, notifying %d clients", len(files), v, len(clients))
392 for _, ch := range clients {
393 select {
394 case ch <- msg:
395 default:
396 }
397 }
398 }
399
400 // notifyFullRefresh bumps version and notifies with empty file list (full refresh).
401 func (s *Smesh3Server) notifyFullRefresh() {
402 s.mu.Lock()
403 s.version = time.Now().UnixMilli()
404 v := s.version
405 clients := make([]chan string, 0, len(s.clients))
406 for ch := range s.clients {
407 clients = append(clients, ch)
408 }
409 s.mu.Unlock()
410
411 msg := `{"v":` + strconv.FormatInt(v, 10) + `}`
412
413 log.I.F("smesh: full refresh, version=%d, notifying %d clients", v, len(clients))
414 for _, ch := range clients {
415 select {
416 case ch <- msg:
417 default:
418 }
419 }
420 }
421
422 // handleVersion returns the current version as plain text.
423 func (s *Smesh3Server) handleVersion(w http.ResponseWriter, r *http.Request) {
424 s.mu.RLock()
425 v := s.version
426 s.mu.RUnlock()
427 w.Header().Set("Content-Type", "text/plain")
428 w.Header().Set("Cache-Control", "no-cache")
429 fmt.Fprintf(w, "%d", v)
430 }
431
432 // handleSSE maintains a Server-Sent Events connection.
433 // Sends the current version on connect, then pushes on each change.
434 func (s *Smesh3Server) handleSSE(w http.ResponseWriter, r *http.Request) {
435 flusher, ok := w.(http.Flusher)
436 if !ok {
437 http.Error(w, "SSE not supported", http.StatusInternalServerError)
438 return
439 }
440
441 w.Header().Set("Content-Type", "text/event-stream")
442 w.Header().Set("Cache-Control", "no-cache")
443 w.Header().Set("Connection", "keep-alive")
444 w.Header().Set("Access-Control-Allow-Origin", "*")
445
446 ch := make(chan string, 4)
447
448 // Register client.
449 s.mu.Lock()
450 s.clients[ch] = struct{}{}
451 v := s.version
452 s.mu.Unlock()
453
454 // Send current version immediately (no files on connect).
455 fmt.Fprintf(w, "data: {\"v\":%d}\n\n", v)
456 flusher.Flush()
457
458 defer func() {
459 s.mu.Lock()
460 delete(s.clients, ch)
461 s.mu.Unlock()
462 }()
463
464 ctx := r.Context()
465 // Heartbeat to detect dead connections.
466 ticker := time.NewTicker(30 * time.Second)
467 defer ticker.Stop()
468
469 for {
470 select {
471 case <-ctx.Done():
472 return
473 case msg := <-ch:
474 fmt.Fprintf(w, "data: %s\n\n", msg)
475 flusher.Flush()
476 case <-ticker.C:
477 fmt.Fprintf(w, ": heartbeat\n\n")
478 flusher.Flush()
479 }
480 }
481 }
482
483