#!/usr/bin/env python3 """ Knowledge Graph Health Checker Provides comprehensive KG health assessment including: - Pattern detection for incomplete research - Health score generation (0-100) - Issue categorization and severity assessment - Recommendations for fixing issues """ import time import json from datetime import datetime, timedelta from pathlib import Path from typing import List, Dict, Tuple from kg_pattern_detector import KGPatternDetector class KGHealthChecker: """Check and report on Knowledge Graph health status.""" def __init__(self): """Initialize the health checker.""" self.detector = KGPatternDetector() self.kg_db_paths = [ '/etc/luz-knowledge/research.db', '/etc/zen-swarm/memory/research.db', ] def check_kg_completeness(self, time_scope_days: int = 30, verbose: bool = False) -> Dict: """ Quick KG completeness audit (last 30 days). Returns: Dict with: - 'status': 'healthy' | 'degraded' | 'critical' - 'incomplete_count': Number of incomplete research sessions - 'total_sessions': Total sessions in time scope - 'completeness_pct': Percentage of complete research - 'findings': List of specific issues - 'summary': Pattern breakdown """ result = self.detector.find_all_incomplete_research( kg_db_paths=self.kg_db_paths, time_scope_days=time_scope_days ) findings = result['findings'] summary = result['summary'] # Determine status based on issue count incomplete_count = len(findings) total_sessions = self._count_total_sessions(time_scope_days) completeness_pct = 100 - (incomplete_count / max(total_sessions, 1) * 100) # Status determination if completeness_pct >= 95: status = 'healthy' elif completeness_pct >= 80: status = 'degraded' else: status = 'critical' return { 'status': status, 'incomplete_count': incomplete_count, 'total_sessions': total_sessions, 'completeness_pct': round(completeness_pct, 1), 'findings': findings, 'summary': summary, 'timestamp': time.time(), 'time_scope_days': time_scope_days } def check_research_patterns(self, time_scope_days: int = 30, verbose: bool = False) -> Dict: """ Detailed pattern analysis across all 4 pattern types. Returns: Dict with: - 'pattern_analysis': Breakdown by each pattern type - 'severity_breakdown': High/Medium/Low counts - 'recommendations': Specific fixes for each pattern - 'health_score': 0-100 KG health rating """ result = self.detector.find_all_incomplete_research( kg_db_paths=self.kg_db_paths, time_scope_days=time_scope_days ) findings = result['findings'] summary = result['summary'] # Categorize by pattern with detailed analysis pattern_analysis = {} for pattern in ['unresolved_question', 'incomplete_duration', 'claude_no_conclusion']: pattern_findings = [f for f in findings if f['pattern'] == pattern] pattern_analysis[pattern] = { 'count': len(pattern_findings), 'examples': pattern_findings[:3], # First 3 examples 'recommendation': self._get_pattern_recommendation(pattern) } # Severity breakdown severity_breakdown = summary['by_severity'] # Calculate health score (0-100) health_score = self._calculate_kg_health_score(summary, len(findings)) return { 'pattern_analysis': pattern_analysis, 'severity_breakdown': severity_breakdown, 'total_findings': len(findings), 'health_score': health_score, 'health_status': 'healthy' if health_score >= 80 else 'degraded' if health_score >= 60 else 'critical', 'recommendations': self._generate_recommendations(summary), 'timestamp': time.time() } def mark_incomplete_for_review(self, findings: List[Dict] = None, time_scope_days: int = 30, auto_mark: bool = False) -> Dict: """ Mark incomplete research sessions for review (does NOT auto-fix, only flags). Args: findings: List of findings to mark. If None, will detect first. time_scope_days: Time scope for detection auto_mark: If True, actually mark. If False, return preview. Returns: Dict with: - 'marked_count': Number of sessions marked for review - 'review_queue_path': Path to review queue - 'actions': List of marking actions """ if findings is None: result = self.detector.find_all_incomplete_research( kg_db_paths=self.kg_db_paths, time_scope_days=time_scope_days ) findings = result['findings'] # Create review queue directory review_queue_path = Path('/home/admin/conductor/review') review_queue_path.mkdir(parents=True, exist_ok=True) actions = [] for finding in findings: if finding['source'] == 'kg_database': # Create review marker in conductor review_id = f"{finding['id']}_review" review_file = review_queue_path / f"{review_id}.json" review_data = { 'entity_id': finding['id'], 'entity_name': finding.get('name', 'unknown'), 'pattern': finding['pattern'], 'severity': finding['severity'], 'example': finding['example'], 'marked_at': datetime.now().isoformat(), 'reason': 'Incomplete research: needs user follow-up', 'action_required': 'Review and complete research session' } action = { 'entity_id': finding['id'], 'review_file': str(review_file), 'status': 'preview' if not auto_mark else 'marked' } if auto_mark: review_file.write_text(json.dumps(review_data, indent=2)) actions.append(action) return { 'marked_count': len(actions), 'review_queue_path': str(review_queue_path), 'actions': actions, 'auto_mark': auto_mark, 'timestamp': time.time() } def generate_health_score(self, time_scope_days: int = 30) -> Dict: """ Generate comprehensive KG health score. Returns: Dict with: - 'overall_score': 0-100 health rating - 'component_scores': Breakdown by metric - 'issues': List of specific problems - 'actionable_fixes': Recommended actions """ audit = self.check_kg_completeness(time_scope_days) patterns = self.check_research_patterns(time_scope_days) findings = audit['findings'] # Component scoring (each 0-100) component_scores = { 'completeness': audit['completeness_pct'], 'pattern_quality': patterns['health_score'], 'recency': self._calculate_recency_score(findings, time_scope_days), 'metadata_integrity': self._calculate_metadata_integrity(findings) } # Weighted overall score overall_score = ( component_scores['completeness'] * 0.35 + component_scores['pattern_quality'] * 0.35 + component_scores['recency'] * 0.20 + component_scores['metadata_integrity'] * 0.10 ) return { 'overall_score': round(overall_score, 1), 'component_scores': {k: round(v, 1) for k, v in component_scores.items()}, 'status': 'healthy' if overall_score >= 80 else 'degraded' if overall_score >= 60 else 'critical', 'total_findings': len(findings), 'findings_by_severity': audit['summary']['by_severity'], 'actionable_fixes': patterns['recommendations'], 'timestamp': time.time() } def _count_total_sessions(self, time_scope_days: int) -> int: """Count total research sessions in time scope.""" cutoff_time = time.time() - (time_scope_days * 86400) total = 0 try: import sqlite3 for db_path in self.kg_db_paths: if not Path(db_path).exists(): continue with sqlite3.connect(db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT COUNT(*) FROM entities WHERE type = 'session' AND domain = 'research' AND updated_at > ? """, (cutoff_time,)) count = cursor.fetchone()[0] total += count except Exception: pass return total def _calculate_kg_health_score(self, summary: Dict, finding_count: int) -> float: """Calculate KG health score based on issue summary.""" # Start with 100 score = 100.0 # Deduct for each finding type high_severity = summary['by_severity'].get('high', 0) medium_severity = summary['by_severity'].get('medium', 0) score -= high_severity * 5 # -5 per high severity issue score -= medium_severity * 2 # -2 per medium severity issue return max(0, min(100, score)) def _calculate_recency_score(self, findings: List[Dict], time_scope_days: int) -> float: """Score based on age of incomplete research (older = worse).""" if not findings: return 100.0 cutoff_time = time.time() - (time_scope_days * 86400) now = time.time() avg_age = sum(now - f['timestamp'] for f in findings) / len(findings) avg_age_days = avg_age / 86400 # Score decreases with age if avg_age_days <= 3: return 90.0 elif avg_age_days <= 7: return 75.0 elif avg_age_days <= 14: return 60.0 else: return 40.0 def _calculate_metadata_integrity(self, findings: List[Dict]) -> float: """Score based on completeness of finding metadata.""" if not findings: return 100.0 required_fields = {'source', 'pattern', 'severity', 'example', 'timestamp'} valid_count = 0 for finding in findings: if required_fields.issubset(set(finding.keys())): valid_count += 1 return (valid_count / len(findings)) * 100 def _get_pattern_recommendation(self, pattern: str) -> str: """Get specific recommendation for a pattern.""" recommendations = { 'unresolved_question': 'Resume research session with user input; complete analysis and synthesis', 'incomplete_duration': 'Research ended prematurely; needs deeper investigation or additional findings', 'claude_no_conclusion': 'Assistant analysis present but missing final conclusions; add summary section', } return recommendations.get(pattern, 'Review and complete research session') def _generate_recommendations(self, summary: Dict) -> List[str]: """Generate ranked recommendations based on findings.""" recommendations = [] high_count = summary['by_severity'].get('high', 0) if high_count > 0: recommendations.append( f"[URGENT] Address {high_count} high-severity incomplete research sessions" ) pattern_counts = summary['by_pattern'] if pattern_counts.get('unresolved_question', 0) > 0: recommendations.append( "Resume incomplete research with user follow-up and complete analysis" ) if pattern_counts.get('claude_no_conclusion', 0) > 0: recommendations.append( "Add missing conclusion/synthesis sections to Claude analysis" ) if pattern_counts.get('incomplete_duration', 0) > 0: recommendations.append( "Investigate incomplete sessions with minimal duration; may need deeper research" ) recommendations.append( "Implement validation: block research completion if unresolved questions remain" ) return recommendations if __name__ == '__main__': checker = KGHealthChecker() print("=" * 70) print("KG COMPLETENESS AUDIT") print("=" * 70) audit = checker.check_kg_completeness() print(f"Status: {audit['status'].upper()}") print(f"Completeness: {audit['completeness_pct']}% ({audit['incomplete_count']}/{audit['total_sessions']})") print(f"Issues by pattern: {audit['summary']['by_pattern']}") print("\n" + "=" * 70) print("KG HEALTH SCORE") print("=" * 70) health = checker.generate_health_score() print(f"Overall Score: {health['overall_score']}/100 ({health['status'].upper()})") print(f"Component Scores: {health['component_scores']}") print(f"\nRecommendations:") for i, rec in enumerate(health['actionable_fixes'], 1): print(f" {i}. {rec}") print("\n" + "=" * 70) print("REVIEW MARKING (PREVIEW)") print("=" * 70) review_result = checker.mark_incomplete_for_review(auto_mark=False) print(f"Sessions to mark for review: {review_result['marked_count']}") print(f"Review queue path: {review_result['review_queue_path']}")