Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
204 lines
7.0 KiB
Python
204 lines
7.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Enhanced status route for Luzia orchestrator
|
|
This module provides an upgraded route_status function that includes
|
|
the new status system capabilities while preserving backward compatibility.
|
|
|
|
Usage:
|
|
from luzia_enhanced_status_route import route_status_enhanced
|
|
# Use route_status_enhanced in place of route_status
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any, List
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_conductor_status(project: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Get conductor status from directory"""
|
|
conductor_data = {"active": []}
|
|
|
|
try:
|
|
conductor_dir = Path.home() / "conductor" / "active"
|
|
if not conductor_dir.exists():
|
|
return conductor_data
|
|
|
|
for task_dir in conductor_dir.iterdir():
|
|
if not task_dir.is_dir():
|
|
continue
|
|
|
|
try:
|
|
meta_file = task_dir / "meta.json"
|
|
if meta_file.exists():
|
|
with open(meta_file) as f:
|
|
meta = json.load(f)
|
|
|
|
# Filter by project if specified
|
|
if project and meta.get("project") != project:
|
|
continue
|
|
|
|
progress_file = task_dir / "progress.md"
|
|
progress = ""
|
|
if progress_file.exists():
|
|
with open(progress_file) as f:
|
|
progress = f.read()[:200]
|
|
|
|
conductor_data["active"].append({
|
|
"id": task_dir.name,
|
|
"project": meta.get("project"),
|
|
"status": "running" if (task_dir / "heartbeat.json").exists() else "stale",
|
|
"skill": meta.get("skill"),
|
|
"prompt": meta.get("prompt", ""),
|
|
"progress": progress[:60] + "..." if len(progress) > 60 else progress
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error reading task {task_dir.name}: {e}")
|
|
continue
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error reading conductor status: {e}")
|
|
|
|
return conductor_data
|
|
|
|
|
|
def route_status_enhanced(config: dict, args: list, kwargs: dict) -> int:
|
|
"""
|
|
Enhanced status handler with new status system integration
|
|
Preserves backward compatibility with existing behavior
|
|
|
|
Usage: luzia status [project] [options]
|
|
Options:
|
|
--conductor, -c Show conductor tasks only
|
|
--dashboard Show new status dashboard (default for no args)
|
|
--alerts Show only alerts
|
|
--recent N Show last N updates
|
|
--project <name> Show project summary
|
|
--export json Export to JSON
|
|
--export markdown Export to markdown
|
|
"""
|
|
|
|
# Parse arguments
|
|
project = None
|
|
conductor_only = False
|
|
use_new_system = False
|
|
new_args = []
|
|
|
|
for arg in args:
|
|
if arg in ("--conductor", "-c"):
|
|
conductor_only = True
|
|
elif arg == "--dashboard":
|
|
use_new_system = True
|
|
elif arg == "--alerts" or arg == "--recent" or arg == "--export" or arg == "--project":
|
|
use_new_system = True
|
|
new_args.append(arg)
|
|
elif not arg.startswith("-"):
|
|
project = arg
|
|
else:
|
|
new_args.append(arg)
|
|
|
|
# Try to use new status system if requested
|
|
if use_new_system:
|
|
try:
|
|
from luzia_status_handler import get_status_handler
|
|
handler = get_status_handler()
|
|
|
|
if handler.is_available():
|
|
# If new args exist, pass them; otherwise pass original args
|
|
cmd_args = new_args if new_args else args
|
|
result = handler.handle_command(cmd_args)
|
|
print(result)
|
|
return 0
|
|
except Exception as e:
|
|
logger.warning(f"Status system unavailable, falling back to classic mode: {e}")
|
|
|
|
# Fall back to classic status display
|
|
print("=" * 60)
|
|
print("LUZIA STATUS")
|
|
print("=" * 60)
|
|
|
|
# Show conductor state
|
|
try:
|
|
conductor = get_conductor_status(project)
|
|
active_tasks = conductor.get("active", [])
|
|
|
|
if active_tasks:
|
|
print("\nACTIVE TASKS (Conductor):")
|
|
for task in active_tasks:
|
|
status_icon = "running" if task.get("status") == "running" else "stale" if task.get("status") == "stale" else "pending"
|
|
skill = f"[{task.get('skill')}]" if task.get("skill") else ""
|
|
print(f" [{status_icon}] {task['project']}/{task['id'][:12]} {skill}")
|
|
print(f" {task.get('prompt', '')[:60]}...")
|
|
if task.get('progress'):
|
|
print(f" Progress: {task.get('progress')[:50]}...")
|
|
else:
|
|
print("\nNo active conductor tasks")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting conductor status: {e}")
|
|
print(f"Error reading conductor status: {e}")
|
|
|
|
# Show containers if not conductor-only
|
|
if not conductor_only:
|
|
try:
|
|
from docker_bridge import list_project_containers
|
|
|
|
containers = list_project_containers()
|
|
if containers:
|
|
print("\nCONTAINERS:")
|
|
for c in containers:
|
|
if project and f"luzia-{project}" != c.get("name"):
|
|
continue
|
|
status = c.get("status", "unknown")
|
|
print(f" {c.get('name')}: {status}")
|
|
else:
|
|
print("\nNo containers running")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting containers: {e}")
|
|
|
|
print("\n" + "=" * 60)
|
|
return 0
|
|
|
|
|
|
def route_status_with_stream(config: dict, args: list, kwargs: dict) -> int:
|
|
"""
|
|
Status route with streaming support
|
|
Combines new status system with conductor visibility
|
|
|
|
This is for future use when streaming is fully integrated
|
|
"""
|
|
try:
|
|
from luzia_status_integration import get_status_system
|
|
|
|
status_system = get_status_system()
|
|
|
|
if not status_system.is_enabled():
|
|
logger.info("Status system not enabled, using classic mode")
|
|
return route_status_enhanced(config, args, kwargs)
|
|
|
|
# Check if we should stream
|
|
if "--stream" in args:
|
|
print("Streaming status updates (press Ctrl+C to stop)...")
|
|
print(status_system.get_dashboard())
|
|
|
|
# In a real implementation, this would stream updates
|
|
# For now, just show the dashboard
|
|
import time
|
|
try:
|
|
while True:
|
|
time.sleep(5)
|
|
print("\n" + status_system.get_recent_updates(3))
|
|
except KeyboardInterrupt:
|
|
print("\nStatus streaming stopped")
|
|
return 0
|
|
|
|
# Otherwise, use enhanced status
|
|
return route_status_enhanced(config, args, kwargs)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in streaming status: {e}")
|
|
return route_status_enhanced(config, args, kwargs)
|