Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
352 lines
12 KiB
Python
352 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Script Health Checker
|
|
|
|
Validates Python script quality across the orchestrator library:
|
|
- Syntax validation
|
|
- Import/dependency checking
|
|
- Type hint completeness
|
|
- Error handling patterns
|
|
- Docstring coverage
|
|
"""
|
|
|
|
import ast
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import List, Dict, Tuple
|
|
|
|
|
|
class ScriptHealthChecker:
|
|
"""Check health of orchestrator Python scripts."""
|
|
|
|
LIB_DIR = Path('/opt/server-agents/orchestrator/lib')
|
|
|
|
def __init__(self):
|
|
"""Initialize script health checker."""
|
|
self.issues = []
|
|
|
|
def validate_all_scripts(self) -> Dict:
|
|
"""
|
|
Validate all Python scripts in orchestrator lib.
|
|
|
|
Returns:
|
|
Dict with validation results
|
|
"""
|
|
if not self.LIB_DIR.exists():
|
|
return {
|
|
'status': 'error',
|
|
'message': f'Lib directory not found: {self.LIB_DIR}',
|
|
'scripts': []
|
|
}
|
|
|
|
scripts = list(self.LIB_DIR.glob('*.py'))
|
|
results = {
|
|
'total_scripts': len(scripts),
|
|
'valid_scripts': 0,
|
|
'scripts': [],
|
|
'overall_health': 0
|
|
}
|
|
|
|
for script_path in scripts:
|
|
if script_path.name.startswith('_'):
|
|
continue
|
|
|
|
result = self.validate_script(script_path)
|
|
results['scripts'].append(result)
|
|
|
|
if result['status'] == 'valid':
|
|
results['valid_scripts'] += 1
|
|
|
|
# Calculate overall health
|
|
if results['total_scripts'] > 0:
|
|
results['overall_health'] = (results['valid_scripts'] / results['total_scripts']) * 100
|
|
|
|
return results
|
|
|
|
def validate_script(self, script_path: Path) -> Dict:
|
|
"""
|
|
Validate a single Python script.
|
|
|
|
Args:
|
|
script_path: Path to Python file
|
|
|
|
Returns:
|
|
Dict with validation results
|
|
"""
|
|
result = {
|
|
'script': script_path.name,
|
|
'path': str(script_path),
|
|
'status': 'unknown',
|
|
'issues': [],
|
|
'metrics': {}
|
|
}
|
|
|
|
try:
|
|
content = script_path.read_text(encoding='utf-8')
|
|
except Exception as e:
|
|
result['status'] = 'error'
|
|
result['issues'].append(f"Cannot read file: {e}")
|
|
return result
|
|
|
|
# 1. Syntax validation
|
|
try:
|
|
tree = ast.parse(content)
|
|
result['metrics']['lines'] = len(content.split('\n'))
|
|
except SyntaxError as e:
|
|
result['status'] = 'syntax_error'
|
|
result['issues'].append(f"Syntax error at line {e.lineno}: {e.msg}")
|
|
return result
|
|
|
|
# 2. Import validation
|
|
import_issues = self._check_imports(tree, script_path)
|
|
result['issues'].extend(import_issues)
|
|
|
|
# 3. Type hint coverage
|
|
type_coverage = self._check_type_hints(tree)
|
|
result['metrics']['type_hint_coverage'] = type_coverage
|
|
|
|
# 4. Docstring coverage
|
|
docstring_coverage = self._check_docstrings(tree)
|
|
result['metrics']['docstring_coverage'] = docstring_coverage
|
|
|
|
# 5. Error handling patterns
|
|
error_handling = self._check_error_handling(tree)
|
|
result['metrics']['error_handling_score'] = error_handling
|
|
|
|
# 6. Class and function count
|
|
result['metrics']['classes'] = len([n for n in ast.walk(tree) if isinstance(n, ast.ClassDef)])
|
|
result['metrics']['functions'] = len([n for n in ast.walk(tree) if isinstance(n, ast.FunctionDef)])
|
|
|
|
# Determine overall status
|
|
if not result['issues']:
|
|
result['status'] = 'valid'
|
|
elif len(result['issues']) <= 2:
|
|
result['status'] = 'warnings'
|
|
else:
|
|
result['status'] = 'issues'
|
|
|
|
return result
|
|
|
|
def _check_imports(self, tree: ast.AST, script_path: Path) -> List[str]:
|
|
"""Check for import issues."""
|
|
issues = []
|
|
imports = []
|
|
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.Import):
|
|
for alias in node.names:
|
|
imports.append(alias.name)
|
|
elif isinstance(node, ast.ImportFrom):
|
|
if node.module:
|
|
imports.append(node.module)
|
|
|
|
# Check for unused imports
|
|
imported_names = set(imports)
|
|
content = script_path.read_text()
|
|
|
|
for imported in imported_names:
|
|
short_name = imported.split('.')[0]
|
|
# Simple heuristic: if imported name doesn't appear in code
|
|
if content.count(short_name) <= 1: # Only in import statement
|
|
if not short_name.startswith('_'):
|
|
issues.append(f"Possible unused import: {imported}")
|
|
|
|
# Check for missing imports (stdlib coverage)
|
|
required_stdlib = {'json', 'time', 'pathlib', 'typing'}
|
|
stdlib_used = imported_names & required_stdlib
|
|
if stdlib_used != required_stdlib:
|
|
missing = required_stdlib - stdlib_used
|
|
for module in missing:
|
|
if module in content:
|
|
issues.append(f"Missing import: {module}")
|
|
|
|
return issues
|
|
|
|
def _check_type_hints(self, tree: ast.AST) -> float:
|
|
"""Calculate type hint coverage percentage."""
|
|
functions = [n for n in ast.walk(tree) if isinstance(n, ast.FunctionDef)]
|
|
if not functions:
|
|
return 100.0
|
|
|
|
functions_with_hints = 0
|
|
for func in functions:
|
|
# Check if function has return type hint
|
|
if func.returns:
|
|
# Check if parameters have type hints
|
|
params_with_hints = 0
|
|
for arg in func.args.args:
|
|
if arg.annotation:
|
|
params_with_hints += 1
|
|
|
|
# Consider function well-typed if most params are annotated
|
|
if params_with_hints >= len(func.args.args) * 0.5:
|
|
functions_with_hints += 1
|
|
|
|
return (functions_with_hints / len(functions)) * 100
|
|
|
|
def _check_docstrings(self, tree: ast.AST) -> float:
|
|
"""Calculate docstring coverage percentage."""
|
|
documented = 0
|
|
total = 0
|
|
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, (ast.FunctionDef, ast.ClassDef)):
|
|
total += 1
|
|
if ast.get_docstring(node):
|
|
documented += 1
|
|
|
|
if total == 0:
|
|
return 100.0
|
|
|
|
return (documented / total) * 100
|
|
|
|
def _check_error_handling(self, tree: ast.AST) -> float:
|
|
"""Score error handling patterns (try/except coverage)."""
|
|
try_blocks = 0
|
|
functions = 0
|
|
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.FunctionDef):
|
|
functions += 1
|
|
for child in ast.walk(node):
|
|
if isinstance(child, ast.Try):
|
|
try_blocks += 1
|
|
|
|
if functions == 0:
|
|
return 100.0
|
|
|
|
# Score based on try/except ratio
|
|
error_handling_ratio = (try_blocks / functions) * 100
|
|
return min(100, error_handling_ratio)
|
|
|
|
def get_module_dependencies(self) -> Dict:
|
|
"""Get all external module dependencies."""
|
|
dependencies = set()
|
|
|
|
for script_path in self.LIB_DIR.glob('*.py'):
|
|
if script_path.name.startswith('_'):
|
|
continue
|
|
|
|
try:
|
|
content = script_path.read_text()
|
|
tree = ast.parse(content)
|
|
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.Import):
|
|
for alias in node.names:
|
|
module = alias.name.split('.')[0]
|
|
if not self._is_stdlib(module):
|
|
dependencies.add(module)
|
|
elif isinstance(node, ast.ImportFrom):
|
|
if node.module:
|
|
module = node.module.split('.')[0]
|
|
if not self._is_stdlib(module):
|
|
dependencies.add(module)
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
'external_dependencies': sorted(list(dependencies)),
|
|
'stdlib_usage': True,
|
|
'total_dependencies': len(dependencies)
|
|
}
|
|
|
|
@staticmethod
|
|
def _is_stdlib(module_name: str) -> bool:
|
|
"""Check if module is Python standard library."""
|
|
stdlib_modules = {
|
|
'json', 'time', 'pathlib', 'typing', 'os', 'sys', 'sqlite3',
|
|
'datetime', 'shutil', 'signal', 'ast', 're', 'subprocess',
|
|
'threading', 'multiprocessing', 'logging', 'argparse'
|
|
}
|
|
return module_name in stdlib_modules
|
|
|
|
def generate_script_health_report(self) -> Dict:
|
|
"""Generate comprehensive script health report."""
|
|
validation = self.validate_all_scripts()
|
|
dependencies = self.get_module_dependencies()
|
|
|
|
# Calculate overall health score
|
|
health_score = 0
|
|
if validation['total_scripts'] > 0:
|
|
health_score = validation['overall_health']
|
|
|
|
# Deduct for issues
|
|
for script in validation['scripts']:
|
|
if script['status'] == 'syntax_error':
|
|
health_score -= 25
|
|
elif script['status'] == 'issues':
|
|
health_score -= 5
|
|
elif script['status'] == 'warnings':
|
|
health_score -= 2
|
|
|
|
health_score = max(0, min(100, health_score))
|
|
|
|
return {
|
|
'health_score': round(health_score, 1),
|
|
'status': 'healthy' if health_score >= 80 else 'degraded' if health_score >= 60 else 'critical',
|
|
'total_scripts': validation['total_scripts'],
|
|
'valid_scripts': validation['valid_scripts'],
|
|
'scripts': validation['scripts'],
|
|
'dependencies': dependencies,
|
|
'recommendations': self._generate_recommendations(validation, health_score),
|
|
'timestamp': time.time()
|
|
}
|
|
|
|
def _generate_recommendations(self, validation: Dict, health_score: float) -> List[str]:
|
|
"""Generate recommendations based on validation results."""
|
|
recommendations = []
|
|
|
|
if health_score < 80:
|
|
recommendations.append("[ATTENTION] Script health degraded: fix validation issues")
|
|
|
|
problematic_scripts = [s for s in validation['scripts'] if s['status'] in ['syntax_error', 'issues']]
|
|
if problematic_scripts:
|
|
recommendations.append(f"Fix {len(problematic_scripts)} script(s) with issues")
|
|
|
|
# Check docstring coverage
|
|
low_doc_scripts = [
|
|
s for s in validation['scripts']
|
|
if s['metrics'].get('docstring_coverage', 100) < 50
|
|
]
|
|
if low_doc_scripts:
|
|
recommendations.append("Improve docstring coverage in modules")
|
|
|
|
# Check type hints
|
|
low_type_scripts = [
|
|
s for s in validation['scripts']
|
|
if s['metrics'].get('type_hint_coverage', 100) < 50
|
|
]
|
|
if low_type_scripts:
|
|
recommendations.append("Add type hints to function signatures")
|
|
|
|
if not recommendations:
|
|
recommendations.append("Script health excellent - no immediate action needed")
|
|
|
|
return recommendations
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import time
|
|
checker = ScriptHealthChecker()
|
|
|
|
print("=" * 70)
|
|
print("SCRIPT HEALTH CHECK")
|
|
print("=" * 70)
|
|
report = checker.generate_script_health_report()
|
|
|
|
print(f"Health Score: {report['health_score']}/100 ({report['status'].upper()})")
|
|
print(f"Valid scripts: {report['valid_scripts']}/{report['total_scripts']}")
|
|
print(f"External dependencies: {report['dependencies']['total_dependencies']}")
|
|
|
|
print("\nProblematic scripts:")
|
|
for script in report['scripts']:
|
|
if script['status'] != 'valid':
|
|
print(f" {script['script']}: {script['status']}")
|
|
for issue in script['issues'][:2]:
|
|
print(f" - {issue}")
|
|
|
|
print("\nRecommendations:")
|
|
for rec in report['recommendations']:
|
|
print(f" - {rec}")
|