#!/usr/bin/env python3 """ Known Issues Detector - Pattern-based bug detection and auto-fix Features: 1. Detect common error patterns in output 2. Match against known issues database 3. Suggest or auto-apply fixes 4. Learn from fixes applied 5. Report patterns to knowledge graph """ import json import re from pathlib import Path from typing import Dict, List, Optional, Tuple, Any from datetime import datetime from dataclasses import dataclass, asdict @dataclass class IssuePattern: """Pattern for detecting a known issue""" name: str description: str error_patterns: List[str] # Regex patterns to match fix: str # Description of fix auto_fixable: bool = False fix_command: Optional[str] = None # Shell command to auto-fix project: Optional[str] = None # Project-specific issue severity: str = "warning" # warning, error, critical @dataclass class DetectedIssue: """An issue detected in output""" pattern_name: str description: str severity: str message: str suggested_fix: str auto_fixable: bool detected_at: str class KnownIssuesDetector: """Detects and suggests fixes for known issues""" def __init__(self, issues_db_path: Optional[Path] = None): """Initialize detector with known issues database Args: issues_db_path: Path to JSON file with issue patterns """ self.db_path = issues_db_path or Path("/opt/server-agents/orchestrator/config/known_issues.json") self.patterns: List[IssuePattern] = [] self.detected_history: List[DetectedIssue] = [] self.fixes_applied: List[Dict[str, Any]] = [] self.load_patterns() self._initialize_default_patterns() def load_patterns(self) -> None: """Load issue patterns from database""" if self.db_path.exists(): try: data = json.loads(self.db_path.read_text()) for pattern_data in data.get("patterns", []): self.patterns.append(IssuePattern(**pattern_data)) except Exception as e: print(f"[Warning] Failed to load issue patterns: {e}") def _initialize_default_patterns(self) -> None: """Initialize built-in common issue patterns""" default_patterns = [ # Docker/Container issues IssuePattern( name="container_not_found", description="Docker container not found or exited", error_patterns=[ r"container .* not found", r"docker: error response", r"connection refused.*docker", r"no such container" ], fix="Restart container: luzia stop && luzia ", auto_fixable=True, severity="error" ), # Permission issues IssuePattern( name="permission_denied", description="Permission denied accessing file or command", error_patterns=[ r"permission denied", r"access denied", r"not allowed to access" ], fix="Check file permissions or use appropriate sudo/user context", auto_fixable=False, severity="error" ), # Dependency/Package issues IssuePattern( name="module_not_found", description="Python or Node module not found", error_patterns=[ r"ModuleNotFoundError", r"ImportError", r"cannot find module", r"npm ERR! code", r"not installed" ], fix="Install dependencies: npm install (Node) or pip install (Python)", auto_fixable=True, fix_command="npm install || pip install -r requirements.txt", severity="error" ), # Build/Compilation issues IssuePattern( name="build_failed", description="Build or compilation failed", error_patterns=[ r"build failed", r"compilation error", r"cannot find symbol", r"SyntaxError", r"type error" ], fix="Check build output, fix source code, retry build", auto_fixable=False, severity="error" ), # Configuration issues IssuePattern( name="config_corrupted", description="Configuration file is corrupted or invalid", error_patterns=[ r"invalid json", r"malformed yaml", r"configuration corrupted", r"parse error.*config" ], fix="Restore config from backup or regenerate", auto_fixable=True, fix_command="~/restore-claude-config.sh", severity="critical" ), # Network/Connection issues IssuePattern( name="connection_failed", description="Network connection failed", error_patterns=[ r"connection refused", r"network unreachable", r"timeout", r"ECONNREFUSED", r"cannot reach" ], fix="Check network/service status: ping, netstat, systemctl", auto_fixable=False, severity="warning" ), # Memory/Resource issues IssuePattern( name="out_of_memory", description="Out of memory or resource limit exceeded", error_patterns=[ r"out of memory", r"OOM", r"memory limit exceeded", r"no space left" ], fix="Cleanup old files/containers: luzia cleanup", auto_fixable=True, severity="critical" ), # Type/Validation issues IssuePattern( name="type_mismatch", description="Type checking or validation failure", error_patterns=[ r"type.*error", r"type check", r"expected .* got", r"validation error" ], fix="Review types and validate inputs, run type checker", auto_fixable=False, severity="warning" ), # File not found IssuePattern( name="file_not_found", description="Required file not found", error_patterns=[ r"no such file", r"ENOENT", r"cannot open", r"file not found" ], fix="Check file path and ensure it exists", auto_fixable=False, severity="error" ), ] # Add only if not already in patterns existing_names = {p.name for p in self.patterns} for pattern in default_patterns: if pattern.name not in existing_names: self.patterns.append(pattern) def detect_issues(self, output: str, error: str = "", project: Optional[str] = None) -> List[DetectedIssue]: """Detect known issues in output Args: output: Task output text error: Error message text project: Optional project name for project-specific patterns Returns: List of detected issues """ detected = [] combined = f"{output}\n{error}".lower() for pattern in self.patterns: # Skip project-specific patterns if not applicable if pattern.project and pattern.project != project: continue # Check if any error pattern matches for error_pattern in pattern.error_patterns: if re.search(error_pattern, combined, re.IGNORECASE): issue = DetectedIssue( pattern_name=pattern.name, description=pattern.description, severity=pattern.severity, message=f"Detected: {pattern.description}", suggested_fix=pattern.fix, auto_fixable=pattern.auto_fixable, detected_at=datetime.now().isoformat() ) detected.append(issue) self.detected_history.append(issue) break # Don't match same pattern twice return detected def suggest_fix(self, issue: DetectedIssue) -> str: """Get detailed fix suggestion for an issue Args: issue: Detected issue Returns: Detailed fix suggestion """ pattern = next((p for p in self.patterns if p.name == issue.pattern_name), None) if not pattern: return issue.suggested_fix if pattern.fix_command: return f"Run: {pattern.fix_command}\n\nDetails: {pattern.fix}" else: return pattern.fix def can_auto_fix(self, issue: DetectedIssue) -> bool: """Check if an issue can be auto-fixed Args: issue: Detected issue Returns: True if auto-fixable """ pattern = next((p for p in self.patterns if p.name == issue.pattern_name), None) return pattern and pattern.auto_fixable if pattern else False def get_fix_command(self, issue: DetectedIssue) -> Optional[str]: """Get the command to fix an issue Args: issue: Detected issue Returns: Shell command to execute, or None """ pattern = next((p for p in self.patterns if p.name == issue.pattern_name), None) return pattern.fix_command if pattern else None def record_fix_applied(self, issue: DetectedIssue, success: bool, fix_details: Optional[str] = None) -> None: """Record that a fix was attempted Args: issue: Issue that was fixed success: Whether fix was successful fix_details: Optional details about fix """ self.fixes_applied.append({ "pattern_name": issue.pattern_name, "applied_at": datetime.now().isoformat(), "success": success, "details": fix_details }) def get_recent_issues(self, limit: int = 10) -> List[Dict[str, Any]]: """Get recently detected issues Args: limit: Max issues to return Returns: List of recent issues """ return [asdict(issue) for issue in self.detected_history[-limit:]] def get_issue_statistics(self) -> Dict[str, Any]: """Get statistics about detected issues Returns: Statistics dict """ by_name = {} by_severity = {"warning": 0, "error": 0, "critical": 0} for issue in self.detected_history: by_name[issue.pattern_name] = by_name.get(issue.pattern_name, 0) + 1 by_severity[issue.severity] = by_severity.get(issue.severity, 0) + 1 fix_success = sum(1 for f in self.fixes_applied if f["success"]) fix_attempts = len(self.fixes_applied) return { "total_detected": len(self.detected_history), "by_pattern": by_name, "by_severity": by_severity, "fixes_attempted": fix_attempts, "fixes_successful": fix_success, "fix_success_rate": fix_success / fix_attempts if fix_attempts > 0 else 0 } def export_patterns(self, output_path: Path) -> None: """Export patterns to JSON file Args: output_path: Path to write patterns to """ data = { "patterns": [asdict(p) for p in self.patterns], "exported_at": datetime.now().isoformat() } output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(json.dumps(data, indent=2)) def add_pattern(self, pattern: IssuePattern) -> None: """Add a new issue pattern Args: pattern: Issue pattern to add """ # Check for duplicates if not any(p.name == pattern.name for p in self.patterns): self.patterns.append(pattern) # Save to database if self.db_path.exists(): self.export_patterns(self.db_path) def format_issue_report(self, issues: List[DetectedIssue]) -> str: """Format detected issues as readable report Args: issues: List of detected issues Returns: Formatted report text """ if not issues: return "No issues detected." report = ["# Issue Detection Report\n"] report.append(f"**Detection Time:** {datetime.now().isoformat()}\n") # Group by severity by_severity = {} for issue in issues: if issue.severity not in by_severity: by_severity[issue.severity] = [] by_severity[issue.severity].append(issue) # Write critical first, then error, then warning for severity in ["critical", "error", "warning"]: if severity in by_severity: report.append(f"\n## {severity.upper()} ({len(by_severity[severity])})\n") for issue in by_severity[severity]: report.append(f"### {issue.pattern_name}") report.append(f"**Description:** {issue.description}") report.append(f"**Message:** {issue.message}") report.append(f"**Suggested Fix:** {issue.suggested_fix}") if issue.auto_fixable: report.append("✅ **Auto-fixable:** Yes") report.append("") return "\n".join(report)