Files
luzia/lib/watchdog.py
admin ec33ac1936 Refactor cockpit to use DockerTmuxController pattern
Based on claude-code-tools TmuxCLIController, this refactor:

- Added DockerTmuxController class for robust tmux session management
- Implements send_keys() with configurable delay_enter
- Implements capture_pane() for output retrieval
- Implements wait_for_prompt() for pattern-based completion detection
- Implements wait_for_idle() for content-hash-based idle detection
- Implements wait_for_shell_prompt() for shell prompt detection

Also includes workflow improvements:
- Pre-task git snapshot before agent execution
- Post-task commit protocol in agent guidelines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:42:16 -03:00

435 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Conductor Watchdog - Heartbeat monitoring for Luzia tasks
Monitors active tasks in conductor directories and escalates stalled tasks.
Features:
- Scans heartbeat.json files for liveness
- Reads progress.md for semantic status
- Escalates via assistant-channel MCP when stalled
- Archives failed/completed tasks
Usage:
from watchdog import ConductorWatchdog
watchdog = ConductorWatchdog()
stalled = watchdog.scan_all_projects()
for task_id, project, reason in stalled:
watchdog.escalate(project, task_id, reason)
"""
import json
import os
import re
import shutil
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
def validate_project_name(project: str) -> bool:
"""
Validate project name to prevent path traversal attacks.
Rules:
- Must be alphanumeric with hyphens/underscores only
- Cannot contain path separators or dots
- Must be 1-32 characters
- Cannot start with hyphen or underscore
"""
if not project or len(project) > 32:
return False
# Only allow alphanumeric, hyphen, underscore; must start with letter
if not re.match(r'^[a-zA-Z][a-zA-Z0-9_-]*$', project):
return False
# Extra check: no path components
if '/' in project or '\\' in project or '..' in project:
return False
return True
class ConductorWatchdog:
"""Monitor conductor tasks for stalls and liveness."""
STALL_TIMEOUT = 600 # 10 minutes
PROJECTS_BASE = Path("/home")
QUEUE_CONFIG = Path("/var/lib/luzia/queue/config.json")
def __init__(self, stall_timeout: int = None):
self.stall_timeout = stall_timeout or self._load_stall_timeout()
def _load_stall_timeout(self) -> int:
"""Load stall timeout from queue config."""
if self.QUEUE_CONFIG.exists():
try:
config = json.loads(self.QUEUE_CONFIG.read_text())
return config.get("stall_timeout_seconds", self.STALL_TIMEOUT)
except (json.JSONDecodeError, IOError):
pass
return self.STALL_TIMEOUT
def _get_project_users(self) -> List[str]:
"""Get list of project users (non-system users with home dirs)."""
projects = []
for home_dir in self.PROJECTS_BASE.iterdir():
try:
if not home_dir.is_dir():
continue
# Skip system users
if home_dir.name in ("admin", "root", "ubuntu", "lost+found", "guest"):
continue
# Check if has conductor directory
if (home_dir / "conductor").exists():
projects.append(home_dir.name)
except PermissionError:
# Skip directories we can't access
continue
return projects
def get_conductor_base(self, project: str) -> Path:
"""Get conductor base directory for a project."""
return self.PROJECTS_BASE / project / "conductor"
# --- Task State Reading ---
def read_task_state(self, task_dir: Path) -> Dict[str, Any]:
"""Read complete task state from conductor directory."""
state = {
"id": task_dir.name,
"path": str(task_dir),
"meta": {},
"heartbeat": None,
"heartbeat_age": None,
"progress": "",
"progress_summary": "",
"stalled": False,
"stall_reason": None,
}
# Read meta.json
meta_file = task_dir / "meta.json"
if meta_file.exists():
try:
state["meta"] = json.loads(meta_file.read_text())
except (json.JSONDecodeError, IOError):
state["meta"] = {"error": "corrupt meta.json"}
# Read heartbeat.json
heartbeat_file = task_dir / "heartbeat.json"
if heartbeat_file.exists():
try:
heartbeat = json.loads(heartbeat_file.read_text())
state["heartbeat"] = heartbeat
state["heartbeat_age"] = time.time() - heartbeat.get("ts", 0)
# Check if stalled
if state["heartbeat_age"] > self.stall_timeout:
state["stalled"] = True
state["stall_reason"] = f"no_heartbeat_{int(state['heartbeat_age'])}s"
except (json.JSONDecodeError, IOError):
state["stalled"] = True
state["stall_reason"] = "corrupt_heartbeat"
else:
state["stalled"] = True
state["stall_reason"] = "missing_heartbeat"
# Read progress.md
progress_file = task_dir / "progress.md"
if progress_file.exists():
state["progress"] = progress_file.read_text()
state["progress_summary"] = self._extract_progress_summary(state["progress"])
return state
def _extract_progress_summary(self, progress_md: str) -> str:
"""Extract last milestone or current status from progress.md."""
# Find completed milestones
completed = re.findall(r"- \[x\] (.+)", progress_md, re.IGNORECASE)
if completed:
return f"Completed: {completed[-1]}"
# Find current status section
status_match = re.search(
r"## Current Status\s*\n(.+?)(?:\n#|\Z)", progress_md, re.DOTALL
)
if status_match:
status = status_match.group(1).strip().split("\n")[0]
return status[:100]
return "No progress recorded"
# --- Scanning ---
def scan_project(self, project: str) -> List[Dict[str, Any]]:
"""Scan all active tasks for a project."""
conductor_base = self.get_conductor_base(project)
active_dir = conductor_base / "active"
if not active_dir.exists():
return []
tasks = []
for task_dir in active_dir.iterdir():
if task_dir.is_dir():
state = self.read_task_state(task_dir)
state["project"] = project
tasks.append(state)
return tasks
def scan_all_projects(self) -> List[Tuple[str, str, str]]:
"""
Scan all projects for stalled tasks.
Returns: List of (task_id, project, stall_reason)
"""
stalled = []
for project in self._get_project_users():
for task in self.scan_project(project):
if task.get("stalled"):
stalled.append((
task["id"],
project,
task.get("stall_reason", "unknown")
))
return stalled
def get_all_active_tasks(self) -> List[Dict[str, Any]]:
"""Get all active tasks across all projects."""
tasks = []
for project in self._get_project_users():
tasks.extend(self.scan_project(project))
return tasks
# --- Escalation ---
def escalate(self, project: str, task_id: str, reason: str) -> bool:
"""
Escalate stalled task via assistant-channel.
Returns True if escalation was sent.
"""
# SECURITY: Validate inputs to prevent path traversal
if not validate_project_name(project):
print(f"[watchdog] Invalid project name: {project}")
return False
if not re.match(r'^[a-zA-Z0-9_-]+$', task_id):
print(f"[watchdog] Invalid task_id: {task_id}")
return False
conductor_base = self.get_conductor_base(project)
task_dir = conductor_base / "active" / task_id
# Read task details
meta = {}
if (task_dir / "meta.json").exists():
try:
meta = json.loads((task_dir / "meta.json").read_text())
except (json.JSONDecodeError, IOError):
pass
prompt = meta.get("prompt", "Unknown task")[:200]
started = meta.get("started", "Unknown")
# Format message
message = f"""**Task Stalled Alert**
Project: `{project}`
Task ID: `{task_id}`
Reason: `{reason}`
Started: {started}
Prompt: {prompt}...
**Action Required:** Check task status or restart.
"""
# Try to send via assistant-channel
try:
# Import here to avoid circular dependency
import subprocess
# SECURITY: Pass arguments via argv, not string interpolation
# This prevents RCE via malicious project names or prompts
escalation_script = """
import sys
sys.path.insert(0, '/opt/server-agents/mcp-servers/assistant-channel')
from channel import send_message
sender = sys.argv[1]
msg = sys.argv[2]
send_message(sender, msg, 'high')
"""
result = subprocess.run(
["python3", "-c", escalation_script, f"watchdog-{project}", message],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
print(f"[watchdog] Escalated {task_id} from {project}")
return True
else:
print(f"[watchdog] Escalation failed: {result.stderr}")
except Exception as e:
print(f"[watchdog] Escalation error: {e}")
# Fallback: write escalation to file
escalation_file = task_dir / "escalation.json"
escalation = {
"reason": reason,
"escalated_at": datetime.now().isoformat(),
"message": message,
}
try:
escalation_file.write_text(json.dumps(escalation, indent=2))
print(f"[watchdog] Wrote escalation to {escalation_file}")
return True
except IOError as e:
print(f"[watchdog] Cannot write escalation: {e}")
return False
# --- Archive ---
def archive_task(
self, project: str, task_id: str, status: str = "completed"
) -> bool:
"""
Move task from active to completed or failed.
Args:
project: Project name
task_id: Task ID
status: 'completed' or 'failed'
Returns True if archived successfully.
"""
# SECURITY: Validate inputs to prevent path traversal
if not validate_project_name(project):
print(f"[watchdog] Invalid project name: {project}")
return False
if not re.match(r'^[a-zA-Z0-9_-]+$', task_id):
print(f"[watchdog] Invalid task_id: {task_id}")
return False
if status not in ("completed", "failed"):
print(f"[watchdog] Invalid status: {status}")
return False
conductor_base = self.get_conductor_base(project)
source = conductor_base / "active" / task_id
dest = conductor_base / status / task_id
if not source.exists():
return False
try:
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(source), str(dest))
# Update meta with archive timestamp
meta_file = dest / "meta.json"
if meta_file.exists():
meta = json.loads(meta_file.read_text())
meta["archived_at"] = datetime.now().isoformat()
meta["status"] = status
meta_file.write_text(json.dumps(meta, indent=2))
print(f"[watchdog] Archived {task_id} to {status}/")
return True
except Exception as e:
print(f"[watchdog] Archive failed: {e}")
return False
# --- Heartbeat Update ---
def update_heartbeat(self, project: str, task_id: str, step: str = "") -> bool:
"""Update heartbeat for a task (called by running agent)."""
conductor_base = self.get_conductor_base(project)
heartbeat_file = conductor_base / "active" / task_id / "heartbeat.json"
if not heartbeat_file.parent.exists():
return False
heartbeat = {
"ts": time.time(),
"step": step,
}
try:
# Atomic write
tmp_file = heartbeat_file.with_suffix(".json.tmp")
with open(tmp_file, "w") as f:
json.dump(heartbeat, f)
f.flush()
os.fsync(f.fileno())
os.rename(tmp_file, heartbeat_file)
return True
except Exception as e:
print(f"[watchdog] Heartbeat update failed: {e}")
return False
# CLI interface
if __name__ == "__main__":
import sys
watchdog = ConductorWatchdog()
if len(sys.argv) < 2:
print("Usage:")
print(" watchdog.py scan Scan all projects for stalled tasks")
print(" watchdog.py scan <project> Scan specific project")
print(" watchdog.py list List all active tasks")
print(" watchdog.py escalate <project> <task_id> <reason>")
print(" watchdog.py archive <project> <task_id> [status]")
print(" watchdog.py heartbeat <project> <task_id> [step]")
sys.exit(0)
cmd = sys.argv[1]
if cmd == "scan":
if len(sys.argv) > 2:
project = sys.argv[2]
tasks = watchdog.scan_project(project)
for task in tasks:
status = "STALLED" if task["stalled"] else "OK"
print(f"[{status}] {task['id']}: {task.get('progress_summary', 'No progress')}")
else:
stalled = watchdog.scan_all_projects()
if stalled:
print(f"Found {len(stalled)} stalled tasks:")
for task_id, project, reason in stalled:
print(f" {project}/{task_id}: {reason}")
else:
print("No stalled tasks found")
elif cmd == "list":
tasks = watchdog.get_all_active_tasks()
if tasks:
print(f"Active tasks ({len(tasks)}):")
for task in tasks:
age = task.get("heartbeat_age")
age_str = f"{int(age)}s ago" if age else "no heartbeat"
status = "STALLED" if task["stalled"] else "OK"
print(f" [{status}] {task['project']}/{task['id']}: {age_str}")
else:
print("No active tasks")
elif cmd == "escalate" and len(sys.argv) >= 5:
project, task_id, reason = sys.argv[2], sys.argv[3], sys.argv[4]
watchdog.escalate(project, task_id, reason)
elif cmd == "archive" and len(sys.argv) >= 4:
project, task_id = sys.argv[2], sys.argv[3]
status = sys.argv[4] if len(sys.argv) > 4 else "completed"
watchdog.archive_task(project, task_id, status)
elif cmd == "heartbeat" and len(sys.argv) >= 4:
project, task_id = sys.argv[2], sys.argv[3]
step = sys.argv[4] if len(sys.argv) > 4 else ""
watchdog.update_heartbeat(project, task_id, step)
else:
print(f"Unknown command: {cmd}")
sys.exit(1)