Files
luzia/bin/luzia.backup-20260108-123231
admin ec33ac1936 Refactor cockpit to use DockerTmuxController pattern
Based on claude-code-tools TmuxCLIController, this refactor:

- Added DockerTmuxController class for robust tmux session management
- Implements send_keys() with configurable delay_enter
- Implements capture_pane() for output retrieval
- Implements wait_for_prompt() for pattern-based completion detection
- Implements wait_for_idle() for content-hash-based idle detection
- Implements wait_for_shell_prompt() for shell prompt detection

Also includes workflow improvements:
- Pre-task git snapshot before agent execution
- Post-task commit protocol in agent guidelines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:42:16 -03:00

3372 lines
115 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Luzia - Unified Access Point for All Tasks
Pattern-based routing dispatcher:
luzia <project> <task> Execute task in project's Docker container
luzia work on <project> Interactive session (delegates to subagent)
luzia list/status/stop Management commands
luzia think deep <topic> Deep reasoning via Zen + Gemini 3
luzia history <project> View project change history
Maintenance Commands:
luzia cleanup Full maintenance (jobs + containers + logs)
luzia cleanup jobs Clean old job directories only
luzia cleanup containers Stop stale containers only
luzia cleanup --dry-run Preview without deleting
luzia maintenance Show maintenance status and recommendations
luzia jobs [job_id] List jobs or show specific job
luzia kill <job_id> Kill a running agent
Failure Management (Smart Retry):
luzia failures List recent failures with exit codes
luzia failures <job_id> Show failure details
luzia failures --summary Summary by exit code
luzia failures --auto-retry Auto-retry all fixable failures
luzia retry <job_id> Retry a specific failed job
QA & Documentation (Knowledge Graph):
luzia qa Run QA validation checks
luzia qa --sync Sync code to knowledge graph
luzia docs <query> Search all knowledge graphs
luzia docs sysadmin <query> Search sysadmin domain
luzia docs --show <name> Show entity details
luzia docs --stats Show KG statistics
luzia docs --sync Sync .md files to KG
Research Commands (3-Phase Flow with Knowledge Graph):
luzia research [project] <topic> Start research (context -> search -> synthesize)
luzia deep research [project] <topic> Same as research
luzia web research [project] <topic> Same as research
luzia research-list [project] List research sessions
luzia research-show <session_id> Show research session details
luzia research-knowledge [project] Show project knowledge graph
Research Management (called during flow):
luzia research-update <id> <phase> <json> Update research phase
luzia research-graph <id> <json> Add entities to knowledge graph
Use --verbose flag for detailed output.
"""
import json
import os
import sys
import subprocess
import re
import sqlite3
import uuid
import time as time_module
import shutil
from pathlib import Path
from typing import Optional, Dict, Any, Tuple, Callable
from datetime import datetime
# Add lib to path - resolve symlinks to get real path
script_path = Path(__file__).resolve()
lib_path = script_path.parent.parent / "lib"
sys.path.insert(0, str(lib_path))
# ANSI color codes
class Color:
@staticmethod
def hex_to_ansi(hex_color: str) -> str:
"""Convert hex color to ANSI 256 color code"""
hex_color = hex_color.lstrip('#')
r, g, b = int(hex_color[0:2], 16), int(hex_color[2:4], 16), int(hex_color[4:6], 16)
return f"\033[38;2;{r};{g};{b}m"
@staticmethod
def reset() -> str:
return "\033[0m"
@staticmethod
def bold(text: str, color: str = "") -> str:
return f"\033[1m{color}{text}{Color.reset()}"
@staticmethod
def output(text: str, color: str) -> str:
return f"{color}{text}{Color.reset()}"
try:
from docker_bridge import DockerBridge, cleanup_idle_containers, list_project_containers
except ImportError as e:
print(f"Error: Could not import docker_bridge module: {e}")
print(f"Lib path: {lib_path}")
print("Make sure /opt/server-agents/orchestrator/lib/docker_bridge.py exists")
sys.exit(1)
CONFIG_PATH = Path("/opt/server-agents/orchestrator/config.json")
LOG_DIR = Path("/var/log/luz-orchestrator")
JOBS_DIR = Path("/var/log/luz-orchestrator/jobs")
PROJECTS_KG_PATH = Path("/etc/zen-swarm/memory/projects.db")
# Global state
LOG_DIR.mkdir(parents=True, exist_ok=True)
JOBS_DIR.mkdir(parents=True, exist_ok=True)
VERBOSE = False
BACKGROUND = True # Default: dispatch immediately
# --- Knowledge Graph Functions ---
def _kg_get_or_create_entity(conn, name: str, entity_type: str = None) -> str:
"""Get or create an entity in the knowledge graph"""
c = conn.cursor()
c.execute("SELECT id FROM entities WHERE name = ?", (name,))
row = c.fetchone()
if row:
return row[0]
entity_id = str(uuid.uuid4())
c.execute("INSERT INTO entities (id, name, type, created_at) VALUES (?, ?, ?, ?)",
(entity_id, name, entity_type, time_module.time()))
return entity_id
# Retention: keep max 100 changes per project, 30 days max age
KG_MAX_CHANGES_PER_PROJECT = 100
KG_MAX_AGE_DAYS = 30
# Job maintenance settings
JOB_MAX_AGE_DAYS = 3 # Keep completed jobs for 3 days
JOB_FAILED_MAX_AGE_DAYS = 7 # Keep failed jobs longer for debugging
JOB_MAX_COUNT = 50 # Always keep at least last 50 jobs
CONTAINER_MAX_LIFETIME_HOURS = 24 # Max container lifetime
NOTIFICATION_LOG_MAX_LINES = 1000 # Max lines in notifications.log
# Research knowledge graph path (separate from project changes)
RESEARCH_KG_PATH = Path("/etc/zen-swarm/memory/research.db")
def _kg_prune_old_changes(conn, project_id: str):
"""Prune old change events for a project (retention policy)"""
c = conn.cursor()
now = time_module.time()
max_age_seconds = KG_MAX_AGE_DAYS * 24 * 60 * 60
# Delete relations older than max age
c.execute('''
DELETE FROM relations
WHERE source_id = ? AND created_at < ?
''', (project_id, now - max_age_seconds))
# Keep only the most recent N changes per project
c.execute('''
DELETE FROM relations WHERE id IN (
SELECT r.id FROM relations r
WHERE r.source_id = ?
ORDER BY r.created_at DESC
LIMIT -1 OFFSET ?
)
''', (project_id, KG_MAX_CHANGES_PER_PROJECT))
# Clean up orphaned change_event entities (no relations pointing to them)
c.execute('''
DELETE FROM entities WHERE type = 'change_event' AND id NOT IN (
SELECT target_id FROM relations
)
''')
def log_project_change(project: str, change_type: str, description: str, details: str = None):
"""
Log a change to a project's knowledge graph.
Automatically prunes old entries (>30 days or >100 per project).
Args:
project: Project name (e.g., 'musica', 'overbits')
change_type: Type of change (e.g., 'config_update', 'file_modified', 'deployment')
description: Human-readable description of the change
details: Optional additional details/context
"""
try:
# Ensure KB exists
PROJECTS_KG_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(PROJECTS_KG_PATH)
c = conn.cursor()
# Ensure tables exist
c.execute('''CREATE TABLE IF NOT EXISTS entities (
id TEXT PRIMARY KEY, name TEXT UNIQUE NOT NULL, type TEXT, created_at REAL
)''')
c.execute('''CREATE TABLE IF NOT EXISTS relations (
id TEXT PRIMARY KEY, source_id TEXT, target_id TEXT, relation TEXT NOT NULL,
weight INTEGER DEFAULT 1, context TEXT, created_at REAL
)''')
# Create entities
project_id = _kg_get_or_create_entity(conn, project, "project")
change_name = f"{project}:{change_type}:{datetime.now().strftime('%Y%m%d_%H%M%S')}"
change_id = _kg_get_or_create_entity(conn, change_name, "change_event")
# Build context with timestamp and details
context = json.dumps({
"timestamp": datetime.now().isoformat(),
"description": description,
"details": details,
"source": "luzia"
})
# Create relation: project -> has_change -> change_event
rel_id = str(uuid.uuid4())
c.execute('''INSERT INTO relations (id, source_id, target_id, relation, weight, context, created_at)
VALUES (?, ?, ?, ?, 1, ?, ?)''',
(rel_id, project_id, change_id, f"has_{change_type}", context, time_module.time()))
# Prune old entries (retention policy)
_kg_prune_old_changes(conn, project_id)
conn.commit()
conn.close()
_log(f" [KB] Logged {change_type} for {project}", verbose_only=True)
return True
except Exception as e:
_log(f" [KB] Warning: Could not log to knowledge graph: {e}", verbose_only=True)
return False
def get_project_changes(project: str, limit: int = 10) -> list:
"""Get recent changes for a project from the knowledge graph"""
try:
if not PROJECTS_KG_PATH.exists():
return []
conn = sqlite3.connect(PROJECTS_KG_PATH)
c = conn.cursor()
c.execute('''
SELECT e2.name, r.relation, r.context, r.created_at
FROM entities e1
JOIN relations r ON e1.id = r.source_id
JOIN entities e2 ON r.target_id = e2.id
WHERE e1.name = ? AND e1.type = 'project'
ORDER BY r.created_at DESC
LIMIT ?
''', (project, limit))
results = []
for row in c.fetchall():
try:
ctx = json.loads(row[2]) if row[2] else {}
except:
ctx = {"raw": row[2]}
results.append({
"event": row[0],
"relation": row[1],
"context": ctx,
"timestamp": row[3]
})
conn.close()
return results
except Exception as e:
return []
# --- Research Knowledge Graph Functions ---
def _init_research_db():
"""Initialize research knowledge graph database"""
RESEARCH_KG_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(RESEARCH_KG_PATH)
c = conn.cursor()
# Research sessions table
c.execute('''CREATE TABLE IF NOT EXISTS research_sessions (
id TEXT PRIMARY KEY,
project TEXT NOT NULL,
topic TEXT NOT NULL,
status TEXT DEFAULT 'pending',
created_at REAL,
updated_at REAL,
phase TEXT DEFAULT 'init',
context_expansion TEXT,
search_branches TEXT,
final_synthesis TEXT
)''')
# Research findings table (linked to sessions)
c.execute('''CREATE TABLE IF NOT EXISTS research_findings (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
phase TEXT NOT NULL,
finding_type TEXT,
content TEXT,
source TEXT,
confidence REAL DEFAULT 0.5,
created_at REAL,
FOREIGN KEY (session_id) REFERENCES research_sessions(id)
)''')
# Research graph nodes (concepts, entities discovered)
c.execute('''CREATE TABLE IF NOT EXISTS research_nodes (
id TEXT PRIMARY KEY,
session_id TEXT,
project TEXT,
name TEXT NOT NULL,
node_type TEXT,
description TEXT,
embedding TEXT,
created_at REAL
)''')
# Research graph edges (relationships between nodes)
c.execute('''CREATE TABLE IF NOT EXISTS research_edges (
id TEXT PRIMARY KEY,
source_id TEXT NOT NULL,
target_id TEXT NOT NULL,
relation TEXT NOT NULL,
weight REAL DEFAULT 1.0,
context TEXT,
created_at REAL,
FOREIGN KEY (source_id) REFERENCES research_nodes(id),
FOREIGN KEY (target_id) REFERENCES research_nodes(id)
)''')
# Index for faster lookups
c.execute('CREATE INDEX IF NOT EXISTS idx_sessions_project ON research_sessions(project)')
c.execute('CREATE INDEX IF NOT EXISTS idx_findings_session ON research_findings(session_id)')
c.execute('CREATE INDEX IF NOT EXISTS idx_nodes_project ON research_nodes(project)')
conn.commit()
return conn
def create_research_session(project: str, topic: str) -> str:
"""Create a new research session for a project"""
conn = _init_research_db()
c = conn.cursor()
session_id = str(uuid.uuid4())[:8]
now = time_module.time()
c.execute('''INSERT INTO research_sessions
(id, project, topic, status, created_at, updated_at, phase)
VALUES (?, ?, ?, 'active', ?, ?, 'init')''',
(session_id, project, topic, now, now))
conn.commit()
conn.close()
return session_id
def update_research_phase(session_id: str, phase: str, data: dict):
"""Update research session with phase results"""
conn = _init_research_db()
c = conn.cursor()
now = time_module.time()
if phase == 'context_expansion':
c.execute('''UPDATE research_sessions
SET phase = ?, context_expansion = ?, updated_at = ?
WHERE id = ?''',
(phase, json.dumps(data), now, session_id))
elif phase == 'search_branches':
c.execute('''UPDATE research_sessions
SET phase = ?, search_branches = ?, updated_at = ?
WHERE id = ?''',
(phase, json.dumps(data), now, session_id))
elif phase == 'final_synthesis':
c.execute('''UPDATE research_sessions
SET phase = ?, final_synthesis = ?, status = 'completed', updated_at = ?
WHERE id = ?''',
(phase, json.dumps(data), now, session_id))
conn.commit()
conn.close()
def add_research_finding(session_id: str, phase: str, finding_type: str,
content: str, source: str = None, confidence: float = 0.5):
"""Add a finding to a research session"""
conn = _init_research_db()
c = conn.cursor()
finding_id = str(uuid.uuid4())
now = time_module.time()
c.execute('''INSERT INTO research_findings
(id, session_id, phase, finding_type, content, source, confidence, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
(finding_id, session_id, phase, finding_type, content, source, confidence, now))
conn.commit()
conn.close()
return finding_id
def add_research_node(session_id: str, project: str, name: str,
node_type: str, description: str = None) -> str:
"""Add a concept/entity node to the research graph"""
conn = _init_research_db()
c = conn.cursor()
# Check if node already exists for this project
c.execute('SELECT id FROM research_nodes WHERE project = ? AND name = ?',
(project, name))
existing = c.fetchone()
if existing:
conn.close()
return existing[0]
node_id = str(uuid.uuid4())
now = time_module.time()
c.execute('''INSERT INTO research_nodes
(id, session_id, project, name, node_type, description, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)''',
(node_id, session_id, project, name, node_type, description, now))
conn.commit()
conn.close()
return node_id
def add_research_edge(source_id: str, target_id: str, relation: str,
context: str = None, weight: float = 1.0):
"""Add a relationship edge between research nodes"""
conn = _init_research_db()
c = conn.cursor()
edge_id = str(uuid.uuid4())
now = time_module.time()
c.execute('''INSERT INTO research_edges
(id, source_id, target_id, relation, weight, context, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)''',
(edge_id, source_id, target_id, relation, weight, context, now))
conn.commit()
conn.close()
return edge_id
def get_project_research_context(project: str, limit: int = 5) -> list:
"""Get recent research sessions and their findings for a project"""
try:
if not RESEARCH_KG_PATH.exists():
return []
conn = sqlite3.connect(RESEARCH_KG_PATH)
c = conn.cursor()
c.execute('''SELECT id, topic, status, phase, context_expansion,
search_branches, final_synthesis, created_at
FROM research_sessions
WHERE project = ?
ORDER BY created_at DESC
LIMIT ?''', (project, limit))
sessions = []
for row in c.fetchall():
session = {
"id": row[0],
"topic": row[1],
"status": row[2],
"phase": row[3],
"context_expansion": json.loads(row[4]) if row[4] else None,
"search_branches": json.loads(row[5]) if row[5] else None,
"final_synthesis": json.loads(row[6]) if row[6] else None,
"created_at": row[7]
}
sessions.append(session)
conn.close()
return sessions
except Exception as e:
return []
def get_research_graph(project: str) -> dict:
"""Get the research knowledge graph for a project"""
try:
if not RESEARCH_KG_PATH.exists():
return {"nodes": [], "edges": []}
conn = sqlite3.connect(RESEARCH_KG_PATH)
c = conn.cursor()
# Get nodes
c.execute('''SELECT id, name, node_type, description
FROM research_nodes WHERE project = ?''', (project,))
nodes = [{"id": r[0], "name": r[1], "type": r[2], "description": r[3]}
for r in c.fetchall()]
# Get edges for these nodes
node_ids = [n["id"] for n in nodes]
if node_ids:
placeholders = ','.join('?' * len(node_ids))
c.execute(f'''SELECT source_id, target_id, relation, weight
FROM research_edges
WHERE source_id IN ({placeholders})''', node_ids)
edges = [{"source": r[0], "target": r[1], "relation": r[2], "weight": r[3]}
for r in c.fetchall()]
else:
edges = []
conn.close()
return {"nodes": nodes, "edges": edges}
except Exception as e:
return {"nodes": [], "edges": []}
def load_config() -> dict:
"""Load orchestrator configuration"""
try:
with open(CONFIG_PATH) as f:
return json.load(f)
except Exception as e:
print(f"Error loading config: {e}")
sys.exit(1)
def _log(msg: str, verbose_only: bool = False):
"""Conditionally print verbose messages"""
if verbose_only and not VERBOSE:
return
print(msg)
# --- Maintenance Functions ---
def _get_actual_job_status(job_dir: Path) -> str:
"""Get actual job status by checking output.log for exit code.
This is needed because meta.json status isn't updated when job completes.
The job's shell script appends "exit:<code>" to output.log on completion.
"""
output_file = job_dir / "output.log"
meta_file = job_dir / "meta.json"
# Start with meta.json status
status = "unknown"
if meta_file.exists():
try:
with open(meta_file) as f:
meta = json.load(f)
status = meta.get("status", "unknown")
except:
pass
# Check output.log for actual completion
if output_file.exists():
try:
content = output_file.read_text()
if "exit:" in content:
# Find exit code to determine if failed
lines = content.strip().split("\n")
for line in reversed(lines):
if line.startswith("exit:"):
exit_code = int(line.split(":")[1])
if exit_code == 0:
return "completed"
elif exit_code == -9:
return "killed"
else:
return "failed"
except:
pass
return status
def cleanup_old_jobs(dry_run: bool = False) -> dict:
"""
Clean up old job directories based on retention policy.
Policy:
- Never delete running jobs
- Keep last JOB_MAX_COUNT jobs regardless of age
- Delete completed jobs older than JOB_MAX_AGE_DAYS
- Delete failed jobs older than JOB_FAILED_MAX_AGE_DAYS
Returns dict with cleanup statistics.
"""
stats = {"checked": 0, "deleted": 0, "kept": 0, "errors": 0, "bytes_freed": 0}
if not JOBS_DIR.exists():
return stats
# Collect all jobs with metadata
jobs = []
for job_dir in JOBS_DIR.iterdir():
if not job_dir.is_dir():
continue
meta_file = job_dir / "meta.json"
if not meta_file.exists():
continue
try:
with open(meta_file) as f:
meta = json.load(f)
# Get actual status by checking output.log
actual_status = _get_actual_job_status(job_dir)
meta["status"] = actual_status
# Calculate directory size
dir_size = sum(f.stat().st_size for f in job_dir.rglob('*') if f.is_file())
jobs.append({
"dir": job_dir,
"meta": meta,
"size": dir_size,
"started": meta.get("started", "1970-01-01T00:00:00")
})
except Exception as e:
_log(f" Warning: Could not read {meta_file}: {e}", verbose_only=True)
stats["errors"] += 1
# Sort by start time (newest first)
jobs.sort(key=lambda x: x["started"], reverse=True)
now = datetime.now()
kept_count = 0
for job in jobs:
stats["checked"] += 1
job_dir = job["dir"]
meta = job["meta"]
status = meta.get("status", "unknown")
# Parse start time
try:
started = datetime.fromisoformat(meta.get("started", "1970-01-01T00:00:00"))
except:
started = datetime.fromtimestamp(0)
age_days = (now - started).total_seconds() / 86400
# Decision logic
should_delete = False
reason = ""
# Never delete running jobs
if status == "running":
reason = "running"
# Always keep first JOB_MAX_COUNT jobs
elif kept_count < JOB_MAX_COUNT:
reason = "within_limit"
kept_count += 1
# Age-based deletion
else:
if status == "failed" and age_days > JOB_FAILED_MAX_AGE_DAYS:
should_delete = True
reason = f"failed_old ({age_days:.1f}d)"
elif status != "failed" and age_days > JOB_MAX_AGE_DAYS:
should_delete = True
reason = f"completed_old ({age_days:.1f}d)"
else:
reason = "recent"
kept_count += 1
if should_delete:
if dry_run:
_log(f" [DRY] Would delete {job_dir.name} ({reason}, {job['size']/1024:.1f}KB)")
else:
try:
shutil.rmtree(job_dir)
stats["deleted"] += 1
stats["bytes_freed"] += job["size"]
_log(f" Deleted {job_dir.name} ({reason})", verbose_only=True)
except Exception as e:
_log(f" Error deleting {job_dir.name}: {e}")
stats["errors"] += 1
else:
stats["kept"] += 1
return stats
def cleanup_stale_containers(max_lifetime_hours: int = CONTAINER_MAX_LIFETIME_HOURS) -> dict:
"""
Stop containers that have exceeded maximum lifetime.
Also cleans up orphaned containers (no matching job record).
Returns dict with cleanup statistics.
"""
stats = {"checked": 0, "stopped": 0, "orphaned": 0, "errors": 0}
containers = list_project_containers()
now = datetime.now()
for container in containers:
stats["checked"] += 1
name = container.get("name", "")
# Parse container creation time
created_str = container.get("created", "")
try:
# Docker returns format like "2025-01-07 16:31:45 +0000 UTC"
created = datetime.strptime(created_str[:19], "%Y-%m-%d %H:%M:%S")
except:
_log(f" Warning: Could not parse creation time for {name}", verbose_only=True)
continue
age_hours = (now - created).total_seconds() / 3600
if age_hours > max_lifetime_hours:
_log(f" Stopping {name} (age: {age_hours:.1f}h > {max_lifetime_hours}h)", verbose_only=True)
try:
subprocess.run(["docker", "stop", name], capture_output=True, timeout=30)
subprocess.run(["docker", "rm", name], capture_output=True, timeout=10)
stats["stopped"] += 1
except Exception as e:
_log(f" Error stopping {name}: {e}")
stats["errors"] += 1
return stats
def rotate_notifications_log(max_lines: int = NOTIFICATION_LOG_MAX_LINES) -> dict:
"""
Rotate notifications.log to keep only the last max_lines.
Returns dict with rotation statistics.
"""
stats = {"rotated": False, "lines_before": 0, "lines_after": 0}
notify_file = LOG_DIR / "notifications.log"
if not notify_file.exists():
return stats
try:
with open(notify_file, "r") as f:
lines = f.readlines()
stats["lines_before"] = len(lines)
if len(lines) > max_lines:
# Keep only last max_lines
with open(notify_file, "w") as f:
f.writelines(lines[-max_lines:])
stats["lines_after"] = max_lines
stats["rotated"] = True
_log(f" Rotated notifications.log: {len(lines)} -> {max_lines} lines", verbose_only=True)
else:
stats["lines_after"] = len(lines)
except Exception as e:
_log(f" Error rotating notifications.log: {e}")
return stats
def get_maintenance_status() -> dict:
"""
Get current maintenance status including:
- Job statistics
- Container status
- Disk usage
- Log file sizes
"""
status = {
"jobs": {"total": 0, "running": 0, "completed": 0, "failed": 0, "oldest_days": 0},
"containers": {"total": 0, "oldest_hours": 0},
"disk": {"jobs_mb": 0, "logs_mb": 0},
"notifications": {"lines": 0}
}
# Job statistics
if JOBS_DIR.exists():
now = datetime.now()
oldest_age = 0
for job_dir in JOBS_DIR.iterdir():
if not job_dir.is_dir():
continue
meta_file = job_dir / "meta.json"
if not meta_file.exists():
continue
try:
with open(meta_file) as f:
meta = json.load(f)
status["jobs"]["total"] += 1
# Get actual status by checking output.log (meta.json isn't updated)
job_status = _get_actual_job_status(job_dir)
if job_status == "running":
status["jobs"]["running"] += 1
elif job_status in ("failed", "killed"):
status["jobs"]["failed"] += 1
else:
status["jobs"]["completed"] += 1
# Calculate age
try:
started = datetime.fromisoformat(meta.get("started", "1970-01-01"))
age_days = (now - started).total_seconds() / 86400
oldest_age = max(oldest_age, age_days)
except:
pass
except:
pass
status["jobs"]["oldest_days"] = round(oldest_age, 1)
# Calculate disk usage
try:
jobs_size = sum(f.stat().st_size for f in JOBS_DIR.rglob('*') if f.is_file())
status["disk"]["jobs_mb"] = round(jobs_size / (1024 * 1024), 2)
except:
pass
# Container statistics
containers = list_project_containers()
status["containers"]["total"] = len(containers)
if containers:
now = datetime.now()
oldest_hours = 0
for c in containers:
try:
created = datetime.strptime(c.get("created", "")[:19], "%Y-%m-%d %H:%M:%S")
age_hours = (now - created).total_seconds() / 3600
oldest_hours = max(oldest_hours, age_hours)
except:
pass
status["containers"]["oldest_hours"] = round(oldest_hours, 1)
# Notification log
notify_file = LOG_DIR / "notifications.log"
if notify_file.exists():
try:
with open(notify_file, "r") as f:
status["notifications"]["lines"] = sum(1 for _ in f)
except:
pass
# Log directory size
try:
logs_size = sum(f.stat().st_size for f in LOG_DIR.glob('*.log') if f.is_file())
status["disk"]["logs_mb"] = round(logs_size / (1024 * 1024), 2)
except:
pass
return status
def run_maintenance(dry_run: bool = False) -> dict:
"""
Run full maintenance cycle:
1. Clean old jobs
2. Stop stale containers
3. Rotate logs
4. Run idle container cleanup
Returns combined statistics.
"""
results = {
"jobs": cleanup_old_jobs(dry_run=dry_run),
"containers": cleanup_stale_containers() if not dry_run else {"skipped": True},
"logs": rotate_notifications_log() if not dry_run else {"skipped": True},
"idle_cleanup": {"done": False}
}
# Also run idle container cleanup
if not dry_run:
try:
cleanup_idle_containers(timeout_minutes=10)
results["idle_cleanup"]["done"] = True
except Exception as e:
results["idle_cleanup"]["error"] = str(e)
return results
def spawn_background_job(project: str, command: str, log_file: Path, job_type: str = "docker") -> str:
"""Spawn a background job, return job ID immediately"""
job_id = datetime.now().strftime("%H%M%S") + "-" + hex(hash(command) & 0xffff)[2:]
job_dir = JOBS_DIR / job_id
job_dir.mkdir(exist_ok=True)
# Write job metadata
with open(job_dir / "meta.json", "w") as f:
json.dump({
"id": job_id,
"project": project,
"command": command,
"type": job_type,
"started": datetime.now().isoformat(),
"status": "running"
}, f)
output_file = job_dir / "output.log"
# Spawn fully detached via nohup - parent exits immediately
os.system(
f'nohup sh -c \'docker exec luzia-{project} bash -c "{command}" > "{output_file}" 2>&1; '
f'echo "exit:$?" >> "{output_file}"\' >/dev/null 2>&1 &'
)
return job_id
def is_claude_dev_task(task: str) -> bool:
"""Detect if a task is related to Claude development (skills, plugins, agents, etc.)
When detected, agents should run with --debug flag for better visibility.
"""
task_lower = task.lower()
# Keywords that indicate Claude/agent development work
claude_dev_keywords = [
# Skills and plugins
'skill', 'plugin', 'command',
# Agent development
'sub-agent', 'subagent', 'agent',
# MCP development
'mcp', 'mcp server', 'mcp-server',
# Claude config
'.claude', 'claude.md', 'claude.json',
# Hooks
'hook',
# Luzia itself
'luzia', 'orchestrat',
# Debug explicitly requested
'debug mode', 'debug flag', 'with debug',
]
return any(kw in task_lower for kw in claude_dev_keywords)
def spawn_claude_agent(project: str, task: str, context: str, config: dict) -> str:
"""Spawn a detached Claude agent to handle a natural language task.
IMPORTANT: Agents run with full permissions (--dangerously-skip-permissions)
regardless of how the parent session was started. This ensures autonomous
background execution without blocking on approval prompts.
SMART DEBUG: For Claude development tasks (skills, plugins, agents, MCP),
automatically enables --debug flag for better visibility.
AUTO-MAINTENANCE: Cleans up old jobs before spawning new ones to prevent
unbounded growth of job directories.
"""
# Run lightweight maintenance before spawning (non-blocking)
# Only clean if we have many jobs to avoid overhead on every spawn
try:
job_count = sum(1 for d in JOBS_DIR.iterdir() if d.is_dir()) if JOBS_DIR.exists() else 0
if job_count > JOB_MAX_COUNT:
cleanup_old_jobs(dry_run=False)
_log(f" [Auto-cleanup] Pruned old jobs (was {job_count})", verbose_only=True)
except Exception as e:
_log(f" [Auto-cleanup] Warning: {e}", verbose_only=True)
job_id = datetime.now().strftime("%H%M%S") + "-" + hex(hash(task) & 0xffff)[2:]
job_dir = JOBS_DIR / job_id
job_dir.mkdir(exist_ok=True)
project_config = config["projects"].get(project, {})
project_path = project_config.get("path", f"/home/{project}")
# Detect Claude development tasks - enable debug for better visibility
debug_mode = is_claude_dev_task(task)
# Build the prompt for the agent
prompt = f"""You are a project agent working on the **{project}** project.
{context}
## Your Task
{task}
## Execution Environment
- You are running directly in the project directory: {project_path}
- You have FULL permission to read, write, and execute files in this directory
- Use standard Claude tools (Read, Write, Edit, Bash) directly - no need for luzia subcommands
- All file operations are pre-authorized - proceed without asking for permission
## Guidelines
- Complete the task autonomously
- If you encounter errors, debug and fix them
- Provide a summary of what was done when complete"""
output_file = job_dir / "output.log"
prompt_file = job_dir / "prompt.txt"
pid_file = job_dir / "pid"
# Write prompt to file for claude to read
with open(prompt_file, "w") as f:
f.write(prompt)
# Spawn Claude agent detached - runs independently of admin CLI
# CRITICAL: Use --dangerously-skip-permissions for autonomous background execution
# This ensures agents don't block on approval prompts regardless of parent session settings
# Track PID, notify on completion
notify_cmd = f'echo "[$(date +%H:%M:%S)] Agent {job_id} finished (exit $exit_code)" >> /var/log/luz-orchestrator/notifications.log'
# Build claude command with appropriate flags
# - Always: --dangerously-skip-permissions (full autonomy)
# - Always: --add-dir for project path (allow file operations in project)
# - Claude dev tasks: --debug (better visibility for skill/plugin/agent work)
debug_flag = "--debug " if debug_mode else ""
# Add project path AND /opt/server-agents to allowed directories
# This ensures agents can read/write project files and access orchestrator tools
claude_cmd = f'claude --dangerously-skip-permissions --add-dir "{project_path}" --add-dir /opt/server-agents {debug_flag}-p'
# Create isolated config directory for this agent to prevent race conditions
# when multiple agents run concurrently (they'd all write to the same .claude.json)
agent_config_dir = job_dir / "claude-config"
agent_config_dir.mkdir(exist_ok=True)
# Copy essential config files to agent's isolated directory
home_claude_json = Path.home() / ".claude.json"
home_claude_dir = Path.home() / ".claude"
home_claude_settings = home_claude_dir / "settings.json"
home_claude_creds = home_claude_dir / ".credentials.json"
if home_claude_json.exists():
shutil.copy(home_claude_json, agent_config_dir / ".claude.json")
# Create .claude subdirectory in agent config
agent_claude_subdir = agent_config_dir / ".claude"
agent_claude_subdir.mkdir(exist_ok=True)
if home_claude_settings.exists():
shutil.copy(home_claude_settings, agent_claude_subdir / "settings.json")
if home_claude_creds.exists():
shutil.copy(home_claude_creds, agent_claude_subdir / ".credentials.json")
# Set CLAUDE_CONFIG_DIR to isolate this agent's config
env_setup = f'export CLAUDE_CONFIG_DIR="{agent_config_dir}"; '
os.system(
f'nohup sh -c \''
f'echo $$ > "{pid_file}"; '
f'{env_setup}'
f'cd "{project_path}" && cat "{prompt_file}" | {claude_cmd} > "{output_file}" 2>&1; '
f'exit_code=$?; echo "exit:$exit_code" >> "{output_file}"; '
f'{notify_cmd}'
f'\' >/dev/null 2>&1 &'
)
# Wait briefly for PID file
import time
time.sleep(0.2)
pid = None
if pid_file.exists():
pid = pid_file.read_text().strip()
# Write job metadata with PID
with open(job_dir / "meta.json", "w") as f:
json.dump({
"id": job_id,
"project": project,
"task": task,
"type": "agent",
"pid": pid,
"started": datetime.now().isoformat(),
"status": "running",
"debug": debug_mode
}, f)
# Log to project knowledge graph
log_project_change(
project=project,
change_type="agent_task",
description=f"Agent task dispatched: {task[:100]}{'...' if len(task) > 100 else ''}",
details=json.dumps({"job_id": job_id, "full_task": task})
)
return job_id
def get_job_status(job_id: str) -> dict:
"""Get status of a background job"""
job_dir = JOBS_DIR / job_id
if not job_dir.exists():
return {"error": f"Job {job_id} not found"}
meta_file = job_dir / "meta.json"
output_file = job_dir / "output.log"
with open(meta_file) as f:
meta = json.load(f)
# Check if completed (look for exit code in output)
if output_file.exists():
content = output_file.read_text()
if "exit:" in content:
lines = content.strip().split("\n")
for line in reversed(lines):
if line.startswith("exit:"):
meta["status"] = "completed"
meta["exit_code"] = int(line.split(":")[1])
break
return meta
def list_jobs() -> list:
"""List all jobs"""
jobs = []
for job_dir in sorted(JOBS_DIR.iterdir(), reverse=True):
if job_dir.is_dir():
meta_file = job_dir / "meta.json"
if meta_file.exists():
status = get_job_status(job_dir.name)
jobs.append(status)
return jobs[:20] # Last 20
def kill_agent(job_id: str) -> dict:
"""Kill a running agent by job ID"""
job_dir = JOBS_DIR / job_id
if not job_dir.exists():
return {"error": f"Job {job_id} not found"}
meta_file = job_dir / "meta.json"
pid_file = job_dir / "pid"
output_file = job_dir / "output.log"
with open(meta_file) as f:
meta = json.load(f)
if meta.get("status") == "completed":
return {"error": f"Job {job_id} already completed"}
# Try to kill by PID
killed = False
if pid_file.exists():
pid = pid_file.read_text().strip()
try:
os.kill(int(pid), 9)
killed = True
except (ProcessLookupError, ValueError):
pass
# Also try to find and kill claude process for this job
result = subprocess.run(
["pgrep", "-f", f"{job_id}"],
capture_output=True, text=True
)
for pid in result.stdout.strip().split("\n"):
if pid:
try:
os.kill(int(pid), 9)
killed = True
except (ProcessLookupError, ValueError):
pass
# Update metadata
meta["status"] = "killed"
meta["killed_at"] = datetime.now().isoformat()
with open(meta_file, "w") as f:
json.dump(meta, f)
# Append to output
with open(output_file, "a") as f:
f.write(f"\n[KILLED at {datetime.now().strftime('%H:%M:%S')}]\nexit:-9\n")
# Notify
notify_file = LOG_DIR / "notifications.log"
with open(notify_file, "a") as f:
f.write(f"[{datetime.now().strftime('%H:%M:%S')}] Agent {job_id} KILLED by user\n")
return {"success": True, "job_id": job_id, "killed": killed}
def get_notifications(limit: int = 10) -> list:
"""Get recent notifications"""
notify_file = LOG_DIR / "notifications.log"
if not notify_file.exists():
return []
lines = notify_file.read_text().strip().split("\n")
return lines[-limit:] if lines else []
# --- Exit Code Classification for Smart Retry ---
# Classify exit codes to determine if failure is retryable
EXIT_CODE_INFO = {
0: {"meaning": "Success", "retryable": False},
1: {"meaning": "General error", "retryable": True, "reason": "Task error - may succeed on retry"},
2: {"meaning": "Shell misuse", "retryable": False, "reason": "Syntax or usage error"},
126: {"meaning": "Permission denied", "retryable": False, "reason": "File not executable"},
127: {"meaning": "Command not found", "retryable": False, "reason": "Missing binary/command"},
128: {"meaning": "Invalid exit code", "retryable": False},
130: {"meaning": "SIGINT (Ctrl+C)", "retryable": True, "reason": "Interrupted - may complete on retry"},
137: {"meaning": "SIGKILL (OOM)", "retryable": True, "reason": "Out of memory - may succeed with less load"},
143: {"meaning": "SIGTERM", "retryable": True, "reason": "Terminated - may succeed on retry"},
254: {"meaning": "Claude CLI error", "retryable": True, "reason": "Claude CLI issue - often transient"},
255: {"meaning": "Exit status out of range", "retryable": False},
-9: {"meaning": "Killed by user", "retryable": False, "reason": "Manually killed - don't auto-retry"},
}
def get_exit_code_info(exit_code: int) -> dict:
"""Get information about an exit code"""
if exit_code in EXIT_CODE_INFO:
return EXIT_CODE_INFO[exit_code]
if 128 <= exit_code <= 192:
signal_num = exit_code - 128
return {"meaning": f"Signal {signal_num}", "retryable": signal_num in [1, 2, 15]}
return {"meaning": "Unknown", "retryable": False}
def is_failure_retryable(exit_code: int) -> tuple:
"""Check if a failure is retryable.
Returns (is_retryable: bool, reason: str)
"""
info = get_exit_code_info(exit_code)
is_retryable = info.get("retryable", False)
reason = info.get("reason", info.get("meaning", "Unknown"))
return is_retryable, reason
def list_failed_jobs(limit: int = 20) -> list:
"""List failed jobs with exit code analysis.
Returns list of failed jobs sorted by time (newest first).
"""
failed_jobs = []
if not JOBS_DIR.exists():
return failed_jobs
for job_dir in sorted(JOBS_DIR.iterdir(), reverse=True):
if not job_dir.is_dir():
continue
meta_file = job_dir / "meta.json"
output_file = job_dir / "output.log"
if not meta_file.exists():
continue
try:
with open(meta_file) as f:
meta = json.load(f)
# Check actual status
actual_status = _get_actual_job_status(job_dir)
if actual_status not in ["failed", "killed"]:
continue
# Extract exit code
exit_code = None
last_output_lines = []
if output_file.exists():
content = output_file.read_text()
lines = content.strip().split("\n")
last_output_lines = lines[-10:] if len(lines) > 10 else lines
for line in reversed(lines):
if line.startswith("exit:"):
exit_code = int(line.split(":")[1])
break
# Get exit code info
exit_info = get_exit_code_info(exit_code) if exit_code is not None else {}
is_retryable, retry_reason = is_failure_retryable(exit_code) if exit_code is not None else (False, "No exit code")
failed_jobs.append({
"id": job_dir.name,
"project": meta.get("project", "unknown"),
"task": meta.get("task", "")[:100],
"started": meta.get("started", "unknown"),
"status": actual_status,
"exit_code": exit_code,
"exit_meaning": exit_info.get("meaning", "Unknown"),
"retryable": is_retryable,
"retry_reason": retry_reason,
"last_output": last_output_lines
})
if len(failed_jobs) >= limit:
break
except Exception as e:
_log(f" Warning: Could not process {job_dir.name}: {e}", verbose_only=True)
return failed_jobs
def get_failure_summary() -> dict:
"""Get summary of failures by exit code"""
summary = {
"total": 0,
"retryable": 0,
"by_exit_code": {},
"by_project": {}
}
if not JOBS_DIR.exists():
return summary
for job_dir in JOBS_DIR.iterdir():
if not job_dir.is_dir():
continue
actual_status = _get_actual_job_status(job_dir)
if actual_status not in ["failed", "killed"]:
continue
meta_file = job_dir / "meta.json"
output_file = job_dir / "output.log"
try:
with open(meta_file) as f:
meta = json.load(f)
project = meta.get("project", "unknown")
exit_code = None
if output_file.exists():
content = output_file.read_text()
for line in reversed(content.strip().split("\n")):
if line.startswith("exit:"):
exit_code = int(line.split(":")[1])
break
summary["total"] += 1
# By exit code
code_str = str(exit_code) if exit_code is not None else "none"
if code_str not in summary["by_exit_code"]:
info = get_exit_code_info(exit_code) if exit_code is not None else {"meaning": "No exit code"}
summary["by_exit_code"][code_str] = {
"count": 0,
"meaning": info.get("meaning", "Unknown"),
"retryable": info.get("retryable", False)
}
summary["by_exit_code"][code_str]["count"] += 1
# By project
if project not in summary["by_project"]:
summary["by_project"][project] = 0
summary["by_project"][project] += 1
# Count retryable
if exit_code is not None:
is_retryable, _ = is_failure_retryable(exit_code)
if is_retryable:
summary["retryable"] += 1
except Exception:
pass
return summary
def retry_job(job_id: str, config: dict) -> dict:
"""Retry a failed job by re-spawning it with the same task.
Returns dict with success status and new job_id or error.
"""
job_dir = JOBS_DIR / job_id
if not job_dir.exists():
return {"success": False, "error": f"Job {job_id} not found"}
meta_file = job_dir / "meta.json"
output_file = job_dir / "output.log"
try:
with open(meta_file) as f:
meta = json.load(f)
except Exception as e:
return {"success": False, "error": f"Could not read job metadata: {e}"}
# Check status
actual_status = _get_actual_job_status(job_dir)
if actual_status == "running":
return {"success": False, "error": "Job is still running"}
# Get exit code
exit_code = None
if output_file.exists():
content = output_file.read_text()
for line in reversed(content.strip().split("\n")):
if line.startswith("exit:"):
exit_code = int(line.split(":")[1])
break
# Check if retryable
if exit_code is not None:
is_retryable, reason = is_failure_retryable(exit_code)
if not is_retryable:
return {"success": False, "error": f"Not retryable: {reason} (exit {exit_code})"}
# Get original task details
project = meta.get("project")
task = meta.get("task")
if not project or not task:
return {"success": False, "error": "Missing project or task in job metadata"}
if project not in config.get("projects", {}):
return {"success": False, "error": f"Unknown project: {project}"}
# Build context and spawn new job
context = get_project_context(project, config)
new_job_id = spawn_claude_agent(project, task, context, config)
# Mark original as retried
meta["retried_at"] = datetime.now().isoformat()
meta["retried_as"] = new_job_id
with open(meta_file, "w") as f:
json.dump(meta, f)
return {
"success": True,
"original_job": job_id,
"new_job": new_job_id,
"project": project,
"task": task[:100]
}
def auto_retry_failures(config: dict, limit: int = 5) -> list:
"""Automatically retry recent retryable failures.
Only retries jobs that:
- Failed with a retryable exit code
- Haven't been retried already
- Are within the last 24 hours
Returns list of retry results.
"""
results = []
now = datetime.now()
failed = list_failed_jobs(limit=50) # Check more to find retryable ones
for job in failed:
if len(results) >= limit:
break
if not job["retryable"]:
continue
job_dir = JOBS_DIR / job["id"]
meta_file = job_dir / "meta.json"
try:
with open(meta_file) as f:
meta = json.load(f)
# Skip if already retried
if meta.get("retried_as"):
continue
# Skip if too old (>24h)
started = datetime.fromisoformat(meta.get("started", "1970-01-01T00:00:00"))
if (now - started).total_seconds() > 86400:
continue
# Attempt retry
result = retry_job(job["id"], config)
results.append({
"original": job["id"],
"project": job["project"],
"exit_code": job["exit_code"],
"retry_result": result
})
except Exception as e:
results.append({
"original": job["id"],
"error": str(e)
})
return results
def route_failures(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia failures [job_id] [--summary] [--retry] [--auto-retry]
Commands:
luzia failures - List recent failures
luzia failures <job_id> - Show details of specific failure
luzia failures --summary - Show failure summary by exit code
luzia failures --retry <id> - Retry a specific failed job
luzia failures --auto-retry - Auto-retry all retryable recent failures
"""
# Parse options
show_summary = "--summary" in args
do_retry = "--retry" in args
do_auto_retry = "--auto-retry" in args
args = [a for a in args if not a.startswith("--")]
if show_summary:
summary = get_failure_summary()
print("\n=== Failure Summary ===\n")
print(f"Total failures: {summary['total']}")
print(f"Retryable: {summary['retryable']}")
print("\nBy Exit Code:")
for code, info in sorted(summary["by_exit_code"].items(), key=lambda x: -x[1]["count"]):
retry_mark = "" if info["retryable"] else ""
print(f" {code:>4}: {info['count']:>3}x - {info['meaning']:<20} [{retry_mark} retry]")
print("\nBy Project:")
for project, count in sorted(summary["by_project"].items(), key=lambda x: -x[1]):
print(f" {project:<15}: {count}x")
return 0
if do_auto_retry:
print("Auto-retrying recent fixable failures...")
results = auto_retry_failures(config, limit=5)
if not results:
print("No retryable failures found.")
return 0
for r in results:
if r.get("error"):
print(f"{r['original']}: {r['error']}")
elif r.get("retry_result", {}).get("success"):
print(f"{r['original']} -> {r['retry_result']['new_job']} ({r['project']})")
else:
print(f"{r['original']}: {r.get('retry_result', {}).get('error', 'Unknown error')}")
return 0
if do_retry:
if not args:
print("Usage: luzia failures --retry <job_id>")
return 1
result = retry_job(args[0], config)
if result["success"]:
print(f"✓ Retrying {result['original_job']} as {result['new_job']}")
print(f" Project: {result['project']}")
print(f" Task: {result['task']}...")
else:
print(f"✗ Could not retry: {result['error']}")
return 0 if result["success"] else 1
# Show specific failure
if args:
job_id = args[0]
failed = list_failed_jobs(limit=100)
job = next((j for j in failed if j["id"] == job_id), None)
if not job:
print(f"Failure not found: {job_id}")
return 1
print(f"\n=== Failed Job: {job['id']} ===\n")
print(f"Project: {job['project']}")
print(f"Started: {job['started']}")
print(f"Exit Code: {job['exit_code']} ({job['exit_meaning']})")
print(f"Retryable: {'Yes - ' + job['retry_reason'] if job['retryable'] else 'No - ' + job['retry_reason']}")
print(f"\nTask:")
print(f" {job['task']}")
print(f"\nLast Output:")
for line in job["last_output"]:
print(f" {line[:100]}")
if job['retryable']:
print(f"\nTo retry: luzia failures --retry {job['id']}")
return 0
# List recent failures
failed = list_failed_jobs(limit=20)
if not failed:
print("No failures found.")
return 0
print("\n=== Recent Failures ===\n")
print(f"{'ID':<18} {'Project':<12} {'Exit':<6} {'Retryable':<10} Started")
print("-" * 75)
for job in failed:
retry_mark = "Yes" if job["retryable"] else "No"
exit_str = str(job["exit_code"]) if job["exit_code"] is not None else "?"
started_short = job["started"][11:19] if len(job["started"]) > 19 else job["started"]
print(f"{job['id']:<18} {job['project']:<12} {exit_str:<6} {retry_mark:<10} {started_short}")
summary = get_failure_summary()
print(f"\nTotal: {summary['total']} failures ({summary['retryable']} retryable)")
print("\nCommands:")
print(" luzia failures <job_id> - Show failure details")
print(" luzia failures --summary - Summary by exit code")
print(" luzia failures --retry <id> - Retry specific job")
print(" luzia failures --auto-retry - Auto-retry all fixable failures")
return 0
def route_retry(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia retry <job_id>
Shortcut for: luzia failures --retry <job_id>
"""
if not args:
print("Usage: luzia retry <job_id>")
return 1
result = retry_job(args[0], config)
if result["success"]:
print(f"✓ Retrying {result['original_job']} as {result['new_job']}")
print(f" Project: {result['project']}")
print(f" Task: {result['task']}...")
print(f"\n Monitor: luzia jobs {result['new_job']}")
else:
print(f"✗ Could not retry: {result['error']}")
return 0 if result["success"] else 1
# --- QA Validation Functions ---
def qa_validate_syntax() -> dict:
"""Check Python syntax of luzia script"""
script_path = Path(__file__).resolve()
result = subprocess.run(
["python3", "-m", "py_compile", str(script_path)],
capture_output=True, text=True
)
return {
"check": "syntax",
"passed": result.returncode == 0,
"error": result.stderr if result.returncode != 0 else None
}
def qa_validate_routes() -> dict:
"""Check that all route handlers have matching matchers"""
script_path = Path(__file__).resolve()
content = script_path.read_text()
# Find all route_ functions
route_funcs = set(re.findall(r'def (route_\w+)\(', content))
# Find all _match_ methods
match_methods = set(re.findall(r'def (_match_\w+)\(', content))
# Find routes registered in Router
registered = set(re.findall(r'self\.(_match_\w+),\s*(route_\w+)', content))
issues = []
# Check each route has a matcher
for route in route_funcs:
expected_matcher = "_match_" + route.replace("route_", "")
# Some routes use self._route_ pattern (internal)
if route.startswith("route_") and expected_matcher not in match_methods:
# Check if it's registered differently
found = any(r[1] == route for r in registered)
if not found and route not in ["route_project_task"]: # Special case
issues.append(f"Route {route} may not have a matcher")
return {
"check": "routes",
"passed": len(issues) == 0,
"route_count": len(route_funcs),
"matcher_count": len(match_methods),
"registered_count": len(registered),
"issues": issues if issues else None
}
def qa_validate_docstring() -> dict:
"""Check that script docstring matches implemented commands"""
script_path = Path(__file__).resolve()
content = script_path.read_text()
# Extract docstring (after shebang line)
docstring_match = re.search(r'"""(.*?)"""', content, re.DOTALL)
if not docstring_match:
return {"check": "docstring", "passed": False, "error": "No docstring found"}
docstring = docstring_match.group(1)
# Find commands mentioned in docstring
doc_commands = set(re.findall(r'luzia (\w+)', docstring))
# Find actual route commands
route_commands = set()
for match in re.findall(r'def _match_(\w+)\(', content):
if match not in ["project_task", "exec", "write", "read", "context"]:
route_commands.add(match.replace("_", "-"))
# Simple commands (list, status, stop, etc.)
simple = {"list", "status", "stop", "cleanup", "maintenance", "jobs", "kill",
"failures", "retry", "notify", "history", "logs", "fix", "qa"}
# Multi-word commands that are in docstring as "luzia <word1> <word2>"
multi_word = {"think-deep", "work-on"}
missing_in_doc = route_commands - doc_commands - simple - multi_word
# Filter out internal commands
missing_in_doc = {c for c in missing_in_doc if not c.startswith("research-")}
return {
"check": "docstring",
"passed": len(missing_in_doc) == 0,
"doc_commands": len(doc_commands),
"route_commands": len(route_commands),
"missing": list(missing_in_doc) if missing_in_doc else None
}
def qa_validate_config() -> dict:
"""Check config.json is valid and projects exist"""
issues = []
if not CONFIG_PATH.exists():
return {"check": "config", "passed": False, "error": "config.json not found"}
try:
with open(CONFIG_PATH) as f:
config = json.load(f)
except json.JSONDecodeError as e:
return {"check": "config", "passed": False, "error": f"Invalid JSON: {e}"}
projects = config.get("projects", {})
for name, info in projects.items():
path = info.get("path", f"/home/{name}")
try:
if not Path(path).exists():
issues.append(f"Project {name}: path {path} does not exist")
else:
claude_md = Path(path) / "CLAUDE.md"
try:
if not claude_md.exists():
issues.append(f"Project {name}: missing CLAUDE.md")
except PermissionError:
# Can't check - skip silently (different user's home)
pass
except PermissionError:
# Can't check - skip silently
pass
return {
"check": "config",
"passed": len(issues) == 0,
"project_count": len(projects),
"issues": issues if issues else None
}
def qa_validate_directories() -> dict:
"""Check required directories exist"""
required = [
LOG_DIR,
JOBS_DIR,
Path("/opt/server-agents/orchestrator/lib"),
Path("/opt/server-agents/docs"),
]
missing = [str(d) for d in required if not d.exists()]
return {
"check": "directories",
"passed": len(missing) == 0,
"missing": missing if missing else None
}
def qa_run_all() -> list:
"""Run all QA validations"""
return [
qa_validate_syntax(),
qa_validate_routes(),
qa_validate_docstring(),
qa_validate_config(),
qa_validate_directories(),
]
def qa_update_docs() -> dict:
"""Update LUZIA-REFERENCE.md with current command info"""
ref_path = Path("/opt/server-agents/docs/LUZIA-REFERENCE.md")
if not ref_path.exists():
return {"success": False, "error": "LUZIA-REFERENCE.md not found"}
# Read current doc
content = ref_path.read_text()
# Update timestamp
today = datetime.now().strftime("%Y-%m-%d")
content = re.sub(
r'\*\*Last Updated:\*\* \d{4}-\d{2}-\d{2}',
f'**Last Updated:** {today}',
content
)
# Update project list from config
try:
with open(CONFIG_PATH) as f:
config = json.load(f)
projects = config.get("projects", {})
project_table = "| Project | Description | Focus |\n|---------|-------------|-------|\n"
for name, info in sorted(projects.items()):
desc = info.get("description", "")[:30]
focus = info.get("focus", "")[:25]
project_table += f"| {name} | {desc} | {focus} |\n"
# Replace project table
content = re.sub(
r'## Registered Projects\n\n\|.*?\n\n---',
f'## Registered Projects\n\n{project_table}\n---',
content,
flags=re.DOTALL
)
except Exception as e:
return {"success": False, "error": f"Could not update projects: {e}"}
# Write back
ref_path.write_text(content)
return {"success": True, "path": str(ref_path), "updated": today}
def route_qa(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia qa [--update-docs] [--test-all]
QA validation for Luzia itself:
luzia qa - Run all validations
luzia qa --update-docs - Update LUZIA-REFERENCE.md
luzia qa --test-all - Run tests with verbose output
"""
update_docs = "--update-docs" in args
test_all = "--test-all" in args
verbose = VERBOSE or test_all
if update_docs:
print("Updating documentation...")
result = qa_update_docs()
if result["success"]:
print(f"✓ Updated {result['path']}")
print(f" Timestamp: {result['updated']}")
else:
print(f"✗ Failed: {result['error']}")
return 0 if result["success"] else 1
# Run all validations
print("\n=== Luzia QA Validation ===\n")
results = qa_run_all()
all_passed = True
for r in results:
check = r["check"]
passed = r["passed"]
status = "" if passed else ""
if not passed:
all_passed = False
print(f"{status} {check}")
if verbose or not passed:
for key, value in r.items():
if key not in ["check", "passed"] and value:
if isinstance(value, list):
for item in value:
print(f" - {item}")
else:
print(f" {key}: {value}")
print()
if all_passed:
print("All validations passed.")
else:
print("Some validations failed. Run with --test-all for details.")
print("\nCommands:")
print(" luzia qa --update-docs Update reference documentation")
print(" luzia qa --test-all Verbose validation output")
print(" luzia qa --sync Sync code to knowledge graph")
return 0 if all_passed else 1
def route_docs(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia docs [domain] [query] [--show <name>] [--stats]
Query documentation from knowledge graphs:
luzia docs <query> - Search all domains
luzia docs sysadmin <query> - Search sysadmin domain
luzia docs projects <query> - Search projects domain
luzia docs --show <name> - Show entity details
luzia docs --stats - Show KG statistics
luzia docs --sync - Sync .md files to KG
"""
# Import KG module
try:
sys.path.insert(0, str(lib_path))
from knowledge_graph import KnowledgeGraph, search_all, get_all_stats, KG_PATHS
from doc_sync import run_migration
except ImportError as e:
print(f"Error: Knowledge graph module not available: {e}")
return 1
# Parse options
show_stats = "--stats" in args
show_entity = "--show" in args
do_sync = "--sync" in args
args = [a for a in args if not a.startswith("--")]
if show_stats:
print("\n=== Knowledge Graph Statistics ===\n")
for domain, stats in get_all_stats().items():
if "error" in stats:
print(f"{domain}: {stats['error']}")
else:
print(f"{domain}:")
print(f" Entities: {stats['entities']}")
print(f" Relations: {stats['relations']}")
print(f" Observations: {stats['observations']}")
if stats.get("by_type"):
print(f" By type: {stats['by_type']}")
return 0
if do_sync:
print("Syncing documentation to knowledge graphs...")
# Run the doc sync
try:
from doc_sync import DocSync
from qa_validator import QAValidator
sync = DocSync()
validator = QAValidator()
# Sync routes to sysadmin KG
print("\nSyncing luzia commands...")
result = validator.sync_routes_to_kg()
if "error" in result:
print(f" Error: {result['error']}")
else:
print(f" Commands: {result['added']} added, {result['updated']} updated")
# Sync projects
print("\nSyncing projects...")
result = validator.sync_projects_to_kg()
if "error" in result:
print(f" Error: {result['error']}")
else:
print(f" Projects: {result['added']} added, {result['updated']} updated")
print("\nDone. Use 'luzia docs --stats' to see results.")
except Exception as e:
print(f"Error: {e}")
return 1
return 0
if show_entity:
# Show specific entity
if not args:
print("Usage: luzia docs --show <entity_name>")
return 1
name = args[0]
found = False
for domain in KG_PATHS.keys():
try:
kg = KnowledgeGraph(domain)
entity = kg.get_entity(name)
if entity:
found = True
print(f"\n=== {entity['name']} ({domain}) ===\n")
print(f"Type: {entity['type']}")
print(f"Updated: {datetime.fromtimestamp(entity['updated_at']).strftime('%Y-%m-%d %H:%M')}")
if entity.get('source'):
print(f"Source: {entity['source']}")
print(f"\n{entity['content'][:1000]}")
if len(entity['content']) > 1000:
print(f"\n... ({len(entity['content']) - 1000} more characters)")
# Show relations
relations = kg.get_relations(name)
if relations:
print(f"\nRelations:")
for r in relations[:10]:
print(f" - {r['relation']}: {r.get('target_name', r.get('source_name', '?'))}")
# Show observations
observations = kg.get_observations(name)
if observations:
print(f"\nObservations:")
for o in observations[:5]:
print(f" [{o['observer']}] {o['content'][:100]}")
break
except Exception:
pass
if not found:
print(f"Entity not found: {name}")
return 1
return 0
# Search
if not args:
print("Usage: luzia docs <query>")
print(" luzia docs <domain> <query>")
print(" luzia docs --show <name>")
print(" luzia docs --stats")
print(" luzia docs --sync")
print(f"\nDomains: {', '.join(KG_PATHS.keys())}")
return 0
# Check if first arg is a domain
query_domain = None
query = ""
if args[0] in KG_PATHS:
query_domain = args[0]
query = " ".join(args[1:])
else:
query = " ".join(args)
if not query:
print("Please provide a search query")
return 1
# Perform search
print(f"\nSearching for: {query}\n")
if query_domain:
kg = KnowledgeGraph(query_domain)
results = kg.search(query)
if results:
print(f"{query_domain}:")
for e in results[:10]:
print(f" [{e['type']}] {e['name']}")
if e.get('content'):
preview = e['content'][:80].replace('\n', ' ')
print(f" {preview}...")
else:
print(f"No results in {query_domain}")
else:
all_results = search_all(query)
total = 0
for domain, results in all_results.items():
if results and not results[0].get("error"):
print(f"{domain}:")
for e in results[:5]:
print(f" [{e['type']}] {e['name']}")
total += len(results)
if total == 0:
print("No results found")
return 0
def get_project_context(project: str, config: dict) -> str:
"""Build context prompt for project from config and CLAUDE.md"""
project_config = config["projects"].get(project, {})
context_parts = [
f"You are working on the **{project}** project.",
f"Description: {project_config.get('description', 'Project user')}",
f"Focus: {project_config.get('focus', 'General development')}",
"",
"**IMPORTANT**: All commands execute inside a Docker container as the project user.",
"Files you create/modify will be owned by the correct user.",
"Working directory: /workspace (mounted from project home)",
""
]
# Try to load project CLAUDE.md
project_path = project_config.get("path", f"/home/{project}")
claude_md = Path(project_path) / "CLAUDE.md"
if claude_md.exists():
try:
with open(claude_md) as f:
context_parts.append("## Project Guidelines (from CLAUDE.md):")
context_parts.append(f.read())
except:
pass
return "\n".join(context_parts)
def route_list(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia list"""
projects = config.get("projects", {})
containers = {c["name"]: c for c in list_project_containers()}
if VERBOSE:
print("Available Projects:\n")
for name, info in sorted(projects.items()):
container_name = f"luzia-{name}"
container = containers.get(container_name, {})
status = "RUN" if "Up" in container.get("status", "") else "---"
color_hex = info.get("color", "#808080")
color_code = Color.hex_to_ansi(color_hex)
colored_name = Color.bold(f"{name:15}", color_code)
desc = info.get('description', '')[:40]
if VERBOSE:
print(f" [{status}] {colored_name} {desc}")
print(f" Focus: {info.get('focus', 'N/A')[:50]}")
else:
print(f" [{status}] {colored_name} {desc}")
return 0
def route_status(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia status [project]"""
project = args[0] if args else None
containers = list_project_containers()
if not containers:
print("No containers running")
return 0
if VERBOSE:
print(f"{'Container':<20} {'Status':<30} {'Created'}")
print("-" * 70)
for c in containers:
if project and f"luzia-{project}" != c["name"]:
continue
print(f"{c['name']:<20} {c['status']:<30} {c['created'][:19]}")
return 0
def route_stop(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia stop <project>"""
if not args:
print("Usage: luzia stop <project>")
return 1
project = args[0]
project_config = config["projects"].get(project)
if not project_config:
print(f"Unknown project: {project}")
return 1
bridge = DockerBridge(
project=project,
host_path=project_config.get("path", f"/home/{project}")
)
if bridge._is_running():
bridge.stop()
print(f"Stopped {project}")
else:
print(f"{project} not running")
return 0
def route_cleanup(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia cleanup [jobs|containers|all] [--dry-run]
Subcommands:
luzia cleanup - Full maintenance (jobs + containers + logs)
luzia cleanup jobs - Clean old job directories only
luzia cleanup containers - Stop stale containers only
luzia cleanup all - Same as no subcommand
Options:
--dry-run - Preview what would be cleaned without deleting
"""
dry_run = "--dry-run" in args
args = [a for a in args if a != "--dry-run"]
subcommand = args[0] if args else "all"
if subcommand == "jobs":
print("Cleaning old jobs...")
result = cleanup_old_jobs(dry_run=dry_run)
print(f" Checked: {result['checked']}, Deleted: {result['deleted']}, Kept: {result['kept']}")
if result['bytes_freed'] > 0:
print(f" Freed: {result['bytes_freed'] / 1024:.1f} KB")
if result['errors'] > 0:
print(f" Errors: {result['errors']}")
elif subcommand == "containers":
print("Stopping stale containers...")
result = cleanup_stale_containers()
print(f" Checked: {result['checked']}, Stopped: {result['stopped']}")
if result['errors'] > 0:
print(f" Errors: {result['errors']}")
else: # "all" or empty
print("Running full maintenance..." + (" (dry-run)" if dry_run else ""))
results = run_maintenance(dry_run=dry_run)
print(f"\nJobs:")
print(f" Checked: {results['jobs']['checked']}, Deleted: {results['jobs']['deleted']}, Kept: {results['jobs']['kept']}")
if results['jobs']['bytes_freed'] > 0:
print(f" Freed: {results['jobs']['bytes_freed'] / 1024:.1f} KB")
if not dry_run:
print(f"\nContainers:")
print(f" Checked: {results['containers']['checked']}, Stopped: {results['containers']['stopped']}")
print(f"\nLogs:")
if results['logs'].get('rotated'):
print(f" Rotated notifications.log: {results['logs']['lines_before']} -> {results['logs']['lines_after']} lines")
else:
print(f" Notifications.log: {results['logs'].get('lines_after', 0)} lines (no rotation needed)")
print("\nDone.")
return 0
def route_maintenance(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia maintenance
Show maintenance status and resource usage.
"""
status = get_maintenance_status()
print("\n=== Luzia Maintenance Status ===\n")
# Jobs
print(f"Jobs ({JOBS_DIR}):")
print(f" Total: {status['jobs']['total']}")
print(f" Running: {status['jobs']['running']}")
print(f" Completed: {status['jobs']['completed']}")
print(f" Failed: {status['jobs']['failed']}")
print(f" Oldest: {status['jobs']['oldest_days']} days")
print(f" Disk: {status['disk']['jobs_mb']} MB")
# Retention policy
print(f"\n Retention Policy:")
print(f" Keep last {JOB_MAX_COUNT} jobs")
print(f" Delete completed after {JOB_MAX_AGE_DAYS} days")
print(f" Delete failed after {JOB_FAILED_MAX_AGE_DAYS} days")
# Containers
print(f"\nContainers:")
print(f" Running: {status['containers']['total']}")
print(f" Oldest: {status['containers']['oldest_hours']} hours")
print(f" Max Lifetime: {CONTAINER_MAX_LIFETIME_HOURS} hours")
# Logs
print(f"\nLogs:")
print(f" Notifications: {status['notifications']['lines']} lines (max {NOTIFICATION_LOG_MAX_LINES})")
print(f" Logs Dir: {status['disk']['logs_mb']} MB")
# Recommendations
print(f"\nRecommendations:")
needs_cleanup = False
if status['jobs']['total'] > JOB_MAX_COUNT * 1.5:
print(f" ⚠ High job count ({status['jobs']['total']}), consider: luzia cleanup jobs")
needs_cleanup = True
if status['containers']['oldest_hours'] > CONTAINER_MAX_LIFETIME_HOURS:
print(f" ⚠ Stale containers ({status['containers']['oldest_hours']}h), consider: luzia cleanup containers")
needs_cleanup = True
if status['disk']['jobs_mb'] > 100:
print(f" ⚠ High disk usage ({status['disk']['jobs_mb']}MB), consider: luzia cleanup")
needs_cleanup = True
if not needs_cleanup:
print(" ✓ All systems nominal")
print()
return 0
def route_project_task(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia <project> <task>"""
if not args or len(args) < 2:
# Just project name - show project info
project = args[0] if args else None
if not project or project not in config["projects"]:
print("Usage: luzia <project> <task>")
return 1
project_config = config["projects"][project]
bridge = DockerBridge(project, project_config.get("path", f"/home/{project}"))
status = bridge.status()
color_hex = project_config.get("color", "#808080")
color_code = Color.hex_to_ansi(color_hex)
print(Color.bold(f"{project}", color_code))
if VERBOSE:
print(f" Description: {project_config.get('description', 'N/A')}")
print(f" Path: {project_config.get('path', f'/home/{project}')}")
print(f" Focus: {project_config.get('focus', 'N/A')}")
print(Color.output(f" {'Running' if status.get('running') else 'Stopped'}", color_code))
return 0
project = args[0]
task = " ".join(args[1:])
project_config = config["projects"].get(project)
if not project_config:
print(f"Unknown project: {project}")
return 1
color_hex = project_config.get("color", "#808080")
color_code = Color.hex_to_ansi(color_hex)
bridge = DockerBridge(
project=project,
host_path=project_config.get("path", f"/home/{project}"),
extra_mounts=project_config.get("extra_mounts", [])
)
context = get_project_context(project, config)
task_id = datetime.now().strftime("%H%M%S") + "-" + hex(hash(task) & 0xffff)[2:]
log_file = LOG_DIR / f"{project}-{task_id}.log"
if VERBOSE:
print(Color.bold(f"Task for {project}", color_code))
print(f" Container: luzia-{project}")
print(f" Log: {log_file}")
print()
was_started = bridge.ensure_running()
if VERBOSE and was_started:
print(f"Started container luzia-{project}")
# Detect if task is a direct shell command (not natural language)
# These must be followed by space, args, or be the entire command
command_starters = ['npm ', 'node ', 'python ', 'pip ', 'git ', 'ls ', 'ls$', 'cat ',
'grep ', 'find ', 'make ', 'make$', 'cargo ', 'go ', 'yarn ', 'pnpm ',
'docker ', 'cd ', 'pwd', 'echo ', 'touch ', 'mkdir ', 'rm ', 'cp ', 'mv ',
'curl ', 'wget ', 'which ', 'env ', 'env$', 'export ', 'source ', 'bash ',
'./', 'sh ', 'test ', './']
task_lower = task.lower()
is_command = any(
task_lower.startswith(cmd.rstrip('$')) and (cmd.endswith('$') or cmd.endswith(' ') or len(task_lower) == len(cmd.rstrip('$')))
for cmd in command_starters
)
if is_command:
# Background mode - dispatch and return immediately
if BACKGROUND:
job_id = spawn_background_job(project, task, log_file)
print(f"{project}:{job_id}")
return 0
# Direct command execution (foreground)
result = bridge.execute(task)
if result["output"]:
print(result["output"], end='')
if result["error"]:
print(result["error"], file=sys.stderr, end='')
# Log result
with open(log_file, 'w') as f:
f.write(f"Task: {task}\n")
f.write(f"Exit: {result['exit_code']}\n\n")
f.write(result["output"])
if result["error"]:
f.write(f"\nSTDERR:\n{result['error']}")
return 0 if result["success"] else 1
else:
# Natural language task - spawn independent Claude agent
job_id = spawn_claude_agent(project, task, context, config)
# Show debug indicator if Claude dev task detected
debug_indicator = " [DEBUG]" if is_claude_dev_task(task) else ""
print(f"agent:{project}:{job_id}{debug_indicator}")
return 0
def route_work_on(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia work on <project> [task]"""
if not args:
print("Usage: luzia work on <project>")
return 1
project = args[0]
task = " ".join(args[1:]) if len(args) > 1 else None
if project not in config["projects"]:
print(f"Unknown project: {project}")
return 1
if task:
return route_project_task(config, [project, task], kwargs)
else:
# Interactive mode - show project info
project_config = config["projects"][project]
color_hex = project_config.get("color", "#808080")
color_code = Color.hex_to_ansi(color_hex)
print(Color.bold(f"Working on {project}", color_code))
print(project_config.get("description", ""))
return 0
def route_think_deep(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia think deep <topic>
Enhanced Deep Logic Framework combining:
- First Principles Thinking: Break down to fundamental truths
- Refraction Thinking: Multi-lens perspective analysis
- Structured synthesis and validation
Outputs instructions for the calling Claude session to use Zen MCP with Gemini 3.
"""
if not args:
print("Usage: luzia think deep <topic>")
print(" luzia think deep --code <code problem>")
print(" luzia think deep --bug <bug description>")
print(" luzia think deep --arch <architecture question>")
return 1
# Detect mode from flags
mode = "general"
topic_args = args
if args[0] == "--code":
mode = "code"
topic_args = args[1:]
elif args[0] == "--bug":
mode = "bug"
topic_args = args[1:]
elif args[0] == "--arch":
mode = "architecture"
topic_args = args[1:]
elif args[0] == "--qa":
mode = "qa"
topic_args = args[1:]
topic = " ".join(topic_args)
# Mode-specific lens emphasis
lens_emphasis = {
"general": "all lenses equally",
"code": "Technical, Scale, Integration lenses primarily",
"bug": "Technical, Failure, Temporal lenses primarily",
"architecture": "Technical, Scale, Integration, Temporal lenses primarily",
"qa": "User, Failure, Scale, Integration lenses primarily"
}
# Output structured instruction with Deep Logic Framework
print(f"""LUZIA_DIRECTIVE: DEEP_LOGIC_FRAMEWORK
TOPIC: {topic}
MODE: {mode}
MODEL: gemini-3-pro-preview
THINKING_MODE: max
═══════════════════════════════════════════════════════════════════
DEEP LOGIC FRAMEWORK ANALYSIS
═══════════════════════════════════════════════════════════════════
Execute this 4-stage analysis using mcp__zen__thinkdeep:
## STAGE 1: DECOMPOSITION (First Principles)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Break down "{topic}" to fundamental truths:
1. State the problem/question precisely
2. List ALL assumptions (explicit and implicit)
3. Challenge each assumption: Is this actually true?
4. Identify irreducible components (axioms)
5. Ask "Why?" repeatedly until you reach bedrock truths
Key Questions:
- What do we KNOW to be true vs. ASSUME to be true?
- If we were starting from scratch, what would we do?
- What would this look like if it were easy?
## STAGE 2: MULTI-LENS ANALYSIS (Refraction Thinking)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Examine through seven lenses (emphasize: {lens_emphasis[mode]}):
| Lens | Key Questions |
|------|---------------|
| 🔧 Technical | What are the technical constraints and implications? |
| 👤 User | Who is affected and how? What are their goals? |
| 💼 Business | What is the cost, value, and risk? |
| ⏰ Temporal | What happened before? What happens long-term? |
| 📈 Scale | How does this behave at 10x scale? |
| ⚠️ Failure | What can go wrong? How do we detect and recover? |
| 🔗 Integration | What systems/dependencies are involved? |
## STAGE 3: SYNTHESIS
━━━━━━━━━━━━━━━━━━━━
Combine insights from Stages 1 and 2:
1. Identify patterns across lenses
2. Resolve contradictions
3. Reconstruct solution from first principles only
4. Generate 2-3 solution options with trade-offs
5. Provide recommendation with confidence level (low/medium/high/very high)
## STAGE 4: VALIDATION CHECKLIST
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
□ Solution addresses root cause (not symptoms)
□ All relevant lenses considered
□ Assumptions documented and challenged
□ Trade-offs are explicit
□ Failure modes identified
□ Test/validation strategy defined
□ Rollback plan exists (if applicable)
═══════════════════════════════════════════════════════════════════
Execute with mcp__zen__thinkdeep:
{{
"model": "gemini-3-pro-preview",
"thinking_mode": "max",
"step": "Deep Logic Framework analysis of: {topic}. Execute all 4 stages: (1) First Principles Decomposition - break to fundamental truths, challenge assumptions, (2) Refraction Analysis through 7 lenses with emphasis on {lens_emphasis[mode]}, (3) Synthesis - combine insights, resolve contradictions, generate solutions, (4) Validation checklist.",
"step_number": 1,
"total_steps": 2,
"next_step_required": true,
"findings": "",
"focus_areas": ["first principles", "refraction analysis", "synthesis", "validation"],
"problem_context": "Deep Logic Framework analysis for: {topic}"
}}
After analysis, provide output in this format:
## Deep Logic Analysis: {topic}
### Stage 1: First Principles Decomposition
[Problem statement, challenged assumptions, fundamental truths]
### Stage 2: Lens Analysis
[Table of observations from each lens]
### Stage 3: Synthesis
[Root cause, solution options, recommendation]
### Stage 4: Validation
[Checklist results, test strategy, next steps]""")
return 0
def route_research(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia research [project] <topic>
3-Phase Research Flow:
1. Context Expansion - Infer related concepts, expand search scope
2. Branching Web Search - Multiple parallel searches on expanded concepts
3. Final Synthesis - Consolidate findings into knowledge graph
All research is stored in project-specific knowledge graph.
Uses Zen MCP with Gemini 3 for all phases.
"""
if not args:
print("Usage: luzia research [project] <topic>")
print(" luzia deep research [project] <topic>")
print(" luzia web research [project] <topic>")
print("\nExamples:")
print(" luzia research admin multi-agent orchestration")
print(" luzia deep research musica audio synthesis libraries")
return 1
# Check if first arg is a project name
projects = set(config.get("projects", {}).keys())
if args[0] in projects:
project = args[0]
topic = " ".join(args[1:])
else:
project = "admin" # Default project for general research
topic = " ".join(args)
if not topic:
print("Error: No research topic provided")
return 1
# Create research session
session_id = create_research_session(project, topic)
# Get prior research context for this project
prior_research = get_project_research_context(project, limit=3)
prior_context = ""
if prior_research:
prior_topics = [r["topic"] for r in prior_research if r.get("final_synthesis")]
if prior_topics:
prior_context = f"\n\nPrior research in this project: {', '.join(prior_topics[:5])}"
# Get existing knowledge graph nodes
graph = get_research_graph(project)
existing_concepts = [n["name"] for n in graph.get("nodes", [])][:20]
graph_context = ""
if existing_concepts:
graph_context = f"\n\nExisting knowledge concepts: {', '.join(existing_concepts)}"
# Output structured 3-phase research directive
print(f"""LUZIA_DIRECTIVE: RESEARCH_FLOW
SESSION_ID: {session_id}
PROJECT: {project}
TOPIC: {topic}
MODEL: gemini-3-pro-preview
{prior_context}{graph_context}
Execute this 3-phase research flow using Zen MCP with Gemini 3:
═══════════════════════════════════════════════════════════════════
PHASE 1: CONTEXT EXPANSION (mcp__zen__thinkdeep)
═══════════════════════════════════════════════════════════════════
Goal: Expand the research topic into related concepts and search branches
Parameters:
{{
"model": "gemini-3-pro-preview",
"thinking_mode": "high",
"step": "Context expansion for research topic: {topic}. Identify: 1) Core concepts and terminology, 2) Related fields and disciplines, 3) Key questions to answer, 4) Potential search branches (5-8 specific queries), 5) Expected sources (academic, industry, open source)",
"step_number": 1,
"total_steps": 3,
"next_step_required": true,
"findings": "",
"focus_areas": ["concept mapping", "terminology", "related fields", "search strategy"],
"problem_context": "Research context expansion for: {topic}"
}}
After Phase 1, call: luzia research-update {session_id} context_expansion "<json_data>"
═══════════════════════════════════════════════════════════════════
PHASE 2: BRANCHING WEB SEARCH (mcp__zen__thinkdeep + WebSearch)
═══════════════════════════════════════════════════════════════════
Goal: Execute multiple parallel web searches on expanded concepts
For each search branch from Phase 1:
1. Use WebSearch tool with specific queries
2. Use mcp__zen__thinkdeep to analyze and extract key findings
3. Identify entities (people, companies, projects, concepts)
4. Note relationships between entities
Parameters for each branch analysis:
{{
"model": "gemini-3-pro-preview",
"thinking_mode": "medium",
"step": "Analyze search results for branch: <branch_topic>",
"step_number": 2,
"total_steps": 3,
"next_step_required": true,
"findings": "<search_results_summary>",
"focus_areas": ["key findings", "entities", "relationships", "sources"]
}}
After Phase 2, call: luzia research-update {session_id} search_branches "<json_data>"
═══════════════════════════════════════════════════════════════════
PHASE 3: FINAL SYNTHESIS (mcp__zen__thinkdeep)
═══════════════════════════════════════════════════════════════════
Goal: Consolidate all findings into coherent research output
Parameters:
{{
"model": "gemini-3-pro-preview",
"thinking_mode": "max",
"step": "Final synthesis of research on: {topic}. Consolidate all branch findings into: 1) Executive summary, 2) Key concepts and definitions, 3) Current state of the field, 4) Major players and projects, 5) Trends and future directions, 6) Recommendations, 7) Knowledge graph entities to store",
"step_number": 3,
"total_steps": 3,
"next_step_required": false,
"findings": "<consolidated_branch_findings>",
"focus_areas": ["synthesis", "recommendations", "knowledge extraction"]
}}
After Phase 3, call: luzia research-update {session_id} final_synthesis "<json_data>"
Then call: luzia research-graph {session_id} "<entities_and_relations_json>"
═══════════════════════════════════════════════════════════════════
OUTPUT FORMAT
═══════════════════════════════════════════════════════════════════
Final output should include:
1. Research summary (2-3 paragraphs)
2. Key findings (bulleted list)
3. Knowledge graph additions (entities and relationships)
4. Sources cited
5. Follow-up research suggestions""")
return 0
def route_research_update(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia research-update <session_id> <phase> <json_data>
Update a research session with phase results.
"""
if len(args) < 3:
print("Usage: luzia research-update <session_id> <phase> <json_data>")
print("Phases: context_expansion, search_branches, final_synthesis")
return 1
session_id = args[0]
phase = args[1]
json_data = " ".join(args[2:])
try:
data = json.loads(json_data)
except json.JSONDecodeError:
# Try to parse as simple key-value if not valid JSON
data = {"raw": json_data}
update_research_phase(session_id, phase, data)
print(f"Updated session {session_id} phase: {phase}")
return 0
def route_research_graph(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia research-graph <session_id> <entities_json>
Add entities and relationships to the research knowledge graph.
Expected JSON format:
{
"project": "admin",
"entities": [
{"name": "AutoGen", "type": "framework", "description": "..."},
...
],
"relationships": [
{"source": "AutoGen", "target": "Microsoft", "relation": "developed_by"},
...
]
}
"""
if len(args) < 2:
print("Usage: luzia research-graph <session_id> <entities_json>")
return 1
session_id = args[0]
json_data = " ".join(args[1:])
try:
data = json.loads(json_data)
except json.JSONDecodeError:
print(f"Error: Invalid JSON data")
return 1
project = data.get("project", "admin")
entities = data.get("entities", [])
relationships = data.get("relationships", [])
# Add nodes
node_map = {} # name -> id
for entity in entities:
node_id = add_research_node(
session_id=session_id,
project=project,
name=entity.get("name"),
node_type=entity.get("type", "concept"),
description=entity.get("description")
)
node_map[entity.get("name")] = node_id
# Add edges
for rel in relationships:
source_name = rel.get("source")
target_name = rel.get("target")
relation = rel.get("relation", "related_to")
# Ensure both nodes exist
if source_name not in node_map:
node_map[source_name] = add_research_node(session_id, project, source_name, "concept")
if target_name not in node_map:
node_map[target_name] = add_research_node(session_id, project, target_name, "concept")
add_research_edge(
source_id=node_map[source_name],
target_id=node_map[target_name],
relation=relation,
context=rel.get("context")
)
print(f"Added {len(entities)} entities and {len(relationships)} relationships to {project} knowledge graph")
return 0
def route_research_list(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia research-list [project]
List research sessions for a project.
"""
project = args[0] if args else "admin"
sessions = get_project_research_context(project, limit=20)
if not sessions:
print(f"No research sessions for project: {project}")
return 0
print(f"\nResearch sessions for {project}:")
print("-" * 60)
for s in sessions:
status_icon = "" if s["status"] == "completed" else ""
ts = datetime.fromtimestamp(s["created_at"]).strftime("%Y-%m-%d %H:%M")
print(f" [{status_icon}] {s['id']} | {ts} | {s['topic'][:40]}")
print(f" Phase: {s['phase']}")
return 0
def route_research_show(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia research-show <session_id>
Show details of a research session.
"""
if not args:
print("Usage: luzia research-show <session_id>")
return 1
session_id = args[0]
# Find session across all projects
conn = _init_research_db()
c = conn.cursor()
c.execute('SELECT * FROM research_sessions WHERE id = ?', (session_id,))
row = c.fetchone()
conn.close()
if not row:
print(f"Session not found: {session_id}")
return 1
print(f"\nResearch Session: {row[0]}")
print(f"Project: {row[1]}")
print(f"Topic: {row[2]}")
print(f"Status: {row[3]}")
print(f"Phase: {row[6]}")
print(f"Created: {datetime.fromtimestamp(row[4]).strftime('%Y-%m-%d %H:%M')}")
if row[7]: # context_expansion
print(f"\n--- Context Expansion ---")
print(json.dumps(json.loads(row[7]), indent=2)[:500])
if row[8]: # search_branches
print(f"\n--- Search Branches ---")
print(json.dumps(json.loads(row[8]), indent=2)[:500])
if row[9]: # final_synthesis
print(f"\n--- Final Synthesis ---")
print(json.dumps(json.loads(row[9]), indent=2)[:1000])
return 0
def route_research_knowledge(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia research-knowledge [project]
Show the knowledge graph for a project.
"""
project = args[0] if args else "admin"
graph = get_research_graph(project)
if not graph["nodes"]:
print(f"No knowledge graph for project: {project}")
return 0
print(f"\nKnowledge Graph for {project}:")
print(f"Nodes: {len(graph['nodes'])} | Edges: {len(graph['edges'])}")
print("-" * 60)
print("\nEntities:")
for node in graph["nodes"][:30]:
desc = (node.get("description") or "")[:50]
print(f" [{node['type']}] {node['name']}: {desc}")
if graph["edges"]:
print("\nRelationships:")
# Build name lookup
node_names = {n["id"]: n["name"] for n in graph["nodes"]}
for edge in graph["edges"][:20]:
src = node_names.get(edge["source"], edge["source"][:8])
tgt = node_names.get(edge["target"], edge["target"][:8])
print(f" {src} --[{edge['relation']}]--> {tgt}")
return 0
def route_fix(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia fix <issue>"""
if not args:
print("Usage: luzia fix <issue>")
return 1
issue = " ".join(args)
troubleshooting = config.get("troubleshooting", {})
# Search for matching issue patterns
for problem, details in troubleshooting.items():
patterns = details.get("error_patterns", [])
if any(p.lower() in issue.lower() for p in patterns):
print(f"Issue: {issue}")
print(f"Problem: {problem}")
print(f"Fix: {details.get('fix', 'N/A')}")
if VERBOSE and details.get('source_script'):
print(f"Script: {details.get('source_script')}")
return 0
print(f"Unknown issue: {issue}")
print("Run 'luzia fix <keyword>' for troubleshooting.")
print("Available categories: configuration, builds, containers")
return 1
def route_logs(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia logs <project|job_id>"""
if not args:
print("Usage: luzia logs <project|job_id>")
return 1
target = args[0]
# Check if it's a job ID
job_dir = JOBS_DIR / target
if job_dir.exists():
output_file = job_dir / "output.log"
if output_file.exists():
print(output_file.read_text())
else:
print("Job running, no output yet")
return 0
# Otherwise treat as project
log_files = sorted(LOG_DIR.glob(f"{target}-*.log"), reverse=True)
if log_files:
with open(log_files[0]) as f:
print(f.read())
else:
print(f"No logs for {target}")
return 0
def route_jobs(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia jobs [job_id]"""
if args:
# Show specific job
job = get_job_status(args[0])
if "error" in job:
print(job["error"])
return 1
print(f"Job: {job['id']}")
print(f"Project: {job['project']}")
print(f"Command: {job['command']}")
print(f"Status: {job['status']}")
if "exit_code" in job:
print(f"Exit: {job['exit_code']}")
return 0
# List all jobs
jobs = list_jobs()
if not jobs:
print("No jobs")
return 0
for job in jobs:
status = "" if job.get("status") == "completed" else ""
exit_code = job.get("exit_code", "")
exit_str = f" ({exit_code})" if exit_code != "" else ""
job_type = job.get("type", "docker")
type_indicator = "🤖" if job_type == "agent" else "📦"
desc = job.get("task", job.get("command", ""))[:40]
print(f" [{status}] {type_indicator} {job['id']} {job['project']} {desc}{exit_str}")
return 0
def route_kill(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia kill <job_id>"""
if not args:
print("Usage: luzia kill <job_id>")
return 1
result = kill_agent(args[0])
if "error" in result:
print(result["error"])
return 1
print(f"Killed: {args[0]}")
return 0
def route_notify(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia notify [limit]"""
limit = int(args[0]) if args else 10
notifications = get_notifications(limit)
if not notifications:
print("No notifications")
return 0
for n in notifications:
print(n)
return 0
def route_history(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia history <project> [limit]
Show recent changes/activity for a project from the knowledge graph.
"""
if not args:
print("Usage: luzia history <project> [limit]")
print("Example: luzia history musica 20")
return 1
project = args[0]
limit = int(args[1]) if len(args) > 1 else 10
# Verify project exists
if project not in config.get("projects", {}):
print(f"Unknown project: {project}")
print(f"Available: {', '.join(config.get('projects', {}).keys())}")
return 1
project_config = config["projects"][project]
color = Color.hex_to_ansi(project_config.get("color", "#888888"))
changes = get_project_changes(project, limit)
if not changes:
print(f"No recorded changes for {Color.bold(project, color)}")
return 0
print(f"\n{Color.bold(f'Recent changes for {project}:', color)}")
print("-" * 60)
for change in changes:
ctx = change.get("context", {})
ts = ctx.get("timestamp", "unknown")
desc = ctx.get("description", change.get("event", ""))
relation = change.get("relation", "").replace("has_", "")
# Format timestamp
try:
dt = datetime.fromisoformat(ts)
ts_fmt = dt.strftime("%Y-%m-%d %H:%M")
except:
ts_fmt = ts[:16] if len(ts) > 16 else ts
print(f" [{ts_fmt}] {Color.bold(relation, color)}: {desc}")
print()
return 0
def cmd_exec_raw(config: dict, project: str, command: str):
"""Execute a raw command in the container (for subagent use)"""
project_config = config["projects"].get(project)
if not project_config:
return {"error": f"Unknown project: {project}"}
bridge = DockerBridge(
project=project,
host_path=project_config.get("path", f"/home/{project}"),
extra_mounts=project_config.get("extra_mounts", [])
)
return bridge.execute(command)
def cmd_write_file(config: dict, project: str, path: str, content: str):
"""Write a file in the project container (for subagent use)"""
project_config = config["projects"].get(project)
if not project_config:
return {"error": f"Unknown project: {project}"}
bridge = DockerBridge(
project=project,
host_path=project_config.get("path", f"/home/{project}"),
extra_mounts=project_config.get("extra_mounts", [])
)
return bridge.write_file(path, content)
def cmd_read_file(config: dict, project: str, path: str):
"""Read a file from the project container (for subagent use)"""
project_config = config["projects"].get(project)
if not project_config:
return {"error": f"Unknown project: {project}"}
bridge = DockerBridge(
project=project,
host_path=project_config.get("path", f"/home/{project}"),
extra_mounts=project_config.get("extra_mounts", [])
)
return bridge.read_file(path)
def print_help():
"""Print help message"""
print(__doc__)
class Router:
"""Pattern-based routing dispatcher"""
def __init__(self, config: dict):
self.config = config
self.projects = set(config.get("projects", {}).keys())
# Define routes: (pattern_fn, handler_fn, description)
self.routes = [
(self._match_list, route_list, "List projects"),
(self._match_status, route_status, "Show status"),
(self._match_stop, route_stop, "Stop container"),
(self._match_logs, route_logs, "View logs"),
(self._match_cleanup, route_cleanup, "Cleanup/maintenance"),
(self._match_maintenance, route_maintenance, "Maintenance status"),
(self._match_jobs, route_jobs, "Job management"),
(self._match_kill, route_kill, "Kill agent"),
(self._match_failures, route_failures, "List/retry failures"),
(self._match_retry, route_retry, "Retry failed job"),
(self._match_qa, route_qa, "QA validation"),
(self._match_docs, route_docs, "Documentation KG"),
(self._match_notify, route_notify, "View notifications"),
(self._match_history, route_history, "Project history"),
(self._match_work_on, route_work_on, "Interactive work"),
(self._match_think_deep, route_think_deep, "Deep reasoning"),
# Research commands (order matters - specific before general)
(self._match_research_update, route_research_update, "Update research phase"),
(self._match_research_graph, route_research_graph, "Add to knowledge graph"),
(self._match_research_list, route_research_list, "List research sessions"),
(self._match_research_show, route_research_show, "Show research session"),
(self._match_research_knowledge, route_research_knowledge, "Show knowledge graph"),
(self._match_research, route_research, "Research (3-phase flow)"),
(self._match_fix, route_fix, "Troubleshooting"),
(self._match_project_task, route_project_task, "Project task"),
# Internal (JSON output)
(self._match_exec, self._route_exec, "Raw execution"),
(self._match_write, self._route_write, "File write"),
(self._match_read, self._route_read, "File read"),
(self._match_context, self._route_context, "Get context"),
]
def _match_list(self, args: list) -> Optional[list]:
if args and args[0] == "list":
return []
return None
def _match_status(self, args: list) -> Optional[list]:
if args and args[0] == "status":
return args[1:]
return None
def _match_stop(self, args: list) -> Optional[list]:
if args and args[0] == "stop":
return args[1:]
return None
def _match_cleanup(self, args: list) -> Optional[list]:
if args and args[0] == "cleanup":
return args[1:] # Pass subcommands (jobs, containers, all, --dry-run)
return None
def _match_maintenance(self, args: list) -> Optional[list]:
if args and args[0] == "maintenance":
return args[1:]
return None
def _match_logs(self, args: list) -> Optional[list]:
if args and args[0] == "logs":
return args[1:]
return None
def _match_jobs(self, args: list) -> Optional[list]:
if args and args[0] == "jobs":
return args[1:]
return None
def _match_kill(self, args: list) -> Optional[list]:
if args and args[0] == "kill":
return args[1:]
return None
def _match_failures(self, args: list) -> Optional[list]:
if args and args[0] == "failures":
return args[1:]
return None
def _match_retry(self, args: list) -> Optional[list]:
if args and args[0] == "retry":
return args[1:]
return None
def _match_qa(self, args: list) -> Optional[list]:
if args and args[0] == "qa":
return args[1:]
return None
def _match_docs(self, args: list) -> Optional[list]:
if args and args[0] == "docs":
return args[1:]
return None
def _match_notify(self, args: list) -> Optional[list]:
if args and args[0] in ["notify", "notifications"]:
return args[1:]
return None
def _match_history(self, args: list) -> Optional[list]:
if args and args[0] == "history":
return args[1:]
return None
def _match_work_on(self, args: list) -> Optional[list]:
if len(args) >= 3 and args[0] == "work" and args[1] == "on":
return args[2:]
return None
def _match_think_deep(self, args: list) -> Optional[list]:
if len(args) >= 3 and args[0] == "think" and args[1] == "deep":
return args[2:]
return None
def _match_research(self, args: list) -> Optional[list]:
# Match: research <topic>
if args and args[0] == "research":
return args[1:]
# Match: deep research <topic>
if len(args) >= 2 and args[0] == "deep" and args[1] == "research":
return args[2:]
# Match: web research <topic>
if len(args) >= 2 and args[0] == "web" and args[1] == "research":
return args[2:]
return None
def _match_research_update(self, args: list) -> Optional[list]:
if args and args[0] == "research-update":
return args[1:]
return None
def _match_research_graph(self, args: list) -> Optional[list]:
if args and args[0] == "research-graph":
return args[1:]
return None
def _match_research_list(self, args: list) -> Optional[list]:
if args and args[0] == "research-list":
return args[1:]
return None
def _match_research_show(self, args: list) -> Optional[list]:
if args and args[0] == "research-show":
return args[1:]
return None
def _match_research_knowledge(self, args: list) -> Optional[list]:
if args and args[0] == "research-knowledge":
return args[1:]
return None
def _match_fix(self, args: list) -> Optional[list]:
if args and args[0] == "fix":
return args[1:]
return None
def _match_project_task(self, args: list) -> Optional[list]:
if args and args[0] in self.projects:
return args # [project, task, ...]
return None
def _match_exec(self, args: list) -> Optional[list]:
if args and args[0] == "--exec":
return args[1:]
return None
def _match_write(self, args: list) -> Optional[list]:
if args and args[0] == "--write":
return args[1:]
return None
def _match_read(self, args: list) -> Optional[list]:
if args and args[0] == "--read":
return args[1:]
return None
def _match_context(self, args: list) -> Optional[list]:
if args and args[0] == "--context":
return args[1:]
return None
def _route_exec(self, config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia --exec <project> <command>"""
if len(args) < 2:
print(json.dumps({"error": "Usage: luzia --exec <project> <command>"}))
return 1
result = cmd_exec_raw(config, args[0], " ".join(args[1:]))
print(json.dumps(result))
return 0 if result.get("success") else 1
def _route_write(self, config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia --write <project> <path> <content>"""
if len(args) < 3:
print(json.dumps({"error": "Usage: luzia --write <project> <path> <content>"}))
return 1
if args[2] == "-":
content = sys.stdin.read()
else:
content = " ".join(args[2:])
result = cmd_write_file(config, args[0], args[1], content)
print(json.dumps(result))
return 0 if result.get("success") else 1
def _route_read(self, config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia --read <project> <path>"""
if len(args) < 2:
print(json.dumps({"error": "Usage: luzia --read <project> <path>"}))
return 1
result = cmd_read_file(config, args[0], args[1])
print(json.dumps(result))
return 0 if result.get("success") else 1
def _route_context(self, config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia --context <project>"""
if not args:
print(json.dumps({"error": "Usage: luzia --context <project>"}))
return 1
context = get_project_context(args[0], config)
print(json.dumps({"context": context}))
return 0
def dispatch(self, args: list) -> int:
"""Route and dispatch to appropriate handler"""
for pattern_fn, handler_fn, desc in self.routes:
matched_args = pattern_fn(args)
if matched_args is not None:
return handler_fn(self.config, matched_args, {})
# No match found
if args:
print(f"Unknown: {args[0]}")
print("Run 'luzia --help' for usage")
return 1
def main():
global VERBOSE, BACKGROUND
args = sys.argv[1:]
# Check for flags
if "--verbose" in args:
VERBOSE = True
args = [a for a in args if a != "--verbose"]
if "--fg" in args:
BACKGROUND = False
args = [a for a in args if a != "--fg"]
if not args or args[0] in ["-h", "--help", "help"]:
print_help()
return 0
config = load_config()
router = Router(config)
return router.dispatch(args)
if __name__ == "__main__":
sys.exit(main() or 0)