Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
463 lines
17 KiB
Python
463 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Autonomous Learning Integration Module
|
|
|
|
Integrates the ACE Framework (Generator-Reflector-Curator) autonomous learning
|
|
system with the sub-agent orchestration system.
|
|
|
|
Features:
|
|
- Initializes AutonomousLearningOrchestrator on startup
|
|
- Connects to active task stream for metrics collection
|
|
- Implements 30-second learning cycle
|
|
- Tracks delta history and application results
|
|
- Logs learning metrics to /var/log/luzia/learning.log
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import threading
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Any, Callable
|
|
from datetime import datetime
|
|
from dataclasses import dataclass, asdict
|
|
import traceback
|
|
|
|
# Configure logging
|
|
log_dir = Path("/var/log/luzia")
|
|
log_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(log_dir / "learning.log"),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class DeltaUpdate:
|
|
"""Delta update for autonomous learning"""
|
|
id: str
|
|
timestamp: int
|
|
type: str # 'strategy', 'coordination', 'resource', 'metric'
|
|
operation: str # 'modify', 'add', 'remove', 'adjust'
|
|
target: str
|
|
oldValue: Any
|
|
newValue: Any
|
|
reasoning: str
|
|
confidence: float # 0-1
|
|
impact: str # 'positive', 'negative', 'neutral'
|
|
appliedAt: Optional[int] = None
|
|
|
|
|
|
@dataclass
|
|
class DeltaEvaluation:
|
|
"""Evaluation of a delta proposal"""
|
|
deltaId: str
|
|
overallScore: float # 0-100
|
|
recommended: bool
|
|
reasoning: str
|
|
riskLevel: str # 'low', 'medium', 'high'
|
|
estimatedBenefit: str
|
|
|
|
|
|
class AutonomousLearningIntegration:
|
|
"""
|
|
Integrates ACE Framework learning with sub-agent orchestration.
|
|
|
|
Manages the 30-second learning cycle:
|
|
1. GENERATION: Analyze last 30 tasks, propose deltas
|
|
2. REFLECTION: Score proposals with confidence and impact
|
|
3. CURATION: Apply deltas with score >= 65/100
|
|
"""
|
|
|
|
def __init__(self, config_path: Path = Path("/etc/luzia/learning_config.json")):
|
|
"""Initialize learning integration"""
|
|
self.config_path = config_path
|
|
self.config = self._load_config()
|
|
|
|
# Learning state
|
|
self.active = False
|
|
self.learning_thread: Optional[threading.Thread] = None
|
|
self.cycle_interval = self.config.get("cycle", {}).get("interval_seconds", 30)
|
|
|
|
# Metrics and history
|
|
self.task_history: List[Dict[str, Any]] = []
|
|
self.delta_history: List[DeltaUpdate] = []
|
|
self.evaluation_history: List[DeltaEvaluation] = []
|
|
self.learning_cycles: List[Dict[str, Any]] = []
|
|
|
|
# Metrics provider callback
|
|
self.metrics_provider: Optional[Callable] = None
|
|
|
|
# Sub-agent context manager
|
|
self.context_manager = None
|
|
|
|
logger.info("AutonomousLearningIntegration initialized")
|
|
logger.info(f"Cycle interval: {self.cycle_interval}s")
|
|
logger.info(f"Min confidence: {self.config.get('reflection', {}).get('min_confidence', 0.5)}")
|
|
logger.info(f"Min score: {self.config.get('reflection', {}).get('min_score', 65)}/100")
|
|
|
|
def _load_config(self) -> Dict[str, Any]:
|
|
"""Load learning configuration"""
|
|
try:
|
|
if self.config_path.exists():
|
|
return json.loads(self.config_path.read_text())
|
|
except Exception as e:
|
|
logger.error(f"Failed to load config from {self.config_path}: {e}")
|
|
|
|
# Return default config
|
|
return {
|
|
"cycle": {"interval_seconds": 30},
|
|
"reflection": {"min_confidence": 0.5, "min_score": 65},
|
|
"monitoring": {"log_file": "/var/log/luzia/learning.log"}
|
|
}
|
|
|
|
def set_metrics_provider(self, provider: Callable[[], Dict[str, Any]]) -> None:
|
|
"""Set callback function to provide coordination metrics"""
|
|
self.metrics_provider = provider
|
|
logger.debug("Metrics provider registered")
|
|
|
|
def set_context_manager(self, manager) -> None:
|
|
"""Set sub-agent context manager for coordination"""
|
|
self.context_manager = manager
|
|
logger.debug("Context manager registered")
|
|
|
|
def record_task(self, task: Dict[str, Any]) -> None:
|
|
"""Record task execution for learning analysis"""
|
|
task_with_timestamp = {
|
|
**task,
|
|
"recorded_at": datetime.utcnow().isoformat()
|
|
}
|
|
self.task_history.append(task_with_timestamp)
|
|
|
|
# Keep only recent 100 tasks
|
|
if len(self.task_history) > 100:
|
|
self.task_history = self.task_history[-100:]
|
|
|
|
def start_learning(self) -> None:
|
|
"""Start the autonomous learning cycle"""
|
|
if self.active:
|
|
logger.warning("Learning cycle already active")
|
|
return
|
|
|
|
self.active = True
|
|
self.learning_thread = threading.Thread(
|
|
target=self._learning_cycle_worker,
|
|
daemon=False
|
|
)
|
|
self.learning_thread.start()
|
|
logger.info("Autonomous learning cycle started")
|
|
|
|
def stop_learning(self) -> None:
|
|
"""Stop the autonomous learning cycle"""
|
|
self.active = False
|
|
if self.learning_thread:
|
|
self.learning_thread.join(timeout=5)
|
|
logger.info("Autonomous learning cycle stopped")
|
|
|
|
def _learning_cycle_worker(self) -> None:
|
|
"""Main learning cycle worker thread"""
|
|
cycle_count = 0
|
|
|
|
while self.active:
|
|
try:
|
|
cycle_count += 1
|
|
cycle_id = f"cycle-{cycle_count}-{int(time.time())}"
|
|
|
|
logger.info(f"Starting learning cycle {cycle_count}")
|
|
|
|
# PHASE 1: GENERATION
|
|
generated_deltas = self._generate_deltas()
|
|
logger.info(f"Generated {len(generated_deltas)} delta proposals")
|
|
|
|
# PHASE 2: REFLECTION
|
|
if generated_deltas:
|
|
evaluations = self._evaluate_deltas(generated_deltas)
|
|
recommended = [e for e in evaluations if e.recommended]
|
|
logger.info(f"Evaluated deltas: {len(recommended)} recommended out of {len(evaluations)}")
|
|
|
|
# PHASE 3: CURATION
|
|
if recommended:
|
|
applied = self._apply_recommended_deltas(
|
|
[d for d in generated_deltas if any(
|
|
e.deltaId == d.id and e.recommended for e in evaluations
|
|
)],
|
|
evaluations
|
|
)
|
|
logger.info(f"Applied {applied} deltas in cycle {cycle_count}")
|
|
else:
|
|
logger.debug("No delta proposals generated in this cycle")
|
|
|
|
# Record cycle metrics
|
|
self._record_cycle_metrics(cycle_id, generated_deltas)
|
|
|
|
# Wait for next cycle
|
|
time.sleep(self.cycle_interval)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in learning cycle: {e}")
|
|
logger.error(traceback.format_exc())
|
|
time.sleep(5) # Backoff on error
|
|
|
|
def _generate_deltas(self) -> List[DeltaUpdate]:
|
|
"""
|
|
GENERATION PHASE: Analyze task history and generate delta proposals
|
|
"""
|
|
deltas: List[DeltaUpdate] = []
|
|
|
|
if len(self.task_history) < 30:
|
|
logger.debug(f"Not enough tasks for analysis ({len(self.task_history)} < 30)")
|
|
return deltas
|
|
|
|
# Analyze last 30 tasks
|
|
recent_tasks = self.task_history[-30:]
|
|
|
|
# Calculate metrics
|
|
avg_latency = sum(
|
|
t.get("latency", 0) for t in recent_tasks
|
|
) / len(recent_tasks) if recent_tasks else 0
|
|
|
|
success_count = sum(1 for t in recent_tasks if t.get("status") == "success")
|
|
success_rate = success_count / len(recent_tasks) if recent_tasks else 0
|
|
|
|
# Get coordination context
|
|
metrics = self.metrics_provider() if self.metrics_provider else {}
|
|
|
|
logger.debug(
|
|
f"Task analysis: avg_latency={avg_latency:.1f}ms, "
|
|
f"success_rate={success_rate:.1%}, "
|
|
f"sub_agents={metrics.get('sub_agent_count', 0)}"
|
|
)
|
|
|
|
# Delta 1: Coordination strategy adjustment
|
|
if metrics.get('sub_agent_count', 0) > 8 and avg_latency > 100:
|
|
deltas.append(DeltaUpdate(
|
|
id=f"delta-{int(time.time())}-1",
|
|
timestamp=int(time.time() * 1000),
|
|
type="coordination",
|
|
operation="modify",
|
|
target="primary_coordination_strategy",
|
|
oldValue="sequential",
|
|
newValue="adaptive",
|
|
reasoning=f"High agent count ({metrics.get('sub_agent_count', 0)}) with "
|
|
f"elevated latency ({avg_latency:.0f}ms)",
|
|
confidence=0.75,
|
|
impact="positive"
|
|
))
|
|
|
|
# Delta 2: Success rate threshold
|
|
if success_rate < 0.85:
|
|
deltas.append(DeltaUpdate(
|
|
id=f"delta-{int(time.time())}-2",
|
|
timestamp=int(time.time() * 1000),
|
|
type="strategy",
|
|
operation="adjust",
|
|
target="fallback_strategy_threshold",
|
|
oldValue=0.8,
|
|
newValue=0.75,
|
|
reasoning=f"Success rate {success_rate:.1%} below target",
|
|
confidence=0.6,
|
|
impact="positive"
|
|
))
|
|
|
|
# Delta 3: Resource pressure
|
|
cpu_percent = metrics.get('cpu_percent', 0)
|
|
if cpu_percent > 85:
|
|
deltas.append(DeltaUpdate(
|
|
id=f"delta-{int(time.time())}-3",
|
|
timestamp=int(time.time() * 1000),
|
|
type="resource",
|
|
operation="adjust",
|
|
target="max_cpu_per_agent",
|
|
oldValue=cpu_percent,
|
|
newValue=int(cpu_percent * 0.6),
|
|
reasoning=f"CPU utilization at {cpu_percent}%, approaching limit",
|
|
confidence=0.85,
|
|
impact="positive"
|
|
))
|
|
|
|
self.delta_history.extend(deltas)
|
|
return deltas
|
|
|
|
def _evaluate_deltas(self, deltas: List[DeltaUpdate]) -> List[DeltaEvaluation]:
|
|
"""
|
|
REFLECTION PHASE: Evaluate delta proposals with scoring
|
|
"""
|
|
evaluations: List[DeltaEvaluation] = []
|
|
|
|
for delta in deltas:
|
|
score = 0.0
|
|
reasoning_parts: List[str] = []
|
|
|
|
# Factor 1: Confidence (40%)
|
|
confidence_score = delta.confidence * 40
|
|
score += confidence_score
|
|
reasoning_parts.append(f"Confidence: {delta.confidence*100:.0f}% = {confidence_score:.0f}pts")
|
|
|
|
# Factor 2: Reasoning quality (30%)
|
|
reasoning_quality = self._assess_reasoning_quality(delta.reasoning)
|
|
reasoning_score = reasoning_quality * 30
|
|
score += reasoning_score
|
|
reasoning_parts.append(f"Reasoning: {reasoning_quality:.1f} = {reasoning_score:.0f}pts")
|
|
|
|
# Factor 3: Impact (20%)
|
|
impact_score = 0.0
|
|
if delta.impact == "positive":
|
|
impact_score = 20.0
|
|
elif delta.impact == "negative":
|
|
impact_score = 0.0
|
|
score = 0.0 # Veto negative
|
|
else:
|
|
impact_score = 10.0
|
|
score += impact_score
|
|
reasoning_parts.append(f"Impact: {delta.impact} = {impact_score:.0f}pts")
|
|
|
|
# Factor 4: Risk (10%)
|
|
risk_level = self._assess_risk(delta)
|
|
risk_score = (1.0 - (1.0 if risk_level == "high" else 0.5 if risk_level == "medium" else 0.0)) * 10
|
|
score += risk_score
|
|
reasoning_parts.append(f"Risk: {risk_level} = {risk_score:.0f}pts")
|
|
|
|
score = min(100, max(0, score))
|
|
|
|
# Recommendation threshold: 65/100
|
|
min_score = self.config.get("reflection", {}).get("min_score", 65)
|
|
recommended = score >= min_score
|
|
|
|
evaluation = DeltaEvaluation(
|
|
deltaId=delta.id,
|
|
overallScore=score,
|
|
recommended=recommended,
|
|
reasoning="; ".join(reasoning_parts),
|
|
riskLevel=risk_level,
|
|
estimatedBenefit=self._estimate_benefit(delta)
|
|
)
|
|
evaluations.append(evaluation)
|
|
|
|
logger.debug(
|
|
f"Delta {delta.id}: score={score:.0f}, "
|
|
f"recommended={recommended}, risk={risk_level}"
|
|
)
|
|
|
|
self.evaluation_history.extend(evaluations)
|
|
return evaluations
|
|
|
|
def _apply_recommended_deltas(
|
|
self,
|
|
deltas: List[DeltaUpdate],
|
|
evaluations: List[DeltaEvaluation]
|
|
) -> int:
|
|
"""
|
|
CURATION PHASE: Apply recommended deltas with score >= 65
|
|
"""
|
|
applied_count = 0
|
|
|
|
for delta in deltas:
|
|
evaluation = next((e for e in evaluations if e.deltaId == delta.id), None)
|
|
if not evaluation:
|
|
continue
|
|
|
|
if evaluation.recommended and evaluation.riskLevel != "high":
|
|
# Apply the delta
|
|
delta.appliedAt = int(time.time() * 1000)
|
|
applied_count += 1
|
|
|
|
logger.info(
|
|
f"Applied delta {delta.id}: "
|
|
f"{delta.target} {delta.operation} "
|
|
f"{delta.oldValue} -> {delta.newValue} "
|
|
f"(score={evaluation.overallScore:.0f})"
|
|
)
|
|
|
|
return applied_count
|
|
|
|
def _assess_reasoning_quality(self, reasoning: str) -> float:
|
|
"""Assess quality of delta reasoning (0-1)"""
|
|
score = 0.5 # Base score
|
|
|
|
if "observed" in reasoning or "%" in reasoning:
|
|
score += 0.2
|
|
if "system" in reasoning or "performance" in reasoning:
|
|
score += 0.15
|
|
if "because" in reasoning or "therefore" in reasoning:
|
|
score += 0.15
|
|
|
|
return min(1.0, score)
|
|
|
|
def _assess_risk(self, delta: DeltaUpdate) -> str:
|
|
"""Assess risk level of delta"""
|
|
if delta.operation == "remove":
|
|
return "high"
|
|
elif delta.operation == "modify":
|
|
return "medium"
|
|
else:
|
|
return "low"
|
|
|
|
def _estimate_benefit(self, delta: DeltaUpdate) -> str:
|
|
"""Estimate potential benefit of delta"""
|
|
if delta.type == "coordination":
|
|
return "Potential latency improvement: ~10-15%"
|
|
elif delta.type == "resource":
|
|
return "Better resource utilization, reduced contention"
|
|
elif delta.type == "metric":
|
|
return "More realistic performance targets"
|
|
return "Unknown benefit"
|
|
|
|
def _record_cycle_metrics(self, cycle_id: str, deltas: List[DeltaUpdate]) -> None:
|
|
"""Record learning cycle metrics"""
|
|
cycle_metrics = {
|
|
"cycle_id": cycle_id,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"deltas_proposed": len(deltas),
|
|
"deltas_applied": sum(1 for d in deltas if d.appliedAt),
|
|
"total_deltas_history": len(self.delta_history),
|
|
"total_evaluations": len(self.evaluation_history)
|
|
}
|
|
self.learning_cycles.append(cycle_metrics)
|
|
|
|
logger.info(
|
|
f"Learning cycle metrics: "
|
|
f"proposed={len(deltas)}, "
|
|
f"history_size={len(self.delta_history)}"
|
|
)
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Get current learning system status"""
|
|
return {
|
|
"active": self.active,
|
|
"cycle_interval_seconds": self.cycle_interval,
|
|
"total_tasks_recorded": len(self.task_history),
|
|
"total_deltas_proposed": len(self.delta_history),
|
|
"total_deltas_applied": sum(1 for d in self.delta_history if d.appliedAt),
|
|
"total_evaluations": len(self.evaluation_history),
|
|
"total_cycles": len(self.learning_cycles),
|
|
"recommended_deltas": sum(
|
|
1 for e in self.evaluation_history if e.recommended
|
|
),
|
|
"config_version": self.config.get("version", "unknown")
|
|
}
|
|
|
|
def get_learning_history(self, limit: int = 10) -> List[Dict[str, Any]]:
|
|
"""Get recent learning cycles"""
|
|
return self.learning_cycles[-limit:]
|
|
|
|
def get_delta_status(self) -> Dict[str, Any]:
|
|
"""Get delta proposal and application status"""
|
|
applied = sum(1 for d in self.delta_history if d.appliedAt)
|
|
return {
|
|
"total_proposed": len(self.delta_history),
|
|
"total_applied": applied,
|
|
"pending_or_rejected": len(self.delta_history) - applied,
|
|
"by_type": {
|
|
delta_type: sum(
|
|
1 for d in self.delta_history if d.type == delta_type
|
|
)
|
|
for delta_type in ["coordination", "resource", "metric", "strategy"]
|
|
}
|
|
}
|