Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
435 lines
15 KiB
Python
435 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Conductor Watchdog - Heartbeat monitoring for Luzia tasks
|
|
|
|
Monitors active tasks in conductor directories and escalates stalled tasks.
|
|
|
|
Features:
|
|
- Scans heartbeat.json files for liveness
|
|
- Reads progress.md for semantic status
|
|
- Escalates via assistant-channel MCP when stalled
|
|
- Archives failed/completed tasks
|
|
|
|
Usage:
|
|
from watchdog import ConductorWatchdog
|
|
|
|
watchdog = ConductorWatchdog()
|
|
stalled = watchdog.scan_all_projects()
|
|
for task_id, project, reason in stalled:
|
|
watchdog.escalate(project, task_id, reason)
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
|
|
|
|
def validate_project_name(project: str) -> bool:
|
|
"""
|
|
Validate project name to prevent path traversal attacks.
|
|
|
|
Rules:
|
|
- Must be alphanumeric with hyphens/underscores only
|
|
- Cannot contain path separators or dots
|
|
- Must be 1-32 characters
|
|
- Cannot start with hyphen or underscore
|
|
"""
|
|
if not project or len(project) > 32:
|
|
return False
|
|
# Only allow alphanumeric, hyphen, underscore; must start with letter
|
|
if not re.match(r'^[a-zA-Z][a-zA-Z0-9_-]*$', project):
|
|
return False
|
|
# Extra check: no path components
|
|
if '/' in project or '\\' in project or '..' in project:
|
|
return False
|
|
return True
|
|
|
|
|
|
class ConductorWatchdog:
|
|
"""Monitor conductor tasks for stalls and liveness."""
|
|
|
|
STALL_TIMEOUT = 600 # 10 minutes
|
|
PROJECTS_BASE = Path("/home")
|
|
QUEUE_CONFIG = Path("/var/lib/luzia/queue/config.json")
|
|
|
|
def __init__(self, stall_timeout: int = None):
|
|
self.stall_timeout = stall_timeout or self._load_stall_timeout()
|
|
|
|
def _load_stall_timeout(self) -> int:
|
|
"""Load stall timeout from queue config."""
|
|
if self.QUEUE_CONFIG.exists():
|
|
try:
|
|
config = json.loads(self.QUEUE_CONFIG.read_text())
|
|
return config.get("stall_timeout_seconds", self.STALL_TIMEOUT)
|
|
except (json.JSONDecodeError, IOError):
|
|
pass
|
|
return self.STALL_TIMEOUT
|
|
|
|
def _get_project_users(self) -> List[str]:
|
|
"""Get list of project users (non-system users with home dirs)."""
|
|
projects = []
|
|
for home_dir in self.PROJECTS_BASE.iterdir():
|
|
try:
|
|
if not home_dir.is_dir():
|
|
continue
|
|
# Skip system users
|
|
if home_dir.name in ("admin", "root", "ubuntu", "lost+found", "guest"):
|
|
continue
|
|
# Check if has conductor directory
|
|
if (home_dir / "conductor").exists():
|
|
projects.append(home_dir.name)
|
|
except PermissionError:
|
|
# Skip directories we can't access
|
|
continue
|
|
return projects
|
|
|
|
def get_conductor_base(self, project: str) -> Path:
|
|
"""Get conductor base directory for a project."""
|
|
return self.PROJECTS_BASE / project / "conductor"
|
|
|
|
# --- Task State Reading ---
|
|
|
|
def read_task_state(self, task_dir: Path) -> Dict[str, Any]:
|
|
"""Read complete task state from conductor directory."""
|
|
state = {
|
|
"id": task_dir.name,
|
|
"path": str(task_dir),
|
|
"meta": {},
|
|
"heartbeat": None,
|
|
"heartbeat_age": None,
|
|
"progress": "",
|
|
"progress_summary": "",
|
|
"stalled": False,
|
|
"stall_reason": None,
|
|
}
|
|
|
|
# Read meta.json
|
|
meta_file = task_dir / "meta.json"
|
|
if meta_file.exists():
|
|
try:
|
|
state["meta"] = json.loads(meta_file.read_text())
|
|
except (json.JSONDecodeError, IOError):
|
|
state["meta"] = {"error": "corrupt meta.json"}
|
|
|
|
# Read heartbeat.json
|
|
heartbeat_file = task_dir / "heartbeat.json"
|
|
if heartbeat_file.exists():
|
|
try:
|
|
heartbeat = json.loads(heartbeat_file.read_text())
|
|
state["heartbeat"] = heartbeat
|
|
state["heartbeat_age"] = time.time() - heartbeat.get("ts", 0)
|
|
|
|
# Check if stalled
|
|
if state["heartbeat_age"] > self.stall_timeout:
|
|
state["stalled"] = True
|
|
state["stall_reason"] = f"no_heartbeat_{int(state['heartbeat_age'])}s"
|
|
except (json.JSONDecodeError, IOError):
|
|
state["stalled"] = True
|
|
state["stall_reason"] = "corrupt_heartbeat"
|
|
else:
|
|
state["stalled"] = True
|
|
state["stall_reason"] = "missing_heartbeat"
|
|
|
|
# Read progress.md
|
|
progress_file = task_dir / "progress.md"
|
|
if progress_file.exists():
|
|
state["progress"] = progress_file.read_text()
|
|
state["progress_summary"] = self._extract_progress_summary(state["progress"])
|
|
|
|
return state
|
|
|
|
def _extract_progress_summary(self, progress_md: str) -> str:
|
|
"""Extract last milestone or current status from progress.md."""
|
|
# Find completed milestones
|
|
completed = re.findall(r"- \[x\] (.+)", progress_md, re.IGNORECASE)
|
|
if completed:
|
|
return f"Completed: {completed[-1]}"
|
|
|
|
# Find current status section
|
|
status_match = re.search(
|
|
r"## Current Status\s*\n(.+?)(?:\n#|\Z)", progress_md, re.DOTALL
|
|
)
|
|
if status_match:
|
|
status = status_match.group(1).strip().split("\n")[0]
|
|
return status[:100]
|
|
|
|
return "No progress recorded"
|
|
|
|
# --- Scanning ---
|
|
|
|
def scan_project(self, project: str) -> List[Dict[str, Any]]:
|
|
"""Scan all active tasks for a project."""
|
|
conductor_base = self.get_conductor_base(project)
|
|
active_dir = conductor_base / "active"
|
|
|
|
if not active_dir.exists():
|
|
return []
|
|
|
|
tasks = []
|
|
for task_dir in active_dir.iterdir():
|
|
if task_dir.is_dir():
|
|
state = self.read_task_state(task_dir)
|
|
state["project"] = project
|
|
tasks.append(state)
|
|
|
|
return tasks
|
|
|
|
def scan_all_projects(self) -> List[Tuple[str, str, str]]:
|
|
"""
|
|
Scan all projects for stalled tasks.
|
|
|
|
Returns: List of (task_id, project, stall_reason)
|
|
"""
|
|
stalled = []
|
|
for project in self._get_project_users():
|
|
for task in self.scan_project(project):
|
|
if task.get("stalled"):
|
|
stalled.append((
|
|
task["id"],
|
|
project,
|
|
task.get("stall_reason", "unknown")
|
|
))
|
|
return stalled
|
|
|
|
def get_all_active_tasks(self) -> List[Dict[str, Any]]:
|
|
"""Get all active tasks across all projects."""
|
|
tasks = []
|
|
for project in self._get_project_users():
|
|
tasks.extend(self.scan_project(project))
|
|
return tasks
|
|
|
|
# --- Escalation ---
|
|
|
|
def escalate(self, project: str, task_id: str, reason: str) -> bool:
|
|
"""
|
|
Escalate stalled task via assistant-channel.
|
|
|
|
Returns True if escalation was sent.
|
|
"""
|
|
# SECURITY: Validate inputs to prevent path traversal
|
|
if not validate_project_name(project):
|
|
print(f"[watchdog] Invalid project name: {project}")
|
|
return False
|
|
if not re.match(r'^[a-zA-Z0-9_-]+$', task_id):
|
|
print(f"[watchdog] Invalid task_id: {task_id}")
|
|
return False
|
|
|
|
conductor_base = self.get_conductor_base(project)
|
|
task_dir = conductor_base / "active" / task_id
|
|
|
|
# Read task details
|
|
meta = {}
|
|
if (task_dir / "meta.json").exists():
|
|
try:
|
|
meta = json.loads((task_dir / "meta.json").read_text())
|
|
except (json.JSONDecodeError, IOError):
|
|
pass
|
|
|
|
prompt = meta.get("prompt", "Unknown task")[:200]
|
|
started = meta.get("started", "Unknown")
|
|
|
|
# Format message
|
|
message = f"""**Task Stalled Alert**
|
|
|
|
Project: `{project}`
|
|
Task ID: `{task_id}`
|
|
Reason: `{reason}`
|
|
Started: {started}
|
|
Prompt: {prompt}...
|
|
|
|
**Action Required:** Check task status or restart.
|
|
"""
|
|
|
|
# Try to send via assistant-channel
|
|
try:
|
|
# Import here to avoid circular dependency
|
|
import subprocess
|
|
|
|
# SECURITY: Pass arguments via argv, not string interpolation
|
|
# This prevents RCE via malicious project names or prompts
|
|
escalation_script = """
|
|
import sys
|
|
sys.path.insert(0, '/opt/server-agents/mcp-servers/assistant-channel')
|
|
from channel import send_message
|
|
sender = sys.argv[1]
|
|
msg = sys.argv[2]
|
|
send_message(sender, msg, 'high')
|
|
"""
|
|
result = subprocess.run(
|
|
["python3", "-c", escalation_script, f"watchdog-{project}", message],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
if result.returncode == 0:
|
|
print(f"[watchdog] Escalated {task_id} from {project}")
|
|
return True
|
|
else:
|
|
print(f"[watchdog] Escalation failed: {result.stderr}")
|
|
except Exception as e:
|
|
print(f"[watchdog] Escalation error: {e}")
|
|
|
|
# Fallback: write escalation to file
|
|
escalation_file = task_dir / "escalation.json"
|
|
escalation = {
|
|
"reason": reason,
|
|
"escalated_at": datetime.now().isoformat(),
|
|
"message": message,
|
|
}
|
|
try:
|
|
escalation_file.write_text(json.dumps(escalation, indent=2))
|
|
print(f"[watchdog] Wrote escalation to {escalation_file}")
|
|
return True
|
|
except IOError as e:
|
|
print(f"[watchdog] Cannot write escalation: {e}")
|
|
return False
|
|
|
|
# --- Archive ---
|
|
|
|
def archive_task(
|
|
self, project: str, task_id: str, status: str = "completed"
|
|
) -> bool:
|
|
"""
|
|
Move task from active to completed or failed.
|
|
|
|
Args:
|
|
project: Project name
|
|
task_id: Task ID
|
|
status: 'completed' or 'failed'
|
|
|
|
Returns True if archived successfully.
|
|
"""
|
|
# SECURITY: Validate inputs to prevent path traversal
|
|
if not validate_project_name(project):
|
|
print(f"[watchdog] Invalid project name: {project}")
|
|
return False
|
|
if not re.match(r'^[a-zA-Z0-9_-]+$', task_id):
|
|
print(f"[watchdog] Invalid task_id: {task_id}")
|
|
return False
|
|
if status not in ("completed", "failed"):
|
|
print(f"[watchdog] Invalid status: {status}")
|
|
return False
|
|
|
|
conductor_base = self.get_conductor_base(project)
|
|
source = conductor_base / "active" / task_id
|
|
dest = conductor_base / status / task_id
|
|
|
|
if not source.exists():
|
|
return False
|
|
|
|
try:
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.move(str(source), str(dest))
|
|
|
|
# Update meta with archive timestamp
|
|
meta_file = dest / "meta.json"
|
|
if meta_file.exists():
|
|
meta = json.loads(meta_file.read_text())
|
|
meta["archived_at"] = datetime.now().isoformat()
|
|
meta["status"] = status
|
|
meta_file.write_text(json.dumps(meta, indent=2))
|
|
|
|
print(f"[watchdog] Archived {task_id} to {status}/")
|
|
return True
|
|
except Exception as e:
|
|
print(f"[watchdog] Archive failed: {e}")
|
|
return False
|
|
|
|
# --- Heartbeat Update ---
|
|
|
|
def update_heartbeat(self, project: str, task_id: str, step: str = "") -> bool:
|
|
"""Update heartbeat for a task (called by running agent)."""
|
|
conductor_base = self.get_conductor_base(project)
|
|
heartbeat_file = conductor_base / "active" / task_id / "heartbeat.json"
|
|
|
|
if not heartbeat_file.parent.exists():
|
|
return False
|
|
|
|
heartbeat = {
|
|
"ts": time.time(),
|
|
"step": step,
|
|
}
|
|
|
|
try:
|
|
# Atomic write
|
|
tmp_file = heartbeat_file.with_suffix(".json.tmp")
|
|
with open(tmp_file, "w") as f:
|
|
json.dump(heartbeat, f)
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
os.rename(tmp_file, heartbeat_file)
|
|
return True
|
|
except Exception as e:
|
|
print(f"[watchdog] Heartbeat update failed: {e}")
|
|
return False
|
|
|
|
|
|
# CLI interface
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
watchdog = ConductorWatchdog()
|
|
|
|
if len(sys.argv) < 2:
|
|
print("Usage:")
|
|
print(" watchdog.py scan Scan all projects for stalled tasks")
|
|
print(" watchdog.py scan <project> Scan specific project")
|
|
print(" watchdog.py list List all active tasks")
|
|
print(" watchdog.py escalate <project> <task_id> <reason>")
|
|
print(" watchdog.py archive <project> <task_id> [status]")
|
|
print(" watchdog.py heartbeat <project> <task_id> [step]")
|
|
sys.exit(0)
|
|
|
|
cmd = sys.argv[1]
|
|
|
|
if cmd == "scan":
|
|
if len(sys.argv) > 2:
|
|
project = sys.argv[2]
|
|
tasks = watchdog.scan_project(project)
|
|
for task in tasks:
|
|
status = "STALLED" if task["stalled"] else "OK"
|
|
print(f"[{status}] {task['id']}: {task.get('progress_summary', 'No progress')}")
|
|
else:
|
|
stalled = watchdog.scan_all_projects()
|
|
if stalled:
|
|
print(f"Found {len(stalled)} stalled tasks:")
|
|
for task_id, project, reason in stalled:
|
|
print(f" {project}/{task_id}: {reason}")
|
|
else:
|
|
print("No stalled tasks found")
|
|
|
|
elif cmd == "list":
|
|
tasks = watchdog.get_all_active_tasks()
|
|
if tasks:
|
|
print(f"Active tasks ({len(tasks)}):")
|
|
for task in tasks:
|
|
age = task.get("heartbeat_age")
|
|
age_str = f"{int(age)}s ago" if age else "no heartbeat"
|
|
status = "STALLED" if task["stalled"] else "OK"
|
|
print(f" [{status}] {task['project']}/{task['id']}: {age_str}")
|
|
else:
|
|
print("No active tasks")
|
|
|
|
elif cmd == "escalate" and len(sys.argv) >= 5:
|
|
project, task_id, reason = sys.argv[2], sys.argv[3], sys.argv[4]
|
|
watchdog.escalate(project, task_id, reason)
|
|
|
|
elif cmd == "archive" and len(sys.argv) >= 4:
|
|
project, task_id = sys.argv[2], sys.argv[3]
|
|
status = sys.argv[4] if len(sys.argv) > 4 else "completed"
|
|
watchdog.archive_task(project, task_id, status)
|
|
|
|
elif cmd == "heartbeat" and len(sys.argv) >= 4:
|
|
project, task_id = sys.argv[2], sys.argv[3]
|
|
step = sys.argv[4] if len(sys.argv) > 4 else ""
|
|
watchdog.update_heartbeat(project, task_id, step)
|
|
|
|
else:
|
|
print(f"Unknown command: {cmd}")
|
|
sys.exit(1)
|