Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
375 lines
14 KiB
Python
375 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Knowledge Graph Health Checker
|
|
|
|
Provides comprehensive KG health assessment including:
|
|
- Pattern detection for incomplete research
|
|
- Health score generation (0-100)
|
|
- Issue categorization and severity assessment
|
|
- Recommendations for fixing issues
|
|
"""
|
|
|
|
import time
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import List, Dict, Tuple
|
|
|
|
from kg_pattern_detector import KGPatternDetector
|
|
|
|
|
|
class KGHealthChecker:
|
|
"""Check and report on Knowledge Graph health status."""
|
|
|
|
def __init__(self):
|
|
"""Initialize the health checker."""
|
|
self.detector = KGPatternDetector()
|
|
self.kg_db_paths = [
|
|
'/etc/luz-knowledge/research.db',
|
|
'/etc/zen-swarm/memory/research.db',
|
|
]
|
|
|
|
def check_kg_completeness(self, time_scope_days: int = 30, verbose: bool = False) -> Dict:
|
|
"""
|
|
Quick KG completeness audit (last 30 days).
|
|
|
|
Returns:
|
|
Dict with:
|
|
- 'status': 'healthy' | 'degraded' | 'critical'
|
|
- 'incomplete_count': Number of incomplete research sessions
|
|
- 'total_sessions': Total sessions in time scope
|
|
- 'completeness_pct': Percentage of complete research
|
|
- 'findings': List of specific issues
|
|
- 'summary': Pattern breakdown
|
|
"""
|
|
result = self.detector.find_all_incomplete_research(
|
|
kg_db_paths=self.kg_db_paths,
|
|
time_scope_days=time_scope_days
|
|
)
|
|
|
|
findings = result['findings']
|
|
summary = result['summary']
|
|
|
|
# Determine status based on issue count
|
|
incomplete_count = len(findings)
|
|
total_sessions = self._count_total_sessions(time_scope_days)
|
|
|
|
completeness_pct = 100 - (incomplete_count / max(total_sessions, 1) * 100)
|
|
|
|
# Status determination
|
|
if completeness_pct >= 95:
|
|
status = 'healthy'
|
|
elif completeness_pct >= 80:
|
|
status = 'degraded'
|
|
else:
|
|
status = 'critical'
|
|
|
|
return {
|
|
'status': status,
|
|
'incomplete_count': incomplete_count,
|
|
'total_sessions': total_sessions,
|
|
'completeness_pct': round(completeness_pct, 1),
|
|
'findings': findings,
|
|
'summary': summary,
|
|
'timestamp': time.time(),
|
|
'time_scope_days': time_scope_days
|
|
}
|
|
|
|
def check_research_patterns(self, time_scope_days: int = 30, verbose: bool = False) -> Dict:
|
|
"""
|
|
Detailed pattern analysis across all 4 pattern types.
|
|
|
|
Returns:
|
|
Dict with:
|
|
- 'pattern_analysis': Breakdown by each pattern type
|
|
- 'severity_breakdown': High/Medium/Low counts
|
|
- 'recommendations': Specific fixes for each pattern
|
|
- 'health_score': 0-100 KG health rating
|
|
"""
|
|
result = self.detector.find_all_incomplete_research(
|
|
kg_db_paths=self.kg_db_paths,
|
|
time_scope_days=time_scope_days
|
|
)
|
|
|
|
findings = result['findings']
|
|
summary = result['summary']
|
|
|
|
# Categorize by pattern with detailed analysis
|
|
pattern_analysis = {}
|
|
for pattern in ['unresolved_question', 'incomplete_duration', 'claude_no_conclusion']:
|
|
pattern_findings = [f for f in findings if f['pattern'] == pattern]
|
|
pattern_analysis[pattern] = {
|
|
'count': len(pattern_findings),
|
|
'examples': pattern_findings[:3], # First 3 examples
|
|
'recommendation': self._get_pattern_recommendation(pattern)
|
|
}
|
|
|
|
# Severity breakdown
|
|
severity_breakdown = summary['by_severity']
|
|
|
|
# Calculate health score (0-100)
|
|
health_score = self._calculate_kg_health_score(summary, len(findings))
|
|
|
|
return {
|
|
'pattern_analysis': pattern_analysis,
|
|
'severity_breakdown': severity_breakdown,
|
|
'total_findings': len(findings),
|
|
'health_score': health_score,
|
|
'health_status': 'healthy' if health_score >= 80 else 'degraded' if health_score >= 60 else 'critical',
|
|
'recommendations': self._generate_recommendations(summary),
|
|
'timestamp': time.time()
|
|
}
|
|
|
|
def mark_incomplete_for_review(self, findings: List[Dict] = None,
|
|
time_scope_days: int = 30,
|
|
auto_mark: bool = False) -> Dict:
|
|
"""
|
|
Mark incomplete research sessions for review (does NOT auto-fix, only flags).
|
|
|
|
Args:
|
|
findings: List of findings to mark. If None, will detect first.
|
|
time_scope_days: Time scope for detection
|
|
auto_mark: If True, actually mark. If False, return preview.
|
|
|
|
Returns:
|
|
Dict with:
|
|
- 'marked_count': Number of sessions marked for review
|
|
- 'review_queue_path': Path to review queue
|
|
- 'actions': List of marking actions
|
|
"""
|
|
if findings is None:
|
|
result = self.detector.find_all_incomplete_research(
|
|
kg_db_paths=self.kg_db_paths,
|
|
time_scope_days=time_scope_days
|
|
)
|
|
findings = result['findings']
|
|
|
|
# Create review queue directory
|
|
review_queue_path = Path('/home/admin/conductor/review')
|
|
review_queue_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
actions = []
|
|
|
|
for finding in findings:
|
|
if finding['source'] == 'kg_database':
|
|
# Create review marker in conductor
|
|
review_id = f"{finding['id']}_review"
|
|
review_file = review_queue_path / f"{review_id}.json"
|
|
|
|
review_data = {
|
|
'entity_id': finding['id'],
|
|
'entity_name': finding.get('name', 'unknown'),
|
|
'pattern': finding['pattern'],
|
|
'severity': finding['severity'],
|
|
'example': finding['example'],
|
|
'marked_at': datetime.now().isoformat(),
|
|
'reason': 'Incomplete research: needs user follow-up',
|
|
'action_required': 'Review and complete research session'
|
|
}
|
|
|
|
action = {
|
|
'entity_id': finding['id'],
|
|
'review_file': str(review_file),
|
|
'status': 'preview' if not auto_mark else 'marked'
|
|
}
|
|
|
|
if auto_mark:
|
|
review_file.write_text(json.dumps(review_data, indent=2))
|
|
|
|
actions.append(action)
|
|
|
|
return {
|
|
'marked_count': len(actions),
|
|
'review_queue_path': str(review_queue_path),
|
|
'actions': actions,
|
|
'auto_mark': auto_mark,
|
|
'timestamp': time.time()
|
|
}
|
|
|
|
def generate_health_score(self, time_scope_days: int = 30) -> Dict:
|
|
"""
|
|
Generate comprehensive KG health score.
|
|
|
|
Returns:
|
|
Dict with:
|
|
- 'overall_score': 0-100 health rating
|
|
- 'component_scores': Breakdown by metric
|
|
- 'issues': List of specific problems
|
|
- 'actionable_fixes': Recommended actions
|
|
"""
|
|
audit = self.check_kg_completeness(time_scope_days)
|
|
patterns = self.check_research_patterns(time_scope_days)
|
|
|
|
findings = audit['findings']
|
|
|
|
# Component scoring (each 0-100)
|
|
component_scores = {
|
|
'completeness': audit['completeness_pct'],
|
|
'pattern_quality': patterns['health_score'],
|
|
'recency': self._calculate_recency_score(findings, time_scope_days),
|
|
'metadata_integrity': self._calculate_metadata_integrity(findings)
|
|
}
|
|
|
|
# Weighted overall score
|
|
overall_score = (
|
|
component_scores['completeness'] * 0.35 +
|
|
component_scores['pattern_quality'] * 0.35 +
|
|
component_scores['recency'] * 0.20 +
|
|
component_scores['metadata_integrity'] * 0.10
|
|
)
|
|
|
|
return {
|
|
'overall_score': round(overall_score, 1),
|
|
'component_scores': {k: round(v, 1) for k, v in component_scores.items()},
|
|
'status': 'healthy' if overall_score >= 80 else 'degraded' if overall_score >= 60 else 'critical',
|
|
'total_findings': len(findings),
|
|
'findings_by_severity': audit['summary']['by_severity'],
|
|
'actionable_fixes': patterns['recommendations'],
|
|
'timestamp': time.time()
|
|
}
|
|
|
|
def _count_total_sessions(self, time_scope_days: int) -> int:
|
|
"""Count total research sessions in time scope."""
|
|
cutoff_time = time.time() - (time_scope_days * 86400)
|
|
total = 0
|
|
|
|
try:
|
|
import sqlite3
|
|
for db_path in self.kg_db_paths:
|
|
if not Path(db_path).exists():
|
|
continue
|
|
|
|
with sqlite3.connect(db_path) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT COUNT(*)
|
|
FROM entities
|
|
WHERE type = 'session' AND domain = 'research'
|
|
AND updated_at > ?
|
|
""", (cutoff_time,))
|
|
count = cursor.fetchone()[0]
|
|
total += count
|
|
except Exception:
|
|
pass
|
|
|
|
return total
|
|
|
|
def _calculate_kg_health_score(self, summary: Dict, finding_count: int) -> float:
|
|
"""Calculate KG health score based on issue summary."""
|
|
# Start with 100
|
|
score = 100.0
|
|
|
|
# Deduct for each finding type
|
|
high_severity = summary['by_severity'].get('high', 0)
|
|
medium_severity = summary['by_severity'].get('medium', 0)
|
|
|
|
score -= high_severity * 5 # -5 per high severity issue
|
|
score -= medium_severity * 2 # -2 per medium severity issue
|
|
|
|
return max(0, min(100, score))
|
|
|
|
def _calculate_recency_score(self, findings: List[Dict], time_scope_days: int) -> float:
|
|
"""Score based on age of incomplete research (older = worse)."""
|
|
if not findings:
|
|
return 100.0
|
|
|
|
cutoff_time = time.time() - (time_scope_days * 86400)
|
|
now = time.time()
|
|
|
|
avg_age = sum(now - f['timestamp'] for f in findings) / len(findings)
|
|
avg_age_days = avg_age / 86400
|
|
|
|
# Score decreases with age
|
|
if avg_age_days <= 3:
|
|
return 90.0
|
|
elif avg_age_days <= 7:
|
|
return 75.0
|
|
elif avg_age_days <= 14:
|
|
return 60.0
|
|
else:
|
|
return 40.0
|
|
|
|
def _calculate_metadata_integrity(self, findings: List[Dict]) -> float:
|
|
"""Score based on completeness of finding metadata."""
|
|
if not findings:
|
|
return 100.0
|
|
|
|
required_fields = {'source', 'pattern', 'severity', 'example', 'timestamp'}
|
|
valid_count = 0
|
|
|
|
for finding in findings:
|
|
if required_fields.issubset(set(finding.keys())):
|
|
valid_count += 1
|
|
|
|
return (valid_count / len(findings)) * 100
|
|
|
|
def _get_pattern_recommendation(self, pattern: str) -> str:
|
|
"""Get specific recommendation for a pattern."""
|
|
recommendations = {
|
|
'unresolved_question': 'Resume research session with user input; complete analysis and synthesis',
|
|
'incomplete_duration': 'Research ended prematurely; needs deeper investigation or additional findings',
|
|
'claude_no_conclusion': 'Assistant analysis present but missing final conclusions; add summary section',
|
|
}
|
|
return recommendations.get(pattern, 'Review and complete research session')
|
|
|
|
def _generate_recommendations(self, summary: Dict) -> List[str]:
|
|
"""Generate ranked recommendations based on findings."""
|
|
recommendations = []
|
|
|
|
high_count = summary['by_severity'].get('high', 0)
|
|
if high_count > 0:
|
|
recommendations.append(
|
|
f"[URGENT] Address {high_count} high-severity incomplete research sessions"
|
|
)
|
|
|
|
pattern_counts = summary['by_pattern']
|
|
if pattern_counts.get('unresolved_question', 0) > 0:
|
|
recommendations.append(
|
|
"Resume incomplete research with user follow-up and complete analysis"
|
|
)
|
|
|
|
if pattern_counts.get('claude_no_conclusion', 0) > 0:
|
|
recommendations.append(
|
|
"Add missing conclusion/synthesis sections to Claude analysis"
|
|
)
|
|
|
|
if pattern_counts.get('incomplete_duration', 0) > 0:
|
|
recommendations.append(
|
|
"Investigate incomplete sessions with minimal duration; may need deeper research"
|
|
)
|
|
|
|
recommendations.append(
|
|
"Implement validation: block research completion if unresolved questions remain"
|
|
)
|
|
|
|
return recommendations
|
|
|
|
|
|
if __name__ == '__main__':
|
|
checker = KGHealthChecker()
|
|
|
|
print("=" * 70)
|
|
print("KG COMPLETENESS AUDIT")
|
|
print("=" * 70)
|
|
audit = checker.check_kg_completeness()
|
|
print(f"Status: {audit['status'].upper()}")
|
|
print(f"Completeness: {audit['completeness_pct']}% ({audit['incomplete_count']}/{audit['total_sessions']})")
|
|
print(f"Issues by pattern: {audit['summary']['by_pattern']}")
|
|
|
|
print("\n" + "=" * 70)
|
|
print("KG HEALTH SCORE")
|
|
print("=" * 70)
|
|
health = checker.generate_health_score()
|
|
print(f"Overall Score: {health['overall_score']}/100 ({health['status'].upper()})")
|
|
print(f"Component Scores: {health['component_scores']}")
|
|
print(f"\nRecommendations:")
|
|
for i, rec in enumerate(health['actionable_fixes'], 1):
|
|
print(f" {i}. {rec}")
|
|
|
|
print("\n" + "=" * 70)
|
|
print("REVIEW MARKING (PREVIEW)")
|
|
print("=" * 70)
|
|
review_result = checker.mark_incomplete_for_review(auto_mark=False)
|
|
print(f"Sessions to mark for review: {review_result['marked_count']}")
|
|
print(f"Review queue path: {review_result['review_queue_path']}")
|