#!/usr/bin/env python3 """ Learning Context Patch - Integrates AutonomousLearningIntegration with SubAgentContextManager This module provides initialization code to connect the autonomous learning system with the sub-agent context manager in the orchestrator startup sequence. Usage in orchestrator startup: from sub_agent_context import SubAgentContextManager from autonomous_learning_integration import AutonomousLearningIntegration from learning_context_patch import initialize_learning_for_orchestrator # After creating SubAgentContextManager: context_manager = SubAgentContextManager() learning = initialize_learning_for_orchestrator(context_manager) """ import logging from typing import Optional, Dict, Any from pathlib import Path logger = logging.getLogger(__name__) def initialize_learning_for_orchestrator( context_manager, learning_config_path: Optional[Path] = None ) -> 'AutonomousLearningIntegration': """ Initialize and connect learning system to sub-agent orchestrator. Args: context_manager: SubAgentContextManager instance learning_config_path: Optional path to learning_config.json Returns: Initialized AutonomousLearningIntegration instance """ try: from autonomous_learning_integration import AutonomousLearningIntegration # Initialize learning system if learning_config_path is None: learning_config_path = Path("/etc/luzia/learning_config.json") learning = AutonomousLearningIntegration(config_path=learning_config_path) # Connect context manager learning.set_context_manager(context_manager) # Register metrics provider callback metrics_provider = _create_metrics_provider(context_manager) learning.set_metrics_provider(metrics_provider) logger.info("Autonomous learning system initialized and connected") return learning except Exception as e: logger.error(f"Failed to initialize learning system: {e}") raise def create_active_task_stream_listener( context_manager, learning_system: 'AutonomousLearningIntegration' ) -> callable: """ Create listener for active task stream. Connects to task execution stream and records tasks for learning analysis. Args: context_manager: SubAgentContextManager instance learning_system: AutonomousLearningIntegration instance Returns: Callback function for task stream events """ def on_task_event(event: Dict[str, Any]) -> None: """Handle task stream event""" try: # Extract task info task_info = { "task_id": event.get("task_id"), "parent_task_id": event.get("parent_task_id"), "status": event.get("status"), # success, failed, pending "latency": event.get("latency", 0), # milliseconds "sub_agents_used": event.get("sub_agents_used", 0), "timestamp": event.get("timestamp"), "phase": event.get("phase"), "success": event.get("status") == "success" } # Record for learning learning_system.record_task(task_info) logger.debug(f"Task recorded: {task_info['task_id']} ({task_info['status']})") except Exception as e: logger.error(f"Error processing task event: {e}") return on_task_event def _create_metrics_provider(context_manager) -> callable: """ Create metrics provider callback for learning system. Returns context about current sub-agent coordination state. Args: context_manager: SubAgentContextManager instance Returns: Callback that returns coordination metrics """ def get_metrics() -> Dict[str, Any]: """Get current coordination metrics""" try: import psutil # Get system metrics cpu_percent = psutil.cpu_percent(interval=0.1) memory = psutil.virtual_memory() # Get sub-agent count total_agents = len(context_manager.active_contexts) metrics = { "cpu_percent": cpu_percent, "memory_mb": memory.available / 1024 / 1024, "sub_agent_count": total_agents, "active_agents": sum( 1 for ctx in context_manager.active_contexts.values() if ctx and ctx.phase_progression ), "parallel_slots": max(1, total_agents), "timestamp": str(__import__('datetime').datetime.utcnow().isoformat()) } return metrics except Exception as e: logger.error(f"Error getting metrics: {e}") return { "cpu_percent": 0, "memory_mb": 0, "sub_agent_count": 0, "active_agents": 0, "parallel_slots": 1 } return get_metrics def patch_sub_agent_context_manager(context_manager) -> None: """ Add learning-specific methods to SubAgentContextManager. Modifies SubAgentContextManager instance to include learning-aware features: - Track task execution for learning - Record coordination metrics - Callback on phase transitions Args: context_manager: SubAgentContextManager instance to patch """ original_update_phase = context_manager.update_phase def update_phase_with_learning(sub_agent_id: str, phase_name: str, status: str, **kwargs): """Wrapped update_phase that records to learning system""" result = original_update_phase(sub_agent_id, phase_name, status, **kwargs) # Log phase transition for learning if hasattr(context_manager, '_learning_system'): learning = context_manager._learning_system task_info = { "sub_agent_id": sub_agent_id, "phase": phase_name, "phase_status": status, "timestamp": str(__import__('datetime').datetime.utcnow().isoformat()) } learning.record_task(task_info) return result context_manager.update_phase = update_phase_with_learning logger.debug("Patched SubAgentContextManager with learning awareness") def start_learning_on_orchestrator_startup( context_manager, learning_system: 'AutonomousLearningIntegration' ) -> None: """ Start autonomous learning system during orchestrator startup. Call this during orchestrator initialization to activate the learning cycle. Args: context_manager: SubAgentContextManager instance learning_system: AutonomousLearningIntegration instance """ try: # Attach learning system to context manager for callbacks context_manager._learning_system = learning_system # Patch context manager for learning awareness patch_sub_agent_context_manager(context_manager) # Start learning cycle learning_system.start_learning() logger.info("Autonomous learning cycle started on orchestrator startup") except Exception as e: logger.error(f"Failed to start learning: {e}") raise def create_orchestrator_startup_hook(): """ Create a startup hook for use in orchestrator initialization. Returns: Async function suitable for orchestrator.on_startup() """ async def startup_hook(orchestrator_state: Dict[str, Any]) -> None: """Initialize learning system during orchestrator startup""" try: from autonomous_learning_integration import AutonomousLearningIntegration # Get or create context manager if 'context_manager' not in orchestrator_state: from sub_agent_context import SubAgentContextManager orchestrator_state['context_manager'] = SubAgentContextManager() context_manager = orchestrator_state['context_manager'] # Initialize learning learning = initialize_learning_for_orchestrator(context_manager) orchestrator_state['learning_system'] = learning # Start learning cycle start_learning_on_orchestrator_startup(context_manager, learning) logger.info("Learning system initialized on orchestrator startup") except Exception as e: logger.error(f"Startup hook failed: {e}") raise return startup_hook