Files
luzia/lib/autonomous_learning_integration.py
admin ec33ac1936 Refactor cockpit to use DockerTmuxController pattern
Based on claude-code-tools TmuxCLIController, this refactor:

- Added DockerTmuxController class for robust tmux session management
- Implements send_keys() with configurable delay_enter
- Implements capture_pane() for output retrieval
- Implements wait_for_prompt() for pattern-based completion detection
- Implements wait_for_idle() for content-hash-based idle detection
- Implements wait_for_shell_prompt() for shell prompt detection

Also includes workflow improvements:
- Pre-task git snapshot before agent execution
- Post-task commit protocol in agent guidelines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:42:16 -03:00

463 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Autonomous Learning Integration Module
Integrates the ACE Framework (Generator-Reflector-Curator) autonomous learning
system with the sub-agent orchestration system.
Features:
- Initializes AutonomousLearningOrchestrator on startup
- Connects to active task stream for metrics collection
- Implements 30-second learning cycle
- Tracks delta history and application results
- Logs learning metrics to /var/log/luzia/learning.log
"""
import json
import time
import threading
import logging
from pathlib import Path
from typing import Dict, List, Optional, Any, Callable
from datetime import datetime
from dataclasses import dataclass, asdict
import traceback
# Configure logging
log_dir = Path("/var/log/luzia")
log_dir.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_dir / "learning.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
@dataclass
class DeltaUpdate:
"""Delta update for autonomous learning"""
id: str
timestamp: int
type: str # 'strategy', 'coordination', 'resource', 'metric'
operation: str # 'modify', 'add', 'remove', 'adjust'
target: str
oldValue: Any
newValue: Any
reasoning: str
confidence: float # 0-1
impact: str # 'positive', 'negative', 'neutral'
appliedAt: Optional[int] = None
@dataclass
class DeltaEvaluation:
"""Evaluation of a delta proposal"""
deltaId: str
overallScore: float # 0-100
recommended: bool
reasoning: str
riskLevel: str # 'low', 'medium', 'high'
estimatedBenefit: str
class AutonomousLearningIntegration:
"""
Integrates ACE Framework learning with sub-agent orchestration.
Manages the 30-second learning cycle:
1. GENERATION: Analyze last 30 tasks, propose deltas
2. REFLECTION: Score proposals with confidence and impact
3. CURATION: Apply deltas with score >= 65/100
"""
def __init__(self, config_path: Path = Path("/etc/luzia/learning_config.json")):
"""Initialize learning integration"""
self.config_path = config_path
self.config = self._load_config()
# Learning state
self.active = False
self.learning_thread: Optional[threading.Thread] = None
self.cycle_interval = self.config.get("cycle", {}).get("interval_seconds", 30)
# Metrics and history
self.task_history: List[Dict[str, Any]] = []
self.delta_history: List[DeltaUpdate] = []
self.evaluation_history: List[DeltaEvaluation] = []
self.learning_cycles: List[Dict[str, Any]] = []
# Metrics provider callback
self.metrics_provider: Optional[Callable] = None
# Sub-agent context manager
self.context_manager = None
logger.info("AutonomousLearningIntegration initialized")
logger.info(f"Cycle interval: {self.cycle_interval}s")
logger.info(f"Min confidence: {self.config.get('reflection', {}).get('min_confidence', 0.5)}")
logger.info(f"Min score: {self.config.get('reflection', {}).get('min_score', 65)}/100")
def _load_config(self) -> Dict[str, Any]:
"""Load learning configuration"""
try:
if self.config_path.exists():
return json.loads(self.config_path.read_text())
except Exception as e:
logger.error(f"Failed to load config from {self.config_path}: {e}")
# Return default config
return {
"cycle": {"interval_seconds": 30},
"reflection": {"min_confidence": 0.5, "min_score": 65},
"monitoring": {"log_file": "/var/log/luzia/learning.log"}
}
def set_metrics_provider(self, provider: Callable[[], Dict[str, Any]]) -> None:
"""Set callback function to provide coordination metrics"""
self.metrics_provider = provider
logger.debug("Metrics provider registered")
def set_context_manager(self, manager) -> None:
"""Set sub-agent context manager for coordination"""
self.context_manager = manager
logger.debug("Context manager registered")
def record_task(self, task: Dict[str, Any]) -> None:
"""Record task execution for learning analysis"""
task_with_timestamp = {
**task,
"recorded_at": datetime.utcnow().isoformat()
}
self.task_history.append(task_with_timestamp)
# Keep only recent 100 tasks
if len(self.task_history) > 100:
self.task_history = self.task_history[-100:]
def start_learning(self) -> None:
"""Start the autonomous learning cycle"""
if self.active:
logger.warning("Learning cycle already active")
return
self.active = True
self.learning_thread = threading.Thread(
target=self._learning_cycle_worker,
daemon=False
)
self.learning_thread.start()
logger.info("Autonomous learning cycle started")
def stop_learning(self) -> None:
"""Stop the autonomous learning cycle"""
self.active = False
if self.learning_thread:
self.learning_thread.join(timeout=5)
logger.info("Autonomous learning cycle stopped")
def _learning_cycle_worker(self) -> None:
"""Main learning cycle worker thread"""
cycle_count = 0
while self.active:
try:
cycle_count += 1
cycle_id = f"cycle-{cycle_count}-{int(time.time())}"
logger.info(f"Starting learning cycle {cycle_count}")
# PHASE 1: GENERATION
generated_deltas = self._generate_deltas()
logger.info(f"Generated {len(generated_deltas)} delta proposals")
# PHASE 2: REFLECTION
if generated_deltas:
evaluations = self._evaluate_deltas(generated_deltas)
recommended = [e for e in evaluations if e.recommended]
logger.info(f"Evaluated deltas: {len(recommended)} recommended out of {len(evaluations)}")
# PHASE 3: CURATION
if recommended:
applied = self._apply_recommended_deltas(
[d for d in generated_deltas if any(
e.deltaId == d.id and e.recommended for e in evaluations
)],
evaluations
)
logger.info(f"Applied {applied} deltas in cycle {cycle_count}")
else:
logger.debug("No delta proposals generated in this cycle")
# Record cycle metrics
self._record_cycle_metrics(cycle_id, generated_deltas)
# Wait for next cycle
time.sleep(self.cycle_interval)
except Exception as e:
logger.error(f"Error in learning cycle: {e}")
logger.error(traceback.format_exc())
time.sleep(5) # Backoff on error
def _generate_deltas(self) -> List[DeltaUpdate]:
"""
GENERATION PHASE: Analyze task history and generate delta proposals
"""
deltas: List[DeltaUpdate] = []
if len(self.task_history) < 30:
logger.debug(f"Not enough tasks for analysis ({len(self.task_history)} < 30)")
return deltas
# Analyze last 30 tasks
recent_tasks = self.task_history[-30:]
# Calculate metrics
avg_latency = sum(
t.get("latency", 0) for t in recent_tasks
) / len(recent_tasks) if recent_tasks else 0
success_count = sum(1 for t in recent_tasks if t.get("status") == "success")
success_rate = success_count / len(recent_tasks) if recent_tasks else 0
# Get coordination context
metrics = self.metrics_provider() if self.metrics_provider else {}
logger.debug(
f"Task analysis: avg_latency={avg_latency:.1f}ms, "
f"success_rate={success_rate:.1%}, "
f"sub_agents={metrics.get('sub_agent_count', 0)}"
)
# Delta 1: Coordination strategy adjustment
if metrics.get('sub_agent_count', 0) > 8 and avg_latency > 100:
deltas.append(DeltaUpdate(
id=f"delta-{int(time.time())}-1",
timestamp=int(time.time() * 1000),
type="coordination",
operation="modify",
target="primary_coordination_strategy",
oldValue="sequential",
newValue="adaptive",
reasoning=f"High agent count ({metrics.get('sub_agent_count', 0)}) with "
f"elevated latency ({avg_latency:.0f}ms)",
confidence=0.75,
impact="positive"
))
# Delta 2: Success rate threshold
if success_rate < 0.85:
deltas.append(DeltaUpdate(
id=f"delta-{int(time.time())}-2",
timestamp=int(time.time() * 1000),
type="strategy",
operation="adjust",
target="fallback_strategy_threshold",
oldValue=0.8,
newValue=0.75,
reasoning=f"Success rate {success_rate:.1%} below target",
confidence=0.6,
impact="positive"
))
# Delta 3: Resource pressure
cpu_percent = metrics.get('cpu_percent', 0)
if cpu_percent > 85:
deltas.append(DeltaUpdate(
id=f"delta-{int(time.time())}-3",
timestamp=int(time.time() * 1000),
type="resource",
operation="adjust",
target="max_cpu_per_agent",
oldValue=cpu_percent,
newValue=int(cpu_percent * 0.6),
reasoning=f"CPU utilization at {cpu_percent}%, approaching limit",
confidence=0.85,
impact="positive"
))
self.delta_history.extend(deltas)
return deltas
def _evaluate_deltas(self, deltas: List[DeltaUpdate]) -> List[DeltaEvaluation]:
"""
REFLECTION PHASE: Evaluate delta proposals with scoring
"""
evaluations: List[DeltaEvaluation] = []
for delta in deltas:
score = 0.0
reasoning_parts: List[str] = []
# Factor 1: Confidence (40%)
confidence_score = delta.confidence * 40
score += confidence_score
reasoning_parts.append(f"Confidence: {delta.confidence*100:.0f}% = {confidence_score:.0f}pts")
# Factor 2: Reasoning quality (30%)
reasoning_quality = self._assess_reasoning_quality(delta.reasoning)
reasoning_score = reasoning_quality * 30
score += reasoning_score
reasoning_parts.append(f"Reasoning: {reasoning_quality:.1f} = {reasoning_score:.0f}pts")
# Factor 3: Impact (20%)
impact_score = 0.0
if delta.impact == "positive":
impact_score = 20.0
elif delta.impact == "negative":
impact_score = 0.0
score = 0.0 # Veto negative
else:
impact_score = 10.0
score += impact_score
reasoning_parts.append(f"Impact: {delta.impact} = {impact_score:.0f}pts")
# Factor 4: Risk (10%)
risk_level = self._assess_risk(delta)
risk_score = (1.0 - (1.0 if risk_level == "high" else 0.5 if risk_level == "medium" else 0.0)) * 10
score += risk_score
reasoning_parts.append(f"Risk: {risk_level} = {risk_score:.0f}pts")
score = min(100, max(0, score))
# Recommendation threshold: 65/100
min_score = self.config.get("reflection", {}).get("min_score", 65)
recommended = score >= min_score
evaluation = DeltaEvaluation(
deltaId=delta.id,
overallScore=score,
recommended=recommended,
reasoning="; ".join(reasoning_parts),
riskLevel=risk_level,
estimatedBenefit=self._estimate_benefit(delta)
)
evaluations.append(evaluation)
logger.debug(
f"Delta {delta.id}: score={score:.0f}, "
f"recommended={recommended}, risk={risk_level}"
)
self.evaluation_history.extend(evaluations)
return evaluations
def _apply_recommended_deltas(
self,
deltas: List[DeltaUpdate],
evaluations: List[DeltaEvaluation]
) -> int:
"""
CURATION PHASE: Apply recommended deltas with score >= 65
"""
applied_count = 0
for delta in deltas:
evaluation = next((e for e in evaluations if e.deltaId == delta.id), None)
if not evaluation:
continue
if evaluation.recommended and evaluation.riskLevel != "high":
# Apply the delta
delta.appliedAt = int(time.time() * 1000)
applied_count += 1
logger.info(
f"Applied delta {delta.id}: "
f"{delta.target} {delta.operation} "
f"{delta.oldValue} -> {delta.newValue} "
f"(score={evaluation.overallScore:.0f})"
)
return applied_count
def _assess_reasoning_quality(self, reasoning: str) -> float:
"""Assess quality of delta reasoning (0-1)"""
score = 0.5 # Base score
if "observed" in reasoning or "%" in reasoning:
score += 0.2
if "system" in reasoning or "performance" in reasoning:
score += 0.15
if "because" in reasoning or "therefore" in reasoning:
score += 0.15
return min(1.0, score)
def _assess_risk(self, delta: DeltaUpdate) -> str:
"""Assess risk level of delta"""
if delta.operation == "remove":
return "high"
elif delta.operation == "modify":
return "medium"
else:
return "low"
def _estimate_benefit(self, delta: DeltaUpdate) -> str:
"""Estimate potential benefit of delta"""
if delta.type == "coordination":
return "Potential latency improvement: ~10-15%"
elif delta.type == "resource":
return "Better resource utilization, reduced contention"
elif delta.type == "metric":
return "More realistic performance targets"
return "Unknown benefit"
def _record_cycle_metrics(self, cycle_id: str, deltas: List[DeltaUpdate]) -> None:
"""Record learning cycle metrics"""
cycle_metrics = {
"cycle_id": cycle_id,
"timestamp": datetime.utcnow().isoformat(),
"deltas_proposed": len(deltas),
"deltas_applied": sum(1 for d in deltas if d.appliedAt),
"total_deltas_history": len(self.delta_history),
"total_evaluations": len(self.evaluation_history)
}
self.learning_cycles.append(cycle_metrics)
logger.info(
f"Learning cycle metrics: "
f"proposed={len(deltas)}, "
f"history_size={len(self.delta_history)}"
)
def get_status(self) -> Dict[str, Any]:
"""Get current learning system status"""
return {
"active": self.active,
"cycle_interval_seconds": self.cycle_interval,
"total_tasks_recorded": len(self.task_history),
"total_deltas_proposed": len(self.delta_history),
"total_deltas_applied": sum(1 for d in self.delta_history if d.appliedAt),
"total_evaluations": len(self.evaluation_history),
"total_cycles": len(self.learning_cycles),
"recommended_deltas": sum(
1 for e in self.evaluation_history if e.recommended
),
"config_version": self.config.get("version", "unknown")
}
def get_learning_history(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get recent learning cycles"""
return self.learning_cycles[-limit:]
def get_delta_status(self) -> Dict[str, Any]:
"""Get delta proposal and application status"""
applied = sum(1 for d in self.delta_history if d.appliedAt)
return {
"total_proposed": len(self.delta_history),
"total_applied": applied,
"pending_or_rejected": len(self.delta_history) - applied,
"by_type": {
delta_type: sum(
1 for d in self.delta_history if d.type == delta_type
)
for delta_type in ["coordination", "resource", "metric", "strategy"]
}
}