smesh3.go raw

   1  package app
   2  
   3  import (
   4  	"context"
   5  	"embed"
   6  	"encoding/hex"
   7  	"fmt"
   8  	"io/fs"
   9  	"net"
  10  	"net/http"
  11  	"os"
  12  	"path/filepath"
  13  	"strconv"
  14  	"strings"
  15  	"sync"
  16  	"time"
  17  
  18  	"github.com/fsnotify/fsnotify"
  19  	"git.smesh.lol/orly/pkg/lol/log"
  20  )
  21  
  22  //go:embed smesh3
  23  var smesh3FS embed.FS
  24  
  25  // Smesh3Server serves the smesh web client with optional hot-reload.
  26  // When dir is set, serves from disk and watches for changes via fsnotify.
  27  // Connected clients receive version updates over SSE.
  28  type Smesh3Server struct {
  29  	server   *http.Server
  30  	listener net.Listener
  31  	watcher  *fsnotify.Watcher
  32  	port     int
  33  	dir      string
  34  
  35  	deployPub []byte // 32-byte x-only pubkey for /__deploy auth
  36  	clientTag string // client tag for published events (NIP-89)
  37  
  38  	mu           sync.RWMutex
  39  	version      int64
  40  	clients      map[chan string]struct{}
  41  	pendingFiles map[string]struct{}
  42  	cancelFn     context.CancelFunc
  43  }
  44  
  45  // NewSmesh3Server creates a new smesh HTTP server.
  46  // If dir is non-empty, files are served from disk with hot-reload.
  47  // deployPubHex is the hex-encoded pubkey authorized for /__deploy (empty disables).
  48  func NewSmesh3Server(port int, dir, deployPubHex, clientTag string) *Smesh3Server {
  49  	s := &Smesh3Server{
  50  		port:      port,
  51  		dir:       dir,
  52  		clientTag: clientTag,
  53  		version:      time.Now().UnixMilli(),
  54  		clients:      make(map[chan string]struct{}),
  55  		pendingFiles: make(map[string]struct{}),
  56  	}
  57  	if len(deployPubHex) == 64 {
  58  		pub, err := hex.DecodeString(deployPubHex)
  59  		if err == nil && len(pub) == 32 {
  60  			s.deployPub = pub
  61  		}
  62  	}
  63  	return s
  64  }
  65  
  66  // Start begins serving the smesh client.
  67  func (s *Smesh3Server) Start(ctx context.Context) error {
  68  	ctx, s.cancelFn = context.WithCancel(ctx)
  69  
  70  	var fileHandler http.Handler
  71  	if s.dir != "" {
  72  		fileHandler = http.FileServer(http.Dir(s.dir))
  73  		if err := s.startWatcher(ctx); err != nil {
  74  			log.W.F("smesh: fsnotify failed, hot-reload disabled: %v", err)
  75  		}
  76  	} else {
  77  		webDist, err := fs.Sub(smesh3FS, "smesh3")
  78  		if err != nil {
  79  			return fmt.Errorf("failed to load embedded smesh app: %w", err)
  80  		}
  81  		fileHandler = http.FileServer(http.FS(webDist))
  82  	}
  83  
  84  	mux := http.NewServeMux()
  85  
  86  	// SSE endpoint for version updates.
  87  	mux.HandleFunc("/__sse", s.handleSSE)
  88  
  89  	// Version endpoint (quick poll fallback).
  90  	mux.HandleFunc("/__version", s.handleVersion)
  91  
  92  	// Client tag for published events.
  93  	mux.HandleFunc("/__client-tag", func(w http.ResponseWriter, r *http.Request) {
  94  		w.Header().Set("Content-Type", "text/plain")
  95  		w.Header().Set("Cache-Control", "no-cache")
  96  		fmt.Fprint(w, s.clientTag)
  97  	})
  98  
  99  	// Signed bundle deploy endpoint.
 100  	if len(s.deployPub) == 32 {
 101  		mux.HandleFunc("/__deploy", s.handleDeploy)
 102  		log.I.F("smesh: /__deploy enabled for pubkey %x", s.deployPub)
 103  	}
 104  
 105  	// Satellite SW loader pages — serve minimal HTML that registers the SW
 106  	// and periodically posts a keepalive message to prevent browser from
 107  	// terminating the SW. Without this, BroadcastChannel messages are lost.
 108  	for _, swDir := range []string{"$sw-relay"} {
 109  		dir := swDir
 110  		mux.HandleFunc("/"+dir+"/loader.html", func(w http.ResponseWriter, r *http.Request) {
 111  			w.Header().Set("Content-Type", "text/html; charset=utf-8")
 112  			w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
 113  			_ = s.version
 114  			fmt.Fprintf(w, `<!DOCTYPE html><html><body>
 115  <script>
 116  if('serviceWorker' in navigator){
 117    function sendPort(w){
 118      var ch=new MessageChannel();
 119      w.postMessage({type:'bus-port'},[ch.port2]);
 120      ch.port1.onmessage=function(ev){
 121        var d=ev.data;
 122        if(typeof d==='string'&&d.length>0&&d[0]==='{')
 123          window.parent.postMessage(d,'*');
 124      };
 125      console.log('%s port sent to SW');
 126    }
 127    async function boot(){
 128      var regs=await navigator.serviceWorker.getRegistrations();
 129      for(var r of regs){if(r.scope.includes('/%s/'))await r.unregister();}
 130      var reg=await navigator.serviceWorker.register('/%s/$entry.mjs',{type:'module',scope:'/%s/'});
 131      console.log('%s SW registered');
 132      var sw=reg.active;
 133      if(sw) sendPort(sw);
 134      reg.addEventListener('updatefound',function(){
 135        var n=reg.installing;
 136        if(n) n.addEventListener('statechange',function(){
 137          if(n.state==='activated') sendPort(n);
 138        });
 139      });
 140      if(!sw){
 141        sw=reg.installing||reg.waiting;
 142        if(sw) sw.addEventListener('statechange',function(){if(sw.state==='activated')sendPort(sw);});
 143      }
 144    }
 145    boot().catch(e=>console.error('%s SW boot failed:',e));
 146    window.addEventListener('message',ev=>{
 147      var d=ev.data;
 148      if(typeof d==='string'&&d.length>0&&d[0]==='{'&&navigator.serviceWorker.controller){
 149        navigator.serviceWorker.controller.postMessage(d);
 150      }
 151    });
 152    setInterval(()=>{
 153      if(navigator.serviceWorker.controller)navigator.serviceWorker.controller.postMessage('keepalive');
 154    },20000);
 155  }
 156  </script>
 157  </body></html>`, dir, dir, dir, dir, dir, dir)
 158  		})
 159  	}
 160  
 161  	// File serving with MIME fix and SPA fallback.
 162  	mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
 163  		path := r.URL.Path
 164  
 165  		if strings.HasSuffix(path, ".mjs") {
 166  			w.Header().Set("Content-Type", "application/javascript")
 167  		}
 168  		if strings.Contains(path, "$sw") {
 169  			w.Header().Set("Service-Worker-Allowed", "/")
 170  		}
 171  
 172  		// No caching in disk/dev mode; short cache in embedded/prod.
 173  		if s.dir != "" {
 174  			w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
 175  		} else if path == "/" || strings.HasSuffix(path, ".html") || path == "/sw.js" {
 176  			w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
 177  		}
 178  
 179  		if s.dir != "" {
 180  			// Disk mode: check file exists, SPA fallback.
 181  			cleanPath := filepath.Join(s.dir, filepath.Clean(path))
 182  			if _, err := os.Stat(cleanPath); err != nil && path != "/" {
 183  				r.URL.Path = "/"
 184  			}
 185  			fileHandler.ServeHTTP(w, r)
 186  			return
 187  		}
 188  
 189  		// Embedded mode.
 190  		if path == "/" {
 191  			fileHandler.ServeHTTP(w, r)
 192  			return
 193  		}
 194  		cleanPath := strings.TrimPrefix(path, "/")
 195  		webDist, _ := fs.Sub(smesh3FS, "smesh3")
 196  		if f, err := webDist.Open(cleanPath); err == nil {
 197  			f.Close()
 198  			fileHandler.ServeHTTP(w, r)
 199  			return
 200  		}
 201  		r.URL.Path = "/"
 202  		fileHandler.ServeHTTP(w, r)
 203  	})
 204  
 205  	addr := fmt.Sprintf("127.0.0.1:%d", s.port)
 206  	s.server = &http.Server{
 207  		Addr:         addr,
 208  		Handler:      mux,
 209  		ReadTimeout:  15 * time.Second,
 210  		WriteTimeout: 0, // SSE needs no write timeout
 211  		IdleTimeout:  120 * time.Second,
 212  	}
 213  
 214  	var err error
 215  	s.listener, err = net.Listen("tcp", addr)
 216  	if err != nil {
 217  		return fmt.Errorf("smesh: failed to listen on %s: %w", addr, err)
 218  	}
 219  
 220  	go func() {
 221  		<-ctx.Done()
 222  		shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 223  		defer cancel()
 224  		if err := s.server.Shutdown(shutdownCtx); err != nil {
 225  			log.W.F("smesh server shutdown error: %v", err)
 226  		}
 227  	}()
 228  
 229  	mode := "embedded"
 230  	if s.dir != "" {
 231  		mode = "disk:" + s.dir
 232  	}
 233  	log.I.F("smesh web client serving on http://%s (%s)", addr, mode)
 234  
 235  	go func() {
 236  		if err := s.server.Serve(s.listener); err != nil && err != http.ErrServerClosed {
 237  			log.E.F("smesh server error: %v", err)
 238  		}
 239  	}()
 240  
 241  	// Extra listeners for SW origin isolation (127.0.0.2=marmot, 127.0.0.3=relay, 127.0.0.4=crypto)
 242  	for _, ip := range []string{"127.0.0.2", "127.0.0.3", "127.0.0.4"} {
 243  		swAddr := fmt.Sprintf("%s:%d", ip, s.port)
 244  		ln, err := net.Listen("tcp", swAddr)
 245  		if err != nil {
 246  			log.W.F("smesh: failed to listen on %s: %v", swAddr, err)
 247  			continue
 248  		}
 249  		srv := &http.Server{
 250  			Handler:      mux,
 251  			ReadTimeout:  15 * time.Second,
 252  			WriteTimeout: 0,
 253  			IdleTimeout:  120 * time.Second,
 254  		}
 255  		log.I.F("smesh SW origin serving on http://%s", swAddr)
 256  		go func() {
 257  			if err := srv.Serve(ln); err != nil && err != http.ErrServerClosed {
 258  				log.E.F("smesh SW server error on %s: %v", swAddr, err)
 259  			}
 260  		}()
 261  		go func() {
 262  			<-ctx.Done()
 263  			shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 264  			defer cancel()
 265  			srv.Shutdown(shutdownCtx)
 266  		}()
 267  	}
 268  
 269  	return nil
 270  }
 271  
 272  // Stop shuts down the smesh server.
 273  func (s *Smesh3Server) Stop() {
 274  	if s.cancelFn != nil {
 275  		s.cancelFn()
 276  	}
 277  	if s.watcher != nil {
 278  		s.watcher.Close()
 279  	}
 280  	if s.server != nil {
 281  		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 282  		defer cancel()
 283  		s.server.Shutdown(ctx)
 284  	}
 285  }
 286  
 287  // startWatcher sets up fsnotify on the disk directory.
 288  func (s *Smesh3Server) startWatcher(ctx context.Context) error {
 289  	w, err := fsnotify.NewWatcher()
 290  	if err != nil {
 291  		return err
 292  	}
 293  	s.watcher = w
 294  
 295  	// Watch the root dir and all subdirectories (2 levels).
 296  	if err := w.Add(s.dir); err != nil {
 297  		return err
 298  	}
 299  	entries, _ := os.ReadDir(s.dir)
 300  	for _, e := range entries {
 301  		if e.IsDir() {
 302  			subdir := filepath.Join(s.dir, e.Name())
 303  			w.Add(subdir)
 304  			sub2, _ := os.ReadDir(subdir)
 305  			for _, e2 := range sub2 {
 306  				if e2.IsDir() {
 307  					w.Add(filepath.Join(subdir, e2.Name()))
 308  				}
 309  			}
 310  		}
 311  	}
 312  
 313  	go s.watchLoop(ctx)
 314  	log.I.F("smesh: watching %s for changes", s.dir)
 315  	return nil
 316  }
 317  
 318  // watchLoop debounces fsnotify events, tracks changed files, and notifies clients.
 319  func (s *Smesh3Server) watchLoop(ctx context.Context) {
 320  	var debounce *time.Timer
 321  	for {
 322  		select {
 323  		case <-ctx.Done():
 324  			return
 325  		case ev, ok := <-s.watcher.Events:
 326  			if !ok {
 327  				return
 328  			}
 329  			if ev.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Remove|fsnotify.Rename|fsnotify.Chmod) == 0 {
 330  				continue
 331  			}
 332  			// Record the changed file path.
 333  			rel, err := filepath.Rel(s.dir, ev.Name)
 334  			if err == nil {
 335  				path := "/" + filepath.ToSlash(rel)
 336  				if path == "/index.html" {
 337  					path = "/"
 338  				}
 339  				s.mu.Lock()
 340  				s.pendingFiles[path] = struct{}{}
 341  				s.mu.Unlock()
 342  			}
 343  			// Debounce: wait 200ms for batch writes (rsync).
 344  			if debounce != nil {
 345  				debounce.Stop()
 346  			}
 347  			debounce = time.AfterFunc(200*time.Millisecond, func() {
 348  				s.flushChanges()
 349  			})
 350  		case err, ok := <-s.watcher.Errors:
 351  			if !ok {
 352  				return
 353  			}
 354  			log.W.F("smesh: fsnotify error: %v", err)
 355  		}
 356  	}
 357  }
 358  
 359  // flushChanges collects pending file changes, bumps version, and notifies SSE clients.
 360  func (s *Smesh3Server) flushChanges() {
 361  	s.mu.Lock()
 362  	s.version = time.Now().UnixMilli()
 363  	v := s.version
 364  	files := make([]string, 0, len(s.pendingFiles))
 365  	for f := range s.pendingFiles {
 366  		files = append(files, f)
 367  	}
 368  	s.pendingFiles = make(map[string]struct{})
 369  	clients := make([]chan string, 0, len(s.clients))
 370  	for ch := range s.clients {
 371  		clients = append(clients, ch)
 372  	}
 373  	s.mu.Unlock()
 374  
 375  	// Build JSON: {"v":123,"files":["/smesh3.mjs"]}
 376  	var sb strings.Builder
 377  	sb.WriteString(`{"v":`)
 378  	sb.WriteString(strconv.FormatInt(v, 10))
 379  	sb.WriteString(`,"files":[`)
 380  	for i, f := range files {
 381  		if i > 0 {
 382  			sb.WriteByte(',')
 383  		}
 384  		sb.WriteByte('"')
 385  		sb.WriteString(f)
 386  		sb.WriteByte('"')
 387  	}
 388  	sb.WriteString("]}")
 389  	msg := sb.String()
 390  
 391  	log.I.F("smesh: %d files changed, version=%d, notifying %d clients", len(files), v, len(clients))
 392  	for _, ch := range clients {
 393  		select {
 394  		case ch <- msg:
 395  		default:
 396  		}
 397  	}
 398  }
 399  
 400  // notifyFullRefresh bumps version and notifies with empty file list (full refresh).
 401  func (s *Smesh3Server) notifyFullRefresh() {
 402  	s.mu.Lock()
 403  	s.version = time.Now().UnixMilli()
 404  	v := s.version
 405  	clients := make([]chan string, 0, len(s.clients))
 406  	for ch := range s.clients {
 407  		clients = append(clients, ch)
 408  	}
 409  	s.mu.Unlock()
 410  
 411  	msg := `{"v":` + strconv.FormatInt(v, 10) + `}`
 412  
 413  	log.I.F("smesh: full refresh, version=%d, notifying %d clients", v, len(clients))
 414  	for _, ch := range clients {
 415  		select {
 416  		case ch <- msg:
 417  		default:
 418  		}
 419  	}
 420  }
 421  
 422  // handleVersion returns the current version as plain text.
 423  func (s *Smesh3Server) handleVersion(w http.ResponseWriter, r *http.Request) {
 424  	s.mu.RLock()
 425  	v := s.version
 426  	s.mu.RUnlock()
 427  	w.Header().Set("Content-Type", "text/plain")
 428  	w.Header().Set("Cache-Control", "no-cache")
 429  	fmt.Fprintf(w, "%d", v)
 430  }
 431  
 432  // handleSSE maintains a Server-Sent Events connection.
 433  // Sends the current version on connect, then pushes on each change.
 434  func (s *Smesh3Server) handleSSE(w http.ResponseWriter, r *http.Request) {
 435  	flusher, ok := w.(http.Flusher)
 436  	if !ok {
 437  		http.Error(w, "SSE not supported", http.StatusInternalServerError)
 438  		return
 439  	}
 440  
 441  	w.Header().Set("Content-Type", "text/event-stream")
 442  	w.Header().Set("Cache-Control", "no-cache")
 443  	w.Header().Set("Connection", "keep-alive")
 444  	w.Header().Set("Access-Control-Allow-Origin", "*")
 445  
 446  	ch := make(chan string, 4)
 447  
 448  	// Register client.
 449  	s.mu.Lock()
 450  	s.clients[ch] = struct{}{}
 451  	v := s.version
 452  	s.mu.Unlock()
 453  
 454  	// Send current version immediately (no files on connect).
 455  	fmt.Fprintf(w, "data: {\"v\":%d}\n\n", v)
 456  	flusher.Flush()
 457  
 458  	defer func() {
 459  		s.mu.Lock()
 460  		delete(s.clients, ch)
 461  		s.mu.Unlock()
 462  	}()
 463  
 464  	ctx := r.Context()
 465  	// Heartbeat to detect dead connections.
 466  	ticker := time.NewTicker(30 * time.Second)
 467  	defer ticker.Stop()
 468  
 469  	for {
 470  		select {
 471  		case <-ctx.Done():
 472  			return
 473  		case msg := <-ch:
 474  			fmt.Fprintf(w, "data: %s\n\n", msg)
 475  			flusher.Flush()
 476  		case <-ticker.C:
 477  			fmt.Fprintf(w, ": heartbeat\n\n")
 478  			flusher.Flush()
 479  		}
 480  	}
 481  }
 482  
 483