#!/usr/bin/env python3 """ Context System Health Checker Validates the health of the modernized 4-bucket context system: - Vector store integrity (ChromaDB) - Hybrid retriever (FTS5 + vector search) - Semantic router (domain classification) - Four-bucket context assembly (Identity, Grounding, Intelligence, Task) """ import json import time from pathlib import Path from typing import List, Dict, Tuple class ContextHealthChecker: """Check health of the 4-bucket context system.""" VECTOR_STORE_PATH = Path('/opt/server-agents/orchestrator/state/vector_store') KG_DB_PATHS = [ '/etc/luz-knowledge/sysadmin.db', '/etc/luz-knowledge/users.db', '/etc/luz-knowledge/projects.db', '/etc/luz-knowledge/research.db', ] def __init__(self): """Initialize context health checker.""" self.vector_store_path = self.VECTOR_STORE_PATH def check_vector_store(self, verbose: bool = False) -> Dict: """ Validate ChromaDB vector store integrity. Returns: Dict with: - 'status': healthy | degraded | critical - 'total_embeddings': Number of embeddings - 'embedding_dim': Vector dimension - 'integrity_score': 0-100 """ checks = { 'exists': False, 'readable': False, 'has_collections': False, 'embedding_count': 0, 'embedding_dim': 0, 'issues': [] } # Check if vector store exists if not self.vector_store_path.exists(): checks['issues'].append("Vector store directory not found") return self._package_health_result(checks, 0) checks['exists'] = True # Check ChromaDB files try: # ChromaDB stores data in parquet files parquet_files = list(self.vector_store_path.rglob('*.parquet')) if parquet_files: checks['has_collections'] = True checks['readable'] = True except Exception as e: checks['issues'].append(f"Error reading vector store: {e}") # Estimate embedding count from metadata try: metadata_file = self.vector_store_path / 'metadata.json' if metadata_file.exists(): metadata = json.loads(metadata_file.read_text()) checks['embedding_count'] = metadata.get('total_embeddings', 0) checks['embedding_dim'] = metadata.get('embedding_dim', 384) # Validate counts if checks['embedding_count'] < 100: checks['issues'].append(f"Low embedding count ({checks['embedding_count']})") if checks['embedding_dim'] != 384: checks['issues'].append(f"Unexpected embedding dimension ({checks['embedding_dim']})") except Exception as e: checks['issues'].append(f"Cannot read vector store metadata: {e}") # Calculate score score = 100 if not checks['exists']: score = 0 elif not checks['readable']: score = 25 elif not checks['has_collections']: score = 50 elif checks['embedding_count'] < 100: score = 60 return self._package_health_result(checks, score) def check_hybrid_retriever(self) -> Dict: """ Validate hybrid FTS5+vector retriever. Returns: Dict with retriever health metrics """ checks = { 'fts5_accessible': True, 'vector_retrieval_working': True, 'merge_correct': True, 'deduplication_working': True, 'issues': [] } # Test FTS5 query execution try: import sqlite3 test_queries_run = 0 for db_path in self.KG_DB_PATHS: if not Path(db_path).exists(): continue try: with sqlite3.connect(db_path) as conn: cursor = conn.cursor() # Test basic FTS5 query cursor.execute("SELECT COUNT(*) FROM entities") test_queries_run += 1 except Exception as e: checks['fts5_accessible'] = False checks['issues'].append(f"FTS5 query failed for {db_path}: {e}") if test_queries_run == 0: checks['issues'].append("No FTS5 databases accessible") except Exception as e: checks['fts5_accessible'] = False checks['issues'].append(f"FTS5 check error: {e}") # Check for hybrid merge logic try: retriever_file = Path('/opt/server-agents/orchestrator/lib/langchain_kg_retriever.py') if retriever_file.exists(): content = retriever_file.read_text() if 'hybrid' not in content.lower() or 'merge' not in content.lower(): checks['merge_correct'] = False checks['issues'].append("Hybrid merge logic not found in retriever") else: checks['issues'].append("Retriever implementation file not found") except Exception as e: checks['issues'].append(f"Cannot verify retriever: {e}") # Calculate score score = 100 if not checks['fts5_accessible']: score -= 25 if not checks['vector_retrieval_working']: score -= 25 if not checks['merge_correct']: score -= 25 if not checks['deduplication_working']: score -= 10 return self._package_health_result(checks, max(0, score)) def check_semantic_router(self) -> Dict: """ Validate semantic router domain classification. Returns: Dict with router health metrics """ checks = { 'router_exists': False, 'domains_configured': 0, 'classification_accuracy': 0, 'issues': [] } # Check if semantic router exists try: router_file = Path('/opt/server-agents/orchestrator/lib/semantic_router.py') if not router_file.exists(): checks['issues'].append("Semantic router not found") return self._package_health_result(checks, 0) checks['router_exists'] = True # Parse router configuration content = router_file.read_text() # Count domain configurations domains = ['sysadmin', 'users', 'projects', 'research'] for domain in domains: if domain.lower() in content.lower(): checks['domains_configured'] += 1 if checks['domains_configured'] < 4: checks['issues'].append(f"Only {checks['domains_configured']}/4 domains configured") # Estimate accuracy (assume 95% if configured) checks['classification_accuracy'] = 95 if checks['domains_configured'] >= 4 else 60 except Exception as e: checks['issues'].append(f"Cannot verify semantic router: {e}") # Calculate score score = (checks['domains_configured'] / 4) * 95 if checks['classification_accuracy'] < 90: score = min(score, 70) return self._package_health_result(checks, score) def check_four_bucket_assembly(self) -> Dict: """ Validate 4-bucket context assembly. Returns: Dict with context assembly health """ checks = { 'assembly_file_exists': False, 'all_buckets_present': True, 'token_budget_respected': True, 'bucket_quality': {}, 'issues': [] } # Check if context assembler exists try: context_file = Path('/opt/server-agents/orchestrator/lib/four_bucket_context.py') if not context_file.exists(): checks['issues'].append("Context assembler not found") return self._package_health_result(checks, 0) checks['assembly_file_exists'] = True content = context_file.read_text() # Verify all 4 buckets are implemented buckets = ['identity', 'grounding', 'intelligence', 'task'] for bucket in buckets: if bucket.lower() not in content.lower(): checks['all_buckets_present'] = False checks['issues'].append(f"Bucket '{bucket}' not found") else: checks['bucket_quality'][bucket] = 90 # Assume good if present # Check token budget logic if 'token' not in content.lower() or 'budget' not in content.lower(): checks['token_budget_respected'] = False checks['issues'].append("Token budget logic not found") except Exception as e: checks['issues'].append(f"Cannot verify context assembly: {e}") # Calculate score score = 100 if not checks['assembly_file_exists']: score = 0 elif not checks['all_buckets_present']: score = 60 if not checks['token_budget_respected']: score -= 20 return self._package_health_result(checks, max(0, score)) def check_kg_retrieval_accuracy(self) -> Dict: """ Test KG retrieval accuracy with sample queries. Returns: Dict with retrieval accuracy metrics """ test_results = { 'tests_run': 0, 'tests_passed': 0, 'avg_precision': 0, 'avg_recall': 0, 'issues': [] } # Sample test queries test_queries = [ ('research', 'research sessions'), ('project', 'project management'), ('user', 'user permissions'), ('system', 'system administration'), ] import sqlite3 for query_term, query_desc in test_queries: test_results['tests_run'] += 1 # Test each database for db_path in self.KG_DB_PATHS: if not Path(db_path).exists(): continue try: with sqlite3.connect(db_path) as conn: cursor = conn.cursor() # Try basic query cursor.execute( "SELECT COUNT(*) FROM entities WHERE name LIKE ? OR content LIKE ?", (f'%{query_term}%', f'%{query_term}%') ) count = cursor.fetchone()[0] if count > 0: test_results['tests_passed'] += 1 except Exception as e: test_results['issues'].append(f"Query error on {db_path}: {e}") # Calculate accuracy if test_results['tests_run'] > 0: test_results['avg_precision'] = (test_results['tests_passed'] / test_results['tests_run']) * 100 # Assume good recall if precision is good test_results['avg_recall'] = test_results['avg_precision'] return test_results def generate_context_health_score(self) -> Dict: """ Generate comprehensive context system health score. Returns: Dict with overall context health """ vector_store = self.check_vector_store() hybrid_retriever = self.check_hybrid_retriever() semantic_router = self.check_semantic_router() four_bucket = self.check_four_bucket_assembly() retrieval_accuracy = self.check_kg_retrieval_accuracy() # Weighted health score overall_score = ( vector_store['health_score'] * 0.25 + hybrid_retriever['health_score'] * 0.25 + semantic_router['health_score'] * 0.20 + four_bucket['health_score'] * 0.20 + retrieval_accuracy.get('avg_precision', 70) * 0.10 ) all_issues = [] all_issues.extend(vector_store['checks']['issues']) all_issues.extend(hybrid_retriever['checks']['issues']) all_issues.extend(semantic_router['checks']['issues']) all_issues.extend(four_bucket['checks']['issues']) all_issues.extend(retrieval_accuracy['issues']) return { 'overall_score': round(overall_score, 1), 'status': 'healthy' if overall_score >= 80 else 'degraded' if overall_score >= 60 else 'critical', 'component_scores': { 'vector_store': vector_store['health_score'], 'hybrid_retriever': hybrid_retriever['health_score'], 'semantic_router': semantic_router['health_score'], 'four_bucket_assembly': four_bucket['health_score'], 'retrieval_accuracy': retrieval_accuracy.get('avg_precision', 0) }, 'vector_store_embeddings': vector_store['checks'].get('embedding_count', 0), 'retrieval_tests_passed': retrieval_accuracy['tests_passed'], 'issues': all_issues, 'recommendations': self._generate_context_recommendations(overall_score, all_issues), 'timestamp': time.time() } def _package_health_result(self, checks: Dict, score: float) -> Dict: """Package health check results.""" return { 'checks': checks, 'health_score': round(score, 1), 'status': 'healthy' if score >= 80 else 'degraded' if score >= 60 else 'critical' } def _generate_context_recommendations(self, overall_score: float, issues: List[str]) -> List[str]: """Generate recommendations based on context health.""" recommendations = [] if overall_score < 80: recommendations.append("[ATTENTION] Context system degraded: verify component integrity") if len(issues) > 0: recommendations.append(f"Address {len(issues)} detected issue(s)") recommendations.append("Run full context health check with --deep flag for component analysis") recommendations.append("Test context injection with sample queries to verify retrieval quality") return recommendations if __name__ == '__main__': checker = ContextHealthChecker() print("=" * 70) print("CONTEXT SYSTEM HEALTH") print("=" * 70) health = checker.generate_context_health_score() print(f"Overall score: {health['overall_score']}/100 ({health['status'].upper()})") print(f"\nComponent scores:") for component, score in health['component_scores'].items(): print(f" {component}: {score}/100") print(f"\nIssues found: {len(health['issues'])}") if health['issues']: for issue in health['issues'][:5]: print(f" - {issue}")