#!/usr/bin/env python3 """ Learning Test Workload Generator Generates synthetic task workloads to test the autonomous learning system. Features: - Generate 100 realistic sub-agent tasks - Vary latencies, success rates, and resource usage - Monitor delta proposals and scoring - Verify learning system responds appropriately - Measure performance improvements Usage: python learning_test_workload.py --tasks 100 --observe """ import sys import time import random import json import argparse import logging from pathlib import Path from datetime import datetime, timedelta from typing import Dict, List, Any import threading # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class SyntheticWorkloadGenerator: """Generates synthetic task workloads for learning system testing""" def __init__(self, learning_system=None): """Initialize workload generator Args: learning_system: Optional AutonomousLearningIntegration instance """ self.learning_system = learning_system self.tasks_generated = 0 self.start_time = None def generate_tasks(self, count: int = 100, interval_ms: int = 100) -> List[Dict[str, Any]]: """ Generate synthetic task workload. Args: count: Number of tasks to generate interval_ms: Milliseconds between task generation Returns: List of generated task dictionaries """ logger.info(f"Generating {count} synthetic tasks...") tasks = [] self.start_time = time.time() for i in range(count): # Generate task with realistic variations task = self._create_synthetic_task(i) tasks.append(task) # Record task in learning system if available if self.learning_system: self.learning_system.record_task(task) self.tasks_generated += 1 # Print progress if (i + 1) % 10 == 0: logger.info(f"Generated {i + 1}/{count} tasks") # Interval between task generation time.sleep(interval_ms / 1000.0) logger.info(f"Completed generating {count} tasks") return tasks def _create_synthetic_task(self, task_index: int) -> Dict[str, Any]: """Create a single synthetic task""" # Vary characteristics across cycles cycle = task_index // 30 # Base success rate improves with learning base_success = 0.85 + (cycle * 0.02) success = random.random() < base_success # Latency improves with learning (delta application) base_latency = 80 - (cycle * 5) # Improves by ~5ms per 30-task cycle latency = max(20, int(random.gauss(base_latency, 20))) # Sub-agents used sub_agents = random.randint(2, 16) task = { "task_id": f"task-synthetic-{task_index}", "parent_task_id": f"parent-{task_index // 10}", "status": "success" if success else "failed", "latency": latency, "sub_agents_used": sub_agents, "timestamp": datetime.utcnow().isoformat(), "phase": random.choice([ "ANALYZING", "EXECUTING", "LEARNING", "STRATEGIZING" ]), "success": success, "cycle": cycle } return task def simulate_workload_with_monitoring( self, task_count: int = 100, interval_ms: int = 100, observe_duration_s: int = 120 ) -> Dict[str, Any]: """ Generate synthetic workload and observe learning system response. Args: task_count: Number of tasks to generate interval_ms: Milliseconds between tasks observe_duration_s: Seconds to observe after generation Returns: Monitoring results """ logger.info(f"Starting workload simulation ({task_count} tasks, {observe_duration_s}s observation)") # Generate tasks in background thread generation_thread = threading.Thread( target=self.generate_tasks, args=(task_count, interval_ms), daemon=False ) generation_thread.start() # Start time for observation obs_start = time.time() observations = [] # Observe while tasks are being generated and after while time.time() - obs_start < observe_duration_s: if self.learning_system: # Capture learning system state observation = { "timestamp": datetime.utcnow().isoformat(), "elapsed_seconds": time.time() - obs_start, "status": self.learning_system.get_status(), "delta_status": self.learning_system.get_delta_status() } observations.append(observation) # Log current state status = self.learning_system.get_status() logger.info( f"[{observation['elapsed_seconds']:.1f}s] " f"Tasks: {status['total_tasks_recorded']}, " f"Deltas: {status['total_deltas_proposed']} proposed, " f"{status['total_deltas_applied']} applied, " f"Cycles: {status['total_cycles']}" ) time.sleep(5) # Observe every 5 seconds # Wait for generation thread to complete generation_thread.join(timeout=30) # Prepare results results = { "simulation_complete": True, "duration_seconds": time.time() - obs_start, "tasks_generated": self.tasks_generated, "observation_count": len(observations), "observations": observations } if self.learning_system: final_status = self.learning_system.get_status() results["final_learning_status"] = final_status return results def analyze_results(self, results: Dict[str, Any]) -> Dict[str, Any]: """Analyze workload simulation results""" observations = results.get("observations", []) if not observations: return {"error": "No observations recorded"} # Extract metrics deltas_proposed = [o.get("status", {}).get("total_deltas_proposed", 0) for o in observations] deltas_applied = [o.get("status", {}).get("total_deltas_applied", 0) for o in observations] tasks_recorded = [o.get("status", {}).get("total_tasks_recorded", 0) for o in observations] analysis = { "workload_stats": { "total_tasks_generated": results.get("tasks_generated", 0), "total_observations": results.get("observation_count", 0), "simulation_duration_seconds": results.get("duration_seconds", 0) }, "delta_proposal_stats": { "total_proposed": max(deltas_proposed) if deltas_proposed else 0, "total_applied": max(deltas_applied) if deltas_applied else 0, "average_proposed_per_cycle": sum(deltas_proposed) / len(deltas_proposed) if deltas_proposed else 0, "proposal_trend": "increasing" if len(deltas_proposed) > 1 and deltas_proposed[-1] > deltas_proposed[0] else "stable" }, "learning_effectiveness": { "cycles_executed": results.get("final_learning_status", {}).get("total_cycles", 0), "recommended_deltas": results.get("final_learning_status", {}).get("recommended_deltas", 0), "application_rate": ( max(deltas_applied) / max(deltas_proposed) if deltas_proposed and max(deltas_proposed) > 0 else 0 ) }, "delta_breakdown": { "by_type": results.get("final_learning_status", {}).get("delta_status", {}).get("by_type", {}) } } return analysis def main(): """Main test execution""" parser = argparse.ArgumentParser( description="Autonomous Learning System Test Workload" ) parser.add_argument("--tasks", type=int, default=100, help="Number of tasks to generate") parser.add_argument("--interval", type=int, default=100, help="Milliseconds between tasks") parser.add_argument("--observe", type=int, default=120, help="Seconds to observe after generation") parser.add_argument("--output", type=str, default=None, help="Output file for results (JSON)") args = parser.parse_args() # Create workload generator generator = SyntheticWorkloadGenerator() # Check if learning system is available try: from autonomous_learning_integration import AutonomousLearningIntegration learning = AutonomousLearningIntegration() generator.learning_system = learning logger.info("Autonomous learning system connected") except ImportError: logger.warning("Learning system not available, running in standalone mode") # Run simulation logger.info(f"Starting test workload: {args.tasks} tasks, {args.observe}s observation") results = generator.simulate_workload_with_monitoring( task_count=args.tasks, interval_ms=args.interval, observe_duration_s=args.observe ) # Analyze results analysis = generator.analyze_results(results) # Print summary print("\n" + "="*70) print("Learning System Test Results") print("="*70 + "\n") print("Workload Statistics:") for key, value in analysis.get("workload_stats", {}).items(): print(f" {key:.<40} {value}") print("\nDelta Proposal Statistics:") for key, value in analysis.get("delta_proposal_stats", {}).items(): print(f" {key:.<40} {value}") print("\nLearning Effectiveness:") for key, value in analysis.get("learning_effectiveness", {}).items(): print(f" {key:.<40} {value:.1%}" if isinstance(value, float) else f" {key:.<40} {value}") print("\nDelta Types Distribution:") for dtype, count in analysis.get("delta_breakdown", {}).get("by_type", {}).items(): print(f" {dtype:.<40} {count}") print("\n" + "="*70 + "\n") # Save results if requested if args.output: output_path = Path(args.output) with open(output_path, 'w') as f: json.dump({ "results": results, "analysis": analysis, "timestamp": datetime.utcnow().isoformat() }, f, indent=2, default=str) logger.info(f"Results saved to {output_path}") return 0 if __name__ == "__main__": sys.exit(main())