Files
luzia/bin/luzia
admin 45ec8828a1 Add port management skill and CLI commands
- Add lib/port_manager.py for server port allocation
- Rules: dedicated ports, no +1 increment, kill same service on conflict
- Add 'luzia port' CLI commands (list/check/allocate/release/suggest)
- Add .gitignore for __pycache__

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 12:33:33 -03:00

5582 lines
197 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Luzia - Unified Access Point for All Tasks
QUICK START:
luzia --help # Show all commands
luzia list # List all available projects
luzia status # Show current system status
luzia <project> <task> # Run a task in a project
CORE PROJECT COMMANDS:
luzia <project> <task> Execute task in project's Docker container
luzia work on <project> Interactive session (delegates to subagent)
luzia list List all available projects with status
luzia status [project] Show overall or specific project status
luzia stop <project> Stop a running container
luzia history <project> View recent changes in a project
MAINTENANCE & SYSTEM:
luzia cleanup Full maintenance (jobs + containers + logs)
luzia cleanup jobs Clean old job directories only
luzia cleanup containers Stop stale containers only
luzia cleanup --dry-run Preview without deleting
luzia maintenance Show maintenance status and recommendations
luzia jobs [job_id] List all jobs or show specific job
luzia logs [project] View project execution logs
FAILURE MANAGEMENT (Smart Retry):
luzia failures List recent failures with exit codes
luzia failures <job_id> Show detailed failure information
luzia failures --summary Summary breakdown by exit code
luzia failures --auto-retry Auto-retry all fixable failures
luzia retry <job_id> Retry a specific failed job
luzia kill <job_id> Kill a running agent job
KNOWLEDGE GRAPH & QA:
luzia qa Run QA validation checks
luzia qa --sync Sync code to knowledge graph
luzia docs <query> Search all knowledge graphs
luzia docs sysadmin <query> Search sysadmin domain
luzia docs --show <entity> Show entity details from KG
luzia docs --stats Show knowledge graph statistics
luzia docs --sync Sync .md files to KG
PROJECT KNOWLEDGE (Per-Project RAG):
luzia knowledge list List projects with knowledge status
luzia knowledge init <project> Initialize .knowledge/ for a project
luzia knowledge sync <project> Sync from CLAUDE.md to .knowledge/
luzia knowledge search <project> <query> Search project knowledge
luzia knowledge status <project>Show knowledge status for a project
luzia knowledge show <project> Show knowledge contents
RESEARCH (3-Phase Flow):
luzia research [project] <topic> Start research (context → search → synthesize)
luzia deep research [project] <topic> Same as research (alias)
luzia web research [project] <topic> Same as research (alias)
luzia research-list [project] List research sessions
luzia research-show <session_id> Show research session details
luzia research-knowledge [project] Show project knowledge graph
luzia research-update <id> <phase> <json> Update research phase (internal)
luzia research-graph <id> <json> Add to KG (internal)
CODE ANALYSIS:
luzia structure Analyze current orchestrator structure
luzia structure <project> Analyze a specific project
luzia structure . path/src Analyze specific subdirectory
luzia structure --json Output analysis as JSON
luzia structure --no-kg Don't save to knowledge graph
ADVANCED REASONING:
luzia think deep <topic> Deep reasoning via Zen + Gemini 3
luzia fix <issue> Troubleshooting assistant
QUEUE MANAGEMENT:
luzia queue Show queue status
luzia dispatch <job> Dispatch a job to the queue
luzia notify / notifications View notifications
SERVICE MANAGEMENT (Cockpit-friendly):
luzia service start <project> <service> Start a project service
luzia service stop <project> <service> Stop a project service
luzia service status [project] Show running services
luzia service list <project> List available services
TIME METRICS:
luzia metrics Show aggregate task metrics
luzia metrics <project> Show metrics for specific project
luzia metrics --days 30 Show metrics for last 30 days
luzia metrics --by-bucket Show success rate by duration
luzia metrics --baseline Show performance baseline
luzia jobs --timing Show jobs with timing columns
LOW-LEVEL OPERATIONS:
luzia --exec <project> <cmd> Execute raw command (JSON output)
luzia --read <project> <path> Read file contents (JSON output)
luzia --write <project> <path> <content> Write file (JSON output)
luzia --context <project> Get project context (JSON output)
GLOBAL FLAGS:
--help, -h, help Show this help message
--verbose Enable verbose output
--fg Run in foreground (don't background)
--skip-preflight Skip QA preflight checks (use for emergencies)
EXAMPLES:
luzia musica analyze logs
luzia work on overbits
luzia research dss "performance optimization"
luzia failures --summary
luzia cleanup --dry-run
luzia docs "docker setup"
luzia structure --json
See /opt/server-agents/orchestrator/docs/LUZIA_COMMAND_REFERENCE.md for full documentation.
"""
import json
import os
import sys
import subprocess
import re
import sqlite3
import uuid
import time as time_module
import shutil
from pathlib import Path
from typing import Optional, Dict, Any, Tuple, Callable, List
from datetime import datetime
# Add lib to path - resolve symlinks to get real path
script_path = Path(__file__).resolve()
lib_path = script_path.parent.parent / "lib"
sys.path.insert(0, str(lib_path))
# ANSI color codes
class Color:
@staticmethod
def hex_to_ansi(hex_color: str) -> str:
"""Convert hex color to ANSI 256 color code"""
hex_color = hex_color.lstrip('#')
r, g, b = int(hex_color[0:2], 16), int(hex_color[2:4], 16), int(hex_color[4:6], 16)
return f"\033[38;2;{r};{g};{b}m"
@staticmethod
def reset() -> str:
return "\033[0m"
@staticmethod
def bold(text: str, color: str = "") -> str:
return f"\033[1m{color}{text}{Color.reset()}"
@staticmethod
def output(text: str, color: str) -> str:
return f"{color}{text}{Color.reset()}"
try:
from docker_bridge import DockerBridge, cleanup_idle_containers, list_project_containers
except ImportError as e:
print(f"Error: Could not import docker_bridge module: {e}")
print(f"Lib path: {lib_path}")
print("Make sure /opt/server-agents/orchestrator/lib/docker_bridge.py exists")
sys.exit(1)
# Import cockpit module for human-in-the-loop sessions
try:
from cockpit import route_cockpit
COCKPIT_AVAILABLE = True
except ImportError:
COCKPIT_AVAILABLE = False
route_cockpit = None
# Import watchdog module for task monitoring
try:
from task_watchdog import TaskWatchdog
WATCHDOG_AVAILABLE = True
except ImportError:
WATCHDOG_AVAILABLE = False
TaskWatchdog = None
# Import modernized context system (Phase 5 integration)
try:
from luzia_cli_integration import get_project_context_modernized, should_use_new_retriever
MODERNIZED_CONTEXT_AVAILABLE = True
except ImportError:
MODERNIZED_CONTEXT_AVAILABLE = False
# Import time metrics module for task time tracking
try:
from time_metrics import (
create_task_time_metadata,
update_task_completion_metadata,
format_job_with_timing,
format_logs_header,
get_project_metrics,
get_all_projects_metrics,
get_success_by_duration_bucket,
elapsed_since,
format_duration,
format_duration_human,
check_anomaly,
calculate_baseline,
DEFAULT_TIMEZONE
)
TIME_METRICS_AVAILABLE = True
except ImportError as e:
TIME_METRICS_AVAILABLE = False
_log_warning = lambda msg: print(f"Warning: {msg}") # Will be defined later
DEFAULT_TIMEZONE = "America/Montevideo"
# Import QA preflight checks for task validation
try:
from qa_improvements import (
run_preflight_checks,
format_preflight_report,
TimeoutValidator,
PrivilegeChecker,
ServiceHealthChecker,
ContainerCapabilityChecker,
DurationLearner
)
QA_PREFLIGHT_AVAILABLE = True
except ImportError as e:
QA_PREFLIGHT_AVAILABLE = False
run_preflight_checks = None
CONFIG_PATH = Path("/opt/server-agents/orchestrator/config.json")
LOG_DIR = Path("/var/log/luz-orchestrator")
JOBS_DIR = Path("/var/log/luz-orchestrator/jobs")
PROJECTS_KG_PATH = Path("/etc/zen-swarm/memory/projects.db")
# Global state
LOG_DIR.mkdir(parents=True, exist_ok=True)
JOBS_DIR.mkdir(parents=True, exist_ok=True)
VERBOSE = False
BACKGROUND = True # Default: dispatch immediately
# --- Knowledge Graph Functions ---
def _kg_get_or_create_entity(conn, name: str, entity_type: str = None) -> str:
"""Get or create an entity in the knowledge graph"""
c = conn.cursor()
c.execute("SELECT id FROM entities WHERE name = ?", (name,))
row = c.fetchone()
if row:
return row[0]
entity_id = str(uuid.uuid4())
c.execute("INSERT INTO entities (id, name, type, created_at) VALUES (?, ?, ?, ?)",
(entity_id, name, entity_type, time_module.time()))
return entity_id
# Retention: keep max 100 changes per project, 30 days max age
KG_MAX_CHANGES_PER_PROJECT = 100
KG_MAX_AGE_DAYS = 30
# Job maintenance settings
JOB_MAX_AGE_DAYS = 3 # Keep completed jobs for 3 days
JOB_FAILED_MAX_AGE_DAYS = 7 # Keep failed jobs longer for debugging
JOB_MAX_COUNT = 50 # Always keep at least last 50 jobs
CONTAINER_MAX_LIFETIME_HOURS = 24 # Max container lifetime
NOTIFICATION_LOG_MAX_LINES = 1000 # Max lines in notifications.log
# Research knowledge graph path (separate from project changes)
RESEARCH_KG_PATH = Path("/etc/zen-swarm/memory/research.db")
# =============================================================================
# PERMISSION SYSTEM (Triple-Check)
# =============================================================================
import grp
import pwd
def get_current_user() -> str:
"""Get the current Unix user running luzia"""
return pwd.getpwuid(os.getuid()).pw_name
def get_user_groups(username: str) -> list:
"""Get all Unix groups a user belongs to"""
groups = [g.gr_name for g in grp.getgrall() if username in g.gr_mem]
try:
primary_gid = pwd.getpwnam(username).pw_gid
primary_group = grp.getgrgid(primary_gid).gr_name
if primary_group not in groups:
groups.append(primary_group)
except KeyError:
pass
return groups
def check_project_permission(username: str, project: str) -> tuple:
"""Triple-check permission to access a project."""
if username in ('admin', 'root'):
return True, "admin access"
user_groups = get_user_groups(username)
if 'operators' in user_groups:
return True, "operators group"
if username == project:
return True, "own project"
if project in user_groups:
return True, f"member of {project} group"
return False, f"user '{username}' not authorized for project '{project}'"
def require_project_permission(project: str) -> None:
"""Enforce project permission. Exits with error if denied."""
username = get_current_user()
allowed, reason = check_project_permission(username, project)
if not allowed:
print(f"Permission denied: {reason}")
sys.exit(126)
if VERBOSE:
print(f"Permission granted: {reason}")
# =============================================================================
# GUEST USER RESTRICTIONS
# =============================================================================
GUEST_ALLOWED_COMMANDS = {
'list', 'status', 'jobs', 'logs', 'queue', 'docs', 'help', '--help', '-h', 'health',
}
GUEST_BLOCKED_COMMANDS = {
'kill', 'cleanup', 'maintenance', 'retry', 'work', 'research', 'think', 'qa', 'dispatch',
}
def is_guest_user() -> bool:
return get_current_user() == 'guest'
def require_guest_permission(command: str, args: list = None) -> None:
"""Enforce guest restrictions."""
if not is_guest_user():
return
cmd = command.lower().strip()
if cmd in GUEST_ALLOWED_COMMANDS:
return
if cmd in GUEST_BLOCKED_COMMANDS or cmd not in GUEST_ALLOWED_COMMANDS:
print(f"Guest restriction: '{cmd}' not available to guest users")
sys.exit(126)
def _kg_prune_old_changes(conn, project_id: str):
"""Prune old change events for a project (retention policy)"""
c = conn.cursor()
now = time_module.time()
max_age_seconds = KG_MAX_AGE_DAYS * 24 * 60 * 60
# Delete relations older than max age
c.execute('''
DELETE FROM relations
WHERE source_id = ? AND created_at < ?
''', (project_id, now - max_age_seconds))
# Keep only the most recent N changes per project
c.execute('''
DELETE FROM relations WHERE id IN (
SELECT r.id FROM relations r
WHERE r.source_id = ?
ORDER BY r.created_at DESC
LIMIT -1 OFFSET ?
)
''', (project_id, KG_MAX_CHANGES_PER_PROJECT))
# Clean up orphaned change_event entities (no relations pointing to them)
c.execute('''
DELETE FROM entities WHERE type = 'change_event' AND id NOT IN (
SELECT target_id FROM relations
)
''')
def log_project_change(project: str, change_type: str, description: str, details: str = None):
"""
Log a change to a project's knowledge graph.
Automatically prunes old entries (>30 days or >100 per project).
Args:
project: Project name (e.g., 'musica', 'overbits')
change_type: Type of change (e.g., 'config_update', 'file_modified', 'deployment')
description: Human-readable description of the change
details: Optional additional details/context
"""
try:
# Ensure KB exists
PROJECTS_KG_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(PROJECTS_KG_PATH)
c = conn.cursor()
# Ensure tables exist
c.execute('''CREATE TABLE IF NOT EXISTS entities (
id TEXT PRIMARY KEY, name TEXT UNIQUE NOT NULL, type TEXT, created_at REAL
)''')
c.execute('''CREATE TABLE IF NOT EXISTS relations (
id TEXT PRIMARY KEY, source_id TEXT, target_id TEXT, relation TEXT NOT NULL,
weight INTEGER DEFAULT 1, context TEXT, created_at REAL
)''')
# Create entities
project_id = _kg_get_or_create_entity(conn, project, "project")
change_name = f"{project}:{change_type}:{datetime.now().strftime('%Y%m%d_%H%M%S')}"
change_id = _kg_get_or_create_entity(conn, change_name, "change_event")
# Build context with timestamp and details
context = json.dumps({
"timestamp": datetime.now().isoformat(),
"description": description,
"details": details,
"source": "luzia"
})
# Create relation: project -> has_change -> change_event
rel_id = str(uuid.uuid4())
c.execute('''INSERT INTO relations (id, source_id, target_id, relation, weight, context, created_at)
VALUES (?, ?, ?, ?, 1, ?, ?)''',
(rel_id, project_id, change_id, f"has_{change_type}", context, time_module.time()))
# Prune old entries (retention policy)
_kg_prune_old_changes(conn, project_id)
conn.commit()
conn.close()
_log(f" [KB] Logged {change_type} for {project}", verbose_only=True)
return True
except Exception as e:
_log(f" [KB] Warning: Could not log to knowledge graph: {e}", verbose_only=True)
return False
def get_project_changes(project: str, limit: int = 10) -> list:
"""Get recent changes for a project from the knowledge graph"""
try:
if not PROJECTS_KG_PATH.exists():
return []
conn = sqlite3.connect(PROJECTS_KG_PATH)
c = conn.cursor()
c.execute('''
SELECT e2.name, r.relation, r.context, r.created_at
FROM entities e1
JOIN relations r ON e1.id = r.source_id
JOIN entities e2 ON r.target_id = e2.id
WHERE e1.name = ? AND e1.type = 'project'
ORDER BY r.created_at DESC
LIMIT ?
''', (project, limit))
results = []
for row in c.fetchall():
try:
ctx = json.loads(row[2]) if row[2] else {}
except:
ctx = {"raw": row[2]}
results.append({
"event": row[0],
"relation": row[1],
"context": ctx,
"timestamp": row[3]
})
conn.close()
return results
except Exception as e:
return []
# --- Research Knowledge Graph Functions ---
def _init_research_db():
"""Initialize research knowledge graph database"""
RESEARCH_KG_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(RESEARCH_KG_PATH)
c = conn.cursor()
# Research sessions table
c.execute('''CREATE TABLE IF NOT EXISTS research_sessions (
id TEXT PRIMARY KEY,
project TEXT NOT NULL,
topic TEXT NOT NULL,
status TEXT DEFAULT 'pending',
created_at REAL,
updated_at REAL,
phase TEXT DEFAULT 'init',
context_expansion TEXT,
search_branches TEXT,
final_synthesis TEXT
)''')
# Research findings table (linked to sessions)
c.execute('''CREATE TABLE IF NOT EXISTS research_findings (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
phase TEXT NOT NULL,
finding_type TEXT,
content TEXT,
source TEXT,
confidence REAL DEFAULT 0.5,
created_at REAL,
FOREIGN KEY (session_id) REFERENCES research_sessions(id)
)''')
# Research graph nodes (concepts, entities discovered)
c.execute('''CREATE TABLE IF NOT EXISTS research_nodes (
id TEXT PRIMARY KEY,
session_id TEXT,
project TEXT,
name TEXT NOT NULL,
node_type TEXT,
description TEXT,
embedding TEXT,
created_at REAL
)''')
# Research graph edges (relationships between nodes)
c.execute('''CREATE TABLE IF NOT EXISTS research_edges (
id TEXT PRIMARY KEY,
source_id TEXT NOT NULL,
target_id TEXT NOT NULL,
relation TEXT NOT NULL,
weight REAL DEFAULT 1.0,
context TEXT,
created_at REAL,
FOREIGN KEY (source_id) REFERENCES research_nodes(id),
FOREIGN KEY (target_id) REFERENCES research_nodes(id)
)''')
# Index for faster lookups
c.execute('CREATE INDEX IF NOT EXISTS idx_sessions_project ON research_sessions(project)')
c.execute('CREATE INDEX IF NOT EXISTS idx_findings_session ON research_findings(session_id)')
c.execute('CREATE INDEX IF NOT EXISTS idx_nodes_project ON research_nodes(project)')
conn.commit()
return conn
def create_research_session(project: str, topic: str) -> str:
"""Create a new research session for a project"""
conn = _init_research_db()
c = conn.cursor()
session_id = str(uuid.uuid4())[:8]
now = time_module.time()
c.execute('''INSERT INTO research_sessions
(id, project, topic, status, created_at, updated_at, phase)
VALUES (?, ?, ?, 'active', ?, ?, 'init')''',
(session_id, project, topic, now, now))
conn.commit()
conn.close()
return session_id
def update_research_phase(session_id: str, phase: str, data: dict):
"""Update research session with phase results"""
conn = _init_research_db()
c = conn.cursor()
now = time_module.time()
if phase == 'context_expansion':
c.execute('''UPDATE research_sessions
SET phase = ?, context_expansion = ?, updated_at = ?
WHERE id = ?''',
(phase, json.dumps(data), now, session_id))
elif phase == 'search_branches':
c.execute('''UPDATE research_sessions
SET phase = ?, search_branches = ?, updated_at = ?
WHERE id = ?''',
(phase, json.dumps(data), now, session_id))
elif phase == 'final_synthesis':
c.execute('''UPDATE research_sessions
SET phase = ?, final_synthesis = ?, status = 'completed', updated_at = ?
WHERE id = ?''',
(phase, json.dumps(data), now, session_id))
conn.commit()
conn.close()
def add_research_finding(session_id: str, phase: str, finding_type: str,
content: str, source: str = None, confidence: float = 0.5):
"""Add a finding to a research session"""
conn = _init_research_db()
c = conn.cursor()
finding_id = str(uuid.uuid4())
now = time_module.time()
c.execute('''INSERT INTO research_findings
(id, session_id, phase, finding_type, content, source, confidence, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
(finding_id, session_id, phase, finding_type, content, source, confidence, now))
conn.commit()
conn.close()
return finding_id
def add_research_node(session_id: str, project: str, name: str,
node_type: str, description: str = None) -> str:
"""Add a concept/entity node to the research graph"""
conn = _init_research_db()
c = conn.cursor()
# Check if node already exists for this project
c.execute('SELECT id FROM research_nodes WHERE project = ? AND name = ?',
(project, name))
existing = c.fetchone()
if existing:
conn.close()
return existing[0]
node_id = str(uuid.uuid4())
now = time_module.time()
c.execute('''INSERT INTO research_nodes
(id, session_id, project, name, node_type, description, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)''',
(node_id, session_id, project, name, node_type, description, now))
conn.commit()
conn.close()
return node_id
def add_research_edge(source_id: str, target_id: str, relation: str,
context: str = None, weight: float = 1.0):
"""Add a relationship edge between research nodes"""
conn = _init_research_db()
c = conn.cursor()
edge_id = str(uuid.uuid4())
now = time_module.time()
c.execute('''INSERT INTO research_edges
(id, source_id, target_id, relation, weight, context, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)''',
(edge_id, source_id, target_id, relation, weight, context, now))
conn.commit()
conn.close()
return edge_id
def get_project_research_context(project: str, limit: int = 5) -> list:
"""Get recent research sessions and their findings for a project"""
try:
if not RESEARCH_KG_PATH.exists():
return []
conn = sqlite3.connect(RESEARCH_KG_PATH)
c = conn.cursor()
c.execute('''SELECT id, topic, status, phase, context_expansion,
search_branches, final_synthesis, created_at
FROM research_sessions
WHERE project = ?
ORDER BY created_at DESC
LIMIT ?''', (project, limit))
sessions = []
for row in c.fetchall():
session = {
"id": row[0],
"topic": row[1],
"status": row[2],
"phase": row[3],
"context_expansion": json.loads(row[4]) if row[4] else None,
"search_branches": json.loads(row[5]) if row[5] else None,
"final_synthesis": json.loads(row[6]) if row[6] else None,
"created_at": row[7]
}
sessions.append(session)
conn.close()
return sessions
except Exception as e:
return []
def get_research_graph(project: str) -> dict:
"""Get the research knowledge graph for a project"""
try:
if not RESEARCH_KG_PATH.exists():
return {"nodes": [], "edges": []}
conn = sqlite3.connect(RESEARCH_KG_PATH)
c = conn.cursor()
# Get nodes
c.execute('''SELECT id, name, node_type, description
FROM research_nodes WHERE project = ?''', (project,))
nodes = [{"id": r[0], "name": r[1], "type": r[2], "description": r[3]}
for r in c.fetchall()]
# Get edges for these nodes
node_ids = [n["id"] for n in nodes]
if node_ids:
placeholders = ','.join('?' * len(node_ids))
c.execute(f'''SELECT source_id, target_id, relation, weight
FROM research_edges
WHERE source_id IN ({placeholders})''', node_ids)
edges = [{"source": r[0], "target": r[1], "relation": r[2], "weight": r[3]}
for r in c.fetchall()]
else:
edges = []
conn.close()
return {"nodes": nodes, "edges": edges}
except Exception as e:
return {"nodes": [], "edges": []}
def load_config() -> dict:
"""Load orchestrator configuration"""
try:
with open(CONFIG_PATH) as f:
return json.load(f)
except Exception as e:
print(f"Error loading config: {e}")
sys.exit(1)
def _log(msg: str, verbose_only: bool = False):
"""Conditionally print verbose messages"""
if verbose_only and not VERBOSE:
return
print(msg)
# --- Maintenance Functions ---
def _get_actual_job_status(job_dir: Path) -> str:
"""Get actual job status by checking output.log for exit code.
This is needed because meta.json status isn't updated when job completes.
The job's shell script appends "exit:<code>" to output.log on completion.
"""
output_file = job_dir / "output.log"
meta_file = job_dir / "meta.json"
# Start with meta.json status
status = "unknown"
if meta_file.exists():
try:
with open(meta_file) as f:
meta = json.load(f)
status = meta.get("status", "unknown")
except:
pass
# Check output.log for actual completion
if output_file.exists():
try:
content = output_file.read_text()
if "exit:" in content:
# Find exit code to determine if failed
lines = content.strip().split("\n")
for line in reversed(lines):
if line.startswith("exit:"):
exit_code = int(line.split(":")[1])
if exit_code == 0:
return "completed"
elif exit_code == -9:
return "killed"
else:
return "failed"
except:
pass
return status
def cleanup_old_jobs(dry_run: bool = False) -> dict:
"""
Clean up old job directories based on retention policy.
Policy:
- Never delete running jobs
- Keep last JOB_MAX_COUNT jobs regardless of age
- Delete completed jobs older than JOB_MAX_AGE_DAYS
- Delete failed jobs older than JOB_FAILED_MAX_AGE_DAYS
Returns dict with cleanup statistics.
"""
stats = {"checked": 0, "deleted": 0, "kept": 0, "errors": 0, "bytes_freed": 0}
if not JOBS_DIR.exists():
return stats
# Collect all jobs with metadata
jobs = []
for job_dir in JOBS_DIR.iterdir():
if not job_dir.is_dir():
continue
meta_file = job_dir / "meta.json"
if not meta_file.exists():
continue
try:
with open(meta_file) as f:
meta = json.load(f)
# Get actual status by checking output.log
actual_status = _get_actual_job_status(job_dir)
meta["status"] = actual_status
# Calculate directory size
dir_size = sum(f.stat().st_size for f in job_dir.rglob('*') if f.is_file())
jobs.append({
"dir": job_dir,
"meta": meta,
"size": dir_size,
"started": meta.get("started", "1970-01-01T00:00:00")
})
except Exception as e:
_log(f" Warning: Could not read {meta_file}: {e}", verbose_only=True)
stats["errors"] += 1
# Sort by start time (newest first)
jobs.sort(key=lambda x: x["started"], reverse=True)
now = datetime.now()
kept_count = 0
for job in jobs:
stats["checked"] += 1
job_dir = job["dir"]
meta = job["meta"]
status = meta.get("status", "unknown")
# Parse start time
try:
started = datetime.fromisoformat(meta.get("started", "1970-01-01T00:00:00"))
except:
started = datetime.fromtimestamp(0)
age_days = (now - started).total_seconds() / 86400
# Decision logic
should_delete = False
reason = ""
# Never delete running jobs
if status == "running":
reason = "running"
# Always keep first JOB_MAX_COUNT jobs
elif kept_count < JOB_MAX_COUNT:
reason = "within_limit"
kept_count += 1
# Age-based deletion
else:
if status == "failed" and age_days > JOB_FAILED_MAX_AGE_DAYS:
should_delete = True
reason = f"failed_old ({age_days:.1f}d)"
elif status != "failed" and age_days > JOB_MAX_AGE_DAYS:
should_delete = True
reason = f"completed_old ({age_days:.1f}d)"
else:
reason = "recent"
kept_count += 1
if should_delete:
if dry_run:
_log(f" [DRY] Would delete {job_dir.name} ({reason}, {job['size']/1024:.1f}KB)")
else:
try:
shutil.rmtree(job_dir)
stats["deleted"] += 1
stats["bytes_freed"] += job["size"]
_log(f" Deleted {job_dir.name} ({reason})", verbose_only=True)
except Exception as e:
_log(f" Error deleting {job_dir.name}: {e}")
stats["errors"] += 1
else:
stats["kept"] += 1
return stats
def cleanup_stale_containers(max_lifetime_hours: int = CONTAINER_MAX_LIFETIME_HOURS) -> dict:
"""
Stop containers that have exceeded maximum lifetime.
Also cleans up orphaned containers (no matching job record).
Returns dict with cleanup statistics.
"""
stats = {"checked": 0, "stopped": 0, "orphaned": 0, "errors": 0}
containers = list_project_containers()
now = datetime.now()
for container in containers:
stats["checked"] += 1
name = container.get("name", "")
# Parse container creation time
created_str = container.get("created", "")
try:
# Docker returns format like "2025-01-07 16:31:45 +0000 UTC"
created = datetime.strptime(created_str[:19], "%Y-%m-%d %H:%M:%S")
except:
_log(f" Warning: Could not parse creation time for {name}", verbose_only=True)
continue
age_hours = (now - created).total_seconds() / 3600
if age_hours > max_lifetime_hours:
_log(f" Stopping {name} (age: {age_hours:.1f}h > {max_lifetime_hours}h)", verbose_only=True)
try:
subprocess.run(["docker", "stop", name], capture_output=True, timeout=30)
subprocess.run(["docker", "rm", name], capture_output=True, timeout=10)
stats["stopped"] += 1
except Exception as e:
_log(f" Error stopping {name}: {e}")
stats["errors"] += 1
return stats
def rotate_notifications_log(max_lines: int = NOTIFICATION_LOG_MAX_LINES) -> dict:
"""
Rotate notifications.log to keep only the last max_lines.
Returns dict with rotation statistics.
"""
stats = {"rotated": False, "lines_before": 0, "lines_after": 0}
notify_file = LOG_DIR / "notifications.log"
if not notify_file.exists():
return stats
try:
with open(notify_file, "r") as f:
lines = f.readlines()
stats["lines_before"] = len(lines)
if len(lines) > max_lines:
# Keep only last max_lines
with open(notify_file, "w") as f:
f.writelines(lines[-max_lines:])
stats["lines_after"] = max_lines
stats["rotated"] = True
_log(f" Rotated notifications.log: {len(lines)} -> {max_lines} lines", verbose_only=True)
else:
stats["lines_after"] = len(lines)
except Exception as e:
_log(f" Error rotating notifications.log: {e}")
return stats
def get_maintenance_status() -> dict:
"""
Get current maintenance status including:
- Job statistics
- Container status
- Disk usage
- Log file sizes
"""
status = {
"jobs": {"total": 0, "running": 0, "completed": 0, "failed": 0, "oldest_days": 0},
"containers": {"total": 0, "oldest_hours": 0},
"disk": {"jobs_mb": 0, "logs_mb": 0},
"notifications": {"lines": 0}
}
# Job statistics
if JOBS_DIR.exists():
now = datetime.now()
oldest_age = 0
for job_dir in JOBS_DIR.iterdir():
if not job_dir.is_dir():
continue
meta_file = job_dir / "meta.json"
if not meta_file.exists():
continue
try:
with open(meta_file) as f:
meta = json.load(f)
status["jobs"]["total"] += 1
# Get actual status by checking output.log (meta.json isn't updated)
job_status = _get_actual_job_status(job_dir)
if job_status == "running":
status["jobs"]["running"] += 1
elif job_status in ("failed", "killed"):
status["jobs"]["failed"] += 1
else:
status["jobs"]["completed"] += 1
# Calculate age
try:
started = datetime.fromisoformat(meta.get("started", "1970-01-01"))
age_days = (now - started).total_seconds() / 86400
oldest_age = max(oldest_age, age_days)
except:
pass
except:
pass
status["jobs"]["oldest_days"] = round(oldest_age, 1)
# Calculate disk usage
try:
jobs_size = sum(f.stat().st_size for f in JOBS_DIR.rglob('*') if f.is_file())
status["disk"]["jobs_mb"] = round(jobs_size / (1024 * 1024), 2)
except:
pass
# Container statistics
containers = list_project_containers()
status["containers"]["total"] = len(containers)
if containers:
now = datetime.now()
oldest_hours = 0
for c in containers:
try:
created = datetime.strptime(c.get("created", "")[:19], "%Y-%m-%d %H:%M:%S")
age_hours = (now - created).total_seconds() / 3600
oldest_hours = max(oldest_hours, age_hours)
except:
pass
status["containers"]["oldest_hours"] = round(oldest_hours, 1)
# Notification log
notify_file = LOG_DIR / "notifications.log"
if notify_file.exists():
try:
with open(notify_file, "r") as f:
status["notifications"]["lines"] = sum(1 for _ in f)
except:
pass
# Log directory size
try:
logs_size = sum(f.stat().st_size for f in LOG_DIR.glob('*.log') if f.is_file())
status["disk"]["logs_mb"] = round(logs_size / (1024 * 1024), 2)
except:
pass
return status
def run_maintenance(dry_run: bool = False) -> dict:
"""
Run full maintenance cycle:
1. Clean old jobs
2. Stop stale containers
3. Rotate logs
4. Run idle container cleanup
Returns combined statistics.
"""
results = {
"jobs": cleanup_old_jobs(dry_run=dry_run),
"containers": cleanup_stale_containers() if not dry_run else {"skipped": True},
"logs": rotate_notifications_log() if not dry_run else {"skipped": True},
"idle_cleanup": {"done": False}
}
# Also run idle container cleanup
if not dry_run:
try:
cleanup_idle_containers(timeout_minutes=10)
results["idle_cleanup"]["done"] = True
except Exception as e:
results["idle_cleanup"]["error"] = str(e)
return results
def spawn_background_job(project: str, command: str, log_file: Path, job_type: str = "docker") -> str:
"""Spawn a background job, return job ID immediately"""
job_id = datetime.now().strftime("%H%M%S") + "-" + hex(hash(command) & 0xffff)[2:]
job_dir = JOBS_DIR / job_id
job_dir.mkdir(exist_ok=True)
# Write job metadata
with open(job_dir / "meta.json", "w") as f:
json.dump({
"id": job_id,
"project": project,
"command": command,
"type": job_type,
"started": datetime.now().isoformat(),
"status": "running"
}, f)
output_file = job_dir / "output.log"
# Spawn fully detached via nohup - parent exits immediately
os.system(
f'nohup sh -c \'docker exec luzia-{project} bash -c "{command}" > "{output_file}" 2>&1; '
f'echo "exit:$?" >> "{output_file}"\' >/dev/null 2>&1 &'
)
return job_id
def is_claude_dev_task(task: str) -> bool:
"""Detect if a task is related to Claude development (skills, plugins, agents, etc.)
When detected, agents should run with --debug flag for better visibility.
"""
task_lower = task.lower()
# Keywords that indicate Claude/agent development work
claude_dev_keywords = [
# Skills and plugins
'skill', 'plugin', 'command',
# Agent development
'sub-agent', 'subagent', 'agent',
# MCP development
'mcp', 'mcp server', 'mcp-server',
# Claude config
'.claude', 'claude.md', 'claude.json',
# Hooks
'hook',
# Luzia itself
'luzia', 'orchestrat',
# Debug explicitly requested
'debug mode', 'debug flag', 'with debug',
]
return any(kw in task_lower for kw in claude_dev_keywords)
def spawn_claude_agent(project: str, task: str, context: str, config: dict,
skip_preflight: bool = False) -> str:
"""Spawn a detached Claude agent to handle a natural language task.
IMPORTANT: Agents run with full permissions (--dangerously-skip-permissions)
regardless of how the parent session was started. This ensures autonomous
background execution without blocking on approval prompts.
SMART DEBUG: For Claude development tasks (skills, plugins, agents, MCP),
automatically enables --debug flag for better visibility.
AUTO-MAINTENANCE: Cleans up old jobs before spawning new ones to prevent
unbounded growth of job directories.
QA PREFLIGHT: Runs 5 validation checks before dispatching to catch issues early.
Use skip_preflight=True or --skip-preflight flag to bypass for emergencies.
"""
# --- QA PREFLIGHT CHECKS ---
# Run preflight validation unless explicitly skipped
preflight_report = None
qa_config = config.get("qa_preflight", {})
qa_enabled = qa_config.get("enabled", True) and QA_PREFLIGHT_AVAILABLE
if qa_enabled and not skip_preflight and run_preflight_checks:
try:
preflight_task = {
'id': f'{project}-pending',
'title': task[:100],
'description': task
}
approved, preflight_report = run_preflight_checks(preflight_task)
# Log preflight results
if preflight_report.get('warnings'):
for warn in preflight_report['warnings']:
_log(f" [Preflight] Warning: {warn}", verbose_only=False)
if not approved:
# Task blocked by preflight checks
for err in preflight_report.get('errors', []):
_log(f" [Preflight] BLOCKED: {err}", verbose_only=False)
# Log to knowledge graph
log_project_change(
project=project,
change_type="preflight_blocked",
description=f"Task blocked by preflight: {task[:50]}...",
details=json.dumps(preflight_report)
)
# Return a special job_id indicating blocked task
return f"BLOCKED:{preflight_report['errors'][0][:50]}"
# Log successful preflight
_log(f" [Preflight] Approved (timeout: {preflight_report.get('recommended_timeout', 300)}s)",
verbose_only=True)
except Exception as e:
_log(f" [Preflight] Check failed (proceeding anyway): {e}", verbose_only=True)
# Run lightweight maintenance before spawning (non-blocking)
# Only clean if we have many jobs to avoid overhead on every spawn
try:
job_count = sum(1 for d in JOBS_DIR.iterdir() if d.is_dir()) if JOBS_DIR.exists() else 0
if job_count > JOB_MAX_COUNT:
cleanup_old_jobs(dry_run=False)
_log(f" [Auto-cleanup] Pruned old jobs (was {job_count})", verbose_only=True)
except Exception as e:
_log(f" [Auto-cleanup] Warning: {e}", verbose_only=True)
job_id = datetime.now().strftime("%H%M%S") + "-" + hex(hash(task) & 0xffff)[2:]
job_dir = JOBS_DIR / job_id
job_dir.mkdir(exist_ok=True)
project_config = config["projects"].get(project, {})
project_path = project_config.get("path", f"/home/{project}")
# Determine which user to run as
# - Use explicit "user" from config if specified
# - For luzia project, run as admin (infrastructure)
# - Otherwise run as the project user
run_as_user = project_config.get("user", project if project != "luzia" else "admin")
user_home = f"/home/{run_as_user}" if run_as_user != "admin" else "/home/admin"
# Detect Claude development tasks - enable debug for better visibility
debug_mode = is_claude_dev_task(task)
# Build the prompt for the agent
# Try to load autonomous learning KG tools
kg_learning_section = ""
try:
sys.path.insert(0, "/home/admin/.luzia/learning")
from kg_mcp_server import get_kg_server
kg_server = get_kg_server()
kg_status = kg_server.get_kg_status()
kg_learning_section = f"""
## Autonomous Learning Knowledge Graph - ENHANCED
The system maintains an autonomous learning knowledge graph with {kg_status['entities']} learned solutions:
**Available Learning Tools:**
- `kg_search`: Search for similar problems/solutions from prior tasks
- `kg_store`: Store solutions or insights learned from this task
- `kg_details`: Get full details about a specific learned solution
- `kg_relate`: Create relationships between learned concepts
**How to use:**
1. **Start task**: Use `kg_search` to find if similar problems have been solved before
2. **During task**: Use findings to inform your approach
3. **After task**: Use `kg_store` to save important learnings for future agents
4. **Advanced**: Use `kg_relate` to connect related solutions
**Important**: These are NOT standard Claude tools. They will only work if you call them with the proper syntax. Assume they're available when working on tasks."""
except Exception as e:
kg_learning_section = f"\n## Autonomous Learning (Not Available)\nNote: Learning KG is not available in this session ({str(e)[:50]})"
prompt = f"""You are a project agent working on the **{project}** project.
{context}
## Your Task
{task}
## Execution Environment
- You are running as user: {run_as_user}
- You are running directly in the project directory: {project_path}
- You have FULL permission to read, write, and execute files in this directory
- Use standard Claude tools (Read, Write, Edit, Bash) directly - no need for luzia subcommands
- All file operations are pre-authorized - proceed without asking for permission
## Knowledge Graph - IMPORTANT
Use the **shared/global knowledge graph** for storing and retrieving knowledge:
- Use `mcp__shared-projects-memory__store_fact` to store facts/relations
- Use `mcp__shared-projects-memory__query_relations` to query knowledge
- Use `mcp__shared-projects-memory__search_context` to search
- Do NOT use `mcp__memory__*` tools (that's personal/local memory)
- The shared KG is at /etc/zen-swarm/memory/ and shared across all agents
{kg_learning_section}
## Guidelines
- Complete the task autonomously
- If you encounter errors, debug and fix them
- Store important findings in the shared knowledge graph
- When available, use learning tools to leverage prior solutions
- Provide a summary of what was done when complete
## Task Completion Protocol (IMPORTANT)
When you finish your work, you MUST complete these steps before ending:
1. **Document Changes**: If you made significant changes, update relevant documentation:
- Update CLAUDE.md if project behavior changed
- Update README.md if user-facing features changed
- Add comments to complex code sections
2. **Run Tests (REQUIRED)**: Before committing, verify your changes don't break existing functionality:
```bash
# Run project tests (if tests/ directory exists)
if [ -d tests/ ]; then
pytest tests/ -v --tb=short
fi
# If tests fail, FIX THE ISSUES before proceeding to commit
```
- If tests fail, debug and fix the failures
- If you added new functionality, add corresponding tests
- Update existing tests if your changes require test modifications
3. **Git Commit**: Commit your changes with a descriptive message:
```bash
git add -A
git commit -m "Task: <brief description of what was done>"
```
4. **Knowledge Graph**: Store key learnings using mcp__shared-projects-memory__store_fact
Note: The project state was automatically snapshot before this task started.
If you need to revert, the previous state can be recovered from git history."""
output_file = job_dir / "output.log"
prompt_file = job_dir / "prompt.txt"
pid_file = job_dir / "pid"
# Load global context from /etc/claude/GLOBAL.md (non-modifiable by users)
# This is prepended to the prompt so all agents get server-wide rules
global_context = ""
global_context_file = "/etc/claude/GLOBAL.md"
if os.path.exists(global_context_file):
try:
with open(global_context_file, 'r') as f:
global_context = f"""
<system-reminder>
GLOBAL SERVER CONTEXT (from /etc/claude/GLOBAL.md - read-only, system-wide rules):
{f.read()}
</system-reminder>
"""
except Exception as e:
log_warning(f"Could not read global context: {e}")
# Write prompt to file for claude to read
# Global context is prepended as a system reminder
with open(prompt_file, "w") as f:
f.write(global_context + prompt)
# Make files readable by target user
os.chmod(prompt_file, 0o644)
os.chmod(job_dir, 0o755)
# Spawn Claude agent detached - runs independently of admin CLI
# CRITICAL: Use --dangerously-skip-permissions for autonomous background execution
# This ensures agents don't block on approval prompts regardless of parent session settings
# Track PID, notify on completion
notify_cmd = f'echo "[$(date +%H:%M:%S)] Agent {job_id} finished (exit $exit_code)" >> /var/log/luz-orchestrator/notifications.log && python3 /opt/server-agents/orchestrator/lib/qa_postflight.py {job_id} >> /var/log/luz-orchestrator/postflight.log 2>&1 &'
# Build claude command with appropriate flags
# - Always: --dangerously-skip-permissions (full autonomy)
# - Always: --add-dir for project path (allow file operations in project)
# - Claude dev tasks: --debug (better visibility for skill/plugin/agent work)
# - Use --print for non-interactive output mode
# - Use --verbose for progress visibility
debug_flag = "--debug " if debug_mode else ""
# Add project path AND /opt/server-agents to allowed directories
# This ensures agents can read/write project files and access orchestrator tools
# Use --permission-mode bypassPermissions to avoid any interactive prompts
# Global context is appended to the prompt file itself
# This avoids shell escaping issues with --append-system-prompt
claude_cmd = f'claude --dangerously-skip-permissions --permission-mode bypassPermissions --add-dir "{project_path}" --add-dir /opt/server-agents --print --verbose {debug_flag}-p'
# Run as the project user using their home directory config
# No need to copy configs - user's own ~/.claude/ will be used
# Use sudo -u to switch to project user
if run_as_user == get_current_user():
# Running as same user, no sudo needed
sudo_prefix = ""
else:
# Switch to project user
sudo_prefix = f"sudo -u {run_as_user} "
# Create user-specific temp directory to avoid /tmp collisions
# Claude CLI uses temp files that can conflict between users
user_tmp_dir = f"{user_home}/.tmp"
# Build shell script to avoid quote escaping issues
# Set TMPDIR to user's own temp directory to prevent permission conflicts
script_file = job_dir / "run.sh"
with open(script_file, "w") as f:
f.write(f'''#!/bin/bash
echo $$ > "{pid_file}"
# Create user-specific temp directory if needed
{sudo_prefix}mkdir -p "{user_tmp_dir}"
{sudo_prefix}chmod 700 "{user_tmp_dir}"
# Set TMPDIR to user's home to avoid /tmp collisions between users
export TMPDIR="{user_tmp_dir}"
export TEMP="{user_tmp_dir}"
export TMP="{user_tmp_dir}"
# Also set HOME explicitly for the target user
export HOME="{user_home}"
# PRE-EXECUTION: Snapshot project state before task starts
# This allows reverting if the task causes issues
{sudo_prefix}bash -c 'cd "{project_path}" && if [ -d .git ]; then git add -A 2>/dev/null && git commit -m "Pre-task snapshot: {job_id}" --allow-empty 2>/dev/null || true; fi'
echo "[Pre-task] Project state snapshot created" >> "{output_file}"
# Use stdbuf for unbuffered output so logs are captured in real-time
# Also use script -q to capture all terminal output including prompts
{sudo_prefix}bash -c 'export TMPDIR="{user_tmp_dir}" HOME="{user_home}"; cd "{project_path}" && cat "{prompt_file}" | stdbuf -oL -eL {claude_cmd}' 2>&1 | tee "{output_file}"
exit_code=${{PIPESTATUS[0]}}
echo "" >> "{output_file}"
echo "exit:$exit_code" >> "{output_file}"
{notify_cmd}
''')
os.chmod(script_file, 0o755)
os.system(f'nohup "{script_file}" >/dev/null 2>&1 &')
# Wait briefly for PID file
import time
time.sleep(0.2)
pid = None
if pid_file.exists():
pid = pid_file.read_text().strip()
# Create time metrics for task tracking
time_metrics_data = {}
if TIME_METRICS_AVAILABLE:
try:
time_metrics_data = create_task_time_metadata(job_id, project)
except Exception as e:
_log(f" [Time] Warning: Could not create time metrics: {e}", verbose_only=True)
# Write job metadata with PID and time metrics
job_meta = {
"id": job_id,
"project": project,
"task": task,
"type": "agent",
"user": run_as_user,
"pid": pid,
"started": datetime.now().isoformat(),
"status": "running",
"debug": debug_mode
}
# Add time metrics if available
if time_metrics_data:
job_meta.update(time_metrics_data)
# Add preflight report if available
if preflight_report:
job_meta["preflight"] = {
"approved": preflight_report.get("approved", True),
"recommended_timeout": preflight_report.get("recommended_timeout", 300),
"warnings": preflight_report.get("warnings", []),
"checks": list(preflight_report.get("checks", {}).keys())
}
with open(job_dir / "meta.json", "w") as f:
json.dump(job_meta, f, indent=2)
# Log to project knowledge graph
log_project_change(
project=project,
change_type="agent_task",
description=f"Agent task dispatched: {task[:100]}{'...' if len(task) > 100 else ''}",
details=json.dumps({"job_id": job_id, "full_task": task})
)
return job_id
def get_job_status(job_id: str, update_completion: bool = True) -> dict:
"""Get status of a background job.
Args:
job_id: Job identifier
update_completion: If True, update meta.json with completion time metrics
when job is detected as completed for the first time.
"""
job_dir = JOBS_DIR / job_id
if not job_dir.exists():
return {"error": f"Job {job_id} not found"}
meta_file = job_dir / "meta.json"
output_file = job_dir / "output.log"
with open(meta_file) as f:
meta = json.load(f)
# Track if status changed to completed
was_running = meta.get("status") == "running"
exit_code = None
# Check if completed (look for exit code in output)
if output_file.exists():
content = output_file.read_text()
if "exit:" in content:
lines = content.strip().split("\n")
for line in reversed(lines):
if line.startswith("exit:"):
meta["status"] = "completed"
exit_code = int(line.split(":")[1])
meta["exit_code"] = exit_code
break
# Add completion time metrics if job just completed
if update_completion and was_running and meta.get("status") == "completed":
if TIME_METRICS_AVAILABLE and exit_code is not None:
try:
completion_data = update_task_completion_metadata(meta, exit_code)
meta.update(completion_data)
# Persist updated metadata
with open(meta_file, "w") as f:
json.dump(meta, f, indent=2)
except Exception as e:
_log(f" [Time] Warning: Could not update completion metrics: {e}", verbose_only=True)
# Add elapsed time for running jobs
if meta.get("status") == "running" and TIME_METRICS_AVAILABLE:
dispatch_time = meta.get("time_metrics", {}).get("dispatch", {}).get("utc_time")
if dispatch_time:
meta["elapsed"] = elapsed_since(dispatch_time)
return meta
def list_jobs() -> list:
"""List all jobs"""
jobs = []
for job_dir in sorted(JOBS_DIR.iterdir(), reverse=True):
if job_dir.is_dir():
meta_file = job_dir / "meta.json"
if meta_file.exists():
status = get_job_status(job_dir.name)
jobs.append(status)
return jobs[:20] # Last 20
def kill_agent(job_id: str) -> dict:
"""Kill a running agent by job ID"""
job_dir = JOBS_DIR / job_id
if not job_dir.exists():
return {"error": f"Job {job_id} not found"}
meta_file = job_dir / "meta.json"
pid_file = job_dir / "pid"
output_file = job_dir / "output.log"
with open(meta_file) as f:
meta = json.load(f)
if meta.get("status") == "completed":
return {"error": f"Job {job_id} already completed"}
# Try to kill by PID
killed = False
if pid_file.exists():
pid = pid_file.read_text().strip()
try:
os.kill(int(pid), 9)
killed = True
except (ProcessLookupError, ValueError):
pass
# Also try to find and kill claude process for this job
result = subprocess.run(
["pgrep", "-f", f"{job_id}"],
capture_output=True, text=True
)
for pid in result.stdout.strip().split("\n"):
if pid:
try:
os.kill(int(pid), 9)
killed = True
except (ProcessLookupError, ValueError):
pass
# Update metadata
meta["status"] = "killed"
meta["killed_at"] = datetime.now().isoformat()
with open(meta_file, "w") as f:
json.dump(meta, f)
# Append to output
with open(output_file, "a") as f:
f.write(f"\n[KILLED at {datetime.now().strftime('%H:%M:%S')}]\nexit:-9\n")
# Notify
notify_file = LOG_DIR / "notifications.log"
with open(notify_file, "a") as f:
f.write(f"[{datetime.now().strftime('%H:%M:%S')}] Agent {job_id} KILLED by user\n")
return {"success": True, "job_id": job_id, "killed": killed}
def get_notifications(limit: int = 10) -> list:
"""Get recent notifications"""
notify_file = LOG_DIR / "notifications.log"
if not notify_file.exists():
return []
lines = notify_file.read_text().strip().split("\n")
return lines[-limit:] if lines else []
# --- Exit Code Classification for Smart Retry ---
# Classify exit codes to determine if failure is retryable
EXIT_CODE_INFO = {
0: {"meaning": "Success", "retryable": False},
1: {"meaning": "General error", "retryable": True, "reason": "Task error - may succeed on retry"},
2: {"meaning": "Shell misuse", "retryable": False, "reason": "Syntax or usage error"},
126: {"meaning": "Permission denied", "retryable": False, "reason": "File not executable"},
127: {"meaning": "Command not found", "retryable": False, "reason": "Missing binary/command"},
128: {"meaning": "Invalid exit code", "retryable": False},
130: {"meaning": "SIGINT (Ctrl+C)", "retryable": True, "reason": "Interrupted - may complete on retry"},
137: {"meaning": "SIGKILL (OOM)", "retryable": True, "reason": "Out of memory - may succeed with less load"},
143: {"meaning": "SIGTERM", "retryable": True, "reason": "Terminated - may succeed on retry"},
254: {"meaning": "Claude CLI error", "retryable": True, "reason": "Claude CLI issue - often transient"},
255: {"meaning": "Exit status out of range", "retryable": False},
-9: {"meaning": "Killed by user", "retryable": False, "reason": "Manually killed - don't auto-retry"},
}
def get_exit_code_info(exit_code: int) -> dict:
"""Get information about an exit code"""
if exit_code in EXIT_CODE_INFO:
return EXIT_CODE_INFO[exit_code]
if 128 <= exit_code <= 192:
signal_num = exit_code - 128
return {"meaning": f"Signal {signal_num}", "retryable": signal_num in [1, 2, 15]}
return {"meaning": "Unknown", "retryable": False}
def is_failure_retryable(exit_code: int) -> tuple:
"""Check if a failure is retryable.
Returns (is_retryable: bool, reason: str)
"""
info = get_exit_code_info(exit_code)
is_retryable = info.get("retryable", False)
reason = info.get("reason", info.get("meaning", "Unknown"))
return is_retryable, reason
def list_failed_jobs(limit: int = 20) -> list:
"""List failed jobs with exit code analysis.
Returns list of failed jobs sorted by time (newest first).
"""
failed_jobs = []
if not JOBS_DIR.exists():
return failed_jobs
for job_dir in sorted(JOBS_DIR.iterdir(), reverse=True):
if not job_dir.is_dir():
continue
meta_file = job_dir / "meta.json"
output_file = job_dir / "output.log"
if not meta_file.exists():
continue
try:
with open(meta_file) as f:
meta = json.load(f)
# Check actual status
actual_status = _get_actual_job_status(job_dir)
if actual_status not in ["failed", "killed"]:
continue
# Extract exit code
exit_code = None
last_output_lines = []
if output_file.exists():
content = output_file.read_text()
lines = content.strip().split("\n")
last_output_lines = lines[-10:] if len(lines) > 10 else lines
for line in reversed(lines):
if line.startswith("exit:"):
exit_code = int(line.split(":")[1])
break
# Get exit code info
exit_info = get_exit_code_info(exit_code) if exit_code is not None else {}
is_retryable, retry_reason = is_failure_retryable(exit_code) if exit_code is not None else (False, "No exit code")
failed_jobs.append({
"id": job_dir.name,
"project": meta.get("project", "unknown"),
"task": meta.get("task", "")[:100],
"started": meta.get("started", "unknown"),
"status": actual_status,
"exit_code": exit_code,
"exit_meaning": exit_info.get("meaning", "Unknown"),
"retryable": is_retryable,
"retry_reason": retry_reason,
"last_output": last_output_lines
})
if len(failed_jobs) >= limit:
break
except Exception as e:
_log(f" Warning: Could not process {job_dir.name}: {e}", verbose_only=True)
return failed_jobs
def get_failure_summary() -> dict:
"""Get summary of failures by exit code"""
summary = {
"total": 0,
"retryable": 0,
"by_exit_code": {},
"by_project": {}
}
if not JOBS_DIR.exists():
return summary
for job_dir in JOBS_DIR.iterdir():
if not job_dir.is_dir():
continue
actual_status = _get_actual_job_status(job_dir)
if actual_status not in ["failed", "killed"]:
continue
meta_file = job_dir / "meta.json"
output_file = job_dir / "output.log"
try:
with open(meta_file) as f:
meta = json.load(f)
project = meta.get("project", "unknown")
exit_code = None
if output_file.exists():
content = output_file.read_text()
for line in reversed(content.strip().split("\n")):
if line.startswith("exit:"):
exit_code = int(line.split(":")[1])
break
summary["total"] += 1
# By exit code
code_str = str(exit_code) if exit_code is not None else "none"
if code_str not in summary["by_exit_code"]:
info = get_exit_code_info(exit_code) if exit_code is not None else {"meaning": "No exit code"}
summary["by_exit_code"][code_str] = {
"count": 0,
"meaning": info.get("meaning", "Unknown"),
"retryable": info.get("retryable", False)
}
summary["by_exit_code"][code_str]["count"] += 1
# By project
if project not in summary["by_project"]:
summary["by_project"][project] = 0
summary["by_project"][project] += 1
# Count retryable
if exit_code is not None:
is_retryable, _ = is_failure_retryable(exit_code)
if is_retryable:
summary["retryable"] += 1
except Exception:
pass
return summary
def retry_job(job_id: str, config: dict) -> dict:
"""Retry a failed job by re-spawning it with the same task.
Returns dict with success status and new job_id or error.
"""
job_dir = JOBS_DIR / job_id
if not job_dir.exists():
return {"success": False, "error": f"Job {job_id} not found"}
meta_file = job_dir / "meta.json"
output_file = job_dir / "output.log"
try:
with open(meta_file) as f:
meta = json.load(f)
except Exception as e:
return {"success": False, "error": f"Could not read job metadata: {e}"}
# Check status
actual_status = _get_actual_job_status(job_dir)
if actual_status == "running":
return {"success": False, "error": "Job is still running"}
# Get exit code
exit_code = None
if output_file.exists():
content = output_file.read_text()
for line in reversed(content.strip().split("\n")):
if line.startswith("exit:"):
exit_code = int(line.split(":")[1])
break
# Check if retryable
if exit_code is not None:
is_retryable, reason = is_failure_retryable(exit_code)
if not is_retryable:
return {"success": False, "error": f"Not retryable: {reason} (exit {exit_code})"}
# Get original task details
project = meta.get("project")
task = meta.get("task")
if not project or not task:
return {"success": False, "error": "Missing project or task in job metadata"}
if project not in config.get("projects", {}):
return {"success": False, "error": f"Unknown project: {project}"}
# Build context and spawn new job
# Retry bypasses preflight since it was already validated on first attempt
context = get_project_context(project, config, task_query=task)
new_job_id = spawn_claude_agent(project, task, context, config, skip_preflight=True)
# Mark original as retried
meta["retried_at"] = datetime.now().isoformat()
meta["retried_as"] = new_job_id
with open(meta_file, "w") as f:
json.dump(meta, f)
return {
"success": True,
"original_job": job_id,
"new_job": new_job_id,
"project": project,
"task": task[:100]
}
def auto_retry_failures(config: dict, limit: int = 5) -> list:
"""Automatically retry recent retryable failures.
Only retries jobs that:
- Failed with a retryable exit code
- Haven't been retried already
- Are within the last 24 hours
Returns list of retry results.
"""
results = []
now = datetime.now()
failed = list_failed_jobs(limit=50) # Check more to find retryable ones
for job in failed:
if len(results) >= limit:
break
if not job["retryable"]:
continue
job_dir = JOBS_DIR / job["id"]
meta_file = job_dir / "meta.json"
try:
with open(meta_file) as f:
meta = json.load(f)
# Skip if already retried
if meta.get("retried_as"):
continue
# Skip if too old (>24h)
started = datetime.fromisoformat(meta.get("started", "1970-01-01T00:00:00"))
if (now - started).total_seconds() > 86400:
continue
# Attempt retry
result = retry_job(job["id"], config)
results.append({
"original": job["id"],
"project": job["project"],
"exit_code": job["exit_code"],
"retry_result": result
})
except Exception as e:
results.append({
"original": job["id"],
"error": str(e)
})
return results
def route_failures(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia failures [job_id] [--summary] [--retry] [--auto-retry]
Commands:
luzia failures - List recent failures
luzia failures <job_id> - Show details of specific failure
luzia failures --summary - Show failure summary by exit code
luzia failures --retry <id> - Retry a specific failed job
luzia failures --auto-retry - Auto-retry all retryable recent failures
"""
# Parse options
show_summary = "--summary" in args
do_retry = "--retry" in args
do_auto_retry = "--auto-retry" in args
args = [a for a in args if not a.startswith("--")]
if show_summary:
summary = get_failure_summary()
print("\n=== Failure Summary ===\n")
print(f"Total failures: {summary['total']}")
print(f"Retryable: {summary['retryable']}")
print("\nBy Exit Code:")
for code, info in sorted(summary["by_exit_code"].items(), key=lambda x: -x[1]["count"]):
retry_mark = "" if info["retryable"] else ""
print(f" {code:>4}: {info['count']:>3}x - {info['meaning']:<20} [{retry_mark} retry]")
print("\nBy Project:")
for project, count in sorted(summary["by_project"].items(), key=lambda x: -x[1]):
print(f" {project:<15}: {count}x")
return 0
if do_auto_retry:
print("Auto-retrying recent fixable failures...")
results = auto_retry_failures(config, limit=5)
if not results:
print("No retryable failures found.")
return 0
for r in results:
if r.get("error"):
print(f"{r['original']}: {r['error']}")
elif r.get("retry_result", {}).get("success"):
print(f"{r['original']} -> {r['retry_result']['new_job']} ({r['project']})")
else:
print(f"{r['original']}: {r.get('retry_result', {}).get('error', 'Unknown error')}")
return 0
if do_retry:
if not args:
print("Usage: luzia failures --retry <job_id>")
return 1
result = retry_job(args[0], config)
if result["success"]:
print(f"✓ Retrying {result['original_job']} as {result['new_job']}")
print(f" Project: {result['project']}")
print(f" Task: {result['task']}...")
else:
print(f"✗ Could not retry: {result['error']}")
return 0 if result["success"] else 1
# Show specific failure
if args:
job_id = args[0]
failed = list_failed_jobs(limit=100)
job = next((j for j in failed if j["id"] == job_id), None)
if not job:
print(f"Failure not found: {job_id}")
return 1
print(f"\n=== Failed Job: {job['id']} ===\n")
print(f"Project: {job['project']}")
print(f"Started: {job['started']}")
print(f"Exit Code: {job['exit_code']} ({job['exit_meaning']})")
print(f"Retryable: {'Yes - ' + job['retry_reason'] if job['retryable'] else 'No - ' + job['retry_reason']}")
print(f"\nTask:")
print(f" {job['task']}")
print(f"\nLast Output:")
for line in job["last_output"]:
print(f" {line[:100]}")
if job['retryable']:
print(f"\nTo retry: luzia failures --retry {job['id']}")
return 0
# List recent failures
failed = list_failed_jobs(limit=20)
if not failed:
print("No failures found.")
return 0
print("\n=== Recent Failures ===\n")
print(f"{'ID':<18} {'Project':<12} {'Exit':<6} {'Retryable':<10} Started")
print("-" * 75)
for job in failed:
retry_mark = "Yes" if job["retryable"] else "No"
exit_str = str(job["exit_code"]) if job["exit_code"] is not None else "?"
started_short = job["started"][11:19] if len(job["started"]) > 19 else job["started"]
print(f"{job['id']:<18} {job['project']:<12} {exit_str:<6} {retry_mark:<10} {started_short}")
summary = get_failure_summary()
print(f"\nTotal: {summary['total']} failures ({summary['retryable']} retryable)")
print("\nCommands:")
print(" luzia failures <job_id> - Show failure details")
print(" luzia failures --summary - Summary by exit code")
print(" luzia failures --retry <id> - Retry specific job")
print(" luzia failures --auto-retry - Auto-retry all fixable failures")
return 0
def route_retry(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia retry <job_id>
Shortcut for: luzia failures --retry <job_id>
"""
if not args:
print("Usage: luzia retry <job_id>")
return 1
result = retry_job(args[0], config)
if result["success"]:
print(f"✓ Retrying {result['original_job']} as {result['new_job']}")
print(f" Project: {result['project']}")
print(f" Task: {result['task']}...")
print(f"\n Monitor: luzia jobs {result['new_job']}")
else:
print(f"✗ Could not retry: {result['error']}")
return 0 if result["success"] else 1
# --- QA Validation Functions ---
def qa_validate_syntax() -> dict:
"""Check Python syntax of luzia script"""
script_path = Path(__file__).resolve()
result = subprocess.run(
["python3", "-m", "py_compile", str(script_path)],
capture_output=True, text=True
)
return {
"check": "syntax",
"passed": result.returncode == 0,
"error": result.stderr if result.returncode != 0 else None
}
def qa_validate_routes() -> dict:
"""Check that all route handlers have matching matchers"""
script_path = Path(__file__).resolve()
content = script_path.read_text()
# Find all route_ functions
route_funcs = set(re.findall(r'def (route_\w+)\(', content))
# Find all _match_ methods
match_methods = set(re.findall(r'def (_match_\w+)\(', content))
# Find routes registered in Router
registered = set(re.findall(r'self\.(_match_\w+),\s*(route_\w+)', content))
issues = []
# Check each route has a matcher
for route in route_funcs:
expected_matcher = "_match_" + route.replace("route_", "")
# Some routes use self._route_ pattern (internal)
if route.startswith("route_") and expected_matcher not in match_methods:
# Check if it's registered differently
found = any(r[1] == route for r in registered)
if not found and route not in ["route_project_task"]: # Special case
issues.append(f"Route {route} may not have a matcher")
return {
"check": "routes",
"passed": len(issues) == 0,
"route_count": len(route_funcs),
"matcher_count": len(match_methods),
"registered_count": len(registered),
"issues": issues if issues else None
}
def qa_validate_docstring() -> dict:
"""Check that script docstring matches implemented commands"""
script_path = Path(__file__).resolve()
content = script_path.read_text()
# Extract docstring (after shebang line)
docstring_match = re.search(r'"""(.*?)"""', content, re.DOTALL)
if not docstring_match:
return {"check": "docstring", "passed": False, "error": "No docstring found"}
docstring = docstring_match.group(1)
# Find commands mentioned in docstring
doc_commands = set(re.findall(r'luzia (\w+)', docstring))
# Find actual route commands
route_commands = set()
for match in re.findall(r'def _match_(\w+)\(', content):
if match not in ["project_task", "exec", "write", "read", "context"]:
route_commands.add(match.replace("_", "-"))
# Simple commands (list, status, stop, etc.)
simple = {"list", "status", "stop", "cleanup", "maintenance", "jobs", "kill",
"failures", "retry", "notify", "history", "logs", "fix", "qa"}
# Multi-word commands that are in docstring as "luzia <word1> <word2>"
multi_word = {"think-deep", "work-on"}
missing_in_doc = route_commands - doc_commands - simple - multi_word
# Filter out internal commands
missing_in_doc = {c for c in missing_in_doc if not c.startswith("research-")}
return {
"check": "docstring",
"passed": len(missing_in_doc) == 0,
"doc_commands": len(doc_commands),
"route_commands": len(route_commands),
"missing": list(missing_in_doc) if missing_in_doc else None
}
def qa_validate_config() -> dict:
"""Check config.json is valid and projects exist"""
issues = []
if not CONFIG_PATH.exists():
return {"check": "config", "passed": False, "error": "config.json not found"}
try:
with open(CONFIG_PATH) as f:
config = json.load(f)
except json.JSONDecodeError as e:
return {"check": "config", "passed": False, "error": f"Invalid JSON: {e}"}
projects = config.get("projects", {})
for name, info in projects.items():
path = info.get("path", f"/home/{name}")
try:
if not Path(path).exists():
issues.append(f"Project {name}: path {path} does not exist")
else:
claude_md = Path(path) / "CLAUDE.md"
try:
if not claude_md.exists():
issues.append(f"Project {name}: missing CLAUDE.md")
except PermissionError:
# Can't check - skip silently (different user's home)
pass
except PermissionError:
# Can't check - skip silently
pass
return {
"check": "config",
"passed": len(issues) == 0,
"project_count": len(projects),
"issues": issues if issues else None
}
def qa_validate_directories() -> dict:
"""Check required directories exist"""
required = [
LOG_DIR,
JOBS_DIR,
Path("/opt/server-agents/orchestrator/lib"),
Path("/opt/server-agents/docs"),
]
missing = [str(d) for d in required if not d.exists()]
return {
"check": "directories",
"passed": len(missing) == 0,
"missing": missing if missing else None
}
def qa_run_all() -> list:
"""Run all QA validations"""
return [
qa_validate_syntax(),
qa_validate_routes(),
qa_validate_docstring(),
qa_validate_config(),
qa_validate_directories(),
]
def qa_update_docs() -> dict:
"""Update LUZIA-REFERENCE.md with current command info"""
ref_path = Path("/opt/server-agents/docs/LUZIA-REFERENCE.md")
if not ref_path.exists():
return {"success": False, "error": "LUZIA-REFERENCE.md not found"}
# Read current doc
content = ref_path.read_text()
# Update timestamp
today = datetime.now().strftime("%Y-%m-%d")
content = re.sub(
r'\*\*Last Updated:\*\* \d{4}-\d{2}-\d{2}',
f'**Last Updated:** {today}',
content
)
# Update project list from config
try:
with open(CONFIG_PATH) as f:
config = json.load(f)
projects = config.get("projects", {})
project_table = "| Project | Description | Focus |\n|---------|-------------|-------|\n"
for name, info in sorted(projects.items()):
desc = info.get("description", "")[:30]
focus = info.get("focus", "")[:25]
project_table += f"| {name} | {desc} | {focus} |\n"
# Replace project table
content = re.sub(
r'## Registered Projects\n\n\|.*?\n\n---',
f'## Registered Projects\n\n{project_table}\n---',
content,
flags=re.DOTALL
)
except Exception as e:
return {"success": False, "error": f"Could not update projects: {e}"}
# Write back
ref_path.write_text(content)
return {"success": True, "path": str(ref_path), "updated": today}
def route_qa(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia qa [--update-docs] [--test-all] [--postflight]
QA validation for Luzia itself:
luzia qa - Run all validations
luzia qa --update-docs - Update LUZIA-REFERENCE.md
luzia qa --test-all - Run tests with verbose output
luzia qa --postflight [N] - Run postflight on last N jobs (default 5)
"""
update_docs = "--update-docs" in args
test_all = "--test-all" in args
postflight = "--postflight" in args
verbose = VERBOSE or test_all
# Handle postflight
if postflight:
try:
sys.path.insert(0, str(lib_path))
from qa_postflight import run_postflight, JOBS_DIR
# Get count from args
count = 5
for i, arg in enumerate(args):
if arg == "--postflight" and i + 1 < len(args):
try:
count = int(args[i + 1])
except ValueError:
pass
print(f"\n=== QA Postflight (last {count} jobs) ===\n")
jobs = sorted(JOBS_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True)[:count]
for job_dir in jobs:
job_id = job_dir.name
report = run_postflight(job_id)
score = report['quality_score']
score_color = "\033[92m" if score >= 70 else "\033[93m" if score >= 50 else "\033[91m"
reset = "\033[0m"
print(f"{job_id}: {score_color}{score}/100{reset} | errors={len(report['errors'])} | learnings={len(report['learnings'])}")
if report['recommendations'] and score < 70:
for rec in report['recommendations'][:2]:
print(f"{rec}")
print(f"\nReports saved to: /var/log/luz-orchestrator/qa-reports/")
return 0
except Exception as e:
print(f"Postflight error: {e}")
return 1
if update_docs:
print("Updating documentation...")
result = qa_update_docs()
if result["success"]:
print(f"✓ Updated {result['path']}")
print(f" Timestamp: {result['updated']}")
else:
print(f"✗ Failed: {result['error']}")
return 0 if result["success"] else 1
# Run all validations
print("\n=== Luzia QA Validation ===\n")
results = qa_run_all()
all_passed = True
for r in results:
check = r["check"]
passed = r["passed"]
status = "" if passed else ""
if not passed:
all_passed = False
print(f"{status} {check}")
if verbose or not passed:
for key, value in r.items():
if key not in ["check", "passed"] and value:
if isinstance(value, list):
for item in value:
print(f" - {item}")
else:
print(f" {key}: {value}")
print()
if all_passed:
print("All validations passed.")
else:
print("Some validations failed. Run with --test-all for details.")
print("\nCommands:")
print(" luzia qa --update-docs Update reference documentation")
print(" luzia qa --test-all Verbose validation output")
print(" luzia qa --sync Sync code to knowledge graph")
print(" luzia qa --postflight Run postflight validation on recent jobs")
return 0 if all_passed else 1
def route_docs(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia docs [domain] [query] [--show <name>] [--stats]
Query documentation from knowledge graphs:
luzia docs <query> - Search all domains
luzia docs sysadmin <query> - Search sysadmin domain
luzia docs projects <query> - Search projects domain
luzia docs --show <name> - Show entity details
luzia docs --stats - Show KG statistics
luzia docs --sync - Sync .md files to KG
"""
# Import KG module
try:
sys.path.insert(0, str(lib_path))
from knowledge_graph import KnowledgeGraph, search_all, get_all_stats, KG_PATHS
from doc_sync import run_migration
except ImportError as e:
print(f"Error: Knowledge graph module not available: {e}")
return 1
# Parse options
show_stats = "--stats" in args
show_entity = "--show" in args
do_sync = "--sync" in args
args = [a for a in args if not a.startswith("--")]
if show_stats:
print("\n=== Knowledge Graph Statistics ===\n")
for domain, stats in get_all_stats().items():
if "error" in stats:
print(f"{domain}: {stats['error']}")
else:
print(f"{domain}:")
print(f" Entities: {stats['entities']}")
print(f" Relations: {stats['relations']}")
print(f" Observations: {stats['observations']}")
if stats.get("by_type"):
print(f" By type: {stats['by_type']}")
return 0
if do_sync:
print("Syncing documentation to knowledge graphs...")
# Run the doc sync
try:
from doc_sync import DocSync
from qa_validator import QAValidator
sync = DocSync()
validator = QAValidator()
# Sync routes to sysadmin KG
print("\nSyncing luzia commands...")
result = validator.sync_routes_to_kg()
if "error" in result:
print(f" Error: {result['error']}")
else:
print(f" Commands: {result['added']} added, {result['updated']} updated")
# Sync projects
print("\nSyncing projects...")
result = validator.sync_projects_to_kg()
if "error" in result:
print(f" Error: {result['error']}")
else:
print(f" Projects: {result['added']} added, {result['updated']} updated")
# Sync research files
print("\nSyncing research files...")
research_sync = DocSync()
result = research_sync.migrate_research_dir("/home/admin/research", archive=False, dry_run=False)
if "error" in result:
print(f" Error: {result['error']}")
else:
print(f" Files: {result['files_processed']}")
print(f" Entities: {result['entities_created']}")
print(f" Relations: {result['relations_created']}")
if result.get("errors"):
for err in result["errors"]:
print(f" Warning: {err}")
print("\nDone. Use 'luzia docs --stats' to see results.")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
return 1
return 0
if show_entity:
# Show specific entity
if not args:
print("Usage: luzia docs --show <entity_name>")
return 1
name = args[0]
found = False
for domain in KG_PATHS.keys():
try:
kg = KnowledgeGraph(domain)
entity = kg.get_entity(name)
if entity:
found = True
print(f"\n=== {entity['name']} ({domain}) ===\n")
print(f"Type: {entity['type']}")
print(f"Updated: {datetime.fromtimestamp(entity['updated_at']).strftime('%Y-%m-%d %H:%M')}")
if entity.get('source'):
print(f"Source: {entity['source']}")
print(f"\n{entity['content'][:1000]}")
if len(entity['content']) > 1000:
print(f"\n... ({len(entity['content']) - 1000} more characters)")
# Show relations
relations = kg.get_relations(name)
if relations:
print(f"\nRelations:")
for r in relations[:10]:
print(f" - {r['relation']}: {r.get('target_name', r.get('source_name', '?'))}")
# Show observations
observations = kg.get_observations(name)
if observations:
print(f"\nObservations:")
for o in observations[:5]:
print(f" [{o['observer']}] {o['content'][:100]}")
break
except Exception:
pass
if not found:
print(f"Entity not found: {name}")
return 1
return 0
# Search
if not args:
print("Usage: luzia docs <query>")
print(" luzia docs <domain> <query>")
print(" luzia docs --show <name>")
print(" luzia docs --stats")
print(" luzia docs --sync")
print(f"\nDomains: {', '.join(KG_PATHS.keys())}")
return 0
# Check if first arg is a domain
query_domain = None
query = ""
if args[0] in KG_PATHS:
query_domain = args[0]
query = " ".join(args[1:])
else:
query = " ".join(args)
if not query:
print("Please provide a search query")
return 1
# Perform search
print(f"\nSearching for: {query}\n")
if query_domain:
kg = KnowledgeGraph(query_domain)
results = kg.search(query)
if results:
print(f"{query_domain}:")
for e in results[:10]:
print(f" [{e['type']}] {e['name']}")
if e.get('content'):
preview = e['content'][:80].replace('\n', ' ')
print(f" {preview}...")
else:
print(f"No results in {query_domain}")
else:
all_results = search_all(query)
total = 0
for domain, results in all_results.items():
if results and not results[0].get("error"):
print(f"{domain}:")
for e in results[:5]:
print(f" [{e['type']}] {e['name']}")
total += len(results)
if total == 0:
print("No results found")
return 0
def route_knowledge(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia knowledge <subcommand> [args]
Manage per-project knowledge graphs for RAG context injection:
luzia knowledge list - List projects with knowledge status
luzia knowledge init <project> - Initialize .knowledge/ for a project
luzia knowledge sync <project> - Sync from CLAUDE.md to .knowledge/
luzia knowledge search <project> <query> - Search project knowledge
luzia knowledge status <project> - Show knowledge status for a project
luzia knowledge show <project> - Show knowledge contents
"""
# Import project knowledge loader
try:
sys.path.insert(0, str(lib_path))
from project_knowledge_loader import ProjectKnowledgeLoader
loader = ProjectKnowledgeLoader()
except ImportError as e:
print(f"Error: Project knowledge loader not available: {e}")
return 1
# Parse subcommand
if not args:
print("Usage: luzia knowledge <subcommand> [args]")
print("\nSubcommands:")
print(" list - List projects with knowledge status")
print(" init <project> - Initialize .knowledge/ for a project")
print(" sync <project> - Sync from CLAUDE.md to .knowledge/")
print(" search <project> <query> - Search project knowledge")
print(" status <project> - Show knowledge status for a project")
print(" show <project> - Show knowledge contents")
return 0
subcommand = args[0]
sub_args = args[1:]
if subcommand == "list":
# List all projects with knowledge status
print("\n=== Project Knowledge Status ===\n")
projects = loader.list_projects_with_knowledge()
if not projects:
print("No projects found in config")
return 1
has_kg = [p for p in projects if p.get("has_knowledge")]
no_kg = [p for p in projects if not p.get("has_knowledge")]
if has_kg:
print(f"Projects with .knowledge/ ({len(has_kg)}):")
for p in has_kg:
entity_count = p.get("entity_count", 0)
print(f" \033[32m✓\033[0m {p['project']:<20} ({entity_count} entities)")
if no_kg:
print(f"\nProjects without .knowledge/ ({len(no_kg)}):")
for p in no_kg[:10]:
print(f" \033[33m○\033[0m {p['project']}")
if len(no_kg) > 10:
print(f" ... and {len(no_kg) - 10} more")
print(f"\nRun 'luzia knowledge init <project>' to initialize knowledge for a project")
return 0
elif subcommand == "init":
if not sub_args:
print("Usage: luzia knowledge init <project>")
return 1
project = sub_args[0]
# Check if project exists
if project not in config.get("projects", {}):
print(f"Error: Unknown project '{project}'")
print(f"Available projects: {', '.join(config['projects'].keys())}")
return 1
print(f"Initializing .knowledge/ for {project}...")
success = loader.initialize_project_knowledge(project)
if success:
print(f"\033[32m✓\033[0m Created .knowledge/ directory for {project}")
print(f"\nFiles created:")
project_home = config["projects"][project].get("path", f"/home/{project}")
print(f" {project_home}/.knowledge/")
print(f" ├── entities.json (project entities)")
print(f" ├── relations.json (entity relationships)")
print(f" └── context.md (human-readable context)")
print(f"\nRun 'luzia knowledge sync {project}' to populate from CLAUDE.md")
else:
print(f"\033[31m✗\033[0m Failed to initialize knowledge for {project}")
return 1
return 0
elif subcommand == "sync":
if not sub_args:
print("Usage: luzia knowledge sync <project>")
return 1
project = sub_args[0]
if project not in config.get("projects", {}):
print(f"Error: Unknown project '{project}'")
return 1
print(f"Syncing CLAUDE.md to .knowledge/ for {project}...")
# First ensure .knowledge/ exists
if not loader.has_knowledge(project):
print(f"Initializing .knowledge/ first...")
loader.initialize_project_knowledge(project)
success = loader.sync_from_claude_md(project)
if success:
print(f"\033[32m✓\033[0m Synced knowledge from CLAUDE.md")
# Show stats
knowledge = loader.load_project_knowledge(project)
if knowledge:
print(f" Entities: {len(knowledge.entities)}")
print(f" Relations: {len(knowledge.relations)}")
else:
print(f"\033[33m!\033[0m Sync completed with warnings (CLAUDE.md may not exist)")
return 0
elif subcommand == "search":
if len(sub_args) < 2:
print("Usage: luzia knowledge search <project> <query>")
return 1
project = sub_args[0]
query = " ".join(sub_args[1:])
if project not in config.get("projects", {}):
print(f"Error: Unknown project '{project}'")
return 1
if not loader.has_knowledge(project):
print(f"No knowledge found for {project}")
print(f"Run 'luzia knowledge init {project}' first")
return 1
print(f"\nSearching '{project}' knowledge for: {query}\n")
results = loader.search_project_knowledge(project, query, top_k=10)
if results:
for i, result in enumerate(results, 1):
name = result.get("name", "Unknown")
etype = result.get("type", "entity")
content = result.get("content", "")[:100]
score = result.get("score", 0)
print(f"{i}. [{etype}] {name} (score: {score:.2f})")
if content:
print(f" {content}...")
else:
print("No matching results found")
return 0
elif subcommand in ("status", "show"):
if not sub_args:
print(f"Usage: luzia knowledge {subcommand} <project>")
return 1
project = sub_args[0]
if project not in config.get("projects", {}):
print(f"Error: Unknown project '{project}'")
return 1
knowledge = loader.load_project_knowledge(project)
if not knowledge:
print(f"No knowledge found for {project}")
print(f"Run 'luzia knowledge init {project}' first")
return 1
kg_path = loader.get_knowledge_path(project)
print(f"\n=== Knowledge for {project} ===\n")
print(f"Path: {kg_path}")
print(f"Entities: {len(knowledge.entities)}")
print(f"Relations: {len(knowledge.relations)}")
print(f"Has context.md: {bool(knowledge.context_md)}")
if subcommand == "show":
if knowledge.entities:
print(f"\nEntities:")
for e in knowledge.entities[:15]:
etype = getattr(e, 'type', 'entity')
ename = getattr(e, 'name', 'Unknown')
print(f" [{etype}] {ename}")
if len(knowledge.entities) > 15:
print(f" ... and {len(knowledge.entities) - 15} more")
if knowledge.relations:
print(f"\nRelations:")
for r in knowledge.relations[:10]:
src = getattr(r, 'source', '?')
rel = getattr(r, 'relation', '?')
tgt = getattr(r, 'target', '?')
print(f" {src} --[{rel}]--> {tgt}")
if len(knowledge.relations) > 10:
print(f" ... and {len(knowledge.relations) - 10} more")
if knowledge.context_md:
print(f"\nContext preview:")
preview = knowledge.context_md[:500]
print(f" {preview}...")
return 0
else:
print(f"Unknown subcommand: {subcommand}")
print("Use 'luzia knowledge' for help")
return 1
def _search_project_kg(project: str, limit: int = 3) -> List[Dict]:
"""Search knowledge graph for relevant project context.
Args:
project: Project name to search for
limit: Maximum number of results per search
Returns:
List of relevant entities from the knowledge graph
"""
try:
sys.path.insert(0, str(lib_path))
from knowledge_graph import KnowledgeGraph
except ImportError:
return []
results = []
# Search in projects domain first (most relevant)
try:
kg = KnowledgeGraph("projects")
project_results = kg.search(project, limit=limit)
# Filter results by project name and rank by relevance
for result in project_results:
# Prioritize results that mention the project name
if project.lower() in str(result.get('name', '')).lower():
results.append({
'domain': 'projects',
'relevance': 'direct',
**result
})
# Add remaining results with lower relevance
for result in project_results:
if project.lower() not in str(result.get('name', '')).lower():
results.append({
'domain': 'projects',
'relevance': 'related',
**result
})
except Exception:
pass
# Secondary search in sysadmin domain for infrastructure context
if len(results) < limit:
try:
kg = KnowledgeGraph("sysadmin")
search_terms = [project, "docker", "container", "permissions"]
for term in search_terms:
sysadmin_results = kg.search(term, limit=1)
for result in sysadmin_results:
# Avoid duplicates
if not any(r.get('id') == result.get('id') for r in results):
results.append({
'domain': 'sysadmin',
'relevance': 'infrastructure',
**result
})
if len(results) >= limit * 2:
break
if len(results) >= limit * 2:
break
except Exception:
pass
# Tertiary search in autonomous learning KG (high-003 implementation)
# This allows agents to benefit from learned solutions
if len(results) < limit * 2:
try:
sys.path.insert(0, "/home/admin/.luzia/learning")
from kg_mcp_server import get_kg_server
kg_server = get_kg_server()
# Search learning KG for project/domain-specific solutions
search_query = f"{project} solution pattern best_practice"
learning_results = kg_server.kg.search_similar(search_query, top_k=limit)
for result in learning_results:
# Convert KG result format to system KG format
learning_entry = {
'id': result.get('name'),
'name': result.get('name'),
'type': 'learned_solution',
'content': f"Type: {result.get('type')}. Insights: {'. '.join(result.get('observations', [])[:2])}",
'domain': 'learning',
'relevance': 'learned'
}
# Avoid duplicates
if not any(r.get('id') == learning_entry['id'] for r in results):
results.append(learning_entry)
if len(results) >= limit * 2:
break
except Exception:
# Gracefully continue if learning KG not available
pass
return results[:limit * 2]
def get_project_context(project: str, config: dict, task_query: str = "") -> str:
"""Build context prompt for project using modernized 4-bucket system.
Phase 5 Integration: Uses new hybrid retriever with graceful fallback.
Features:
- 4-bucket architecture (Identity, Grounding, Intelligence, Task)
- Hybrid search (FTS5 keyword + vector semantic)
- Domain-aware context injection
- Backward compatible with fallback to legacy system
Args:
project: Project name
config: Luzia config dictionary
task_query: Optional task query for better context retrieval
Returns:
Formatted context string for prompt injection
"""
# Use modernized system if available
if MODERNIZED_CONTEXT_AVAILABLE:
try:
use_new = should_use_new_retriever(sys.argv)
context = get_project_context_modernized(
project=project,
config=config,
task_query=task_query,
use_new_retriever=use_new
)
return context
except Exception as e:
if VERBOSE:
print(f"[DEBUG] Modernized context failed: {e}, falling back to legacy", file=sys.stderr)
# Fallback to legacy implementation
project_config = config["projects"].get(project, {})
context_parts = [
f"You are working on the **{project}** project.",
f"Description: {project_config.get('description', 'Project user')}",
f"Focus: {project_config.get('focus', 'General development')}",
"",
"**IMPORTANT**: All commands execute inside a Docker container as the project user.",
"Files you create/modify will be owned by the correct user.",
"Working directory: /workspace (mounted from project home)",
""
]
# Try to load project CLAUDE.md
project_path = project_config.get("path", f"/home/{project}")
claude_md = Path(project_path) / "CLAUDE.md"
if claude_md.exists():
try:
with open(claude_md) as f:
context_parts.append("## Project Guidelines (from CLAUDE.md):")
context_parts.append(f.read())
except:
pass
# Dynamic context injection from knowledge graph (legacy)
kg_results = _search_project_kg(project, limit=3)
if kg_results:
context_parts.append("")
context_parts.append("## Relevant Knowledge Graph Context:")
context_parts.append("")
for i, result in enumerate(kg_results, 1):
# Format the KG entry with relevance indication
relevance = result.pop('relevance', 'unknown')
domain = result.pop('domain', 'unknown')
entity_type = result.get('type', 'unknown')
name = result.get('name', 'unknown')
content = result.get('content', '')
# Only include non-empty results
if name and name != 'unknown':
context_parts.append(f"### {i}. {name} [{domain}:{entity_type}]")
context_parts.append(f"**Relevance**: {relevance}")
if content:
# Limit content preview to 200 chars
preview = content[:200].replace('\n', ' ')
if len(content) > 200:
preview += "..."
context_parts.append(f"{preview}")
context_parts.append("")
return "\n".join(context_parts)
def route_list(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia list"""
projects = config.get("projects", {})
containers = {c["name"]: c for c in list_project_containers()}
if VERBOSE:
print("Available Projects:\n")
for name, info in sorted(projects.items()):
container_name = f"luzia-{name}"
container = containers.get(container_name, {})
status = "RUN" if "Up" in container.get("status", "") else "---"
color_hex = info.get("color", "#808080")
color_code = Color.hex_to_ansi(color_hex)
colored_name = Color.bold(f"{name:15}", color_code)
desc = info.get('description', '')[:40]
if VERBOSE:
print(f" [{status}] {colored_name} {desc}")
print(f" Focus: {info.get('focus', 'N/A')[:50]}")
else:
print(f" [{status}] {colored_name} {desc}")
return 0
# =============================================================================
# UNIFIED KNOWLEDGE GRAPH SYNC (Phase 5)
# =============================================================================
def sync_task_to_unified_kg(project: str, task_id: str, prompt: str, status: str, skill: str = None):
"""Sync a completed/failed task to the unified knowledge graph."""
try:
import sys
sys.path.insert(0, "/opt/server-agents/fabric/lib")
from unified_kg import sync_task_to_kg
sync_task_to_kg(project, task_id, prompt, status, skill)
if VERBOSE:
print(f"Synced task {task_id} to unified KG")
except Exception as e:
if VERBOSE:
print(f"Warning: Could not sync to unified KG: {e}")
def archive_conductor_task(project: str, task_id: str, status: str = "completed"):
"""Archive a conductor task and sync to unified KG."""
conductor_base = Path(f"/home/{project}/conductor")
active_dir = conductor_base / "active" / task_id
target_dir = conductor_base / status / task_id
if not active_dir.exists():
return False
# Read task metadata before archiving
meta_file = active_dir / "meta.json"
prompt = ""
skill = None
if meta_file.exists():
try:
with open(meta_file) as f:
meta = json.load(f)
prompt = meta.get("prompt", "")
skill = meta.get("skill")
except:
pass
# Move to completed/failed directory
target_dir.parent.mkdir(parents=True, exist_ok=True)
try:
import shutil
shutil.move(str(active_dir), str(target_dir))
except Exception as e:
if VERBOSE:
print(f"Warning: Could not archive task: {e}")
return False
# Sync to unified KG
sync_task_to_unified_kg(project, task_id, prompt, status, skill)
return True
# =============================================================================
# CONDUCTOR STATE READING (Phase 2)
# =============================================================================
def read_conductor_task(task_path: Path) -> Optional[dict]:
"""Read a single conductor task from its directory."""
meta_file = task_path / "meta.json"
heartbeat_file = task_path / "heartbeat.json"
if not meta_file.exists():
return None
try:
with open(meta_file) as f:
meta = json.load(f)
except (json.JSONDecodeError, IOError):
return None
task = {
"id": task_path.name,
"prompt": meta.get("prompt", ""),
"status": meta.get("status", "unknown"),
"skill": meta.get("skill", ""),
}
if heartbeat_file.exists():
try:
with open(heartbeat_file) as f:
heartbeat = json.load(f)
task["last_heartbeat"] = heartbeat.get("ts", 0)
task["current_step"] = heartbeat.get("step", "")
if time_module.time() - task["last_heartbeat"] > 300:
task["status"] = "stale"
except (json.JSONDecodeError, IOError):
pass
return task
def get_conductor_status(project: str = None) -> dict:
"""Get conductor status for all or a specific project."""
result = {"active": [], "completed": [], "failed": []}
if project:
projects = [project]
else:
projects = []
for home in Path("/home").iterdir():
try:
if home.is_dir() and (home / "conductor").exists():
projects.append(home.name)
except PermissionError:
pass
for proj in projects:
conductor_base = Path(f"/home/{proj}/conductor")
if not conductor_base.exists():
continue
active_dir = conductor_base / "active"
if active_dir.exists():
try:
for task_dir in active_dir.iterdir():
if task_dir.is_dir():
task = read_conductor_task(task_dir)
if task:
task["project"] = proj
result["active"].append(task)
except PermissionError:
pass
return result
def route_status(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia status [project] [--conductor]"""
project = None
conductor_only = False
for arg in args:
if arg in ("--conductor", "-c"):
conductor_only = True
elif not arg.startswith("-"):
project = arg
print("=" * 60)
print("LUZIA STATUS")
print("=" * 60)
# Show conductor state
conductor = get_conductor_status(project)
active_tasks = conductor.get("active", [])
if active_tasks:
print("\nACTIVE TASKS (Conductor):")
for task in active_tasks:
status_icon = "running" if task.get("status") == "running" else "stale" if task.get("status") == "stale" else "pending"
skill = f"[{task.get('skill')}]" if task.get("skill") else ""
print(f" [{status_icon}] {task['project']}/{task['id'][:12]} {skill}")
print(f" {task.get('prompt', '')[:60]}...")
else:
print("\nNo active conductor tasks")
# Show containers
if not conductor_only:
containers = list_project_containers()
if containers:
print("\nCONTAINERS:")
for c in containers:
if project and f"luzia-{project}" != c["name"]:
continue
print(f" {c['name']}: {c['status']}")
else:
print("\nNo containers running")
print("\n" + "=" * 60)
return 0
def route_stop(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia stop <project>"""
if not args:
print("Usage: luzia stop <project>")
return 1
project = args[0]
project_config = config["projects"].get(project)
if not project_config:
print(f"Unknown project: {project}")
return 1
bridge = DockerBridge(
project=project,
host_path=project_config.get("path", f"/home/{project}")
)
if bridge._is_running():
bridge.stop()
print(f"Stopped {project}")
else:
print(f"{project} not running")
return 0
def route_cleanup(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia cleanup [jobs|containers|conductor|all] [--dry-run]
Subcommands:
luzia cleanup - Full maintenance (jobs + containers + logs)
luzia cleanup jobs - Clean old job directories only
luzia cleanup containers - Stop stale containers only
luzia cleanup conductor - Archive stale conductor tasks and sync to KG
luzia cleanup all - Same as no subcommand
Options:
--dry-run - Preview what would be cleaned without deleting
"""
dry_run = "--dry-run" in args
args = [a for a in args if a != "--dry-run"]
subcommand = args[0] if args else "all"
if subcommand == "conductor":
print("Archiving stale conductor tasks...")
archived = 0
checked = 0
for home in Path("/home").iterdir():
try:
active_dir = home / "conductor" / "active"
if not active_dir.exists():
continue
for task_dir in active_dir.iterdir():
if not task_dir.is_dir():
continue
checked += 1
# Check heartbeat age
heartbeat_file = task_dir / "heartbeat.json"
if heartbeat_file.exists():
try:
with open(heartbeat_file) as f:
heartbeat = json.load(f)
age = time_module.time() - heartbeat.get("ts", 0)
if age > 3600: # Stale if > 1 hour
if dry_run:
print(f" Would archive: {home.name}/{task_dir.name} (stale {int(age/60)}m)")
else:
if archive_conductor_task(home.name, task_dir.name, "failed"):
archived += 1
print(f" Archived: {home.name}/{task_dir.name}")
except:
pass
except PermissionError:
pass
print(f" Checked: {checked}, Archived: {archived}")
return 0
if subcommand == "jobs":
print("Cleaning old jobs...")
result = cleanup_old_jobs(dry_run=dry_run)
print(f" Checked: {result['checked']}, Deleted: {result['deleted']}, Kept: {result['kept']}")
if result['bytes_freed'] > 0:
print(f" Freed: {result['bytes_freed'] / 1024:.1f} KB")
if result['errors'] > 0:
print(f" Errors: {result['errors']}")
elif subcommand == "containers":
print("Stopping stale containers...")
result = cleanup_stale_containers()
print(f" Checked: {result['checked']}, Stopped: {result['stopped']}")
if result['errors'] > 0:
print(f" Errors: {result['errors']}")
else: # "all" or empty
print("Running full maintenance..." + (" (dry-run)" if dry_run else ""))
results = run_maintenance(dry_run=dry_run)
print(f"\nJobs:")
print(f" Checked: {results['jobs']['checked']}, Deleted: {results['jobs']['deleted']}, Kept: {results['jobs']['kept']}")
if results['jobs']['bytes_freed'] > 0:
print(f" Freed: {results['jobs']['bytes_freed'] / 1024:.1f} KB")
if not dry_run:
print(f"\nContainers:")
print(f" Checked: {results['containers']['checked']}, Stopped: {results['containers']['stopped']}")
print(f"\nLogs:")
if results['logs'].get('rotated'):
print(f" Rotated notifications.log: {results['logs']['lines_before']} -> {results['logs']['lines_after']} lines")
else:
print(f" Notifications.log: {results['logs'].get('lines_after', 0)} lines (no rotation needed)")
print("\nDone.")
return 0
def route_maintenance(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia maintenance
Show maintenance status and resource usage.
"""
status = get_maintenance_status()
print("\n=== Luzia Maintenance Status ===\n")
# Jobs
print(f"Jobs ({JOBS_DIR}):")
print(f" Total: {status['jobs']['total']}")
print(f" Running: {status['jobs']['running']}")
print(f" Completed: {status['jobs']['completed']}")
print(f" Failed: {status['jobs']['failed']}")
print(f" Oldest: {status['jobs']['oldest_days']} days")
print(f" Disk: {status['disk']['jobs_mb']} MB")
# Retention policy
print(f"\n Retention Policy:")
print(f" Keep last {JOB_MAX_COUNT} jobs")
print(f" Delete completed after {JOB_MAX_AGE_DAYS} days")
print(f" Delete failed after {JOB_FAILED_MAX_AGE_DAYS} days")
# Containers
print(f"\nContainers:")
print(f" Running: {status['containers']['total']}")
print(f" Oldest: {status['containers']['oldest_hours']} hours")
print(f" Max Lifetime: {CONTAINER_MAX_LIFETIME_HOURS} hours")
# Logs
print(f"\nLogs:")
print(f" Notifications: {status['notifications']['lines']} lines (max {NOTIFICATION_LOG_MAX_LINES})")
print(f" Logs Dir: {status['disk']['logs_mb']} MB")
# Recommendations
print(f"\nRecommendations:")
needs_cleanup = False
if status['jobs']['total'] > JOB_MAX_COUNT * 1.5:
print(f" ⚠ High job count ({status['jobs']['total']}), consider: luzia cleanup jobs")
needs_cleanup = True
if status['containers']['oldest_hours'] > CONTAINER_MAX_LIFETIME_HOURS:
print(f" ⚠ Stale containers ({status['containers']['oldest_hours']}h), consider: luzia cleanup containers")
needs_cleanup = True
if status['disk']['jobs_mb'] > 100:
print(f" ⚠ High disk usage ({status['disk']['jobs_mb']}MB), consider: luzia cleanup")
needs_cleanup = True
if not needs_cleanup:
print(" ✓ All systems nominal")
print()
return 0
def route_metrics(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia metrics [project] [--days N]
Shows aggregate task metrics and performance statistics.
Options:
--days N Number of days to analyze (default: 7)
--by-bucket Show success rate by duration bucket
--baseline Show/calculate performance baseline
Examples:
luzia metrics - Show all projects
luzia metrics musica - Show specific project
luzia metrics --days 30 - Show last 30 days
luzia metrics --by-bucket - Success rate by duration
"""
if not TIME_METRICS_AVAILABLE:
print("Time metrics module not available.")
print("Check that /opt/server-agents/orchestrator/lib/time_metrics.py exists.")
return 1
# Parse arguments
days = 7
show_buckets = "--by-bucket" in args
show_baseline = "--baseline" in args
project = None
i = 0
while i < len(args):
arg = args[i]
if arg == "--days" and i + 1 < len(args):
try:
days = int(args[i + 1])
i += 2
continue
except ValueError:
pass
elif not arg.startswith("--"):
project = arg
i += 1
print(f"\n=== Luzia Task Metrics (Last {days} Days) ===\n")
if project:
# Single project metrics
metrics = get_project_metrics(project, days)
if metrics.get("error"):
print(f"No data for project '{project}' in the last {days} days.")
return 0
print(f"Project: {project}")
print("-" * 40)
print(f"Total Tasks: {metrics['total_tasks']}")
print(f"Total Time: {metrics['total_time_formatted']}")
print(f"Avg Duration: {metrics['avg_duration_formatted']}")
print(f"Min Duration: {format_duration(metrics['min_duration_seconds'])}")
print(f"Max Duration: {format_duration(metrics['max_duration_seconds'])}")
print(f"Success Rate: {metrics['success_rate']}%")
print(f"Avg System Load: {metrics['avg_system_load']}")
print(f"Avg Memory: {metrics['avg_memory_percent']}%")
if show_baseline:
print(f"\nPerformance Baseline:")
baseline = calculate_baseline(project)
if baseline.get("error"):
print(f" {baseline['error']}")
else:
print(f" Average: {format_duration(baseline['avg_duration'])}")
print(f" Median: {format_duration(baseline['median_duration'])}")
print(f" P95: {format_duration(baseline['p95_duration'])}")
print(f" Samples: {baseline['sample_count']}")
else:
# All projects summary
metrics = get_all_projects_metrics(days)
if not metrics.get("by_project"):
print("No task data available.")
print("\nTasks will be tracked automatically when dispatched via luzia.")
return 0
print(f"Total Tasks: {metrics['total_tasks']}")
print(f"Total Time: {metrics['total_time_formatted']}")
print(f"\nBy Project:")
print(f"{'Project':<15} {'Tasks':>8} {'Time':>12} {'Avg':>10} {'Success':>8}")
print("-" * 55)
for proj, data in sorted(metrics['by_project'].items(),
key=lambda x: x[1]['total_time_seconds'],
reverse=True):
print(f"{proj:<15} {data['total_tasks']:>8} {data['total_time_formatted']:>12} "
f"{format_duration(data['avg_duration_seconds']):>10} {data['success_rate']:>7.1f}%")
if metrics.get("longest_tasks"):
print(f"\nLongest Running Tasks:")
for i, task in enumerate(metrics["longest_tasks"][:5], 1):
print(f" {i}. {task['project']}: {task['duration_formatted']}")
if show_buckets:
print(f"\nSuccess Rate by Duration:")
buckets = get_success_by_duration_bucket(project)
print(f"{'Duration':<15} {'Total':>8} {'Success':>8} {'Rate':>8}")
print("-" * 41)
bucket_names = {
"under_1m": "< 1 minute",
"1_to_5m": "1-5 minutes",
"5_to_15m": "5-15 minutes",
"15_to_30m": "15-30 minutes",
"30_to_60m": "30-60 minutes",
"over_60m": "> 60 minutes"
}
for key in ["under_1m", "1_to_5m", "5_to_15m", "15_to_30m", "30_to_60m", "over_60m"]:
data = buckets.get(key, {})
if data.get("total", 0) > 0:
print(f"{bucket_names[key]:<15} {data['total']:>8} {data['success']:>8} "
f"{data['success_rate']:>7.1f}%")
print()
return 0
def route_project_task(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia <project> <task>"""
if not args or len(args) < 2:
# Just project name - show project info
project = args[0] if args else None
if not project or project not in config["projects"]:
print("Usage: luzia <project> <task>")
return 1
project_config = config["projects"][project]
bridge = DockerBridge(project, project_config.get("path", f"/home/{project}"))
status = bridge.status()
color_hex = project_config.get("color", "#808080")
color_code = Color.hex_to_ansi(color_hex)
print(Color.bold(f"{project}", color_code))
if VERBOSE:
print(f" Description: {project_config.get('description', 'N/A')}")
print(f" Path: {project_config.get('path', f'/home/{project}')}")
print(f" Focus: {project_config.get('focus', 'N/A')}")
print(Color.output(f" {'Running' if status.get('running') else 'Stopped'}", color_code))
return 0
project = args[0]
task = " ".join(args[1:])
project_config = config["projects"].get(project)
if not project_config:
print(f"Unknown project: {project}")
return 1
color_hex = project_config.get("color", "#808080")
color_code = Color.hex_to_ansi(color_hex)
bridge = DockerBridge(
project=project,
host_path=project_config.get("path", f"/home/{project}"),
extra_mounts=project_config.get("extra_mounts", [])
)
context = get_project_context(project, config, task_query=task)
task_id = datetime.now().strftime("%H%M%S") + "-" + hex(hash(task) & 0xffff)[2:]
log_file = LOG_DIR / f"{project}-{task_id}.log"
if VERBOSE:
print(Color.bold(f"Task for {project}", color_code))
print(f" Container: luzia-{project}")
print(f" Log: {log_file}")
print()
was_started = bridge.ensure_running()
if VERBOSE and was_started:
print(f"Started container luzia-{project}")
# Detect if task is a direct shell command (not natural language)
# These must be followed by space, args, or be the entire command
command_starters = ['npm ', 'node ', 'python ', 'pip ', 'git ', 'ls ', 'ls$', 'cat ',
'grep ', 'find ', 'make ', 'make$', 'cargo ', 'go ', 'yarn ', 'pnpm ',
'docker ', 'cd ', 'pwd', 'echo ', 'touch ', 'mkdir ', 'rm ', 'cp ', 'mv ',
'curl ', 'wget ', 'which ', 'env ', 'env$', 'export ', 'source ', 'bash ',
'./', 'sh ', 'test ', './']
task_lower = task.lower()
is_command = any(
task_lower.startswith(cmd.rstrip('$')) and (cmd.endswith('$') or cmd.endswith(' ') or len(task_lower) == len(cmd.rstrip('$')))
for cmd in command_starters
)
if is_command:
# Background mode - dispatch and return immediately
if BACKGROUND:
job_id = spawn_background_job(project, task, log_file)
print(f"{project}:{job_id}")
return 0
# Direct command execution (foreground)
result = bridge.execute(task)
if result["output"]:
print(result["output"], end='')
if result["error"]:
print(result["error"], file=sys.stderr, end='')
# Log result
with open(log_file, 'w') as f:
f.write(f"Task: {task}\n")
f.write(f"Exit: {result['exit_code']}\n\n")
f.write(result["output"])
if result["error"]:
f.write(f"\nSTDERR:\n{result['error']}")
return 0 if result["success"] else 1
else:
# Natural language task - use cockpit for human-in-the-loop execution
if COCKPIT_AVAILABLE:
from cockpit import cockpit_dispatch_task, load_state, container_running
# Check if there's an existing session we should continue
state = load_state(project)
if state.get("session_started") and container_running(project):
# Continue existing session
from cockpit import cockpit_continue
result = cockpit_continue(project, task, config, show_output=True)
else:
# Start new task dispatch via cockpit
result = cockpit_dispatch_task(project, task, context, config, show_output=True)
if result.get("success"):
return 0
else:
print(f"Error: {result.get('error', 'Unknown error')}")
return 1
else:
# Fallback to spawn_claude_agent when cockpit not available
job_id = spawn_claude_agent(project, task, context, config, skip_preflight=SKIP_PREFLIGHT)
# Check if task was blocked by preflight
if job_id.startswith("BLOCKED:"):
error_msg = job_id[8:] # Remove "BLOCKED:" prefix
print(f"blocked:{project}:{error_msg}")
return 1
# Show debug indicator if Claude dev task detected
debug_indicator = " [DEBUG]" if is_claude_dev_task(task) else ""
print(f"agent:{project}:{job_id}{debug_indicator}")
return 0
def route_work_on(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia work on <project> [task]"""
if not args:
print("Usage: luzia work on <project>")
return 1
project = args[0]
task = " ".join(args[1:]) if len(args) > 1 else None
if project not in config["projects"]:
print(f"Unknown project: {project}")
return 1
# Get project config
project_config = config["projects"][project]
if task:
return route_project_task(config, [project, task], kwargs)
else:
# Interactive mode - show project info and start session
color_hex = project_config.get("color", "#808080")
color_code = Color.hex_to_ansi(color_hex)
print(Color.bold(f"Working on {project}", color_code))
print(project_config.get("description", ""))
# For luzia project (dogfooding), spawn interactive session
if project == "luzia":
return _spawn_luzia_session(project_config, None, kwargs)
return 0
def _spawn_luzia_session(project_config: dict, task: str, kwargs: dict) -> int:
"""
Dogfooding: Spawn a Claude session for working on Luzia itself.
Uses the project config for path/settings.
"""
import subprocess
project_path = Path(project_config.get("path", "/opt/server-agents/orchestrator"))
docs_dir = Path("/opt/server-agents/docs")
print()
print(Color.bold("🔧 Dogfooding mode", Color.hex_to_ansi("#FF6B6B")))
print(f" Directory: {project_path}")
if task:
print(f" Task: {task}")
print()
# Build the prompt with context
prompt = f"""You are working on Luzia - the unified orchestration CLI.
IMPORTANT CONTEXT:
- Main script: /opt/server-agents/orchestrator/bin/luzia (Python, ~3800 lines)
- Architecture doc: /opt/server-agents/docs/UNIFIED-ORCHESTRATION-ARCHITECTURE.md
- User guide: /opt/server-agents/docs/AI-AGENT-LUZIA-GUIDE.md
- This is infrastructure code - be careful with changes
- Always verify syntax with: python3 -m py_compile bin/luzia
TASK: {task}
Start by reading the relevant sections of the luzia script to understand the current implementation."""
cmd = ["claude", "--dangerously-skip-permissions", "-p", prompt]
else:
print("Starting interactive session...")
print()
cmd = ["claude", "--dangerously-skip-permissions"]
try:
result = subprocess.run(
cmd,
cwd=str(project_path),
env={**os.environ, "CLAUDE_PROJECT": "luzia"}
)
return result.returncode
except KeyboardInterrupt:
print("\nSession ended")
return 0
except Exception as e:
print(f"Error: {e}")
return 1
def route_think_deep(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia think deep <topic>
Enhanced Deep Logic Framework combining:
- First Principles Thinking: Break down to fundamental truths
- Refraction Thinking: Multi-lens perspective analysis
- Structured synthesis and validation
Outputs instructions for the calling Claude session to use Zen MCP with Gemini 3.
"""
if not args:
print("Usage: luzia think deep <topic>")
print(" luzia think deep --code <code problem>")
print(" luzia think deep --bug <bug description>")
print(" luzia think deep --arch <architecture question>")
return 1
# Detect mode from flags
mode = "general"
topic_args = args
if args[0] == "--code":
mode = "code"
topic_args = args[1:]
elif args[0] == "--bug":
mode = "bug"
topic_args = args[1:]
elif args[0] == "--arch":
mode = "architecture"
topic_args = args[1:]
elif args[0] == "--qa":
mode = "qa"
topic_args = args[1:]
topic = " ".join(topic_args)
# Mode-specific lens emphasis
lens_emphasis = {
"general": "all lenses equally",
"code": "Technical, Scale, Integration lenses primarily",
"bug": "Technical, Failure, Temporal lenses primarily",
"architecture": "Technical, Scale, Integration, Temporal lenses primarily",
"qa": "User, Failure, Scale, Integration lenses primarily"
}
# Output structured instruction with Deep Logic Framework
print(f"""LUZIA_DIRECTIVE: DEEP_LOGIC_FRAMEWORK
TOPIC: {topic}
MODE: {mode}
MODEL: gemini-3-pro-preview
THINKING_MODE: max
═══════════════════════════════════════════════════════════════════
DEEP LOGIC FRAMEWORK ANALYSIS
═══════════════════════════════════════════════════════════════════
Execute this 4-stage analysis using mcp__zen__thinkdeep:
## STAGE 1: DECOMPOSITION (First Principles)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Break down "{topic}" to fundamental truths:
1. State the problem/question precisely
2. List ALL assumptions (explicit and implicit)
3. Challenge each assumption: Is this actually true?
4. Identify irreducible components (axioms)
5. Ask "Why?" repeatedly until you reach bedrock truths
Key Questions:
- What do we KNOW to be true vs. ASSUME to be true?
- If we were starting from scratch, what would we do?
- What would this look like if it were easy?
## STAGE 2: MULTI-LENS ANALYSIS (Refraction Thinking)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Examine through seven lenses (emphasize: {lens_emphasis[mode]}):
| Lens | Key Questions |
|------|---------------|
| 🔧 Technical | What are the technical constraints and implications? |
| 👤 User | Who is affected and how? What are their goals? |
| 💼 Business | What is the cost, value, and risk? |
| ⏰ Temporal | What happened before? What happens long-term? |
| 📈 Scale | How does this behave at 10x scale? |
| ⚠️ Failure | What can go wrong? How do we detect and recover? |
| 🔗 Integration | What systems/dependencies are involved? |
## STAGE 3: SYNTHESIS
━━━━━━━━━━━━━━━━━━━━
Combine insights from Stages 1 and 2:
1. Identify patterns across lenses
2. Resolve contradictions
3. Reconstruct solution from first principles only
4. Generate 2-3 solution options with trade-offs
5. Provide recommendation with confidence level (low/medium/high/very high)
## STAGE 4: VALIDATION CHECKLIST
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
□ Solution addresses root cause (not symptoms)
□ All relevant lenses considered
□ Assumptions documented and challenged
□ Trade-offs are explicit
□ Failure modes identified
□ Test/validation strategy defined
□ Rollback plan exists (if applicable)
═══════════════════════════════════════════════════════════════════
Execute with mcp__zen__thinkdeep:
{{
"model": "gemini-3-pro-preview",
"thinking_mode": "max",
"step": "Deep Logic Framework analysis of: {topic}. Execute all 4 stages: (1) First Principles Decomposition - break to fundamental truths, challenge assumptions, (2) Refraction Analysis through 7 lenses with emphasis on {lens_emphasis[mode]}, (3) Synthesis - combine insights, resolve contradictions, generate solutions, (4) Validation checklist.",
"step_number": 1,
"total_steps": 2,
"next_step_required": true,
"findings": "",
"focus_areas": ["first principles", "refraction analysis", "synthesis", "validation"],
"problem_context": "Deep Logic Framework analysis for: {topic}"
}}
After analysis, provide output in this format:
## Deep Logic Analysis: {topic}
### Stage 1: First Principles Decomposition
[Problem statement, challenged assumptions, fundamental truths]
### Stage 2: Lens Analysis
[Table of observations from each lens]
### Stage 3: Synthesis
[Root cause, solution options, recommendation]
### Stage 4: Validation
[Checklist results, test strategy, next steps]""")
return 0
def route_research(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia research [project] <topic>
3-Phase Research Flow:
1. Context Expansion - Infer related concepts, expand search scope
2. Branching Web Search - Multiple parallel searches on expanded concepts
3. Final Synthesis - Consolidate findings into knowledge graph
All research is stored in project-specific knowledge graph.
Uses Zen MCP with Gemini 3 for all phases.
"""
if not args:
print("Usage: luzia research [project] <topic>")
print(" luzia deep research [project] <topic>")
print(" luzia web research [project] <topic>")
print("\nExamples:")
print(" luzia research admin multi-agent orchestration")
print(" luzia deep research musica audio synthesis libraries")
return 1
# Check if first arg is a project name
projects = set(config.get("projects", {}).keys())
if args[0] in projects:
project = args[0]
topic = " ".join(args[1:])
else:
project = "admin" # Default project for general research
topic = " ".join(args)
if not topic:
print("Error: No research topic provided")
return 1
# Create research session
session_id = create_research_session(project, topic)
# Get prior research context for this project
prior_research = get_project_research_context(project, limit=3)
prior_context = ""
if prior_research:
prior_topics = [r["topic"] for r in prior_research if r.get("final_synthesis")]
if prior_topics:
prior_context = f"\n\nPrior research in this project: {', '.join(prior_topics[:5])}"
# Get existing knowledge graph nodes
graph = get_research_graph(project)
existing_concepts = [n["name"] for n in graph.get("nodes", [])][:20]
graph_context = ""
if existing_concepts:
graph_context = f"\n\nExisting knowledge concepts: {', '.join(existing_concepts)}"
# Output structured 3-phase research directive
print(f"""LUZIA_DIRECTIVE: RESEARCH_FLOW
SESSION_ID: {session_id}
PROJECT: {project}
TOPIC: {topic}
MODEL: gemini-3-pro-preview
{prior_context}{graph_context}
Execute this 3-phase research flow using Zen MCP with Gemini 3:
═══════════════════════════════════════════════════════════════════
PHASE 1: CONTEXT EXPANSION (mcp__zen__thinkdeep)
═══════════════════════════════════════════════════════════════════
Goal: Expand the research topic into related concepts and search branches
Parameters:
{{
"model": "gemini-3-pro-preview",
"thinking_mode": "high",
"step": "Context expansion for research topic: {topic}. Identify: 1) Core concepts and terminology, 2) Related fields and disciplines, 3) Key questions to answer, 4) Potential search branches (5-8 specific queries), 5) Expected sources (academic, industry, open source)",
"step_number": 1,
"total_steps": 3,
"next_step_required": true,
"findings": "",
"focus_areas": ["concept mapping", "terminology", "related fields", "search strategy"],
"problem_context": "Research context expansion for: {topic}"
}}
After Phase 1, call: luzia research-update {session_id} context_expansion "<json_data>"
═══════════════════════════════════════════════════════════════════
PHASE 2: BRANCHING WEB SEARCH (mcp__zen__thinkdeep + WebSearch)
═══════════════════════════════════════════════════════════════════
Goal: Execute multiple parallel web searches on expanded concepts
For each search branch from Phase 1:
1. Use WebSearch tool with specific queries
2. Use mcp__zen__thinkdeep to analyze and extract key findings
3. Identify entities (people, companies, projects, concepts)
4. Note relationships between entities
Parameters for each branch analysis:
{{
"model": "gemini-3-pro-preview",
"thinking_mode": "medium",
"step": "Analyze search results for branch: <branch_topic>",
"step_number": 2,
"total_steps": 3,
"next_step_required": true,
"findings": "<search_results_summary>",
"focus_areas": ["key findings", "entities", "relationships", "sources"]
}}
After Phase 2, call: luzia research-update {session_id} search_branches "<json_data>"
═══════════════════════════════════════════════════════════════════
PHASE 3: FINAL SYNTHESIS (mcp__zen__thinkdeep)
═══════════════════════════════════════════════════════════════════
Goal: Consolidate all findings into coherent research output
Parameters:
{{
"model": "gemini-3-pro-preview",
"thinking_mode": "max",
"step": "Final synthesis of research on: {topic}. Consolidate all branch findings into: 1) Executive summary, 2) Key concepts and definitions, 3) Current state of the field, 4) Major players and projects, 5) Trends and future directions, 6) Recommendations, 7) Knowledge graph entities to store",
"step_number": 3,
"total_steps": 3,
"next_step_required": false,
"findings": "<consolidated_branch_findings>",
"focus_areas": ["synthesis", "recommendations", "knowledge extraction"]
}}
After Phase 3, call: luzia research-update {session_id} final_synthesis "<json_data>"
Then call: luzia research-graph {session_id} "<entities_and_relations_json>"
═══════════════════════════════════════════════════════════════════
OUTPUT FORMAT
═══════════════════════════════════════════════════════════════════
Final output should include:
1. Research summary (2-3 paragraphs)
2. Key findings (bulleted list)
3. Knowledge graph additions (entities and relationships)
4. Sources cited
5. Follow-up research suggestions""")
return 0
def route_research_update(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia research-update <session_id> <phase> <json_data>
Update a research session with phase results.
"""
if len(args) < 3:
print("Usage: luzia research-update <session_id> <phase> <json_data>")
print("Phases: context_expansion, search_branches, final_synthesis")
return 1
session_id = args[0]
phase = args[1]
json_data = " ".join(args[2:])
try:
data = json.loads(json_data)
except json.JSONDecodeError:
# Try to parse as simple key-value if not valid JSON
data = {"raw": json_data}
update_research_phase(session_id, phase, data)
print(f"Updated session {session_id} phase: {phase}")
return 0
def route_research_graph(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia research-graph <session_id> <entities_json>
Add entities and relationships to the research knowledge graph.
Expected JSON format:
{
"project": "admin",
"entities": [
{"name": "AutoGen", "type": "framework", "description": "..."},
...
],
"relationships": [
{"source": "AutoGen", "target": "Microsoft", "relation": "developed_by"},
...
]
}
"""
if len(args) < 2:
print("Usage: luzia research-graph <session_id> <entities_json>")
return 1
session_id = args[0]
json_data = " ".join(args[1:])
try:
data = json.loads(json_data)
except json.JSONDecodeError:
print(f"Error: Invalid JSON data")
return 1
project = data.get("project", "admin")
entities = data.get("entities", [])
relationships = data.get("relationships", [])
# Add nodes
node_map = {} # name -> id
for entity in entities:
node_id = add_research_node(
session_id=session_id,
project=project,
name=entity.get("name"),
node_type=entity.get("type", "concept"),
description=entity.get("description")
)
node_map[entity.get("name")] = node_id
# Add edges
for rel in relationships:
source_name = rel.get("source")
target_name = rel.get("target")
relation = rel.get("relation", "related_to")
# Ensure both nodes exist
if source_name not in node_map:
node_map[source_name] = add_research_node(session_id, project, source_name, "concept")
if target_name not in node_map:
node_map[target_name] = add_research_node(session_id, project, target_name, "concept")
add_research_edge(
source_id=node_map[source_name],
target_id=node_map[target_name],
relation=relation,
context=rel.get("context")
)
print(f"Added {len(entities)} entities and {len(relationships)} relationships to {project} knowledge graph")
return 0
def route_research_list(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia research-list [project]
List research sessions for a project.
"""
project = args[0] if args else "admin"
sessions = get_project_research_context(project, limit=20)
if not sessions:
print(f"No research sessions for project: {project}")
return 0
print(f"\nResearch sessions for {project}:")
print("-" * 60)
for s in sessions:
status_icon = "" if s["status"] == "completed" else ""
ts = datetime.fromtimestamp(s["created_at"]).strftime("%Y-%m-%d %H:%M")
print(f" [{status_icon}] {s['id']} | {ts} | {s['topic'][:40]}")
print(f" Phase: {s['phase']}")
return 0
def route_research_show(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia research-show <session_id>
Show details of a research session.
"""
if not args:
print("Usage: luzia research-show <session_id>")
return 1
session_id = args[0]
# Find session across all projects
conn = _init_research_db()
c = conn.cursor()
c.execute('SELECT * FROM research_sessions WHERE id = ?', (session_id,))
row = c.fetchone()
conn.close()
if not row:
print(f"Session not found: {session_id}")
return 1
print(f"\nResearch Session: {row[0]}")
print(f"Project: {row[1]}")
print(f"Topic: {row[2]}")
print(f"Status: {row[3]}")
print(f"Phase: {row[6]}")
print(f"Created: {datetime.fromtimestamp(row[4]).strftime('%Y-%m-%d %H:%M')}")
if row[7]: # context_expansion
print(f"\n--- Context Expansion ---")
print(json.dumps(json.loads(row[7]), indent=2)[:500])
if row[8]: # search_branches
print(f"\n--- Search Branches ---")
print(json.dumps(json.loads(row[8]), indent=2)[:500])
if row[9]: # final_synthesis
print(f"\n--- Final Synthesis ---")
print(json.dumps(json.loads(row[9]), indent=2)[:1000])
return 0
def route_research_knowledge(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia research-knowledge [project]
Show the knowledge graph for a project.
"""
project = args[0] if args else "admin"
graph = get_research_graph(project)
if not graph["nodes"]:
print(f"No knowledge graph for project: {project}")
return 0
print(f"\nKnowledge Graph for {project}:")
print(f"Nodes: {len(graph['nodes'])} | Edges: {len(graph['edges'])}")
print("-" * 60)
print("\nEntities:")
for node in graph["nodes"][:30]:
desc = (node.get("description") or "")[:50]
print(f" [{node['type']}] {node['name']}: {desc}")
if graph["edges"]:
print("\nRelationships:")
# Build name lookup
node_names = {n["id"]: n["name"] for n in graph["nodes"]}
for edge in graph["edges"][:20]:
src = node_names.get(edge["source"], edge["source"][:8])
tgt = node_names.get(edge["target"], edge["target"][:8])
print(f" {src} --[{edge['relation']}]--> {tgt}")
return 0
def route_fix(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia fix <issue>"""
if not args:
print("Usage: luzia fix <issue>")
return 1
issue = " ".join(args)
troubleshooting = config.get("troubleshooting", {})
# Search for matching issue patterns
for problem, details in troubleshooting.items():
patterns = details.get("error_patterns", [])
if any(p.lower() in issue.lower() for p in patterns):
print(f"Issue: {issue}")
print(f"Problem: {problem}")
print(f"Fix: {details.get('fix', 'N/A')}")
if VERBOSE and details.get('source_script'):
print(f"Script: {details.get('source_script')}")
return 0
print(f"Unknown issue: {issue}")
print("Run 'luzia fix <keyword>' for troubleshooting.")
print("Available categories: configuration, builds, containers")
return 1
# =============================================================================
# =============================================================================
# STRUCTURAL ANALYSIS (Phase 5: Code Structure Intelligence)
# =============================================================================
def route_structure(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia structure [project] [path] [--json] [--no-kg] [--output FILE]
Structural analysis of project code:
luzia structure - Analyze current orchestrator
luzia structure <project> - Analyze a project
luzia structure . path/to/src - Analyze specific path
luzia structure --json - Output analysis as JSON
luzia structure --no-kg - Don't save to knowledge graph
"""
import sys
sys.path.insert(0, "/opt/server-agents/orchestrator/lib")
from structural_analysis import StructuralAnalysisReport, analyze_project
from pathlib import Path
# Parse arguments
project = None
path = None
output_file = None
save_json = True
save_kg = True
json_output = False
i = 0
while i < len(args):
arg = args[i]
if arg == "--json":
json_output = True
elif arg == "--no-kg":
save_kg = False
elif arg == "--output" and i + 1 < len(args):
output_file = args[i + 1]
i += 1
elif not arg.startswith("-"):
if project is None and arg in config.get("projects", {}):
project = arg
elif path is None:
path = arg
elif project is None:
project = arg
i += 1
# Determine project path
if project:
if project not in config.get("projects", {}):
print(f"Project not found: {project}")
return 1
project_path = config["projects"][project].get("path", f"/home/{project}")
else:
project_path = "/opt/server-agents/orchestrator"
project = "orchestrator"
# If specific path provided, use it
if path:
project_path = Path(project_path) / path
# Run analysis
try:
result = analyze_project(
project_path,
project_name=project,
save_json=save_json and not json_output,
save_kg=save_kg,
verbose=not json_output
)
if json_output:
print(json.dumps(result["report"], indent=2))
# Show KG result if not JSON output
if not json_output and result.get("kg_result"):
kg = result["kg_result"]
if "error" not in kg:
print(f"\nKnowledge Graph:")
print(f" Entities added: {kg.get('entities_added', 0)}")
print(f" Relations added: {kg.get('relations_added', 0)}")
if kg.get("errors"):
print(f" Errors: {len(kg['errors'])}")
return 0
except Exception as e:
if json_output:
print(json.dumps({"error": str(e)}))
else:
print(f"Error: {e}")
return 1
# QUEUE COMMANDS (Phase 4: Task Queue Implementation)
# =============================================================================
def route_queue(config: dict, args: list, kwargs: dict) -> int:
"""
Handler: luzia queue [project] [--clear|--stats]
Shows project-based queue status with per-project sequencing.
Usage:
luzia queue # Global queue status
luzia queue musica # Project-specific status
luzia queue --stats # Statistics JSON
luzia queue --clear # Clear all pending tasks
"""
import sys
sys.path.insert(0, "/opt/server-agents/orchestrator/lib")
try:
from project_queue_cli import ProjectQueueCLI, get_stats
except ImportError:
# Fallback to basic queue controller
from queue_controller import QueueController
qc = QueueController()
if "--clear" in args:
cleared = qc.clear_queue()
print(f"Cleared {cleared} pending tasks from queue")
return 0
print("Project-based queue not available, using basic queue")
return 1
# Handle --stats flag
if "--stats" in args:
stats = get_stats()
print(json.dumps(stats, indent=2))
return 0
# Handle --clear flag
if "--clear" in args:
from queue_controller import QueueController
qc = QueueController()
cleared = qc.clear_queue()
print(f"Cleared {cleared} pending tasks from queue")
return 0
# Extract project name if provided
project = None
for arg in args:
if not arg.startswith("-") and arg in config.get("projects", {}):
project = arg
break
# Display queue status
cli = ProjectQueueCLI()
status_output = cli.get_queue_status(project)
print(status_output)
return 0
def route_dispatch(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia dispatch <project> <task> [--priority N] [--now]"""
priority = 5
immediate = "--now" in args
clean_args = []
i = 0
while i < len(args):
if args[i] in ("--priority", "-p") and i + 1 < len(args):
try:
priority = int(args[i + 1])
priority = max(1, min(10, priority))
except ValueError:
pass
i += 2
elif args[i] == "--now":
i += 1
else:
clean_args.append(args[i])
i += 1
if len(clean_args) < 2:
print("Usage: luzia dispatch <project> <task> [--priority N] [--now]")
return 1
project = clean_args[0]
task = " ".join(clean_args[1:])
# Check permission
require_project_permission(project)
if immediate:
job_id = spawn_claude_agent(project, task, "", config, skip_preflight=SKIP_PREFLIGHT)
if job_id.startswith("BLOCKED:"):
print(f"blocked:{project}:{job_id[8:]}")
return 1
print(f"agent:{project}:{job_id}")
return 0
import sys
sys.path.insert(0, "/opt/server-agents/orchestrator/lib")
from queue_controller import QueueController
qc = QueueController()
task_id, position = qc.enqueue(project=project, prompt=task, priority=priority)
tier = "high" if priority <= 3 else "normal"
print(f"Queued task {task_id} (priority {priority}/{tier}, position {position})")
return 0
def route_logs(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia logs <project|job_id> [--no-header]
Shows logs with timing header for job IDs.
Use --no-header to skip the timing header.
"""
no_header = "--no-header" in args
args = [a for a in args if not a.startswith("--")]
if not args:
print("Usage: luzia logs <project|job_id>")
return 1
target = args[0]
# Check if it's a job ID
job_dir = JOBS_DIR / target
if job_dir.exists():
output_file = job_dir / "output.log"
# Show timing header if available
if not no_header and TIME_METRICS_AVAILABLE:
try:
job = get_job_status(target, update_completion=True)
header = format_logs_header(job)
print(header)
except Exception as e:
_log(f" [Time] Warning: Could not format header: {e}", verbose_only=True)
if output_file.exists():
print(output_file.read_text())
else:
print("Job running, no output yet")
return 0
# Otherwise treat as project
log_files = sorted(LOG_DIR.glob(f"{target}-*.log"), reverse=True)
if log_files:
with open(log_files[0]) as f:
print(f.read())
else:
print(f"No logs for {target}")
return 0
def route_jobs(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia jobs [job_id] [--timing]
Shows jobs with optional time metrics.
Use --timing to show detailed timing information.
"""
show_timing = "--timing" in args
args = [a for a in args if not a.startswith("--")]
if args:
# Show specific job
job = get_job_status(args[0])
if "error" in job:
print(job["error"])
return 1
print(f"Job: {job['id']}")
print(f"Project: {job['project']}")
print(f"Task: {job.get('task', job.get('command', ''))}")
print(f"Status: {job['status']}")
if "exit_code" in job:
print(f"Exit: {job['exit_code']}")
# Show time metrics if available
time_metrics = job.get("time_metrics", {})
if time_metrics:
dispatch = time_metrics.get("dispatch", {})
completion = time_metrics.get("completion", {})
print("\nTiming:")
if dispatch.get("utc_time"):
print(f" Dispatched: {dispatch['utc_time']}")
if dispatch.get("system_load"):
load = dispatch["system_load"]
print(f" System Load: {load[0]:.2f}, {load[1]:.2f}, {load[2]:.2f}")
if dispatch.get("memory_percent"):
print(f" Memory: {dispatch['memory_percent']}%")
if completion.get("utc_time"):
print(f" Completed: {completion['utc_time']}")
print(f" Duration: {completion.get('duration_formatted', '--:--:--')}")
elif job.get("elapsed"):
print(f" Elapsed: {job['elapsed']}")
return 0
# List all jobs
jobs = list_jobs()
if not jobs:
print("No jobs")
return 0
# Header for timing view
if show_timing and TIME_METRICS_AVAILABLE:
print(f"\n{'Job ID':<18} {'Project':<10} {'Status':<10} {'Dispatch':<10} {'Duration':<10} {'CPU':>6}")
print("-" * 70)
else:
pass # Use original compact format
for job in jobs:
status_icon = "" if job.get("status") == "completed" else ""
exit_code = job.get("exit_code", "")
exit_str = f" ({exit_code})" if exit_code != "" else ""
job_type = job.get("type", "docker")
type_indicator = "🤖" if job_type == "agent" else "📦"
if show_timing and TIME_METRICS_AVAILABLE:
# Enhanced timing display
time_metrics = job.get("time_metrics", {})
dispatch = time_metrics.get("dispatch", {})
completion = time_metrics.get("completion", {})
dispatch_time = dispatch.get("utc_time", job.get("started", ""))
if dispatch_time:
dispatch_display = dispatch_time[11:19] if len(dispatch_time) > 19 else "--:--:--"
else:
dispatch_display = "--:--:--"
if completion.get("duration_formatted"):
duration = completion["duration_formatted"]
elif job.get("elapsed"):
duration = job["elapsed"]
else:
duration = "--:--:--"
load = dispatch.get("system_load", [0])
load_display = f"{load[0]:.2f}" if isinstance(load, list) and load else "-.--"
status_text = job.get("status", "unknown")[:10]
print(f"{job['id']:<18} {job['project']:<10} {status_text:<10} {dispatch_display:<10} {duration:<10} {load_display:>6}")
else:
# Original compact format
desc = job.get("task", job.get("command", ""))[:35]
# Add elapsed time for running jobs
elapsed = ""
if job.get("status") == "running" and job.get("elapsed"):
elapsed = f" [{job['elapsed']}]"
print(f" [{status_icon}] {type_indicator} {job['id']} {job['project']} {desc}{exit_str}{elapsed}")
if show_timing:
print("\n (--timing shows detailed time metrics)")
return 0
def route_kill(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia kill <job_id>"""
if not args:
print("Usage: luzia kill <job_id>")
return 1
result = kill_agent(args[0])
if "error" in result:
print(result["error"])
return 1
print(f"Killed: {args[0]}")
return 0
def route_notify(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia notify [limit]"""
limit = int(args[0]) if args else 10
notifications = get_notifications(limit)
if not notifications:
print("No notifications")
return 0
for n in notifications:
print(n)
return 0
def route_history(config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia history <project> [limit]
Show recent changes/activity for a project from the knowledge graph.
"""
if not args:
print("Usage: luzia history <project> [limit]")
print("Example: luzia history musica 20")
return 1
project = args[0]
limit = int(args[1]) if len(args) > 1 else 10
# Verify project exists
if project not in config.get("projects", {}):
print(f"Unknown project: {project}")
print(f"Available: {', '.join(config.get('projects', {}).keys())}")
return 1
project_config = config["projects"][project]
color = Color.hex_to_ansi(project_config.get("color", "#888888"))
changes = get_project_changes(project, limit)
if not changes:
print(f"No recorded changes for {Color.bold(project, color)}")
return 0
print(f"\n{Color.bold(f'Recent changes for {project}:', color)}")
print("-" * 60)
for change in changes:
ctx = change.get("context", {})
ts = ctx.get("timestamp", "unknown")
desc = ctx.get("description", change.get("event", ""))
relation = change.get("relation", "").replace("has_", "")
# Format timestamp
try:
dt = datetime.fromisoformat(ts)
ts_fmt = dt.strftime("%Y-%m-%d %H:%M")
except:
ts_fmt = ts[:16] if len(ts) > 16 else ts
print(f" [{ts_fmt}] {Color.bold(relation, color)}: {desc}")
print()
return 0
def cmd_exec_raw(config: dict, project: str, command: str):
"""Execute a raw command in the container (for subagent use)"""
project_config = config["projects"].get(project)
if not project_config:
return {"error": f"Unknown project: {project}"}
bridge = DockerBridge(
project=project,
host_path=project_config.get("path", f"/home/{project}"),
extra_mounts=project_config.get("extra_mounts", [])
)
return bridge.execute(command)
def cmd_write_file(config: dict, project: str, path: str, content: str):
"""Write a file in the project container (for subagent use)"""
project_config = config["projects"].get(project)
if not project_config:
return {"error": f"Unknown project: {project}"}
bridge = DockerBridge(
project=project,
host_path=project_config.get("path", f"/home/{project}"),
extra_mounts=project_config.get("extra_mounts", [])
)
return bridge.write_file(path, content)
def cmd_read_file(config: dict, project: str, path: str):
"""Read a file from the project container (for subagent use)"""
project_config = config["projects"].get(project)
if not project_config:
return {"error": f"Unknown project: {project}"}
bridge = DockerBridge(
project=project,
host_path=project_config.get("path", f"/home/{project}"),
extra_mounts=project_config.get("extra_mounts", [])
)
return bridge.read_file(path)
# ============================================================================
# HEALTH & MAINTENANCE ROUTES (Phase 5 Integration)
# ============================================================================
def route_health(config: dict, args: list, kwargs: dict) -> int:
"""
luzia health [component] [--full|--deep|--fix|--flag]
Health check for system components:
luzia health Overall health score
luzia health kg KG health check
luzia health conductor Conductor task health
luzia health context Context system health
luzia health scripts Script validation
luzia health routines Maintenance routines health
Flags:
--full Comprehensive health check
--deep Detailed component analysis
--fix Auto-fix stalled tasks (conductor only)
--flag Mark incomplete research for review (kg only)
"""
# Set up Python path for lib imports
import sys
lib_path = Path(__file__).parent.parent / "lib"
if str(lib_path) not in sys.path:
sys.path.insert(0, str(lib_path))
if not args:
# Overall health score
try:
from system_health_orchestrator import SystemHealthOrchestrator
from health_report_generator import HealthReportGenerator
orchestrator = SystemHealthOrchestrator()
health = orchestrator.generate_unified_health_score()
generator = HealthReportGenerator()
report = generator.generate_dashboard_report(health)
print(report)
return 0
except Exception as e:
print(f"Error: {e}")
return 1
component = args[0]
sub_args = args[1:] if len(args) > 1 else []
try:
if component == "kg":
from kg_health_checker import KGHealthChecker
checker = KGHealthChecker()
if "--deep" in sub_args:
result = checker.check_research_patterns(time_scope_days=30)
print(json.dumps(result, indent=2, default=str))
elif "--flag" in sub_args:
audit = checker.check_kg_completeness()
print(f"KG Status: {audit['status']}")
print(f"Completeness: {audit['completeness_pct']}%")
print(f"Incomplete sessions: {audit['incomplete_count']}")
else:
audit = checker.check_kg_completeness()
print(f"KG Status: {audit['status']}")
print(f"Completeness: {audit['completeness_pct']}%")
return 0
elif component == "conductor":
from conductor_health_checker import ConductorHealthChecker
from conductor_recovery import ConductorRecovery
checker = ConductorHealthChecker()
if "--fix" in sub_args:
recovery = ConductorRecovery()
result = recovery.recover_all_stalled_tasks(dry_run=False)
print(f"Recovered: {result['recovered']}")
print(f"Moved to failed: {result['moved_to_failed']}")
else:
health = checker.generate_conductor_health_score()
print(f"Conductor Score: {health['overall_score']}/100 ({health['status']})")
print(f"Stalled tasks: {health['stalled_tasks']}")
return 0
elif component == "context":
from context_health_checker import ContextHealthChecker
checker = ContextHealthChecker()
health = checker.generate_context_health_score()
print(f"Context Score: {health['overall_score']}/100 ({health['status']})")
return 0
elif component == "scripts":
from script_health_checker import ScriptHealthChecker
checker = ScriptHealthChecker()
report = checker.generate_script_health_report()
print(f"Script Health: {report['health_score']}/100 ({report['status']})")
return 0
elif component == "routines":
from routine_validator import RoutineValidator
validator = RoutineValidator()
report = validator.generate_routine_validation_report()
print(f"Routines Health: {report['health_score']}/100 ({report['status']})")
return 0
else:
print(f"Unknown component: {component}")
return 1
except ImportError as e:
print(f"Error: Missing health module: {e}")
return 1
except Exception as e:
print(f"Error: {e}")
return 1
def route_maintain(config: dict, args: list, kwargs: dict) -> int:
"""
luzia maintain [target] [--dry-run|--all]
System maintenance operations:
luzia maintain Maintenance status
luzia maintain kg KG optimization (dedup, indexes)
luzia maintain conductor Task archival and cleanup
luzia maintain context Context tuning
luzia maintain --all Full system maintenance
Flags:
--dry-run Preview without making changes
--dedupe Auto-merge KG duplicates
--fix Auto-fix stalled conductor tasks
--archive Archive old tasks
"""
# Set up Python path for lib imports
import sys
lib_path = Path(__file__).parent.parent / "lib"
if str(lib_path) not in sys.path:
sys.path.insert(0, str(lib_path))
if not args or args[0] == "--status":
print("Maintenance operations available:")
print(" luzia maintain kg - Knowledge graph optimization")
print(" luzia maintain conductor - Task archival and cleanup")
print(" luzia maintain context - Context system tuning")
print(" luzia maintain --all - Full system maintenance")
return 0
target = args[0]
sub_args = args[1:] if len(args) > 1 else []
dry_run = "--dry-run" in sub_args
try:
if target == "kg":
from kg_maintainer import KGMaintainer
maintainer = KGMaintainer()
result = maintainer.run_full_kg_maintenance(dry_run=dry_run)
print(f"KG Maintenance ({'DRY RUN' if dry_run else 'APPLIED'}):")
print(f" Duplicates found: {result['duplicates_found']}")
print(f" Duplicates merged: {result['duplicates_merged']}")
print(f" Indexes optimized: {result['indexes_optimized']}")
return 0
elif target == "conductor":
from conductor_maintainer import ConductorMaintainer
maintainer = ConductorMaintainer()
result = maintainer.run_full_conductor_maintenance(dry_run=dry_run)
print(f"Conductor Maintenance ({'DRY RUN' if dry_run else 'APPLIED'}):")
print(f" Tasks archived: {result['summary']['tasks_archived']}")
print(f" Space freed: {result['summary']['space_freed_mb']:.1f}MB")
print(f" Locks removed: {result['summary']['locks_removed']}")
return 0
elif target == "context":
from context_maintainer import ContextMaintainer
maintainer = ContextMaintainer()
result = maintainer.run_full_context_maintenance(dry_run=dry_run)
print(f"Context Maintenance ({'DRY RUN' if dry_run else 'APPLIED'}):")
for action in result['actions_completed']:
print(f" - {action}")
return 0
elif target == "--all":
from maintenance_orchestrator import MaintenanceOrchestrator
orchestrator = MaintenanceOrchestrator()
result = orchestrator.run_full_system_maintenance(dry_run=dry_run)
print(orchestrator.generate_maintenance_report(result))
return 0
else:
print(f"Unknown maintenance target: {target}")
return 1
except ImportError as e:
print(f"Error: Missing maintenance module: {e}")
return 1
except Exception as e:
print(f"Error: {e}")
return 1
def route_chat(config: dict, args: list, kwargs: dict) -> int:
"""
luzia chat [query] [--interactive|--stats|--help-commands|--kg|--local|--bash|--think]
Interactive query interface for knowledge graph, project memory, and system commands:
luzia chat "search term" Single query
luzia chat --interactive Interactive mode
luzia chat --kg "search term" KG search only
luzia chat --local "search term" Project memory only
luzia chat --bash "command" System command only
luzia chat --think "topic" Deep reasoning (deferred)
luzia chat --stats Show system statistics
luzia chat --help-commands Show available commands
Response targets: <500ms total execution time
- KG queries: <200ms
- Memory queries: <150ms
- Bash execution: <300ms
"""
# Set up Python path for lib imports
import sys
lib_path = Path(__file__).parent.parent / "lib"
if str(lib_path) not in sys.path:
sys.path.insert(0, str(lib_path))
try:
from chat_orchestrator import ChatOrchestrator
orchestrator = ChatOrchestrator()
# Handle special flags
if "--help-commands" in args:
from chat_response_formatter import ChatResponseFormatter
formatter = ChatResponseFormatter()
print(formatter.format_help())
return 0
if "--stats" in args:
import json
stats = orchestrator.get_statistics()
print(json.dumps(stats, indent=2))
return 0
if "--interactive" in args or "-i" in args:
orchestrator.start_interactive_session()
return 0
# Process single query
if args:
query = " ".join(args)
result = orchestrator.process_query(query)
print()
print(result['response'])
print()
print(f"*{result.get('response_time_indicator', 'processed')}*")
return 0 if result.get('status') == 'success' else 1
else:
# No args = interactive mode
orchestrator.start_interactive_session()
return 0
except ImportError as e:
print(f"Error: Missing chat module: {e}")
return 1
except Exception as e:
print(f"Error: {e}")
return 1
def print_help():
"""Print help message"""
print(__doc__)
class Router:
"""Pattern-based routing dispatcher"""
def __init__(self, config: dict):
self.config = config
self.projects = set(config.get("projects", {}).keys())
# Define routes: (pattern_fn, handler_fn, description)
self.routes = [
(self._match_list, route_list, "List projects"),
(self._match_status, route_status, "Show status"),
(self._match_stop, route_stop, "Stop container"),
(self._match_logs, route_logs, "View logs"),
(self._match_cleanup, route_cleanup, "Cleanup/maintenance"),
(self._match_maintenance, route_maintenance, "Maintenance status"),
(self._match_health, route_health, "System health check"),
(self._match_maintain, route_maintain, "System maintenance"),
(self._match_chat, route_chat, "Interactive chat"),
(self._match_metrics, route_metrics, "Task metrics"),
(self._match_jobs, route_jobs, "Job management"),
(self._match_kill, route_kill, "Kill agent"),
(self._match_failures, route_failures, "List/retry failures"),
(self._match_retry, route_retry, "Retry failed job"),
(self._match_qa, route_qa, "QA validation"),
(self._match_docs, route_docs, "Documentation KG"),
(self._match_knowledge, route_knowledge, "Project knowledge RAG"),
(self._match_notify, route_notify, "View notifications"),
(self._match_history, route_history, "Project history"),
(self._match_work_on, route_work_on, "Interactive work"),
(self._match_think_deep, route_think_deep, "Deep reasoning"),
# Research commands (order matters - specific before general)
(self._match_research_update, route_research_update, "Update research phase"),
(self._match_research_graph, route_research_graph, "Add to knowledge graph"),
(self._match_research_list, route_research_list, "List research sessions"),
(self._match_research_show, route_research_show, "Show research session"),
(self._match_research_knowledge, route_research_knowledge, "Show knowledge graph"),
(self._match_research, route_research, "Research (3-phase flow)"),
(self._match_fix, route_fix, "Troubleshooting"),
# Queue commands (Phase 4)
(self._match_structure, route_structure, "Code structure analysis"),
(self._match_queue, route_queue, "Queue status"),
(self._match_dispatch, route_dispatch, "Queue dispatch"),
# Cockpit commands (Human-in-the-loop)
(self._match_cockpit, self._route_cockpit, "Cockpit management"),
# Telegram integration
(self._match_telegram, self._route_telegram, "Telegram notifications"),
# Service management (for cockpits)
(self._match_service, self._route_service, "Service management"),
# Port management
(self._match_port, self._route_port, "Port allocation"),
# Watchdog (Task monitoring)
(self._match_watchdog, self._route_watchdog, "Task watchdog"),
(self._match_project_task, route_project_task, "Project task"),
# Internal (JSON output)
(self._match_exec, self._route_exec, "Raw execution"),
(self._match_write, self._route_write, "File write"),
(self._match_read, self._route_read, "File read"),
(self._match_context, self._route_context, "Get context"),
]
def _match_list(self, args: list) -> Optional[list]:
if args and args[0] == "list":
return []
return None
def _match_status(self, args: list) -> Optional[list]:
if args and args[0] == "status":
return args[1:]
return None
def _match_stop(self, args: list) -> Optional[list]:
if args and args[0] == "stop":
return args[1:]
return None
def _match_cleanup(self, args: list) -> Optional[list]:
if args and args[0] == "cleanup":
return args[1:] # Pass subcommands (jobs, containers, all, --dry-run)
return None
def _match_maintenance(self, args: list) -> Optional[list]:
if args and args[0] == "maintenance":
return args[1:]
return None
def _match_metrics(self, args: list) -> Optional[list]:
if args and args[0] == "metrics":
return args[1:]
return None
def _match_logs(self, args: list) -> Optional[list]:
if args and args[0] == "logs":
return args[1:]
return None
def _match_jobs(self, args: list) -> Optional[list]:
if args and args[0] == "jobs":
return args[1:]
return None
def _match_kill(self, args: list) -> Optional[list]:
if args and args[0] == "kill":
return args[1:]
return None
def _match_failures(self, args: list) -> Optional[list]:
if args and args[0] == "failures":
return args[1:]
return None
def _match_retry(self, args: list) -> Optional[list]:
if args and args[0] == "retry":
return args[1:]
return None
def _match_qa(self, args: list) -> Optional[list]:
if args and args[0] == "qa":
return args[1:]
return None
def _match_docs(self, args: list) -> Optional[list]:
if args and args[0] == "docs":
return args[1:]
return None
def _match_knowledge(self, args: list) -> Optional[list]:
if args and args[0] == "knowledge":
return args[1:]
return None
def _match_notify(self, args: list) -> Optional[list]:
if args and args[0] in ["notify", "notifications"]:
return args[1:]
return None
def _match_history(self, args: list) -> Optional[list]:
if args and args[0] == "history":
return args[1:]
return None
def _match_work_on(self, args: list) -> Optional[list]:
if len(args) >= 3 and args[0] == "work" and args[1] == "on":
return args[2:]
return None
def _match_think_deep(self, args: list) -> Optional[list]:
if len(args) >= 3 and args[0] == "think" and args[1] == "deep":
return args[2:]
return None
def _match_research(self, args: list) -> Optional[list]:
# Match: research <topic>
if args and args[0] == "research":
return args[1:]
# Match: deep research <topic>
if len(args) >= 2 and args[0] == "deep" and args[1] == "research":
return args[2:]
# Match: web research <topic>
if len(args) >= 2 and args[0] == "web" and args[1] == "research":
return args[2:]
return None
def _match_research_update(self, args: list) -> Optional[list]:
if args and args[0] == "research-update":
return args[1:]
return None
def _match_research_graph(self, args: list) -> Optional[list]:
if args and args[0] == "research-graph":
return args[1:]
return None
def _match_research_list(self, args: list) -> Optional[list]:
if args and args[0] == "research-list":
return args[1:]
return None
def _match_research_show(self, args: list) -> Optional[list]:
if args and args[0] == "research-show":
return args[1:]
return None
def _match_research_knowledge(self, args: list) -> Optional[list]:
if args and args[0] == "research-knowledge":
return args[1:]
return None
def _match_fix(self, args: list) -> Optional[list]:
if args and args[0] == "fix":
return args[1:]
return None
def _match_project_task(self, args: list) -> Optional[list]:
if args and args[0] in self.projects:
return args # [project, task, ...]
return None
def _match_exec(self, args: list) -> Optional[list]:
if args and args[0] == "--exec":
return args[1:]
return None
def _match_write(self, args: list) -> Optional[list]:
if args and args[0] == "--write":
return args[1:]
return None
def _match_read(self, args: list) -> Optional[list]:
if args and args[0] == "--read":
return args[1:]
return None
def _match_context(self, args: list) -> Optional[list]:
if args and args[0] == "--context":
return args[1:]
return None
def _match_structure(self, args: list) -> Optional[list]:
if args and args[0] == "structure":
return args[1:]
return None
def _match_queue(self, args: list) -> Optional[list]:
if args and args[0] == "queue":
return args[1:]
return None
def _match_dispatch(self, args: list) -> Optional[list]:
if args and args[0] == "dispatch":
return args[1:]
return None
def _match_cockpit(self, args: list) -> Optional[list]:
if args and args[0] == "cockpit":
return args[1:]
return None
def _route_cockpit(self, config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia cockpit <subcommand> [args]"""
if not COCKPIT_AVAILABLE:
print("Error: Cockpit module not available")
return 1
return route_cockpit(config, args, kwargs)
def _match_watchdog(self, args: list) -> Optional[list]:
if args and args[0] == "watchdog":
return args[1:]
return None
def _route_watchdog(self, config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia watchdog <subcommand>"""
if not WATCHDOG_AVAILABLE:
print("Error: Watchdog module not available")
return 1
watchdog = TaskWatchdog()
if not args or args[0] == "check":
# Single check
summary = watchdog.run_check()
print(json.dumps(summary, indent=2))
return 0
if args[0] == "status":
# Project queue status
status = watchdog.get_project_queue_status()
print("Project Queue Status:")
print(f"{'PROJECT':<15} {'PENDING':<10} {'RUNNING':<10} {'AWAITING'}")
print("-" * 50)
for project, counts in status.items():
print(f"{project:<15} {counts['pending']:<10} {counts['running']:<10} {counts['awaiting_human']}")
return 0
if args[0] == "stuck":
# List stuck tasks
stuck = watchdog.check_heartbeats()
if stuck:
print(f"Found {len(stuck)} stuck tasks:")
for t in stuck:
print(f" - {t['task_id']}: {t['reason']}")
else:
print("No stuck tasks found")
return 0
if args[0] == "clean":
# Clean up orphaned tasks
cleaned = watchdog.cleanup_orphaned_tasks()
released = watchdog.release_stale_locks()
print(f"Cleaned {len(cleaned)} orphaned tasks")
print(f"Released {len(released)} stale locks")
return 0
if args[0] == "daemon":
# Run continuous monitoring
interval = int(args[1]) if len(args) > 1 else 60
watchdog.run_loop(interval_seconds=interval)
return 0
print("Usage: luzia watchdog [check|status|stuck|clean|daemon [interval]]")
return 1
def _match_health(self, args: list) -> Optional[list]:
if args and args[0] == "health":
return args[1:]
return None
def _match_maintain(self, args: list) -> Optional[list]:
if args and args[0] == "maintain":
return args[1:]
return None
def _match_chat(self, args: list) -> Optional[list]:
if args and args[0] == "chat":
return args[1:]
return None
def _match_telegram(self, args: list) -> Optional[list]:
if args and args[0] == "telegram":
return args[1:]
return None
def _route_telegram(self, config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia telegram [notify|ask|approve|pending|status]"""
try:
sys.path.insert(0, str(Path(__file__).parent.parent / "lib"))
from telegram_bridge import TelegramBridge
except ImportError as e:
print(f"Error: Telegram bridge not available: {e}")
return 1
bridge = TelegramBridge()
if not args:
print("Telegram Commands:")
print(" luzia telegram notify <message> - Send notification to Bruno")
print(" luzia telegram ask <question> - Ask Bruno a question")
print(" luzia telegram approve <action> - Request approval")
print(" luzia telegram pending - Show pending requests")
print(" luzia telegram status - Check connection")
print(" luzia telegram check <request_id> - Check response")
return 0
cmd = args[0]
if cmd == "notify":
message = " ".join(args[1:]) if len(args) > 1 else "Test notification from Luzia"
success = bridge.send_notification(message, "luzia")
print(f"Notification {'sent' if success else 'failed'}")
return 0 if success else 1
elif cmd == "ask":
if len(args) < 2:
print("Usage: luzia telegram ask <question> [--options opt1,opt2,...]")
return 1
# Parse options if provided
options = None
question_parts = []
for i, arg in enumerate(args[1:]):
if arg == "--options" and i + 1 < len(args[1:]):
options = args[i + 2].split(",")
break
question_parts.append(arg)
question = " ".join(question_parts)
req_id, success = bridge.ask_question(question, "luzia", options=options)
print(f"Question sent: {success}")
print(f"Request ID: {req_id}")
return 0 if success else 1
elif cmd == "approve":
if len(args) < 2:
print("Usage: luzia telegram approve <action>")
return 1
action = " ".join(args[1:])
req_id, success = bridge.request_approval(action, "luzia")
print(f"Approval request sent: {success}")
print(f"Request ID: {req_id}")
return 0 if success else 1
elif cmd == "pending":
requests = bridge.get_pending_requests()
if not requests:
print("No pending requests")
return 0
print(f"Pending Requests ({len(requests)}):")
for req in requests:
print(f" [{req.request_type}] {req.request_id}")
print(f" {req.message[:60]}...")
print(f" Created: {req.created_at}")
return 0
elif cmd == "status":
print(f"Bot token: {'configured' if bridge.bot_token else 'missing'}")
print(f"Chat ID: {bridge.bruno_chat_id or 'missing'}")
print(f"Connected: {bridge.connected}")
return 0
elif cmd == "check":
if len(args) < 2:
print("Usage: luzia telegram check <request_id>")
return 1
req = bridge.check_response(args[1])
if not req:
print("Request not found")
return 1
print(f"Status: {req.status}")
print(f"Response: {req.response or 'No response yet'}")
return 0
else:
print(f"Unknown telegram command: {cmd}")
return 1
def _match_service(self, args: list) -> Optional[list]:
if args and args[0] == "service":
return args[1:]
return None
def _route_service(self, config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia service [start|stop|status|list] <project> [service]
Allows cockpits to manage project services outside their sandbox.
"""
try:
sys.path.insert(0, str(Path(__file__).parent.parent / "lib"))
from service_manager import ServiceManager, cmd_start, cmd_stop, cmd_status, cmd_list
except ImportError as e:
print(f"Error: Service manager not available: {e}")
return 1
if not args:
print("Service Management Commands:")
print(" luzia service start <project> <service> - Start a service")
print(" luzia service stop <project> <service> - Stop a service")
print(" luzia service status [project] - Show running services")
print(" luzia service list <project> - List available services")
print()
print("This allows cockpits to manage services without direct network access.")
print("Services run as the project user outside the sandbox container.")
return 0
cmd = args[0]
if cmd == "start":
if len(args) < 3:
print("Usage: luzia service start <project> <service>")
return 1
print(cmd_start(args[1], args[2]))
return 0
elif cmd == "stop":
if len(args) < 3:
print("Usage: luzia service stop <project> <service>")
return 1
print(cmd_stop(args[1], args[2]))
return 0
elif cmd == "status":
project = args[1] if len(args) > 1 else None
print(cmd_status(project))
return 0
elif cmd == "list":
if len(args) < 2:
print("Usage: luzia service list <project>")
return 1
print(cmd_list(args[1]))
return 0
else:
print(f"Unknown service command: {cmd}")
return 1
def _match_port(self, args: list) -> Optional[list]:
if args and args[0] == "port":
return args[1:]
return None
def _route_port(self, config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia port [list|check|allocate|release|suggest]"""
try:
sys.path.insert(0, str(Path(__file__).parent.parent / "lib"))
from port_manager import (
list_allocations, check_port, allocate_port,
release_port, suggest_port, PORT_RANGES
)
except ImportError as e:
print(f"Error: Port manager not available: {e}")
return 1
if not args:
print("Port Management Commands:")
print(" luzia port list - List all port allocations")
print(" luzia port check <port> - Check what's using a port")
print(" luzia port allocate <service> [-p PORT] - Allocate port to service")
print(" luzia port release <service> - Release port allocation")
print(" luzia port suggest <type> - Suggest available port")
print(f" Types: {', '.join(PORT_RANGES.keys())}")
return 0
cmd = args[0]
if cmd == "list":
print(list_allocations())
return 0
elif cmd == "check":
if len(args) < 2:
print("Usage: luzia port check <port>")
return 1
try:
port = int(args[1])
print(check_port(port))
return 0
except ValueError:
print(f"Invalid port number: {args[1]}")
return 1
elif cmd == "allocate":
if len(args) < 2:
print("Usage: luzia port allocate <service> [--port PORT]")
return 1
service = args[1]
preferred_port = None
if len(args) >= 4 and args[2] in ("-p", "--port"):
try:
preferred_port = int(args[3])
except ValueError:
print(f"Invalid port number: {args[3]}")
return 1
port, msg = allocate_port(service, preferred_port)
print(msg)
return 0 if port > 0 else 1
elif cmd == "release":
if len(args) < 2:
print("Usage: luzia port release <service>")
return 1
print(release_port(args[1]))
return 0
elif cmd == "suggest":
if len(args) < 2:
print(f"Usage: luzia port suggest <type>")
print(f"Types: {', '.join(PORT_RANGES.keys())}")
return 1
port = suggest_port(args[1])
if port > 0:
print(f"Suggested port for {args[1]}: {port}")
return 0
else:
print(f"No available ports in {args[1]} range")
return 1
else:
print(f"Unknown port command: {cmd}")
return 1
def _route_exec(self, config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia --exec <project> <command>"""
if len(args) < 2:
print(json.dumps({"error": "Usage: luzia --exec <project> <command>"}))
return 1
result = cmd_exec_raw(config, args[0], " ".join(args[1:]))
print(json.dumps(result))
return 0 if result.get("success") else 1
def _route_write(self, config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia --write <project> <path> <content>"""
if len(args) < 3:
print(json.dumps({"error": "Usage: luzia --write <project> <path> <content>"}))
return 1
if args[2] == "-":
content = sys.stdin.read()
else:
content = " ".join(args[2:])
result = cmd_write_file(config, args[0], args[1], content)
print(json.dumps(result))
return 0 if result.get("success") else 1
def _route_read(self, config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia --read <project> <path>"""
if len(args) < 2:
print(json.dumps({"error": "Usage: luzia --read <project> <path>"}))
return 1
result = cmd_read_file(config, args[0], args[1])
print(json.dumps(result))
return 0 if result.get("success") else 1
def _route_context(self, config: dict, args: list, kwargs: dict) -> int:
"""Handler: luzia --context <project>"""
if not args:
print(json.dumps({"error": "Usage: luzia --context <project>"}))
return 1
context = get_project_context(args[0], config)
print(json.dumps({"context": context}))
return 0
def dispatch(self, args: list) -> int:
"""Route and dispatch to appropriate handler"""
for pattern_fn, handler_fn, desc in self.routes:
matched_args = pattern_fn(args)
if matched_args is not None:
return handler_fn(self.config, matched_args, {})
# No match found
if args:
print(f"Unknown: {args[0]}")
print("Run 'luzia --help' for usage")
return 1
# Global skip preflight flag
SKIP_PREFLIGHT = False
def main():
global VERBOSE, BACKGROUND, SKIP_PREFLIGHT
args = sys.argv[1:]
# Check for flags
if "--verbose" in args:
VERBOSE = True
args = [a for a in args if a != "--verbose"]
if "--fg" in args:
BACKGROUND = False
args = [a for a in args if a != "--fg"]
if "--skip-preflight" in args:
SKIP_PREFLIGHT = True
args = [a for a in args if a != "--skip-preflight"]
_log(" [Warning] Preflight checks skipped by --skip-preflight flag", verbose_only=False)
if not args or args[0] in ["-h", "--help", "help"]:
print_help()
return 0
# SECURITY: Check guest restrictions before routing
if is_guest_user() and args:
require_guest_permission(args[0], args[1:])
config = load_config()
router = Router(config)
return router.dispatch(args)
if __name__ == "__main__":
sys.exit(main() or 0)