Refactor cockpit to use DockerTmuxController pattern
Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
351
lib/script_health_checker.py
Normal file
351
lib/script_health_checker.py
Normal file
@@ -0,0 +1,351 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Script Health Checker
|
||||
|
||||
Validates Python script quality across the orchestrator library:
|
||||
- Syntax validation
|
||||
- Import/dependency checking
|
||||
- Type hint completeness
|
||||
- Error handling patterns
|
||||
- Docstring coverage
|
||||
"""
|
||||
|
||||
import ast
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Tuple
|
||||
|
||||
|
||||
class ScriptHealthChecker:
|
||||
"""Check health of orchestrator Python scripts."""
|
||||
|
||||
LIB_DIR = Path('/opt/server-agents/orchestrator/lib')
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize script health checker."""
|
||||
self.issues = []
|
||||
|
||||
def validate_all_scripts(self) -> Dict:
|
||||
"""
|
||||
Validate all Python scripts in orchestrator lib.
|
||||
|
||||
Returns:
|
||||
Dict with validation results
|
||||
"""
|
||||
if not self.LIB_DIR.exists():
|
||||
return {
|
||||
'status': 'error',
|
||||
'message': f'Lib directory not found: {self.LIB_DIR}',
|
||||
'scripts': []
|
||||
}
|
||||
|
||||
scripts = list(self.LIB_DIR.glob('*.py'))
|
||||
results = {
|
||||
'total_scripts': len(scripts),
|
||||
'valid_scripts': 0,
|
||||
'scripts': [],
|
||||
'overall_health': 0
|
||||
}
|
||||
|
||||
for script_path in scripts:
|
||||
if script_path.name.startswith('_'):
|
||||
continue
|
||||
|
||||
result = self.validate_script(script_path)
|
||||
results['scripts'].append(result)
|
||||
|
||||
if result['status'] == 'valid':
|
||||
results['valid_scripts'] += 1
|
||||
|
||||
# Calculate overall health
|
||||
if results['total_scripts'] > 0:
|
||||
results['overall_health'] = (results['valid_scripts'] / results['total_scripts']) * 100
|
||||
|
||||
return results
|
||||
|
||||
def validate_script(self, script_path: Path) -> Dict:
|
||||
"""
|
||||
Validate a single Python script.
|
||||
|
||||
Args:
|
||||
script_path: Path to Python file
|
||||
|
||||
Returns:
|
||||
Dict with validation results
|
||||
"""
|
||||
result = {
|
||||
'script': script_path.name,
|
||||
'path': str(script_path),
|
||||
'status': 'unknown',
|
||||
'issues': [],
|
||||
'metrics': {}
|
||||
}
|
||||
|
||||
try:
|
||||
content = script_path.read_text(encoding='utf-8')
|
||||
except Exception as e:
|
||||
result['status'] = 'error'
|
||||
result['issues'].append(f"Cannot read file: {e}")
|
||||
return result
|
||||
|
||||
# 1. Syntax validation
|
||||
try:
|
||||
tree = ast.parse(content)
|
||||
result['metrics']['lines'] = len(content.split('\n'))
|
||||
except SyntaxError as e:
|
||||
result['status'] = 'syntax_error'
|
||||
result['issues'].append(f"Syntax error at line {e.lineno}: {e.msg}")
|
||||
return result
|
||||
|
||||
# 2. Import validation
|
||||
import_issues = self._check_imports(tree, script_path)
|
||||
result['issues'].extend(import_issues)
|
||||
|
||||
# 3. Type hint coverage
|
||||
type_coverage = self._check_type_hints(tree)
|
||||
result['metrics']['type_hint_coverage'] = type_coverage
|
||||
|
||||
# 4. Docstring coverage
|
||||
docstring_coverage = self._check_docstrings(tree)
|
||||
result['metrics']['docstring_coverage'] = docstring_coverage
|
||||
|
||||
# 5. Error handling patterns
|
||||
error_handling = self._check_error_handling(tree)
|
||||
result['metrics']['error_handling_score'] = error_handling
|
||||
|
||||
# 6. Class and function count
|
||||
result['metrics']['classes'] = len([n for n in ast.walk(tree) if isinstance(n, ast.ClassDef)])
|
||||
result['metrics']['functions'] = len([n for n in ast.walk(tree) if isinstance(n, ast.FunctionDef)])
|
||||
|
||||
# Determine overall status
|
||||
if not result['issues']:
|
||||
result['status'] = 'valid'
|
||||
elif len(result['issues']) <= 2:
|
||||
result['status'] = 'warnings'
|
||||
else:
|
||||
result['status'] = 'issues'
|
||||
|
||||
return result
|
||||
|
||||
def _check_imports(self, tree: ast.AST, script_path: Path) -> List[str]:
|
||||
"""Check for import issues."""
|
||||
issues = []
|
||||
imports = []
|
||||
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.Import):
|
||||
for alias in node.names:
|
||||
imports.append(alias.name)
|
||||
elif isinstance(node, ast.ImportFrom):
|
||||
if node.module:
|
||||
imports.append(node.module)
|
||||
|
||||
# Check for unused imports
|
||||
imported_names = set(imports)
|
||||
content = script_path.read_text()
|
||||
|
||||
for imported in imported_names:
|
||||
short_name = imported.split('.')[0]
|
||||
# Simple heuristic: if imported name doesn't appear in code
|
||||
if content.count(short_name) <= 1: # Only in import statement
|
||||
if not short_name.startswith('_'):
|
||||
issues.append(f"Possible unused import: {imported}")
|
||||
|
||||
# Check for missing imports (stdlib coverage)
|
||||
required_stdlib = {'json', 'time', 'pathlib', 'typing'}
|
||||
stdlib_used = imported_names & required_stdlib
|
||||
if stdlib_used != required_stdlib:
|
||||
missing = required_stdlib - stdlib_used
|
||||
for module in missing:
|
||||
if module in content:
|
||||
issues.append(f"Missing import: {module}")
|
||||
|
||||
return issues
|
||||
|
||||
def _check_type_hints(self, tree: ast.AST) -> float:
|
||||
"""Calculate type hint coverage percentage."""
|
||||
functions = [n for n in ast.walk(tree) if isinstance(n, ast.FunctionDef)]
|
||||
if not functions:
|
||||
return 100.0
|
||||
|
||||
functions_with_hints = 0
|
||||
for func in functions:
|
||||
# Check if function has return type hint
|
||||
if func.returns:
|
||||
# Check if parameters have type hints
|
||||
params_with_hints = 0
|
||||
for arg in func.args.args:
|
||||
if arg.annotation:
|
||||
params_with_hints += 1
|
||||
|
||||
# Consider function well-typed if most params are annotated
|
||||
if params_with_hints >= len(func.args.args) * 0.5:
|
||||
functions_with_hints += 1
|
||||
|
||||
return (functions_with_hints / len(functions)) * 100
|
||||
|
||||
def _check_docstrings(self, tree: ast.AST) -> float:
|
||||
"""Calculate docstring coverage percentage."""
|
||||
documented = 0
|
||||
total = 0
|
||||
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, (ast.FunctionDef, ast.ClassDef)):
|
||||
total += 1
|
||||
if ast.get_docstring(node):
|
||||
documented += 1
|
||||
|
||||
if total == 0:
|
||||
return 100.0
|
||||
|
||||
return (documented / total) * 100
|
||||
|
||||
def _check_error_handling(self, tree: ast.AST) -> float:
|
||||
"""Score error handling patterns (try/except coverage)."""
|
||||
try_blocks = 0
|
||||
functions = 0
|
||||
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.FunctionDef):
|
||||
functions += 1
|
||||
for child in ast.walk(node):
|
||||
if isinstance(child, ast.Try):
|
||||
try_blocks += 1
|
||||
|
||||
if functions == 0:
|
||||
return 100.0
|
||||
|
||||
# Score based on try/except ratio
|
||||
error_handling_ratio = (try_blocks / functions) * 100
|
||||
return min(100, error_handling_ratio)
|
||||
|
||||
def get_module_dependencies(self) -> Dict:
|
||||
"""Get all external module dependencies."""
|
||||
dependencies = set()
|
||||
|
||||
for script_path in self.LIB_DIR.glob('*.py'):
|
||||
if script_path.name.startswith('_'):
|
||||
continue
|
||||
|
||||
try:
|
||||
content = script_path.read_text()
|
||||
tree = ast.parse(content)
|
||||
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.Import):
|
||||
for alias in node.names:
|
||||
module = alias.name.split('.')[0]
|
||||
if not self._is_stdlib(module):
|
||||
dependencies.add(module)
|
||||
elif isinstance(node, ast.ImportFrom):
|
||||
if node.module:
|
||||
module = node.module.split('.')[0]
|
||||
if not self._is_stdlib(module):
|
||||
dependencies.add(module)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return {
|
||||
'external_dependencies': sorted(list(dependencies)),
|
||||
'stdlib_usage': True,
|
||||
'total_dependencies': len(dependencies)
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _is_stdlib(module_name: str) -> bool:
|
||||
"""Check if module is Python standard library."""
|
||||
stdlib_modules = {
|
||||
'json', 'time', 'pathlib', 'typing', 'os', 'sys', 'sqlite3',
|
||||
'datetime', 'shutil', 'signal', 'ast', 're', 'subprocess',
|
||||
'threading', 'multiprocessing', 'logging', 'argparse'
|
||||
}
|
||||
return module_name in stdlib_modules
|
||||
|
||||
def generate_script_health_report(self) -> Dict:
|
||||
"""Generate comprehensive script health report."""
|
||||
validation = self.validate_all_scripts()
|
||||
dependencies = self.get_module_dependencies()
|
||||
|
||||
# Calculate overall health score
|
||||
health_score = 0
|
||||
if validation['total_scripts'] > 0:
|
||||
health_score = validation['overall_health']
|
||||
|
||||
# Deduct for issues
|
||||
for script in validation['scripts']:
|
||||
if script['status'] == 'syntax_error':
|
||||
health_score -= 25
|
||||
elif script['status'] == 'issues':
|
||||
health_score -= 5
|
||||
elif script['status'] == 'warnings':
|
||||
health_score -= 2
|
||||
|
||||
health_score = max(0, min(100, health_score))
|
||||
|
||||
return {
|
||||
'health_score': round(health_score, 1),
|
||||
'status': 'healthy' if health_score >= 80 else 'degraded' if health_score >= 60 else 'critical',
|
||||
'total_scripts': validation['total_scripts'],
|
||||
'valid_scripts': validation['valid_scripts'],
|
||||
'scripts': validation['scripts'],
|
||||
'dependencies': dependencies,
|
||||
'recommendations': self._generate_recommendations(validation, health_score),
|
||||
'timestamp': time.time()
|
||||
}
|
||||
|
||||
def _generate_recommendations(self, validation: Dict, health_score: float) -> List[str]:
|
||||
"""Generate recommendations based on validation results."""
|
||||
recommendations = []
|
||||
|
||||
if health_score < 80:
|
||||
recommendations.append("[ATTENTION] Script health degraded: fix validation issues")
|
||||
|
||||
problematic_scripts = [s for s in validation['scripts'] if s['status'] in ['syntax_error', 'issues']]
|
||||
if problematic_scripts:
|
||||
recommendations.append(f"Fix {len(problematic_scripts)} script(s) with issues")
|
||||
|
||||
# Check docstring coverage
|
||||
low_doc_scripts = [
|
||||
s for s in validation['scripts']
|
||||
if s['metrics'].get('docstring_coverage', 100) < 50
|
||||
]
|
||||
if low_doc_scripts:
|
||||
recommendations.append("Improve docstring coverage in modules")
|
||||
|
||||
# Check type hints
|
||||
low_type_scripts = [
|
||||
s for s in validation['scripts']
|
||||
if s['metrics'].get('type_hint_coverage', 100) < 50
|
||||
]
|
||||
if low_type_scripts:
|
||||
recommendations.append("Add type hints to function signatures")
|
||||
|
||||
if not recommendations:
|
||||
recommendations.append("Script health excellent - no immediate action needed")
|
||||
|
||||
return recommendations
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import time
|
||||
checker = ScriptHealthChecker()
|
||||
|
||||
print("=" * 70)
|
||||
print("SCRIPT HEALTH CHECK")
|
||||
print("=" * 70)
|
||||
report = checker.generate_script_health_report()
|
||||
|
||||
print(f"Health Score: {report['health_score']}/100 ({report['status'].upper()})")
|
||||
print(f"Valid scripts: {report['valid_scripts']}/{report['total_scripts']}")
|
||||
print(f"External dependencies: {report['dependencies']['total_dependencies']}")
|
||||
|
||||
print("\nProblematic scripts:")
|
||||
for script in report['scripts']:
|
||||
if script['status'] != 'valid':
|
||||
print(f" {script['script']}: {script['status']}")
|
||||
for issue in script['issues'][:2]:
|
||||
print(f" - {issue}")
|
||||
|
||||
print("\nRecommendations:")
|
||||
for rec in report['recommendations']:
|
||||
print(f" - {rec}")
|
||||
Reference in New Issue
Block a user