sprocket.go raw

   1  package app
   2  
   3  import (
   4  	"bufio"
   5  	"context"
   6  	"encoding/json"
   7  	"fmt"
   8  	"io"
   9  	"os"
  10  	"os/exec"
  11  	"path/filepath"
  12  	"strings"
  13  	"sync"
  14  	"time"
  15  
  16  	"github.com/adrg/xdg"
  17  	"next.orly.dev/pkg/lol/chk"
  18  	"next.orly.dev/pkg/lol/log"
  19  	"next.orly.dev/pkg/nostr/encoders/event"
  20  )
  21  
  22  // SprocketResponse represents a response from the sprocket script
  23  type SprocketResponse struct {
  24  	ID     string `json:"id"`
  25  	Action string `json:"action"` // accept, reject, or shadowReject
  26  	Msg    string `json:"msg"`    // NIP-20 response message (only used for reject)
  27  }
  28  
  29  // SprocketManager handles sprocket script execution and management
  30  type SprocketManager struct {
  31  	ctx           context.Context
  32  	cancel        context.CancelFunc
  33  	configDir     string
  34  	scriptPath    string
  35  	currentCmd    *exec.Cmd
  36  	currentCancel context.CancelFunc
  37  	mutex         sync.RWMutex
  38  	isRunning     bool
  39  	enabled       bool
  40  	disabled      bool // true when sprocket is disabled due to failure
  41  	stdin         io.WriteCloser
  42  	stdout        io.ReadCloser
  43  	stderr        io.ReadCloser
  44  	responseChan  chan SprocketResponse
  45  }
  46  
  47  // NewSprocketManager creates a new sprocket manager
  48  func NewSprocketManager(ctx context.Context, appName string, enabled bool) *SprocketManager {
  49  	configDir := filepath.Join(xdg.ConfigHome, appName)
  50  	scriptPath := filepath.Join(configDir, "sprocket.sh")
  51  
  52  	ctx, cancel := context.WithCancel(ctx)
  53  
  54  	sm := &SprocketManager{
  55  		ctx:          ctx,
  56  		cancel:       cancel,
  57  		configDir:    configDir,
  58  		scriptPath:   scriptPath,
  59  		enabled:      enabled,
  60  		disabled:     false,
  61  		responseChan: make(chan SprocketResponse, 100), // Buffered channel for responses
  62  	}
  63  
  64  	// Start the sprocket script if it exists and is enabled
  65  	if enabled {
  66  		go sm.startSprocketIfExists()
  67  		// Start periodic check for sprocket script availability
  68  		go sm.periodicCheck()
  69  	}
  70  
  71  	return sm
  72  }
  73  
  74  // disableSprocket disables sprocket due to failure
  75  func (sm *SprocketManager) disableSprocket() {
  76  	sm.mutex.Lock()
  77  	defer sm.mutex.Unlock()
  78  
  79  	if !sm.disabled {
  80  		sm.disabled = true
  81  		log.W.F("sprocket disabled due to failure - all events will be rejected (script location: %s)", sm.scriptPath)
  82  	}
  83  }
  84  
  85  // enableSprocket re-enables sprocket and attempts to start it
  86  func (sm *SprocketManager) enableSprocket() {
  87  	sm.mutex.Lock()
  88  	defer sm.mutex.Unlock()
  89  
  90  	if sm.disabled {
  91  		sm.disabled = false
  92  		log.I.F("sprocket re-enabled, attempting to start")
  93  
  94  		// Attempt to start sprocket in background
  95  		go func() {
  96  			if _, err := os.Stat(sm.scriptPath); err == nil {
  97  				if err := sm.StartSprocket(); err != nil {
  98  					log.E.F("failed to restart sprocket: %v", err)
  99  					sm.disableSprocket()
 100  				} else {
 101  					log.I.F("sprocket restarted successfully")
 102  				}
 103  			} else {
 104  				log.W.F("sprocket script still not found, keeping disabled")
 105  				sm.disableSprocket()
 106  			}
 107  		}()
 108  	}
 109  }
 110  
 111  // periodicCheck periodically checks if sprocket script becomes available
 112  func (sm *SprocketManager) periodicCheck() {
 113  	ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds
 114  	defer ticker.Stop()
 115  
 116  	for {
 117  		select {
 118  		case <-sm.ctx.Done():
 119  			return
 120  		case <-ticker.C:
 121  			sm.mutex.RLock()
 122  			disabled := sm.disabled
 123  			running := sm.isRunning
 124  			sm.mutex.RUnlock()
 125  
 126  			// Only check if sprocket is disabled or not running
 127  			if disabled || !running {
 128  				if _, err := os.Stat(sm.scriptPath); err == nil {
 129  					// Script is available, try to enable/restart
 130  					if disabled {
 131  						sm.enableSprocket()
 132  					} else if !running {
 133  						// Script exists but sprocket isn't running, try to start
 134  						go func() {
 135  							if err := sm.StartSprocket(); err != nil {
 136  								log.E.F("failed to restart sprocket: %v", err)
 137  								sm.disableSprocket()
 138  							} else {
 139  								log.I.F("sprocket restarted successfully")
 140  							}
 141  						}()
 142  					}
 143  				}
 144  			}
 145  		}
 146  	}
 147  }
 148  
 149  // startSprocketIfExists starts the sprocket script if the file exists
 150  func (sm *SprocketManager) startSprocketIfExists() {
 151  	if _, err := os.Stat(sm.scriptPath); err == nil {
 152  		if err := sm.StartSprocket(); err != nil {
 153  			log.E.F("failed to start sprocket: %v", err)
 154  			sm.disableSprocket()
 155  		}
 156  	} else {
 157  		log.W.F("sprocket script not found at %s, disabling sprocket", sm.scriptPath)
 158  		sm.disableSprocket()
 159  	}
 160  }
 161  
 162  // StartSprocket starts the sprocket script
 163  func (sm *SprocketManager) StartSprocket() error {
 164  	sm.mutex.Lock()
 165  	defer sm.mutex.Unlock()
 166  
 167  	if sm.isRunning {
 168  		return fmt.Errorf("sprocket is already running")
 169  	}
 170  
 171  	if _, err := os.Stat(sm.scriptPath); os.IsNotExist(err) {
 172  		return fmt.Errorf("sprocket script does not exist")
 173  	}
 174  
 175  	// Create a new context for this command
 176  	cmdCtx, cmdCancel := context.WithCancel(sm.ctx)
 177  
 178  	// Make the script executable
 179  	if err := os.Chmod(sm.scriptPath, 0755); chk.E(err) {
 180  		cmdCancel()
 181  		return fmt.Errorf("failed to make script executable: %v", err)
 182  	}
 183  
 184  	// Start the script
 185  	cmd := exec.CommandContext(cmdCtx, sm.scriptPath)
 186  	cmd.Dir = sm.configDir
 187  
 188  	// Set up stdio pipes for communication
 189  	stdin, err := cmd.StdinPipe()
 190  	if chk.E(err) {
 191  		cmdCancel()
 192  		return fmt.Errorf("failed to create stdin pipe: %v", err)
 193  	}
 194  
 195  	stdout, err := cmd.StdoutPipe()
 196  	if chk.E(err) {
 197  		cmdCancel()
 198  		stdin.Close()
 199  		return fmt.Errorf("failed to create stdout pipe: %v", err)
 200  	}
 201  
 202  	stderr, err := cmd.StderrPipe()
 203  	if chk.E(err) {
 204  		cmdCancel()
 205  		stdin.Close()
 206  		stdout.Close()
 207  		return fmt.Errorf("failed to create stderr pipe: %v", err)
 208  	}
 209  
 210  	// Start the command
 211  	if err := cmd.Start(); chk.E(err) {
 212  		cmdCancel()
 213  		stdin.Close()
 214  		stdout.Close()
 215  		stderr.Close()
 216  		return fmt.Errorf("failed to start sprocket: %v", err)
 217  	}
 218  
 219  	sm.currentCmd = cmd
 220  	sm.currentCancel = cmdCancel
 221  	sm.stdin = stdin
 222  	sm.stdout = stdout
 223  	sm.stderr = stderr
 224  	sm.isRunning = true
 225  
 226  	// Start response reader in background
 227  	go sm.readResponses()
 228  
 229  	// Log stderr output in background
 230  	go sm.logOutput(stdout, stderr)
 231  
 232  	// Monitor the process
 233  	go sm.monitorProcess()
 234  
 235  	log.I.F("sprocket started (pid=%d)", cmd.Process.Pid)
 236  	return nil
 237  }
 238  
 239  // StopSprocket stops the sprocket script gracefully, with SIGKILL fallback
 240  func (sm *SprocketManager) StopSprocket() error {
 241  	sm.mutex.Lock()
 242  	defer sm.mutex.Unlock()
 243  
 244  	if !sm.isRunning || sm.currentCmd == nil {
 245  		return fmt.Errorf("sprocket is not running")
 246  	}
 247  
 248  	// Close stdin first to signal the script to exit
 249  	if sm.stdin != nil {
 250  		sm.stdin.Close()
 251  	}
 252  
 253  	// Cancel the context
 254  	if sm.currentCancel != nil {
 255  		sm.currentCancel()
 256  	}
 257  
 258  	// Wait for graceful shutdown with timeout
 259  	done := make(chan error, 1)
 260  	go func() {
 261  		done <- sm.currentCmd.Wait()
 262  	}()
 263  
 264  	select {
 265  	case <-done:
 266  		// Process exited gracefully
 267  		log.I.F("sprocket stopped gracefully")
 268  	case <-time.After(5 * time.Second):
 269  		// Force kill after 5 seconds
 270  		log.W.F("sprocket did not stop gracefully, sending SIGKILL")
 271  		if err := sm.currentCmd.Process.Kill(); chk.E(err) {
 272  			log.E.F("failed to kill sprocket process: %v", err)
 273  		}
 274  		<-done // Wait for the kill to complete
 275  	}
 276  
 277  	// Clean up pipes
 278  	if sm.stdin != nil {
 279  		sm.stdin.Close()
 280  		sm.stdin = nil
 281  	}
 282  	if sm.stdout != nil {
 283  		sm.stdout.Close()
 284  		sm.stdout = nil
 285  	}
 286  	if sm.stderr != nil {
 287  		sm.stderr.Close()
 288  		sm.stderr = nil
 289  	}
 290  
 291  	sm.isRunning = false
 292  	sm.currentCmd = nil
 293  	sm.currentCancel = nil
 294  
 295  	return nil
 296  }
 297  
 298  // RestartSprocket stops and starts the sprocket script
 299  func (sm *SprocketManager) RestartSprocket() error {
 300  	if sm.isRunning {
 301  		if err := sm.StopSprocket(); chk.E(err) {
 302  			return fmt.Errorf("failed to stop sprocket: %v", err)
 303  		}
 304  		// Give it a moment to fully stop
 305  		time.Sleep(100 * time.Millisecond)
 306  	}
 307  
 308  	return sm.StartSprocket()
 309  }
 310  
 311  // UpdateSprocket updates the sprocket script and restarts it with zero downtime
 312  func (sm *SprocketManager) UpdateSprocket(scriptContent string) error {
 313  	// Ensure config directory exists
 314  	if err := os.MkdirAll(sm.configDir, 0755); chk.E(err) {
 315  		return fmt.Errorf("failed to create config directory: %v", err)
 316  	}
 317  
 318  	// If script content is empty, delete the script and stop
 319  	if strings.TrimSpace(scriptContent) == "" {
 320  		if sm.isRunning {
 321  			if err := sm.StopSprocket(); chk.E(err) {
 322  				log.E.F("failed to stop sprocket before deletion: %v", err)
 323  			}
 324  		}
 325  
 326  		if _, err := os.Stat(sm.scriptPath); err == nil {
 327  			if err := os.Remove(sm.scriptPath); chk.E(err) {
 328  				return fmt.Errorf("failed to delete sprocket script: %v", err)
 329  			}
 330  			log.I.F("sprocket script deleted")
 331  		}
 332  		return nil
 333  	}
 334  
 335  	// Create backup of existing script if it exists
 336  	if _, err := os.Stat(sm.scriptPath); err == nil {
 337  		timestamp := time.Now().Format("20060102150405")
 338  		backupPath := sm.scriptPath + "." + timestamp
 339  		if err := os.Rename(sm.scriptPath, backupPath); chk.E(err) {
 340  			log.W.F("failed to create backup: %v", err)
 341  		} else {
 342  			log.I.F("created backup: %s", backupPath)
 343  		}
 344  	}
 345  
 346  	// Write new script to temporary file first
 347  	tempPath := sm.scriptPath + ".tmp"
 348  	if err := os.WriteFile(tempPath, []byte(scriptContent), 0755); chk.E(err) {
 349  		return fmt.Errorf("failed to write temporary sprocket script: %v", err)
 350  	}
 351  
 352  	// If sprocket is running, do zero-downtime update
 353  	if sm.isRunning {
 354  		// Atomically replace the script file
 355  		if err := os.Rename(tempPath, sm.scriptPath); chk.E(err) {
 356  			os.Remove(tempPath) // Clean up temp file
 357  			return fmt.Errorf("failed to replace sprocket script: %v", err)
 358  		}
 359  
 360  		log.I.F("sprocket script updated atomically")
 361  
 362  		// Restart the sprocket process
 363  		return sm.RestartSprocket()
 364  	} else {
 365  		// Not running, just replace the file
 366  		if err := os.Rename(tempPath, sm.scriptPath); chk.E(err) {
 367  			os.Remove(tempPath) // Clean up temp file
 368  			return fmt.Errorf("failed to replace sprocket script: %v", err)
 369  		}
 370  
 371  		log.I.F("sprocket script updated")
 372  		return nil
 373  	}
 374  }
 375  
 376  // GetSprocketStatus returns the current status of the sprocket
 377  func (sm *SprocketManager) GetSprocketStatus() map[string]interface{} {
 378  	sm.mutex.RLock()
 379  	defer sm.mutex.RUnlock()
 380  
 381  	status := map[string]interface{}{
 382  		"is_running":    sm.isRunning,
 383  		"script_exists": false,
 384  		"script_path":   sm.scriptPath,
 385  	}
 386  
 387  	if _, err := os.Stat(sm.scriptPath); err == nil {
 388  		status["script_exists"] = true
 389  
 390  		// Get script content
 391  		if content, err := os.ReadFile(sm.scriptPath); err == nil {
 392  			status["script_content"] = string(content)
 393  		}
 394  
 395  		// Get file info
 396  		if info, err := os.Stat(sm.scriptPath); err == nil {
 397  			status["script_modified"] = info.ModTime()
 398  		}
 399  	}
 400  
 401  	if sm.isRunning && sm.currentCmd != nil && sm.currentCmd.Process != nil {
 402  		status["pid"] = sm.currentCmd.Process.Pid
 403  	}
 404  
 405  	return status
 406  }
 407  
 408  // GetSprocketVersions returns a list of all sprocket script versions
 409  func (sm *SprocketManager) GetSprocketVersions() ([]map[string]interface{}, error) {
 410  	versions := []map[string]interface{}{}
 411  
 412  	// Check for current script
 413  	if _, err := os.Stat(sm.scriptPath); err == nil {
 414  		if info, err := os.Stat(sm.scriptPath); err == nil {
 415  			if content, err := os.ReadFile(sm.scriptPath); err == nil {
 416  				versions = append(versions, map[string]interface{}{
 417  					"name":       "sprocket.sh",
 418  					"path":       sm.scriptPath,
 419  					"modified":   info.ModTime(),
 420  					"content":    string(content),
 421  					"is_current": true,
 422  				})
 423  			}
 424  		}
 425  	}
 426  
 427  	// Check for backup versions
 428  	dir := filepath.Dir(sm.scriptPath)
 429  	files, err := os.ReadDir(dir)
 430  	if chk.E(err) {
 431  		return versions, nil
 432  	}
 433  
 434  	for _, file := range files {
 435  		if strings.HasPrefix(file.Name(), "sprocket.sh.") && !file.IsDir() {
 436  			path := filepath.Join(dir, file.Name())
 437  			if info, err := os.Stat(path); err == nil {
 438  				if content, err := os.ReadFile(path); err == nil {
 439  					versions = append(versions, map[string]interface{}{
 440  						"name":       file.Name(),
 441  						"path":       path,
 442  						"modified":   info.ModTime(),
 443  						"content":    string(content),
 444  						"is_current": false,
 445  					})
 446  				}
 447  			}
 448  		}
 449  	}
 450  
 451  	return versions, nil
 452  }
 453  
 454  // DeleteSprocketVersion deletes a specific sprocket version
 455  func (sm *SprocketManager) DeleteSprocketVersion(filename string) error {
 456  	// Don't allow deleting the current script
 457  	if filename == "sprocket.sh" {
 458  		return fmt.Errorf("cannot delete current sprocket script")
 459  	}
 460  
 461  	path := filepath.Join(sm.configDir, filename)
 462  	if err := os.Remove(path); chk.E(err) {
 463  		return fmt.Errorf("failed to delete sprocket version: %v", err)
 464  	}
 465  
 466  	log.I.F("deleted sprocket version: %s", filename)
 467  	return nil
 468  }
 469  
 470  // logOutput logs the output from stdout and stderr
 471  func (sm *SprocketManager) logOutput(stdout, stderr io.ReadCloser) {
 472  	defer stdout.Close()
 473  	defer stderr.Close()
 474  
 475  	// Trace-log stdout lines
 476  	go func() {
 477  		scanner := bufio.NewScanner(stdout)
 478  		for scanner.Scan() {
 479  			line := scanner.Text()
 480  			if line == "" {
 481  				continue
 482  			}
 483  			log.T.F("sprocket stdout: %s", line)
 484  		}
 485  	}()
 486  
 487  	// Trace-log stderr lines
 488  	go func() {
 489  		scanner := bufio.NewScanner(stderr)
 490  		for scanner.Scan() {
 491  			line := scanner.Text()
 492  			if line == "" {
 493  				continue
 494  			}
 495  			log.T.F("sprocket stderr: %s", line)
 496  		}
 497  	}()
 498  }
 499  
 500  // ProcessEvent sends an event to the sprocket script and waits for a response
 501  func (sm *SprocketManager) ProcessEvent(evt *event.E) (*SprocketResponse, error) {
 502  	sm.mutex.RLock()
 503  	if !sm.isRunning || sm.stdin == nil {
 504  		sm.mutex.RUnlock()
 505  		return nil, fmt.Errorf("sprocket is not running")
 506  	}
 507  	stdin := sm.stdin
 508  	sm.mutex.RUnlock()
 509  
 510  	// Serialize the event to JSON
 511  	eventJSON, err := json.Marshal(evt)
 512  	if chk.E(err) {
 513  		return nil, fmt.Errorf("failed to serialize event: %v", err)
 514  	}
 515  
 516  	// Send the event JSON to the sprocket script
 517  	// The final ']' should be the only thing after the event's raw JSON
 518  	if _, err := stdin.Write(eventJSON); chk.E(err) {
 519  		return nil, fmt.Errorf("failed to write event to sprocket: %v", err)
 520  	}
 521  
 522  	// Wait for response with timeout
 523  	select {
 524  	case response := <-sm.responseChan:
 525  		return &response, nil
 526  	case <-time.After(5 * time.Second):
 527  		return nil, fmt.Errorf("sprocket response timeout")
 528  	case <-sm.ctx.Done():
 529  		return nil, fmt.Errorf("sprocket context cancelled")
 530  	}
 531  }
 532  
 533  // readResponses reads JSONL responses from the sprocket script
 534  func (sm *SprocketManager) readResponses() {
 535  	if sm.stdout == nil {
 536  		return
 537  	}
 538  
 539  	scanner := bufio.NewScanner(sm.stdout)
 540  	for scanner.Scan() {
 541  		line := scanner.Text()
 542  		if line == "" {
 543  			continue
 544  		}
 545  
 546  		var response SprocketResponse
 547  		if err := json.Unmarshal([]byte(line), &response); chk.E(err) {
 548  			log.E.F("failed to parse sprocket response: %v", err)
 549  			continue
 550  		}
 551  
 552  		// Send response to channel (non-blocking)
 553  		select {
 554  		case sm.responseChan <- response:
 555  		default:
 556  			log.W.F("sprocket response channel full, dropping response")
 557  		}
 558  	}
 559  
 560  	if err := scanner.Err(); chk.E(err) {
 561  		log.E.F("error reading sprocket responses: %v", err)
 562  	}
 563  }
 564  
 565  // IsEnabled returns whether sprocket is enabled
 566  func (sm *SprocketManager) IsEnabled() bool {
 567  	return sm.enabled
 568  }
 569  
 570  // IsRunning returns whether sprocket is currently running
 571  func (sm *SprocketManager) IsRunning() bool {
 572  	sm.mutex.RLock()
 573  	defer sm.mutex.RUnlock()
 574  	return sm.isRunning
 575  }
 576  
 577  // IsDisabled returns whether sprocket is disabled due to failure
 578  func (sm *SprocketManager) IsDisabled() bool {
 579  	sm.mutex.RLock()
 580  	defer sm.mutex.RUnlock()
 581  	return sm.disabled
 582  }
 583  
 584  // monitorProcess monitors the sprocket process and cleans up when it exits
 585  func (sm *SprocketManager) monitorProcess() {
 586  	if sm.currentCmd == nil {
 587  		return
 588  	}
 589  
 590  	err := sm.currentCmd.Wait()
 591  
 592  	sm.mutex.Lock()
 593  	defer sm.mutex.Unlock()
 594  
 595  	// Clean up pipes
 596  	if sm.stdin != nil {
 597  		sm.stdin.Close()
 598  		sm.stdin = nil
 599  	}
 600  	if sm.stdout != nil {
 601  		sm.stdout.Close()
 602  		sm.stdout = nil
 603  	}
 604  	if sm.stderr != nil {
 605  		sm.stderr.Close()
 606  		sm.stderr = nil
 607  	}
 608  
 609  	sm.isRunning = false
 610  	sm.currentCmd = nil
 611  	sm.currentCancel = nil
 612  
 613  	if err != nil {
 614  		log.E.F("sprocket process exited with error: %v", err)
 615  		// Auto-disable sprocket on failure
 616  		sm.disabled = true
 617  		log.W.F("sprocket disabled due to process failure - all events will be rejected (script location: %s)", sm.scriptPath)
 618  	} else {
 619  		log.I.F("sprocket process exited normally")
 620  	}
 621  }
 622  
 623  // Shutdown gracefully shuts down the sprocket manager
 624  func (sm *SprocketManager) Shutdown() {
 625  	sm.cancel()
 626  	if sm.isRunning {
 627  		sm.StopSprocket()
 628  	}
 629  }
 630