Files
luzia/lib/structural_analysis.py
admin ec33ac1936 Refactor cockpit to use DockerTmuxController pattern
Based on claude-code-tools TmuxCLIController, this refactor:

- Added DockerTmuxController class for robust tmux session management
- Implements send_keys() with configurable delay_enter
- Implements capture_pane() for output retrieval
- Implements wait_for_prompt() for pattern-based completion detection
- Implements wait_for_idle() for content-hash-based idle detection
- Implements wait_for_shell_prompt() for shell prompt detection

Also includes workflow improvements:
- Pre-task git snapshot before agent execution
- Post-task commit protocol in agent guidelines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:42:16 -03:00

621 lines
22 KiB
Python

#!/usr/bin/env python3
"""
Structural Analysis Tool for Luzia Project
Scans project code structures, generates analysis reports, and saves structure
data to the shared knowledge graph for cross-project learning.
Features:
- Python AST-based code structure analysis
- Dependency graph visualization
- Module complexity metrics
- Code pattern detection
- JSON-based analysis reports
- Knowledge graph integration
"""
import ast
import json
import re
from pathlib import Path
from typing import Dict, List, Set, Tuple, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
import sys
# Import our modules
sys.path.insert(0, str(Path(__file__).parent))
try:
from knowledge_graph import KnowledgeGraph, RELATION_TYPES
except ImportError:
KnowledgeGraph = None
@dataclass
class CodeMetrics:
"""Code complexity metrics."""
total_lines: int = 0
code_lines: int = 0
comment_lines: int = 0
blank_lines: int = 0
functions: int = 0
classes: int = 0
imports: int = 0
cyclomatic_complexity: int = 0
@dataclass
class ComponentInfo:
"""Information about a code component."""
name: str
type: str
path: str
line_number: int = 0
docstring: Optional[str] = None
metrics: Optional[CodeMetrics] = None
dependencies: List[str] = None
children: List[str] = None
def __post_init__(self):
if self.dependencies is None:
self.dependencies = []
if self.children is None:
self.children = []
def to_dict(self) -> Dict:
"""Convert to dictionary for JSON serialization."""
data = asdict(self)
if self.metrics:
data['metrics'] = asdict(self.metrics)
return data
class CodeStructureAnalyzer:
"""Analyzes Python code structure using AST."""
def __init__(self, project_path: Path):
self.project_path = Path(project_path)
self.components: Dict[str, ComponentInfo] = {}
self.dependencies: Dict[str, Set[str]] = {}
self.imports: Dict[str, List[Tuple[str, str]]] = {}
self.patterns: Dict[str, List[str]] = {}
def analyze_file(self, file_path: Path) -> Dict[str, Any]:
"""Analyze a single Python file."""
if not file_path.exists():
return {"error": f"File not found: {file_path}"}
try:
content = file_path.read_text()
except Exception as e:
return {"error": f"Could not read file: {e}"}
lines = content.split('\n')
total_lines = len(lines)
blank_lines = sum(1 for line in lines if not line.strip())
comment_lines = sum(1 for line in lines if line.strip().startswith('#'))
code_lines = total_lines - blank_lines - comment_lines
metrics = CodeMetrics(
total_lines=total_lines,
code_lines=code_lines,
comment_lines=comment_lines,
blank_lines=blank_lines
)
try:
tree = ast.parse(content, str(file_path))
except SyntaxError as e:
return {"error": f"Syntax error: {e}"}
result = {
"path": str(file_path),
"metrics": asdict(metrics),
"components": [],
"imports": [],
"patterns": []
}
visitor = ASTAnalyzer(file_path)
visitor.visit(tree)
result["components"] = [comp.to_dict() for comp in visitor.components.values()]
result["imports"] = visitor.imports
result["patterns"] = visitor.patterns
result["metrics"]["functions"] = len(visitor.functions)
result["metrics"]["classes"] = len(visitor.classes)
result["metrics"]["imports"] = len(visitor.imports)
result["metrics"]["cyclomatic_complexity"] = visitor.cyclomatic_complexity
return result
def analyze_directory(self, directory: Path = None) -> Dict[str, Any]:
"""Analyze all Python files in a directory."""
if directory is None:
directory = self.project_path
if not directory.exists():
return {"error": f"Directory not found: {directory}"}
py_files = list(directory.rglob("*.py"))
if not py_files:
return {"error": "No Python files found"}
results = {
"directory": str(directory),
"file_count": len(py_files),
"files": {},
"summary": {}
}
total_metrics = CodeMetrics()
for py_file in py_files:
try:
file_result = self.analyze_file(py_file)
results["files"][str(py_file)] = file_result
if "metrics" in file_result:
m = file_result["metrics"]
total_metrics.total_lines += m.get("total_lines", 0)
total_metrics.code_lines += m.get("code_lines", 0)
total_metrics.comment_lines += m.get("comment_lines", 0)
total_metrics.blank_lines += m.get("blank_lines", 0)
total_metrics.functions += m.get("functions", 0)
total_metrics.classes += m.get("classes", 0)
total_metrics.imports += m.get("imports", 0)
except Exception as e:
results["files"][str(py_file)] = {"error": str(e)}
results["summary"] = asdict(total_metrics)
return results
def build_dependency_graph(self) -> Dict[str, List[str]]:
"""Build module dependency graph."""
graph = {}
for module, imports in self.imports.items():
deps = []
for imp_name, imp_from in imports:
if imp_from:
deps.append(imp_from)
else:
deps.append(imp_name.split('.')[0])
graph[module] = list(set(deps))
return graph
def detect_patterns(self) -> Dict[str, List[str]]:
"""Detect common code patterns."""
patterns = {
"singleton": [],
"factory": [],
"observer": [],
"adapter": [],
"decorator": [],
"context_manager": [],
"dataclass": [],
}
return patterns
class ASTAnalyzer(ast.NodeVisitor):
"""AST visitor for code structure analysis."""
def __init__(self, file_path: Path):
self.file_path = file_path
self.components: Dict[str, ComponentInfo] = {}
self.imports: List[Tuple[str, str]] = []
self.patterns: List[Dict] = []
self.functions: List[str] = []
self.classes: List[str] = []
self.cyclomatic_complexity: int = 1
self.current_class: Optional[str] = None
def visit_Import(self, node: ast.Import):
"""Handle import statements."""
for alias in node.names:
self.imports.append((alias.name, ""))
self.generic_visit(node)
def visit_ImportFrom(self, node: ast.ImportFrom):
"""Handle from...import statements."""
module = node.module or ""
for alias in node.names:
self.imports.append((alias.name, module))
self.generic_visit(node)
def visit_ClassDef(self, node: ast.ClassDef):
"""Handle class definitions."""
self.classes.append(node.name)
docstring = ast.get_docstring(node)
self._detect_class_patterns(node)
component = ComponentInfo(
name=node.name,
type="class",
path=str(self.file_path),
line_number=node.lineno,
docstring=docstring,
)
self.components[f"{node.name}"] = component
old_class = self.current_class
self.current_class = node.name
self.generic_visit(node)
self.current_class = old_class
def visit_FunctionDef(self, node: ast.FunctionDef):
"""Handle function definitions."""
self.functions.append(node.name)
docstring = ast.get_docstring(node)
complexity = self._calculate_complexity(node)
self.cyclomatic_complexity += complexity - 1
if self.current_class:
comp_name = f"{self.current_class}.{node.name}"
else:
comp_name = node.name
component = ComponentInfo(
name=node.name,
type="function",
path=str(self.file_path),
line_number=node.lineno,
docstring=docstring,
)
self.components[comp_name] = component
self.generic_visit(node)
def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef):
"""Handle async function definitions."""
self.visit_FunctionDef(node)
def _detect_class_patterns(self, node: ast.ClassDef):
"""Detect design patterns in classes."""
methods = {m.name for m in node.body if isinstance(m, ast.FunctionDef)}
if "__enter__" in methods and "__exit__" in methods:
self.patterns.append({
"name": "context_manager",
"class": node.name,
"line": node.lineno
})
for decorator in node.decorator_list:
if isinstance(decorator, ast.Name) and decorator.id == "dataclass":
self.patterns.append({
"name": "dataclass",
"class": node.name,
"line": node.lineno
})
def _calculate_complexity(self, node: ast.FunctionDef) -> int:
"""Calculate cyclomatic complexity for a function."""
complexity = 1
for child in ast.walk(node):
if isinstance(child, (ast.If, ast.While, ast.For, ast.ExceptHandler)):
complexity += 1
elif isinstance(child, ast.BoolOp):
complexity += len(child.values) - 1
return complexity
class StructuralAnalysisReport:
"""Generates and manages structural analysis reports."""
def __init__(self, project_path: Path, project_name: str = None):
self.project_path = Path(project_path)
self.project_name = project_name or self.project_path.name
self.analyzer = CodeStructureAnalyzer(self.project_path)
self.report: Dict[str, Any] = {}
def generate_report(self) -> Dict[str, Any]:
"""Generate comprehensive structural analysis report."""
print(f"Analyzing project: {self.project_name}")
print(f"Project path: {self.project_path}")
analysis = self.analyzer.analyze_directory()
self.report = {
"project": self.project_name,
"path": str(self.project_path),
"timestamp": datetime.now().isoformat(),
"analysis": analysis,
"dependency_graph": self.analyzer.build_dependency_graph(),
"patterns": self.analyzer.detect_patterns(),
"insights": self._generate_insights(analysis)
}
return self.report
def _generate_insights(self, analysis: Dict) -> Dict[str, Any]:
"""Generate insights from analysis data."""
summary = analysis.get("summary", {})
insights = {
"complexity_assessment": self._assess_complexity(summary),
"code_quality_metrics": self._calculate_quality_metrics(summary),
"hotspots": self._identify_hotspots(analysis),
"recommendations": self._generate_recommendations(summary, analysis)
}
return insights
def _assess_complexity(self, summary: Dict) -> Dict:
"""Assess code complexity."""
cyclomatic = summary.get("cyclomatic_complexity", 0)
functions = summary.get("functions", 1)
avg_complexity = cyclomatic / functions if functions > 0 else 0
if avg_complexity < 5:
level = "low"
elif avg_complexity < 10:
level = "moderate"
else:
level = "high"
return {
"level": level,
"cyclomatic_complexity": cyclomatic,
"functions": functions,
"average_complexity_per_function": round(avg_complexity, 2),
"assessment": f"Average cyclomatic complexity of {round(avg_complexity, 2)} per function"
}
def _calculate_quality_metrics(self, summary: Dict) -> Dict:
"""Calculate code quality metrics."""
total = summary.get("total_lines", 1)
code = summary.get("code_lines", 0)
comments = summary.get("comment_lines", 0)
blank = summary.get("blank_lines", 0)
comment_ratio = (comments / code * 100) if code > 0 else 0
blank_ratio = (blank / total * 100) if total > 0 else 0
code_ratio = (code / total * 100) if total > 0 else 0
return {
"code_ratio": round(code_ratio, 2),
"comment_ratio": round(comment_ratio, 2),
"blank_ratio": round(blank_ratio, 2),
"total_lines": total,
"assessment": "Good" if comment_ratio > 10 else "Needs more documentation"
}
def _identify_hotspots(self, analysis: Dict) -> List[Dict]:
"""Identify complex modules (hotspots)."""
hotspots = []
files = analysis.get("files", {})
for file_path, file_data in files.items():
if isinstance(file_data, dict) and "metrics" in file_data:
metrics = file_data["metrics"]
complexity = metrics.get("cyclomatic_complexity", 0)
functions = metrics.get("functions", 0)
if functions > 0 and complexity / functions > 8:
hotspots.append({
"file": file_path,
"complexity": complexity,
"functions": functions,
"avg_complexity_per_function": round(complexity / functions, 2)
})
hotspots.sort(key=lambda x: x["complexity"], reverse=True)
return hotspots[:10]
def _generate_recommendations(self, summary: Dict, analysis: Dict) -> List[str]:
"""Generate improvement recommendations."""
recommendations = []
cyclomatic = summary.get("cyclomatic_complexity", 0)
functions = summary.get("functions", 1)
comments = summary.get("comment_lines", 0)
code = summary.get("code_lines", 1)
if cyclomatic / functions > 10 if functions > 0 else False:
recommendations.append("Consider refactoring functions with high cyclomatic complexity")
if comments / code * 100 < 10 if code > 0 else False:
recommendations.append("Increase code documentation - aim for 10%+ comment ratio")
hotspots = self._identify_hotspots(analysis)
if hotspots:
recommendations.append(f"Focus refactoring on {len(hotspots)} high-complexity modules")
return recommendations
def save_report(self, output_path: Path = None) -> Path:
"""Save report to JSON file."""
if output_path is None:
output_path = self.project_path / f"structure-analysis-{datetime.now().strftime('%Y%m%d-%H%M%S')}.json"
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(self.report, indent=2))
print(f"Report saved to: {output_path}")
return output_path
def save_to_knowledge_graph(self) -> Dict[str, Any]:
"""Save analysis to shared knowledge graph."""
if KnowledgeGraph is None:
return {"error": "Knowledge graph not available"}
try:
kg = KnowledgeGraph("projects")
except Exception as e:
return {"error": f"Could not open knowledge graph: {e}"}
result = {"entities_added": 0, "relations_added": 0, "errors": []}
try:
analysis = self.report.get("analysis", {})
summary = analysis.get("summary", {})
content = f"""Structural Analysis Report
Project: {self.project_name}
Path: {self.project_path}
Metrics:
- Total Lines: {summary.get('total_lines', 0)}
- Code Lines: {summary.get('code_lines', 0)}
- Functions: {summary.get('functions', 0)}
- Classes: {summary.get('classes', 0)}
- Cyclomatic Complexity: {summary.get('cyclomatic_complexity', 0)}
Generated: {datetime.now().isoformat()}
"""
entity_name = f"{self.project_name}-structure-analysis"
kg.add_entity(
name=entity_name,
entity_type="architecture",
content=content,
metadata={
"project": self.project_name,
"report_type": "structural_analysis",
"metrics": summary,
"insights": self.report.get("insights", {})
},
source="structural_analysis"
)
result["entities_added"] += 1
insights = self.report.get("insights", {})
for insight_type, insight_data in insights.items():
obs_content = json.dumps(insight_data, indent=2)
kg.add_observation(
entity_name=entity_name,
content=f"{insight_type}: {obs_content}",
observer="structural_analysis"
)
files = analysis.get("files", {})
for file_path, file_data in files.items():
if isinstance(file_data, dict) and "components" in file_data:
for comp in file_data["components"]:
comp_name = f"{self.project_name}-{comp['name']}"
try:
kg.add_entity(
name=comp_name,
entity_type="component",
content=f"File: {file_path}\nType: {comp['type']}\n{comp.get('docstring', '')}",
metadata={
"file": file_path,
"type": comp["type"],
"line": comp.get("line_number", 0)
}
)
result["entities_added"] += 1
try:
kg.add_relation(
source_name=entity_name,
target_name=comp_name,
relation="contains"
)
result["relations_added"] += 1
except Exception as e:
result["errors"].append(f"Relation error: {e}")
except Exception as e:
result["errors"].append(f"Component error: {str(e)[:100]}")
except Exception as e:
result["errors"].append(f"Main error: {str(e)}")
return result
def print_summary(self):
"""Print human-readable summary."""
if not self.report:
print("No report generated. Call generate_report() first.")
return
analysis = self.report.get("analysis", {})
summary = analysis.get("summary", {})
insights = self.report.get("insights", {})
print(f"\n{'='*60}")
print(f"Structural Analysis Report: {self.project_name}")
print(f"{'='*60}\n")
print("Code Metrics:")
print(f" Total Lines: {summary.get('total_lines', 0)}")
print(f" Code Lines: {summary.get('code_lines', 0)}")
print(f" Comment Lines: {summary.get('comment_lines', 0)}")
print(f" Functions: {summary.get('functions', 0)}")
print(f" Classes: {summary.get('classes', 0)}")
complexity = insights.get("complexity_assessment", {})
print(f"\nComplexity Assessment: {complexity.get('level', 'N/A')}")
print(f" Average Cyclomatic Complexity: {complexity.get('average_complexity_per_function', 0)}")
quality = insights.get("code_quality_metrics", {})
print(f"\nCode Quality:")
print(f" Code Ratio: {quality.get('code_ratio', 0)}%")
print(f" Comment Ratio: {quality.get('comment_ratio', 0)}%")
print(f" Assessment: {quality.get('assessment', 'N/A')}")
hotspots = insights.get("hotspots", [])
if hotspots:
print(f"\nTop Hotspots (Complex Modules):")
for i, hotspot in enumerate(hotspots[:5], 1):
print(f" {i}. {Path(hotspot['file']).name}")
print(f" Avg Complexity: {hotspot['avg_complexity_per_function']}")
recommendations = insights.get("recommendations", [])
if recommendations:
print(f"\nRecommendations:")
for rec in recommendations:
print(f"{rec}")
print(f"\n{'='*60}\n")
def analyze_project(project_path: str, project_name: str = None,
save_json: bool = True, save_kg: bool = True,
verbose: bool = True) -> Dict[str, Any]:
"""Convenience function to analyze a project."""
report_gen = StructuralAnalysisReport(Path(project_path), project_name)
report_gen.generate_report()
if verbose:
report_gen.print_summary()
if save_json:
report_gen.save_report()
kg_result = {}
if save_kg:
kg_result = report_gen.save_to_knowledge_graph()
return {
"report": report_gen.report,
"kg_result": kg_result
}
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Structural Analysis Tool for Code Projects"
)
parser.add_argument("path", help="Project path to analyze")
parser.add_argument("--name", help="Project name (defaults to directory name)")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--no-kg", action="store_true", help="Don't save to knowledge graph")
parser.add_argument("--output", help="Output file path")
args = parser.parse_args()
result = analyze_project(
args.path,
args.name,
save_json=not args.json,
save_kg=not args.no_kg,
verbose=not args.json
)
if args.json:
print(json.dumps(result["report"], indent=2))
else:
print(json.dumps(result["kg_result"], indent=2))