sprocket.go raw
1 package app
2
3 import (
4 "bufio"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "os"
10 "os/exec"
11 "path/filepath"
12 "strings"
13 "sync"
14 "time"
15
16 "github.com/adrg/xdg"
17 "next.orly.dev/pkg/lol/chk"
18 "next.orly.dev/pkg/lol/log"
19 "next.orly.dev/pkg/nostr/encoders/event"
20 )
21
22 // SprocketResponse represents a response from the sprocket script
23 type SprocketResponse struct {
24 ID string `json:"id"`
25 Action string `json:"action"` // accept, reject, or shadowReject
26 Msg string `json:"msg"` // NIP-20 response message (only used for reject)
27 }
28
29 // SprocketManager handles sprocket script execution and management
30 type SprocketManager struct {
31 ctx context.Context
32 cancel context.CancelFunc
33 configDir string
34 scriptPath string
35 currentCmd *exec.Cmd
36 currentCancel context.CancelFunc
37 mutex sync.RWMutex
38 isRunning bool
39 enabled bool
40 disabled bool // true when sprocket is disabled due to failure
41 stdin io.WriteCloser
42 stdout io.ReadCloser
43 stderr io.ReadCloser
44 responseChan chan SprocketResponse
45 }
46
47 // NewSprocketManager creates a new sprocket manager
48 func NewSprocketManager(ctx context.Context, appName string, enabled bool) *SprocketManager {
49 configDir := filepath.Join(xdg.ConfigHome, appName)
50 scriptPath := filepath.Join(configDir, "sprocket.sh")
51
52 ctx, cancel := context.WithCancel(ctx)
53
54 sm := &SprocketManager{
55 ctx: ctx,
56 cancel: cancel,
57 configDir: configDir,
58 scriptPath: scriptPath,
59 enabled: enabled,
60 disabled: false,
61 responseChan: make(chan SprocketResponse, 100), // Buffered channel for responses
62 }
63
64 // Start the sprocket script if it exists and is enabled
65 if enabled {
66 go sm.startSprocketIfExists()
67 // Start periodic check for sprocket script availability
68 go sm.periodicCheck()
69 }
70
71 return sm
72 }
73
74 // disableSprocket disables sprocket due to failure
75 func (sm *SprocketManager) disableSprocket() {
76 sm.mutex.Lock()
77 defer sm.mutex.Unlock()
78
79 if !sm.disabled {
80 sm.disabled = true
81 log.W.F("sprocket disabled due to failure - all events will be rejected (script location: %s)", sm.scriptPath)
82 }
83 }
84
85 // enableSprocket re-enables sprocket and attempts to start it
86 func (sm *SprocketManager) enableSprocket() {
87 sm.mutex.Lock()
88 defer sm.mutex.Unlock()
89
90 if sm.disabled {
91 sm.disabled = false
92 log.I.F("sprocket re-enabled, attempting to start")
93
94 // Attempt to start sprocket in background
95 go func() {
96 if _, err := os.Stat(sm.scriptPath); err == nil {
97 if err := sm.StartSprocket(); err != nil {
98 log.E.F("failed to restart sprocket: %v", err)
99 sm.disableSprocket()
100 } else {
101 log.I.F("sprocket restarted successfully")
102 }
103 } else {
104 log.W.F("sprocket script still not found, keeping disabled")
105 sm.disableSprocket()
106 }
107 }()
108 }
109 }
110
111 // periodicCheck periodically checks if sprocket script becomes available
112 func (sm *SprocketManager) periodicCheck() {
113 ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds
114 defer ticker.Stop()
115
116 for {
117 select {
118 case <-sm.ctx.Done():
119 return
120 case <-ticker.C:
121 sm.mutex.RLock()
122 disabled := sm.disabled
123 running := sm.isRunning
124 sm.mutex.RUnlock()
125
126 // Only check if sprocket is disabled or not running
127 if disabled || !running {
128 if _, err := os.Stat(sm.scriptPath); err == nil {
129 // Script is available, try to enable/restart
130 if disabled {
131 sm.enableSprocket()
132 } else if !running {
133 // Script exists but sprocket isn't running, try to start
134 go func() {
135 if err := sm.StartSprocket(); err != nil {
136 log.E.F("failed to restart sprocket: %v", err)
137 sm.disableSprocket()
138 } else {
139 log.I.F("sprocket restarted successfully")
140 }
141 }()
142 }
143 }
144 }
145 }
146 }
147 }
148
149 // startSprocketIfExists starts the sprocket script if the file exists
150 func (sm *SprocketManager) startSprocketIfExists() {
151 if _, err := os.Stat(sm.scriptPath); err == nil {
152 if err := sm.StartSprocket(); err != nil {
153 log.E.F("failed to start sprocket: %v", err)
154 sm.disableSprocket()
155 }
156 } else {
157 log.W.F("sprocket script not found at %s, disabling sprocket", sm.scriptPath)
158 sm.disableSprocket()
159 }
160 }
161
162 // StartSprocket starts the sprocket script
163 func (sm *SprocketManager) StartSprocket() error {
164 sm.mutex.Lock()
165 defer sm.mutex.Unlock()
166
167 if sm.isRunning {
168 return fmt.Errorf("sprocket is already running")
169 }
170
171 if _, err := os.Stat(sm.scriptPath); os.IsNotExist(err) {
172 return fmt.Errorf("sprocket script does not exist")
173 }
174
175 // Create a new context for this command
176 cmdCtx, cmdCancel := context.WithCancel(sm.ctx)
177
178 // Make the script executable
179 if err := os.Chmod(sm.scriptPath, 0755); chk.E(err) {
180 cmdCancel()
181 return fmt.Errorf("failed to make script executable: %v", err)
182 }
183
184 // Start the script
185 cmd := exec.CommandContext(cmdCtx, sm.scriptPath)
186 cmd.Dir = sm.configDir
187
188 // Set up stdio pipes for communication
189 stdin, err := cmd.StdinPipe()
190 if chk.E(err) {
191 cmdCancel()
192 return fmt.Errorf("failed to create stdin pipe: %v", err)
193 }
194
195 stdout, err := cmd.StdoutPipe()
196 if chk.E(err) {
197 cmdCancel()
198 stdin.Close()
199 return fmt.Errorf("failed to create stdout pipe: %v", err)
200 }
201
202 stderr, err := cmd.StderrPipe()
203 if chk.E(err) {
204 cmdCancel()
205 stdin.Close()
206 stdout.Close()
207 return fmt.Errorf("failed to create stderr pipe: %v", err)
208 }
209
210 // Start the command
211 if err := cmd.Start(); chk.E(err) {
212 cmdCancel()
213 stdin.Close()
214 stdout.Close()
215 stderr.Close()
216 return fmt.Errorf("failed to start sprocket: %v", err)
217 }
218
219 sm.currentCmd = cmd
220 sm.currentCancel = cmdCancel
221 sm.stdin = stdin
222 sm.stdout = stdout
223 sm.stderr = stderr
224 sm.isRunning = true
225
226 // Start response reader in background
227 go sm.readResponses()
228
229 // Log stderr output in background
230 go sm.logOutput(stdout, stderr)
231
232 // Monitor the process
233 go sm.monitorProcess()
234
235 log.I.F("sprocket started (pid=%d)", cmd.Process.Pid)
236 return nil
237 }
238
239 // StopSprocket stops the sprocket script gracefully, with SIGKILL fallback
240 func (sm *SprocketManager) StopSprocket() error {
241 sm.mutex.Lock()
242 defer sm.mutex.Unlock()
243
244 if !sm.isRunning || sm.currentCmd == nil {
245 return fmt.Errorf("sprocket is not running")
246 }
247
248 // Close stdin first to signal the script to exit
249 if sm.stdin != nil {
250 sm.stdin.Close()
251 }
252
253 // Cancel the context
254 if sm.currentCancel != nil {
255 sm.currentCancel()
256 }
257
258 // Wait for graceful shutdown with timeout
259 done := make(chan error, 1)
260 go func() {
261 done <- sm.currentCmd.Wait()
262 }()
263
264 select {
265 case <-done:
266 // Process exited gracefully
267 log.I.F("sprocket stopped gracefully")
268 case <-time.After(5 * time.Second):
269 // Force kill after 5 seconds
270 log.W.F("sprocket did not stop gracefully, sending SIGKILL")
271 if err := sm.currentCmd.Process.Kill(); chk.E(err) {
272 log.E.F("failed to kill sprocket process: %v", err)
273 }
274 <-done // Wait for the kill to complete
275 }
276
277 // Clean up pipes
278 if sm.stdin != nil {
279 sm.stdin.Close()
280 sm.stdin = nil
281 }
282 if sm.stdout != nil {
283 sm.stdout.Close()
284 sm.stdout = nil
285 }
286 if sm.stderr != nil {
287 sm.stderr.Close()
288 sm.stderr = nil
289 }
290
291 sm.isRunning = false
292 sm.currentCmd = nil
293 sm.currentCancel = nil
294
295 return nil
296 }
297
298 // RestartSprocket stops and starts the sprocket script
299 func (sm *SprocketManager) RestartSprocket() error {
300 if sm.isRunning {
301 if err := sm.StopSprocket(); chk.E(err) {
302 return fmt.Errorf("failed to stop sprocket: %v", err)
303 }
304 // Give it a moment to fully stop
305 time.Sleep(100 * time.Millisecond)
306 }
307
308 return sm.StartSprocket()
309 }
310
311 // UpdateSprocket updates the sprocket script and restarts it with zero downtime
312 func (sm *SprocketManager) UpdateSprocket(scriptContent string) error {
313 // Ensure config directory exists
314 if err := os.MkdirAll(sm.configDir, 0755); chk.E(err) {
315 return fmt.Errorf("failed to create config directory: %v", err)
316 }
317
318 // If script content is empty, delete the script and stop
319 if strings.TrimSpace(scriptContent) == "" {
320 if sm.isRunning {
321 if err := sm.StopSprocket(); chk.E(err) {
322 log.E.F("failed to stop sprocket before deletion: %v", err)
323 }
324 }
325
326 if _, err := os.Stat(sm.scriptPath); err == nil {
327 if err := os.Remove(sm.scriptPath); chk.E(err) {
328 return fmt.Errorf("failed to delete sprocket script: %v", err)
329 }
330 log.I.F("sprocket script deleted")
331 }
332 return nil
333 }
334
335 // Create backup of existing script if it exists
336 if _, err := os.Stat(sm.scriptPath); err == nil {
337 timestamp := time.Now().Format("20060102150405")
338 backupPath := sm.scriptPath + "." + timestamp
339 if err := os.Rename(sm.scriptPath, backupPath); chk.E(err) {
340 log.W.F("failed to create backup: %v", err)
341 } else {
342 log.I.F("created backup: %s", backupPath)
343 }
344 }
345
346 // Write new script to temporary file first
347 tempPath := sm.scriptPath + ".tmp"
348 if err := os.WriteFile(tempPath, []byte(scriptContent), 0755); chk.E(err) {
349 return fmt.Errorf("failed to write temporary sprocket script: %v", err)
350 }
351
352 // If sprocket is running, do zero-downtime update
353 if sm.isRunning {
354 // Atomically replace the script file
355 if err := os.Rename(tempPath, sm.scriptPath); chk.E(err) {
356 os.Remove(tempPath) // Clean up temp file
357 return fmt.Errorf("failed to replace sprocket script: %v", err)
358 }
359
360 log.I.F("sprocket script updated atomically")
361
362 // Restart the sprocket process
363 return sm.RestartSprocket()
364 } else {
365 // Not running, just replace the file
366 if err := os.Rename(tempPath, sm.scriptPath); chk.E(err) {
367 os.Remove(tempPath) // Clean up temp file
368 return fmt.Errorf("failed to replace sprocket script: %v", err)
369 }
370
371 log.I.F("sprocket script updated")
372 return nil
373 }
374 }
375
376 // GetSprocketStatus returns the current status of the sprocket
377 func (sm *SprocketManager) GetSprocketStatus() map[string]interface{} {
378 sm.mutex.RLock()
379 defer sm.mutex.RUnlock()
380
381 status := map[string]interface{}{
382 "is_running": sm.isRunning,
383 "script_exists": false,
384 "script_path": sm.scriptPath,
385 }
386
387 if _, err := os.Stat(sm.scriptPath); err == nil {
388 status["script_exists"] = true
389
390 // Get script content
391 if content, err := os.ReadFile(sm.scriptPath); err == nil {
392 status["script_content"] = string(content)
393 }
394
395 // Get file info
396 if info, err := os.Stat(sm.scriptPath); err == nil {
397 status["script_modified"] = info.ModTime()
398 }
399 }
400
401 if sm.isRunning && sm.currentCmd != nil && sm.currentCmd.Process != nil {
402 status["pid"] = sm.currentCmd.Process.Pid
403 }
404
405 return status
406 }
407
408 // GetSprocketVersions returns a list of all sprocket script versions
409 func (sm *SprocketManager) GetSprocketVersions() ([]map[string]interface{}, error) {
410 versions := []map[string]interface{}{}
411
412 // Check for current script
413 if _, err := os.Stat(sm.scriptPath); err == nil {
414 if info, err := os.Stat(sm.scriptPath); err == nil {
415 if content, err := os.ReadFile(sm.scriptPath); err == nil {
416 versions = append(versions, map[string]interface{}{
417 "name": "sprocket.sh",
418 "path": sm.scriptPath,
419 "modified": info.ModTime(),
420 "content": string(content),
421 "is_current": true,
422 })
423 }
424 }
425 }
426
427 // Check for backup versions
428 dir := filepath.Dir(sm.scriptPath)
429 files, err := os.ReadDir(dir)
430 if chk.E(err) {
431 return versions, nil
432 }
433
434 for _, file := range files {
435 if strings.HasPrefix(file.Name(), "sprocket.sh.") && !file.IsDir() {
436 path := filepath.Join(dir, file.Name())
437 if info, err := os.Stat(path); err == nil {
438 if content, err := os.ReadFile(path); err == nil {
439 versions = append(versions, map[string]interface{}{
440 "name": file.Name(),
441 "path": path,
442 "modified": info.ModTime(),
443 "content": string(content),
444 "is_current": false,
445 })
446 }
447 }
448 }
449 }
450
451 return versions, nil
452 }
453
454 // DeleteSprocketVersion deletes a specific sprocket version
455 func (sm *SprocketManager) DeleteSprocketVersion(filename string) error {
456 // Don't allow deleting the current script
457 if filename == "sprocket.sh" {
458 return fmt.Errorf("cannot delete current sprocket script")
459 }
460
461 path := filepath.Join(sm.configDir, filename)
462 if err := os.Remove(path); chk.E(err) {
463 return fmt.Errorf("failed to delete sprocket version: %v", err)
464 }
465
466 log.I.F("deleted sprocket version: %s", filename)
467 return nil
468 }
469
470 // logOutput logs the output from stdout and stderr
471 func (sm *SprocketManager) logOutput(stdout, stderr io.ReadCloser) {
472 defer stdout.Close()
473 defer stderr.Close()
474
475 // Trace-log stdout lines
476 go func() {
477 scanner := bufio.NewScanner(stdout)
478 for scanner.Scan() {
479 line := scanner.Text()
480 if line == "" {
481 continue
482 }
483 log.T.F("sprocket stdout: %s", line)
484 }
485 }()
486
487 // Trace-log stderr lines
488 go func() {
489 scanner := bufio.NewScanner(stderr)
490 for scanner.Scan() {
491 line := scanner.Text()
492 if line == "" {
493 continue
494 }
495 log.T.F("sprocket stderr: %s", line)
496 }
497 }()
498 }
499
500 // ProcessEvent sends an event to the sprocket script and waits for a response
501 func (sm *SprocketManager) ProcessEvent(evt *event.E) (*SprocketResponse, error) {
502 sm.mutex.RLock()
503 if !sm.isRunning || sm.stdin == nil {
504 sm.mutex.RUnlock()
505 return nil, fmt.Errorf("sprocket is not running")
506 }
507 stdin := sm.stdin
508 sm.mutex.RUnlock()
509
510 // Serialize the event to JSON
511 eventJSON, err := json.Marshal(evt)
512 if chk.E(err) {
513 return nil, fmt.Errorf("failed to serialize event: %v", err)
514 }
515
516 // Send the event JSON to the sprocket script
517 // The final ']' should be the only thing after the event's raw JSON
518 if _, err := stdin.Write(eventJSON); chk.E(err) {
519 return nil, fmt.Errorf("failed to write event to sprocket: %v", err)
520 }
521
522 // Wait for response with timeout
523 select {
524 case response := <-sm.responseChan:
525 return &response, nil
526 case <-time.After(5 * time.Second):
527 return nil, fmt.Errorf("sprocket response timeout")
528 case <-sm.ctx.Done():
529 return nil, fmt.Errorf("sprocket context cancelled")
530 }
531 }
532
533 // readResponses reads JSONL responses from the sprocket script
534 func (sm *SprocketManager) readResponses() {
535 if sm.stdout == nil {
536 return
537 }
538
539 scanner := bufio.NewScanner(sm.stdout)
540 for scanner.Scan() {
541 line := scanner.Text()
542 if line == "" {
543 continue
544 }
545
546 var response SprocketResponse
547 if err := json.Unmarshal([]byte(line), &response); chk.E(err) {
548 log.E.F("failed to parse sprocket response: %v", err)
549 continue
550 }
551
552 // Send response to channel (non-blocking)
553 select {
554 case sm.responseChan <- response:
555 default:
556 log.W.F("sprocket response channel full, dropping response")
557 }
558 }
559
560 if err := scanner.Err(); chk.E(err) {
561 log.E.F("error reading sprocket responses: %v", err)
562 }
563 }
564
565 // IsEnabled returns whether sprocket is enabled
566 func (sm *SprocketManager) IsEnabled() bool {
567 return sm.enabled
568 }
569
570 // IsRunning returns whether sprocket is currently running
571 func (sm *SprocketManager) IsRunning() bool {
572 sm.mutex.RLock()
573 defer sm.mutex.RUnlock()
574 return sm.isRunning
575 }
576
577 // IsDisabled returns whether sprocket is disabled due to failure
578 func (sm *SprocketManager) IsDisabled() bool {
579 sm.mutex.RLock()
580 defer sm.mutex.RUnlock()
581 return sm.disabled
582 }
583
584 // monitorProcess monitors the sprocket process and cleans up when it exits
585 func (sm *SprocketManager) monitorProcess() {
586 if sm.currentCmd == nil {
587 return
588 }
589
590 err := sm.currentCmd.Wait()
591
592 sm.mutex.Lock()
593 defer sm.mutex.Unlock()
594
595 // Clean up pipes
596 if sm.stdin != nil {
597 sm.stdin.Close()
598 sm.stdin = nil
599 }
600 if sm.stdout != nil {
601 sm.stdout.Close()
602 sm.stdout = nil
603 }
604 if sm.stderr != nil {
605 sm.stderr.Close()
606 sm.stderr = nil
607 }
608
609 sm.isRunning = false
610 sm.currentCmd = nil
611 sm.currentCancel = nil
612
613 if err != nil {
614 log.E.F("sprocket process exited with error: %v", err)
615 // Auto-disable sprocket on failure
616 sm.disabled = true
617 log.W.F("sprocket disabled due to process failure - all events will be rejected (script location: %s)", sm.scriptPath)
618 } else {
619 log.I.F("sprocket process exited normally")
620 }
621 }
622
623 // Shutdown gracefully shuts down the sprocket manager
624 func (sm *SprocketManager) Shutdown() {
625 sm.cancel()
626 if sm.isRunning {
627 sm.StopSprocket()
628 }
629 }
630