#!/usr/bin/env python3 """ Luzia Status Integration Module Integrates the status publishing system into the Luzia orchestrator Provides: - Status publisher initialization - Publishing point decorators - Configuration loading - CLI command handler for status """ import json import asyncio import logging from pathlib import Path from typing import Optional, Dict, Any, Callable from datetime import datetime import toml logger = logging.getLogger(__name__) # Import the status modules try: from luzia_status_publisher_impl import ( LuziaStatusPublisher, StatusMessage, StatusMessageType, Severity ) from luzia_claude_bridge_impl import LuziaClaudeBridge, CLIStatusHelper STATUS_MODULES_AVAILABLE = True except ImportError as e: logger.warning(f"Status modules not available: {e}") STATUS_MODULES_AVAILABLE = False class LuziaStatusConfig: """Configuration loader for status system""" CONFIG_PATH = Path("/etc/luzia/status_config.toml") def __init__(self): self.config = {} self.load_config() def load_config(self): """Load configuration from TOML file""" if self.CONFIG_PATH.exists(): try: self.config = toml.load(self.CONFIG_PATH) logger.info(f"Loaded status config from {self.CONFIG_PATH}") except Exception as e: logger.error(f"Failed to load config: {e}") self.config = self._default_config() else: logger.info(f"Config file not found: {self.CONFIG_PATH}, using defaults") self.config = self._default_config() def _default_config(self) -> Dict[str, Any]: """Get default configuration""" return { "status_updates": { "verbosity": "normal", "show_task_started": True, "show_progress_updates": True, "show_completed": True, "show_queued": True, "show_warnings": True, "show_failures": True, "show_system_alerts": True, "progress_update_threshold_percent": 25, "progress_update_min_interval_seconds": 30, }, "display": { "use_colors": True, "use_emojis": True, "compact_format": True, }, "logging": { "enabled": True, "log_file": "/var/log/luzia/status.log", "log_level": "INFO", } } def get(self, key: str, default: Any = None) -> Any: """Get config value with dot notation (e.g., 'status_updates.verbosity')""" keys = key.split('.') value = self.config for k in keys: if isinstance(value, dict): value = value.get(k) else: return default return value if value is not None else default class LuziaStatusSystem: """Main status system - coordinates publishing and CLI""" _instance = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self.config = LuziaStatusConfig() self.publisher = None self.bridge = None self.cli_helper = None self.streaming_task = None self._event_loop = None self._initialized = True self._initialize_system() def _initialize_system(self): """Initialize status system components""" if not STATUS_MODULES_AVAILABLE: logger.warning("Status modules not available, system disabled") return try: # Create publisher self.publisher = LuziaStatusPublisher() verbosity = self.config.get("status_updates.verbosity", "normal") self.publisher.set_verbosity(verbosity) # Create bridge self.bridge = LuziaClaudeBridge(self.publisher) # Create CLI helper self.cli_helper = CLIStatusHelper(self.bridge) logger.info("Status system initialized successfully") except Exception as e: logger.error(f"Failed to initialize status system: {e}") def is_enabled(self) -> bool: """Check if status system is enabled""" return self.publisher is not None and self.bridge is not None async def start_streaming(self): """Start background streaming task""" if not self.is_enabled(): return try: if self.streaming_task is None or self.streaming_task.done(): self.streaming_task = asyncio.create_task( self.bridge.stream_status_updates() ) logger.info("Status streaming started") except Exception as e: logger.error(f"Failed to start streaming: {e}") def stop_streaming(self): """Stop background streaming task""" if self.streaming_task and not self.streaming_task.done(): self.streaming_task.cancel() logger.info("Status streaming stopped") def publish_task_started_sync( self, task_id: str, project: str, description: str, estimated_duration_seconds: int = 300 ): """Publish task started event (synchronous)""" if not self.is_enabled() or not self.publisher: return try: # Schedule the coroutine coro = self.publisher.publish_task_started( task_id=task_id, project=project, description=description, estimated_duration_seconds=estimated_duration_seconds ) # Run it if no loop is active try: loop = asyncio.get_running_loop() asyncio.create_task(coro) except RuntimeError: asyncio.run(coro) except Exception as e: logger.error(f"Failed to publish task started: {e}") async def publish_task_started( self, task_id: str, project: str, description: str, estimated_duration_seconds: int = 300 ): """Publish task started event (async)""" if not self.is_enabled() or not self.publisher: return try: await self.publisher.publish_task_started( task_id=task_id, project=project, description=description, estimated_duration_seconds=estimated_duration_seconds ) except Exception as e: logger.error(f"Failed to publish task started: {e}") async def publish_progress( self, task_id: str, progress_percent: int, current_step: int, total_steps: int, current_step_name: str, elapsed_seconds: int, estimated_remaining_seconds: int ): """Publish progress update""" if not self.is_enabled() or not self.publisher: return try: await self.publisher.publish_progress( task_id=task_id, progress_percent=progress_percent, current_step=current_step, total_steps=total_steps, current_step_name=current_step_name, elapsed_seconds=elapsed_seconds, estimated_remaining_seconds=estimated_remaining_seconds ) except Exception as e: logger.error(f"Failed to publish progress: {e}") async def publish_task_completed( self, task_id: str, elapsed_seconds: int, findings_count: int = 0, recommendations_count: int = 0, status: str = "APPROVED" ): """Publish task completed event""" if not self.is_enabled() or not self.publisher: return try: await self.publisher.publish_task_completed( task_id=task_id, elapsed_seconds=elapsed_seconds, findings_count=findings_count, recommendations_count=recommendations_count, status=status ) except Exception as e: logger.error(f"Failed to publish task completed: {e}") async def publish_task_queued( self, task_id: str, project: str, description: str, reason: str, queue_position: int, queue_ahead: list, estimated_wait_seconds: int ): """Publish task queued event""" if not self.is_enabled() or not self.publisher: return try: await self.publisher.publish_task_queued( task_id=task_id, project=project, description=description, reason=reason, queue_position=queue_position, queue_ahead=queue_ahead, estimated_wait_seconds=estimated_wait_seconds ) except Exception as e: logger.error(f"Failed to publish task queued: {e}") async def publish_warning( self, task_id: str, warning_type: str, message: str, current_step: int, total_steps: int, current_step_name: str, elapsed_seconds: int, progress_percent: int, recommendation: str = None ): """Publish task warning event""" if not self.is_enabled() or not self.publisher: return try: await self.publisher.publish_warning( task_id=task_id, warning_type=warning_type, message=message, current_step=current_step, total_steps=total_steps, current_step_name=current_step_name, elapsed_seconds=elapsed_seconds, progress_percent=progress_percent, recommendation=recommendation ) except Exception as e: logger.error(f"Failed to publish warning: {e}") async def publish_task_failed( self, task_id: str, error: str, elapsed_seconds: int, retry_count: int = 0, retriable: bool = False ): """Publish task failed event""" if not self.is_enabled() or not self.publisher: return try: await self.publisher.publish_task_failed( task_id=task_id, error=error, elapsed_seconds=elapsed_seconds, retry_count=retry_count, retriable=retriable ) except Exception as e: logger.error(f"Failed to publish task failed: {e}") async def publish_system_alert( self, alert_type: str, message: str, recommendation: str, severity: str = "warning" ): """Publish system alert""" if not self.is_enabled() or not self.publisher: return try: severity_obj = getattr(Severity, severity.upper(), Severity.WARNING) await self.publisher.publish_system_alert( alert_type=alert_type, message=message, recommendation=recommendation, severity=severity_obj ) except Exception as e: logger.error(f"Failed to publish system alert: {e}") async def handle_status_command(self, command: str, args: list) -> str: """Handle luzia status command""" if not self.is_enabled() or not self.cli_helper: return "Status system not available" try: return await self.cli_helper.handle_command(command, args) except Exception as e: logger.error(f"Failed to handle status command: {e}") return f"Error: {e}" def get_dashboard(self) -> str: """Get dashboard output""" if not self.is_enabled() or not self.bridge: return "Status system not available" return self.bridge.get_dashboard() def get_recent_updates(self, limit: int = 10) -> str: """Get recent updates""" if not self.is_enabled() or not self.bridge: return "Status system not available" return self.bridge.get_recent_updates(limit) # Global instance accessor def get_status_system() -> LuziaStatusSystem: """Get the global status system instance""" return LuziaStatusSystem()