#!/usr/bin/env python3 """ Skill and Knowledge Learning System for Luzia Orchestrator Automatically extracts learnings from completed tasks and QA passes, storing them in the knowledge graph for future skill recommendations and decision-making improvements. Architecture: 1. TaskAnalyzer: Extracts patterns from task execution 2. SkillExtractor: Identifies skills used and outcomes 3. LearningEngine: Processes learnings and stores in KG 4. SkillRecommender: Suggests skills for future tasks """ import json import re from pathlib import Path from typing import Dict, List, Optional, Any, Tuple from datetime import datetime, timedelta from dataclasses import dataclass import hashlib # Import our modules import sys sys.path.insert(0, str(Path(__file__).parent)) from knowledge_graph import KnowledgeGraph, KG_PATHS @dataclass class TaskExecution: """Records a single task execution.""" task_id: str prompt: str project: str status: str # "success", "failed", "partial" tools_used: List[str] duration: float result_summary: str qa_passed: bool timestamp: datetime @dataclass class ExtractedSkill: """A skill extracted from task execution.""" name: str category: str # "tool_usage", "pattern", "decision", "architecture" confidence: float # 0.0-1.0 context: Dict[str, Any] source_task_id: str evidence: str @dataclass class Learning: """A learning extracted from successful task completion.""" title: str description: str skill_names: List[str] pattern: str applicability: List[str] # Project types, task patterns it applies to confidence: float source_qa_results: Dict[str, Any] related_learnings: List[str] class TaskAnalyzer: """Analyzes task execution to extract patterns and metadata.""" def __init__(self): self.execution_history: List[TaskExecution] = [] def analyze_task(self, task_data: Dict[str, Any]) -> Optional[TaskExecution]: """ Analyze a single task execution. Expected task_data structure: { "task_id": str, "prompt": str, "project": str, "status": "success|failed|partial", "tools_used": [str], "duration": float, "result_summary": str, "qa_passed": bool, "timestamp": str (ISO format) } """ try: execution = TaskExecution( task_id=task_data.get("task_id", self._generate_task_id()), prompt=task_data.get("prompt", ""), project=task_data.get("project", "general"), status=task_data.get("status", "unknown"), tools_used=task_data.get("tools_used", []), duration=task_data.get("duration", 0.0), result_summary=task_data.get("result_summary", ""), qa_passed=task_data.get("qa_passed", False), timestamp=datetime.fromisoformat(task_data.get("timestamp", datetime.now().isoformat())) ) self.execution_history.append(execution) return execution except Exception as e: print(f"Error analyzing task: {e}") return None def extract_patterns(self, executions: List[TaskExecution]) -> Dict[str, Any]: """Extract patterns from multiple task executions.""" if not executions: return {} patterns = { "success_rate": self._calculate_success_rate(executions), "average_duration": sum(e.duration for e in executions) / len(executions), "common_tools": self._extract_common_tools(executions), "project_distribution": self._extract_project_distribution(executions), "time_range": { "oldest": min(e.timestamp for e in executions).isoformat(), "newest": max(e.timestamp for e in executions).isoformat(), } } return patterns def _calculate_success_rate(self, executions: List[TaskExecution]) -> float: """Calculate success rate of task executions.""" if not executions: return 0.0 successful = sum(1 for e in executions if e.status == "success") return successful / len(executions) def _extract_common_tools(self, executions: List[TaskExecution]) -> Dict[str, int]: """Extract most commonly used tools.""" tool_counts = {} for execution in executions: for tool in execution.tools_used: tool_counts[tool] = tool_counts.get(tool, 0) + 1 return dict(sorted(tool_counts.items(), key=lambda x: x[1], reverse=True)) def _extract_project_distribution(self, executions: List[TaskExecution]) -> Dict[str, int]: """Extract project distribution of tasks.""" projects = {} for execution in executions: projects[execution.project] = projects.get(execution.project, 0) + 1 return dict(sorted(projects.items(), key=lambda x: x[1], reverse=True)) def _generate_task_id(self) -> str: """Generate unique task ID.""" return hashlib.md5( f"{datetime.now().isoformat()}".encode() ).hexdigest()[:12] class SkillExtractor: """Extracts skills from task executions and QA results.""" def extract_from_task(self, execution: TaskExecution) -> List[ExtractedSkill]: """Extract skills from a single task execution.""" skills = [] # Extract tool usage skills for tool in execution.tools_used: skills.append(ExtractedSkill( name=f"tool_{tool.lower()}", category="tool_usage", confidence=0.8, context={ "tool": tool, "project": execution.project, "frequency": 1 }, source_task_id=execution.task_id, evidence=f"Tool '{tool}' used in task: {execution.prompt[:100]}" )) # Extract decision patterns from prompt decision_skills = self._extract_decision_patterns(execution.prompt) skills.extend(decision_skills) # Extract project-specific skills project_skill = ExtractedSkill( name=f"project_{execution.project}", category="architecture", confidence=0.7, context={"project": execution.project}, source_task_id=execution.task_id, evidence=f"Task executed for project: {execution.project}" ) skills.append(project_skill) return skills def extract_from_qa_results(self, qa_results: Dict[str, Any]) -> List[ExtractedSkill]: """Extract skills from QA validation results.""" skills = [] if not qa_results.get("passed", False): return skills # Success in validation categories for category, passed in qa_results.get("results", {}).items(): if passed: skills.append(ExtractedSkill( name=f"qa_pass_{category}", category="pattern", confidence=0.9, context={"qa_category": category}, source_task_id=qa_results.get("task_id", "unknown"), evidence=f"QA passed for category: {category}" )) return skills def _extract_decision_patterns(self, prompt: str) -> List[ExtractedSkill]: """Extract decision-making patterns from task prompt.""" skills = [] patterns = { "optimization": r"(optimiz|improves?|faster|efficient)", "debugging": r"(debug|troubleshoot|fix|error)", "documentation": r"(document|document|docstring|comment)", "testing": r"(test|validate|check|verify)", "refactoring": r"(refactor|clean|simplify|reorganize)", "integration": r"(integrat|connect|link|sync)", "automation": r"(automat|cron|schedule|batch)", } for pattern_name, pattern_regex in patterns.items(): if re.search(pattern_regex, prompt, re.IGNORECASE): skills.append(ExtractedSkill( name=f"pattern_{pattern_name}", category="decision", confidence=0.6, context={"pattern_type": pattern_name}, source_task_id="", evidence=f"Pattern '{pattern_name}' detected in prompt" )) return skills def aggregate_skills(self, skills: List[ExtractedSkill]) -> Dict[str, Dict[str, Any]]: """Aggregate multiple skill extractions.""" aggregated = {} for skill in skills: if skill.name not in aggregated: aggregated[skill.name] = { "name": skill.name, "category": skill.category, "occurrences": 0, "total_confidence": 0.0, "contexts": [], } aggregated[skill.name]["occurrences"] += 1 aggregated[skill.name]["total_confidence"] += skill.confidence aggregated[skill.name]["contexts"].append(skill.context) # Calculate average confidence for skill_name, data in aggregated.items(): if data["occurrences"] > 0: data["average_confidence"] = data["total_confidence"] / data["occurrences"] return aggregated class LearningEngine: """Processes and stores learnings in the knowledge graph.""" def __init__(self): self.kg = KnowledgeGraph("research", skip_permission_check=True) def extract_learning( self, execution: TaskExecution, skills: List[ExtractedSkill], qa_results: Dict[str, Any] ) -> Optional[Learning]: """Extract a learning from successful task completion.""" if execution.status != "success" or not qa_results.get("passed", False): return None # Build learning from components skill_names = [s.name for s in skills] learning = Learning( title=self._generate_title(execution), description=self._generate_description(execution, skills), skill_names=skill_names, pattern=self._extract_pattern(execution), applicability=self._determine_applicability(execution, skills), confidence=self._calculate_confidence(skills, qa_results), source_qa_results=qa_results, related_learnings=[] ) return learning def store_learning(self, learning: Learning) -> str: """Store learning in knowledge graph.""" # Create learning entity learning_name = f"learning_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{learning.title[:30]}" content = f"""Title: {learning.title} Description: {learning.description} Skills Used: {', '.join(learning.skill_names)} Pattern: {learning.pattern} Applicability: {chr(10).join(f' - {a}' for a in learning.applicability)} Confidence: {learning.confidence:.2%} QA Results Summary: {json.dumps(learning.source_qa_results.get('summary', {}), indent=2)} """ metadata = { "skills": learning.skill_names, "pattern": learning.pattern, "confidence": learning.confidence, "applicability": learning.applicability, "extraction_time": datetime.now().isoformat(), } entity_id = self.kg.add_entity( name=learning_name, entity_type="finding", content=content, metadata=metadata, source="skill_learning_engine" ) # Store each skill relationship for skill_name in learning.skill_names: try: self.kg.add_relation( learning_name, skill_name, "references", # Changed from "uses" to valid relation type f"Learning demonstrates use of {skill_name}" ) except Exception as e: # Skills might not exist as entities, skip relation pass return entity_id def create_skill_entity(self, skill: ExtractedSkill) -> str: """Create or update skill entity in KG.""" content = f"""Category: {skill.category} Confidence: {skill.confidence:.2%} Context: {json.dumps(skill.context, indent=2)} Evidence: {skill.evidence} """ metadata = { "category": skill.category, "confidence": skill.confidence, "source_task": skill.source_task_id, } return self.kg.add_entity( name=skill.name, entity_type="finding", content=content, metadata=metadata, source="skill_extractor" ) def _generate_title(self, execution: TaskExecution) -> str: """Generate a learning title from task execution.""" # Extract key concepts from prompt words = execution.prompt.split()[:5] return " ".join(words).title() def _generate_description(self, execution: TaskExecution, skills: List[ExtractedSkill]) -> str: """Generate learning description.""" skill_summary = ", ".join([s.name for s in skills[:3]]) return f"""Task: {execution.prompt[:150]}... Project: {execution.project} Status: {execution.status} Tools: {', '.join(execution.tools_used[:3])} Key Skills: {skill_summary} """ def _extract_pattern(self, execution: TaskExecution) -> str: """Extract the core pattern from task execution.""" # Simplified pattern extraction if "debug" in execution.prompt.lower(): return "debugging_pattern" elif "refactor" in execution.prompt.lower(): return "refactoring_pattern" elif "integrat" in execution.prompt.lower(): return "integration_pattern" else: return "general_task_pattern" def _determine_applicability(self, execution: TaskExecution, skills: List[ExtractedSkill]) -> List[str]: """Determine which contexts this learning applies to.""" applicability = [ execution.project, f"tool_{execution.tools_used[0].lower()}" if execution.tools_used else "general", ] # Add skill categories categories = set(s.category for s in skills) applicability.extend(list(categories)) return list(set(applicability)) def _calculate_confidence(self, skills: List[ExtractedSkill], qa_results: Dict[str, Any]) -> float: """Calculate overall learning confidence.""" # Average skill confidence skill_confidence = sum(s.confidence for s in skills) / len(skills) if skills else 0.5 # QA pass rate qa_confidence = 0.9 if qa_results.get("passed", False) else 0.3 # Weighted average return (skill_confidence * 0.6) + (qa_confidence * 0.4) class SkillRecommender: """Recommends skills for future tasks based on learnings.""" def __init__(self): self.kg = KnowledgeGraph("research", skip_permission_check=True) def recommend_for_task(self, task_prompt: str, project: str = "general") -> List[Dict[str, Any]]: """ Recommend skills for a given task. Returns list of recommended skills with confidence scores. """ recommendations = [] # Search for relevant learnings query_terms = " ".join(task_prompt.split()[:5]) learnings = self.kg.search(query_terms, limit=10) for learning in learnings: if learning.get("error"): continue metadata = learning.get("metadata", {}) # Handle metadata as either dict or JSON string if isinstance(metadata, str): try: import json metadata = json.loads(metadata) except: metadata = {} if metadata.get("applicability") and project not in metadata.get("applicability", []): continue # Extract skills from learning skills = metadata.get("skills", []) confidence = metadata.get("confidence", 0.5) for skill in skills: recommendations.append({ "skill": skill, "source_learning": learning.get("name"), "confidence": confidence, "applicability": metadata.get("applicability", []), }) # Sort by confidence recommendations.sort(key=lambda x: x["confidence"], reverse=True) return recommendations[:10] # Top 10 recommendations def get_skill_profile(self) -> Dict[str, Any]: """Get overall profile of learned skills.""" skills = self.kg.list_entities(entity_type="finding") profile = { "total_learnings": len(skills), "by_category": {}, "top_skills": [], "extraction_time": datetime.now().isoformat(), } # Categorize for skill in skills: metadata = skill.get("metadata", {}) # Handle metadata as either dict or JSON string if isinstance(metadata, str): try: import json metadata = json.loads(metadata) except: metadata = {} category = metadata.get("category", "unknown") if category not in profile["by_category"]: profile["by_category"][category] = 0 profile["by_category"][category] += 1 # Top skills by frequency skill_counts = {} for skill in skills: metadata = skill.get("metadata", {}) # Handle metadata as either dict or JSON string if isinstance(metadata, str): try: import json metadata = json.loads(metadata) except: metadata = {} for skill_name in metadata.get("skills", []): skill_counts[skill_name] = skill_counts.get(skill_name, 0) + 1 profile["top_skills"] = sorted( skill_counts.items(), key=lambda x: x[1], reverse=True )[:10] return profile class SkillLearningSystem: """ Unified system for skill learning and knowledge extraction. Orchestrates the full pipeline: task execution → analysis → learning extraction → knowledge graph storage → recommendations. """ def __init__(self): self.analyzer = TaskAnalyzer() self.extractor = SkillExtractor() self.learning_engine = LearningEngine() self.recommender = SkillRecommender() def process_task_completion( self, task_data: Dict[str, Any], qa_results: Dict[str, Any] ) -> Dict[str, Any]: """ Full pipeline: process a completed task and extract learnings. Args: task_data: Task execution data qa_results: QA validation results Returns: Dict with extraction results and learning IDs """ # 1. Analyze task execution = self.analyzer.analyze_task(task_data) if not execution: return {"error": "Failed to analyze task"} # 2. Extract skills task_skills = self.extractor.extract_from_task(execution) qa_skills = self.extractor.extract_from_qa_results(qa_results) all_skills = task_skills + qa_skills # 3. Store skills in KG skill_ids = [] for skill in all_skills: try: skill_id = self.learning_engine.create_skill_entity(skill) skill_ids.append(skill_id) except Exception as e: print(f"Failed to store skill: {e}") # 4. Extract learning learning = self.learning_engine.extract_learning(execution, all_skills, qa_results) learning_id = None if learning: try: learning_id = self.learning_engine.store_learning(learning) except Exception as e: print(f"Failed to store learning: {e}") return { "success": True, "task_id": execution.task_id, "skills_extracted": len(all_skills), "skills_stored": len(skill_ids), "learning_created": learning_id is not None, "learning_id": learning_id, "skill_ids": skill_ids, "timestamp": datetime.now().isoformat(), } def get_recommendations(self, task_prompt: str, project: str = "general") -> List[Dict[str, Any]]: """Get skill recommendations for a task.""" return self.recommender.recommend_for_task(task_prompt, project) def get_learning_summary(self) -> Dict[str, Any]: """Get summary of all learnings and skill profile.""" return self.recommender.get_skill_profile() # --- CLI --- def main(): import argparse parser = argparse.ArgumentParser(description="Skill Learning Engine") parser.add_argument("command", choices=["process", "recommend", "summary", "test"]) parser.add_argument("--task-data", help="JSON file with task data") parser.add_argument("--qa-results", help="JSON file with QA results") parser.add_argument("--task-prompt", help="Task prompt for recommendations") parser.add_argument("--project", default="general", help="Project name") args = parser.parse_args() system = SkillLearningSystem() if args.command == "process": if not args.task_data or not args.qa_results: print("Error: --task-data and --qa-results required") exit(1) task_data = json.loads(Path(args.task_data).read_text()) qa_results = json.loads(Path(args.qa_results).read_text()) result = system.process_task_completion(task_data, qa_results) print(json.dumps(result, indent=2)) elif args.command == "recommend": if not args.task_prompt: print("Error: --task-prompt required") exit(1) recommendations = system.get_recommendations(args.task_prompt, args.project) print(json.dumps(recommendations, indent=2)) elif args.command == "summary": summary = system.get_learning_summary() print(json.dumps(summary, indent=2)) elif args.command == "test": print("=== Testing Skill Learning System ===\n") # Test task data test_task = { "task_id": "test_001", "prompt": "Refactor and optimize the database schema for better performance", "project": "overbits", "status": "success", "tools_used": ["Bash", "Read", "Edit"], "duration": 45.2, "result_summary": "Successfully refactored schema with 40% query improvement", "qa_passed": True, "timestamp": datetime.now().isoformat() } test_qa = { "passed": True, "results": { "syntax": True, "routes": True, "documentation": True, }, "summary": { "errors": 0, "warnings": 0, "info": 3, }, "timestamp": datetime.now().isoformat() } print("Processing test task...") result = system.process_task_completion(test_task, test_qa) print(json.dumps(result, indent=2)) print("\nGetting recommendations...") recommendations = system.get_recommendations( "Optimize database performance", "overbits" ) print(json.dumps(recommendations, indent=2)) print("\nLearning summary...") summary = system.get_learning_summary() print(json.dumps(summary, indent=2)) if __name__ == "__main__": main()