#!/usr/bin/env python3 """ Synchronous wrapper for Luzia Status Publishing Provides synchronous entry points that work with the async status system Use this module in synchronous code that needs to publish status events """ import asyncio import logging from typing import Optional, List logger = logging.getLogger(__name__) class SyncStatusPublisher: """Synchronous wrapper around the async status system""" def __init__(self): self.system = None self._initialize() def _initialize(self): """Initialize the status system""" try: from luzia_status_integration import get_status_system self.system = get_status_system() if self.system.is_enabled(): logger.info("Sync status publisher initialized") except Exception as e: logger.error(f"Failed to initialize sync publisher: {e}") def is_enabled(self) -> bool: """Check if status system is enabled""" return self.system is not None and self.system.is_enabled() def _run_async(self, coro): """Helper to run async code from sync context""" try: # Try to get running loop loop = asyncio.get_running_loop() # Schedule as task if loop exists task = asyncio.create_task(coro) return task except RuntimeError: # No running loop, create a new one return asyncio.run(coro) def publish_task_started( self, task_id: str, project: str, description: str, estimated_duration_seconds: int = 300 ): """Publish task started (synchronous)""" if not self.is_enabled(): return try: coro = self.system.publish_task_started( task_id=task_id, project=project, description=description, estimated_duration_seconds=estimated_duration_seconds ) self._run_async(coro) except Exception as e: logger.error(f"Failed to publish task started: {e}") def publish_progress( self, task_id: str, progress_percent: int, current_step: int, total_steps: int, current_step_name: str, elapsed_seconds: int, estimated_remaining_seconds: int ): """Publish progress update (synchronous)""" if not self.is_enabled(): return try: coro = self.system.publish_progress( task_id=task_id, progress_percent=progress_percent, current_step=current_step, total_steps=total_steps, current_step_name=current_step_name, elapsed_seconds=elapsed_seconds, estimated_remaining_seconds=estimated_remaining_seconds ) self._run_async(coro) except Exception as e: logger.error(f"Failed to publish progress: {e}") def publish_task_completed( self, task_id: str, elapsed_seconds: int, findings_count: int = 0, recommendations_count: int = 0, status: str = "APPROVED" ): """Publish task completed (synchronous)""" if not self.is_enabled(): return try: coro = self.system.publish_task_completed( task_id=task_id, elapsed_seconds=elapsed_seconds, findings_count=findings_count, recommendations_count=recommendations_count, status=status ) self._run_async(coro) except Exception as e: logger.error(f"Failed to publish task completed: {e}") def publish_task_queued( self, task_id: str, project: str, description: str, reason: str, queue_position: int, queue_ahead: List[str], estimated_wait_seconds: int ): """Publish task queued (synchronous)""" if not self.is_enabled(): return try: coro = self.system.publish_task_queued( task_id=task_id, project=project, description=description, reason=reason, queue_position=queue_position, queue_ahead=queue_ahead, estimated_wait_seconds=estimated_wait_seconds ) self._run_async(coro) except Exception as e: logger.error(f"Failed to publish task queued: {e}") def publish_warning( self, task_id: str, warning_type: str, message: str, current_step: int, total_steps: int, current_step_name: str, elapsed_seconds: int, progress_percent: int, recommendation: str = None ): """Publish warning (synchronous)""" if not self.is_enabled(): return try: coro = self.system.publish_warning( task_id=task_id, warning_type=warning_type, message=message, current_step=current_step, total_steps=total_steps, current_step_name=current_step_name, elapsed_seconds=elapsed_seconds, progress_percent=progress_percent, recommendation=recommendation ) self._run_async(coro) except Exception as e: logger.error(f"Failed to publish warning: {e}") def publish_task_failed( self, task_id: str, error: str, elapsed_seconds: int, retry_count: int = 0, retriable: bool = False ): """Publish task failed (synchronous)""" if not self.is_enabled(): return try: coro = self.system.publish_task_failed( task_id=task_id, error=error, elapsed_seconds=elapsed_seconds, retry_count=retry_count, retriable=retriable ) self._run_async(coro) except Exception as e: logger.error(f"Failed to publish task failed: {e}") def publish_system_alert( self, alert_type: str, message: str, recommendation: str, severity: str = "warning" ): """Publish system alert (synchronous)""" if not self.is_enabled(): return try: coro = self.system.publish_system_alert( alert_type=alert_type, message=message, recommendation=recommendation, severity=severity ) self._run_async(coro) except Exception as e: logger.error(f"Failed to publish system alert: {e}") # Global instance _sync_publisher = None def get_sync_publisher() -> SyncStatusPublisher: """Get the global synchronous status publisher""" global _sync_publisher if _sync_publisher is None: _sync_publisher = SyncStatusPublisher() return _sync_publisher