#!/usr/bin/env python3 """ Autonomous Learning Integration Module Integrates the ACE Framework (Generator-Reflector-Curator) autonomous learning system with the sub-agent orchestration system. Features: - Initializes AutonomousLearningOrchestrator on startup - Connects to active task stream for metrics collection - Implements 30-second learning cycle - Tracks delta history and application results - Logs learning metrics to /var/log/luzia/learning.log """ import json import time import threading import logging from pathlib import Path from typing import Dict, List, Optional, Any, Callable from datetime import datetime from dataclasses import dataclass, asdict import traceback # Configure logging log_dir = Path("/var/log/luzia") log_dir.mkdir(parents=True, exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_dir / "learning.log"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) @dataclass class DeltaUpdate: """Delta update for autonomous learning""" id: str timestamp: int type: str # 'strategy', 'coordination', 'resource', 'metric' operation: str # 'modify', 'add', 'remove', 'adjust' target: str oldValue: Any newValue: Any reasoning: str confidence: float # 0-1 impact: str # 'positive', 'negative', 'neutral' appliedAt: Optional[int] = None @dataclass class DeltaEvaluation: """Evaluation of a delta proposal""" deltaId: str overallScore: float # 0-100 recommended: bool reasoning: str riskLevel: str # 'low', 'medium', 'high' estimatedBenefit: str class AutonomousLearningIntegration: """ Integrates ACE Framework learning with sub-agent orchestration. Manages the 30-second learning cycle: 1. GENERATION: Analyze last 30 tasks, propose deltas 2. REFLECTION: Score proposals with confidence and impact 3. CURATION: Apply deltas with score >= 65/100 """ def __init__(self, config_path: Path = Path("/etc/luzia/learning_config.json")): """Initialize learning integration""" self.config_path = config_path self.config = self._load_config() # Learning state self.active = False self.learning_thread: Optional[threading.Thread] = None self.cycle_interval = self.config.get("cycle", {}).get("interval_seconds", 30) # Metrics and history self.task_history: List[Dict[str, Any]] = [] self.delta_history: List[DeltaUpdate] = [] self.evaluation_history: List[DeltaEvaluation] = [] self.learning_cycles: List[Dict[str, Any]] = [] # Metrics provider callback self.metrics_provider: Optional[Callable] = None # Sub-agent context manager self.context_manager = None logger.info("AutonomousLearningIntegration initialized") logger.info(f"Cycle interval: {self.cycle_interval}s") logger.info(f"Min confidence: {self.config.get('reflection', {}).get('min_confidence', 0.5)}") logger.info(f"Min score: {self.config.get('reflection', {}).get('min_score', 65)}/100") def _load_config(self) -> Dict[str, Any]: """Load learning configuration""" try: if self.config_path.exists(): return json.loads(self.config_path.read_text()) except Exception as e: logger.error(f"Failed to load config from {self.config_path}: {e}") # Return default config return { "cycle": {"interval_seconds": 30}, "reflection": {"min_confidence": 0.5, "min_score": 65}, "monitoring": {"log_file": "/var/log/luzia/learning.log"} } def set_metrics_provider(self, provider: Callable[[], Dict[str, Any]]) -> None: """Set callback function to provide coordination metrics""" self.metrics_provider = provider logger.debug("Metrics provider registered") def set_context_manager(self, manager) -> None: """Set sub-agent context manager for coordination""" self.context_manager = manager logger.debug("Context manager registered") def record_task(self, task: Dict[str, Any]) -> None: """Record task execution for learning analysis""" task_with_timestamp = { **task, "recorded_at": datetime.utcnow().isoformat() } self.task_history.append(task_with_timestamp) # Keep only recent 100 tasks if len(self.task_history) > 100: self.task_history = self.task_history[-100:] def start_learning(self) -> None: """Start the autonomous learning cycle""" if self.active: logger.warning("Learning cycle already active") return self.active = True self.learning_thread = threading.Thread( target=self._learning_cycle_worker, daemon=False ) self.learning_thread.start() logger.info("Autonomous learning cycle started") def stop_learning(self) -> None: """Stop the autonomous learning cycle""" self.active = False if self.learning_thread: self.learning_thread.join(timeout=5) logger.info("Autonomous learning cycle stopped") def _learning_cycle_worker(self) -> None: """Main learning cycle worker thread""" cycle_count = 0 while self.active: try: cycle_count += 1 cycle_id = f"cycle-{cycle_count}-{int(time.time())}" logger.info(f"Starting learning cycle {cycle_count}") # PHASE 1: GENERATION generated_deltas = self._generate_deltas() logger.info(f"Generated {len(generated_deltas)} delta proposals") # PHASE 2: REFLECTION if generated_deltas: evaluations = self._evaluate_deltas(generated_deltas) recommended = [e for e in evaluations if e.recommended] logger.info(f"Evaluated deltas: {len(recommended)} recommended out of {len(evaluations)}") # PHASE 3: CURATION if recommended: applied = self._apply_recommended_deltas( [d for d in generated_deltas if any( e.deltaId == d.id and e.recommended for e in evaluations )], evaluations ) logger.info(f"Applied {applied} deltas in cycle {cycle_count}") else: logger.debug("No delta proposals generated in this cycle") # Record cycle metrics self._record_cycle_metrics(cycle_id, generated_deltas) # Wait for next cycle time.sleep(self.cycle_interval) except Exception as e: logger.error(f"Error in learning cycle: {e}") logger.error(traceback.format_exc()) time.sleep(5) # Backoff on error def _generate_deltas(self) -> List[DeltaUpdate]: """ GENERATION PHASE: Analyze task history and generate delta proposals """ deltas: List[DeltaUpdate] = [] if len(self.task_history) < 30: logger.debug(f"Not enough tasks for analysis ({len(self.task_history)} < 30)") return deltas # Analyze last 30 tasks recent_tasks = self.task_history[-30:] # Calculate metrics avg_latency = sum( t.get("latency", 0) for t in recent_tasks ) / len(recent_tasks) if recent_tasks else 0 success_count = sum(1 for t in recent_tasks if t.get("status") == "success") success_rate = success_count / len(recent_tasks) if recent_tasks else 0 # Get coordination context metrics = self.metrics_provider() if self.metrics_provider else {} logger.debug( f"Task analysis: avg_latency={avg_latency:.1f}ms, " f"success_rate={success_rate:.1%}, " f"sub_agents={metrics.get('sub_agent_count', 0)}" ) # Delta 1: Coordination strategy adjustment if metrics.get('sub_agent_count', 0) > 8 and avg_latency > 100: deltas.append(DeltaUpdate( id=f"delta-{int(time.time())}-1", timestamp=int(time.time() * 1000), type="coordination", operation="modify", target="primary_coordination_strategy", oldValue="sequential", newValue="adaptive", reasoning=f"High agent count ({metrics.get('sub_agent_count', 0)}) with " f"elevated latency ({avg_latency:.0f}ms)", confidence=0.75, impact="positive" )) # Delta 2: Success rate threshold if success_rate < 0.85: deltas.append(DeltaUpdate( id=f"delta-{int(time.time())}-2", timestamp=int(time.time() * 1000), type="strategy", operation="adjust", target="fallback_strategy_threshold", oldValue=0.8, newValue=0.75, reasoning=f"Success rate {success_rate:.1%} below target", confidence=0.6, impact="positive" )) # Delta 3: Resource pressure cpu_percent = metrics.get('cpu_percent', 0) if cpu_percent > 85: deltas.append(DeltaUpdate( id=f"delta-{int(time.time())}-3", timestamp=int(time.time() * 1000), type="resource", operation="adjust", target="max_cpu_per_agent", oldValue=cpu_percent, newValue=int(cpu_percent * 0.6), reasoning=f"CPU utilization at {cpu_percent}%, approaching limit", confidence=0.85, impact="positive" )) self.delta_history.extend(deltas) return deltas def _evaluate_deltas(self, deltas: List[DeltaUpdate]) -> List[DeltaEvaluation]: """ REFLECTION PHASE: Evaluate delta proposals with scoring """ evaluations: List[DeltaEvaluation] = [] for delta in deltas: score = 0.0 reasoning_parts: List[str] = [] # Factor 1: Confidence (40%) confidence_score = delta.confidence * 40 score += confidence_score reasoning_parts.append(f"Confidence: {delta.confidence*100:.0f}% = {confidence_score:.0f}pts") # Factor 2: Reasoning quality (30%) reasoning_quality = self._assess_reasoning_quality(delta.reasoning) reasoning_score = reasoning_quality * 30 score += reasoning_score reasoning_parts.append(f"Reasoning: {reasoning_quality:.1f} = {reasoning_score:.0f}pts") # Factor 3: Impact (20%) impact_score = 0.0 if delta.impact == "positive": impact_score = 20.0 elif delta.impact == "negative": impact_score = 0.0 score = 0.0 # Veto negative else: impact_score = 10.0 score += impact_score reasoning_parts.append(f"Impact: {delta.impact} = {impact_score:.0f}pts") # Factor 4: Risk (10%) risk_level = self._assess_risk(delta) risk_score = (1.0 - (1.0 if risk_level == "high" else 0.5 if risk_level == "medium" else 0.0)) * 10 score += risk_score reasoning_parts.append(f"Risk: {risk_level} = {risk_score:.0f}pts") score = min(100, max(0, score)) # Recommendation threshold: 65/100 min_score = self.config.get("reflection", {}).get("min_score", 65) recommended = score >= min_score evaluation = DeltaEvaluation( deltaId=delta.id, overallScore=score, recommended=recommended, reasoning="; ".join(reasoning_parts), riskLevel=risk_level, estimatedBenefit=self._estimate_benefit(delta) ) evaluations.append(evaluation) logger.debug( f"Delta {delta.id}: score={score:.0f}, " f"recommended={recommended}, risk={risk_level}" ) self.evaluation_history.extend(evaluations) return evaluations def _apply_recommended_deltas( self, deltas: List[DeltaUpdate], evaluations: List[DeltaEvaluation] ) -> int: """ CURATION PHASE: Apply recommended deltas with score >= 65 """ applied_count = 0 for delta in deltas: evaluation = next((e for e in evaluations if e.deltaId == delta.id), None) if not evaluation: continue if evaluation.recommended and evaluation.riskLevel != "high": # Apply the delta delta.appliedAt = int(time.time() * 1000) applied_count += 1 logger.info( f"Applied delta {delta.id}: " f"{delta.target} {delta.operation} " f"{delta.oldValue} -> {delta.newValue} " f"(score={evaluation.overallScore:.0f})" ) return applied_count def _assess_reasoning_quality(self, reasoning: str) -> float: """Assess quality of delta reasoning (0-1)""" score = 0.5 # Base score if "observed" in reasoning or "%" in reasoning: score += 0.2 if "system" in reasoning or "performance" in reasoning: score += 0.15 if "because" in reasoning or "therefore" in reasoning: score += 0.15 return min(1.0, score) def _assess_risk(self, delta: DeltaUpdate) -> str: """Assess risk level of delta""" if delta.operation == "remove": return "high" elif delta.operation == "modify": return "medium" else: return "low" def _estimate_benefit(self, delta: DeltaUpdate) -> str: """Estimate potential benefit of delta""" if delta.type == "coordination": return "Potential latency improvement: ~10-15%" elif delta.type == "resource": return "Better resource utilization, reduced contention" elif delta.type == "metric": return "More realistic performance targets" return "Unknown benefit" def _record_cycle_metrics(self, cycle_id: str, deltas: List[DeltaUpdate]) -> None: """Record learning cycle metrics""" cycle_metrics = { "cycle_id": cycle_id, "timestamp": datetime.utcnow().isoformat(), "deltas_proposed": len(deltas), "deltas_applied": sum(1 for d in deltas if d.appliedAt), "total_deltas_history": len(self.delta_history), "total_evaluations": len(self.evaluation_history) } self.learning_cycles.append(cycle_metrics) logger.info( f"Learning cycle metrics: " f"proposed={len(deltas)}, " f"history_size={len(self.delta_history)}" ) def get_status(self) -> Dict[str, Any]: """Get current learning system status""" return { "active": self.active, "cycle_interval_seconds": self.cycle_interval, "total_tasks_recorded": len(self.task_history), "total_deltas_proposed": len(self.delta_history), "total_deltas_applied": sum(1 for d in self.delta_history if d.appliedAt), "total_evaluations": len(self.evaluation_history), "total_cycles": len(self.learning_cycles), "recommended_deltas": sum( 1 for e in self.evaluation_history if e.recommended ), "config_version": self.config.get("version", "unknown") } def get_learning_history(self, limit: int = 10) -> List[Dict[str, Any]]: """Get recent learning cycles""" return self.learning_cycles[-limit:] def get_delta_status(self) -> Dict[str, Any]: """Get delta proposal and application status""" applied = sum(1 for d in self.delta_history if d.appliedAt) return { "total_proposed": len(self.delta_history), "total_applied": applied, "pending_or_rejected": len(self.delta_history) - applied, "by_type": { delta_type: sum( 1 for d in self.delta_history if d.type == delta_type ) for delta_type in ["coordination", "resource", "metric", "strategy"] } }