#!/usr/bin/env python3 """ Structural Analysis Tool for Luzia Project Scans project code structures, generates analysis reports, and saves structure data to the shared knowledge graph for cross-project learning. Features: - Python AST-based code structure analysis - Dependency graph visualization - Module complexity metrics - Code pattern detection - JSON-based analysis reports - Knowledge graph integration """ import ast import json import re from pathlib import Path from typing import Dict, List, Set, Tuple, Any, Optional from dataclasses import dataclass, asdict from datetime import datetime import sys # Import our modules sys.path.insert(0, str(Path(__file__).parent)) try: from knowledge_graph import KnowledgeGraph, RELATION_TYPES except ImportError: KnowledgeGraph = None @dataclass class CodeMetrics: """Code complexity metrics.""" total_lines: int = 0 code_lines: int = 0 comment_lines: int = 0 blank_lines: int = 0 functions: int = 0 classes: int = 0 imports: int = 0 cyclomatic_complexity: int = 0 @dataclass class ComponentInfo: """Information about a code component.""" name: str type: str path: str line_number: int = 0 docstring: Optional[str] = None metrics: Optional[CodeMetrics] = None dependencies: List[str] = None children: List[str] = None def __post_init__(self): if self.dependencies is None: self.dependencies = [] if self.children is None: self.children = [] def to_dict(self) -> Dict: """Convert to dictionary for JSON serialization.""" data = asdict(self) if self.metrics: data['metrics'] = asdict(self.metrics) return data class CodeStructureAnalyzer: """Analyzes Python code structure using AST.""" def __init__(self, project_path: Path): self.project_path = Path(project_path) self.components: Dict[str, ComponentInfo] = {} self.dependencies: Dict[str, Set[str]] = {} self.imports: Dict[str, List[Tuple[str, str]]] = {} self.patterns: Dict[str, List[str]] = {} def analyze_file(self, file_path: Path) -> Dict[str, Any]: """Analyze a single Python file.""" if not file_path.exists(): return {"error": f"File not found: {file_path}"} try: content = file_path.read_text() except Exception as e: return {"error": f"Could not read file: {e}"} lines = content.split('\n') total_lines = len(lines) blank_lines = sum(1 for line in lines if not line.strip()) comment_lines = sum(1 for line in lines if line.strip().startswith('#')) code_lines = total_lines - blank_lines - comment_lines metrics = CodeMetrics( total_lines=total_lines, code_lines=code_lines, comment_lines=comment_lines, blank_lines=blank_lines ) try: tree = ast.parse(content, str(file_path)) except SyntaxError as e: return {"error": f"Syntax error: {e}"} result = { "path": str(file_path), "metrics": asdict(metrics), "components": [], "imports": [], "patterns": [] } visitor = ASTAnalyzer(file_path) visitor.visit(tree) result["components"] = [comp.to_dict() for comp in visitor.components.values()] result["imports"] = visitor.imports result["patterns"] = visitor.patterns result["metrics"]["functions"] = len(visitor.functions) result["metrics"]["classes"] = len(visitor.classes) result["metrics"]["imports"] = len(visitor.imports) result["metrics"]["cyclomatic_complexity"] = visitor.cyclomatic_complexity return result def analyze_directory(self, directory: Path = None) -> Dict[str, Any]: """Analyze all Python files in a directory.""" if directory is None: directory = self.project_path if not directory.exists(): return {"error": f"Directory not found: {directory}"} py_files = list(directory.rglob("*.py")) if not py_files: return {"error": "No Python files found"} results = { "directory": str(directory), "file_count": len(py_files), "files": {}, "summary": {} } total_metrics = CodeMetrics() for py_file in py_files: try: file_result = self.analyze_file(py_file) results["files"][str(py_file)] = file_result if "metrics" in file_result: m = file_result["metrics"] total_metrics.total_lines += m.get("total_lines", 0) total_metrics.code_lines += m.get("code_lines", 0) total_metrics.comment_lines += m.get("comment_lines", 0) total_metrics.blank_lines += m.get("blank_lines", 0) total_metrics.functions += m.get("functions", 0) total_metrics.classes += m.get("classes", 0) total_metrics.imports += m.get("imports", 0) except Exception as e: results["files"][str(py_file)] = {"error": str(e)} results["summary"] = asdict(total_metrics) return results def build_dependency_graph(self) -> Dict[str, List[str]]: """Build module dependency graph.""" graph = {} for module, imports in self.imports.items(): deps = [] for imp_name, imp_from in imports: if imp_from: deps.append(imp_from) else: deps.append(imp_name.split('.')[0]) graph[module] = list(set(deps)) return graph def detect_patterns(self) -> Dict[str, List[str]]: """Detect common code patterns.""" patterns = { "singleton": [], "factory": [], "observer": [], "adapter": [], "decorator": [], "context_manager": [], "dataclass": [], } return patterns class ASTAnalyzer(ast.NodeVisitor): """AST visitor for code structure analysis.""" def __init__(self, file_path: Path): self.file_path = file_path self.components: Dict[str, ComponentInfo] = {} self.imports: List[Tuple[str, str]] = [] self.patterns: List[Dict] = [] self.functions: List[str] = [] self.classes: List[str] = [] self.cyclomatic_complexity: int = 1 self.current_class: Optional[str] = None def visit_Import(self, node: ast.Import): """Handle import statements.""" for alias in node.names: self.imports.append((alias.name, "")) self.generic_visit(node) def visit_ImportFrom(self, node: ast.ImportFrom): """Handle from...import statements.""" module = node.module or "" for alias in node.names: self.imports.append((alias.name, module)) self.generic_visit(node) def visit_ClassDef(self, node: ast.ClassDef): """Handle class definitions.""" self.classes.append(node.name) docstring = ast.get_docstring(node) self._detect_class_patterns(node) component = ComponentInfo( name=node.name, type="class", path=str(self.file_path), line_number=node.lineno, docstring=docstring, ) self.components[f"{node.name}"] = component old_class = self.current_class self.current_class = node.name self.generic_visit(node) self.current_class = old_class def visit_FunctionDef(self, node: ast.FunctionDef): """Handle function definitions.""" self.functions.append(node.name) docstring = ast.get_docstring(node) complexity = self._calculate_complexity(node) self.cyclomatic_complexity += complexity - 1 if self.current_class: comp_name = f"{self.current_class}.{node.name}" else: comp_name = node.name component = ComponentInfo( name=node.name, type="function", path=str(self.file_path), line_number=node.lineno, docstring=docstring, ) self.components[comp_name] = component self.generic_visit(node) def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef): """Handle async function definitions.""" self.visit_FunctionDef(node) def _detect_class_patterns(self, node: ast.ClassDef): """Detect design patterns in classes.""" methods = {m.name for m in node.body if isinstance(m, ast.FunctionDef)} if "__enter__" in methods and "__exit__" in methods: self.patterns.append({ "name": "context_manager", "class": node.name, "line": node.lineno }) for decorator in node.decorator_list: if isinstance(decorator, ast.Name) and decorator.id == "dataclass": self.patterns.append({ "name": "dataclass", "class": node.name, "line": node.lineno }) def _calculate_complexity(self, node: ast.FunctionDef) -> int: """Calculate cyclomatic complexity for a function.""" complexity = 1 for child in ast.walk(node): if isinstance(child, (ast.If, ast.While, ast.For, ast.ExceptHandler)): complexity += 1 elif isinstance(child, ast.BoolOp): complexity += len(child.values) - 1 return complexity class StructuralAnalysisReport: """Generates and manages structural analysis reports.""" def __init__(self, project_path: Path, project_name: str = None): self.project_path = Path(project_path) self.project_name = project_name or self.project_path.name self.analyzer = CodeStructureAnalyzer(self.project_path) self.report: Dict[str, Any] = {} def generate_report(self) -> Dict[str, Any]: """Generate comprehensive structural analysis report.""" print(f"Analyzing project: {self.project_name}") print(f"Project path: {self.project_path}") analysis = self.analyzer.analyze_directory() self.report = { "project": self.project_name, "path": str(self.project_path), "timestamp": datetime.now().isoformat(), "analysis": analysis, "dependency_graph": self.analyzer.build_dependency_graph(), "patterns": self.analyzer.detect_patterns(), "insights": self._generate_insights(analysis) } return self.report def _generate_insights(self, analysis: Dict) -> Dict[str, Any]: """Generate insights from analysis data.""" summary = analysis.get("summary", {}) insights = { "complexity_assessment": self._assess_complexity(summary), "code_quality_metrics": self._calculate_quality_metrics(summary), "hotspots": self._identify_hotspots(analysis), "recommendations": self._generate_recommendations(summary, analysis) } return insights def _assess_complexity(self, summary: Dict) -> Dict: """Assess code complexity.""" cyclomatic = summary.get("cyclomatic_complexity", 0) functions = summary.get("functions", 1) avg_complexity = cyclomatic / functions if functions > 0 else 0 if avg_complexity < 5: level = "low" elif avg_complexity < 10: level = "moderate" else: level = "high" return { "level": level, "cyclomatic_complexity": cyclomatic, "functions": functions, "average_complexity_per_function": round(avg_complexity, 2), "assessment": f"Average cyclomatic complexity of {round(avg_complexity, 2)} per function" } def _calculate_quality_metrics(self, summary: Dict) -> Dict: """Calculate code quality metrics.""" total = summary.get("total_lines", 1) code = summary.get("code_lines", 0) comments = summary.get("comment_lines", 0) blank = summary.get("blank_lines", 0) comment_ratio = (comments / code * 100) if code > 0 else 0 blank_ratio = (blank / total * 100) if total > 0 else 0 code_ratio = (code / total * 100) if total > 0 else 0 return { "code_ratio": round(code_ratio, 2), "comment_ratio": round(comment_ratio, 2), "blank_ratio": round(blank_ratio, 2), "total_lines": total, "assessment": "Good" if comment_ratio > 10 else "Needs more documentation" } def _identify_hotspots(self, analysis: Dict) -> List[Dict]: """Identify complex modules (hotspots).""" hotspots = [] files = analysis.get("files", {}) for file_path, file_data in files.items(): if isinstance(file_data, dict) and "metrics" in file_data: metrics = file_data["metrics"] complexity = metrics.get("cyclomatic_complexity", 0) functions = metrics.get("functions", 0) if functions > 0 and complexity / functions > 8: hotspots.append({ "file": file_path, "complexity": complexity, "functions": functions, "avg_complexity_per_function": round(complexity / functions, 2) }) hotspots.sort(key=lambda x: x["complexity"], reverse=True) return hotspots[:10] def _generate_recommendations(self, summary: Dict, analysis: Dict) -> List[str]: """Generate improvement recommendations.""" recommendations = [] cyclomatic = summary.get("cyclomatic_complexity", 0) functions = summary.get("functions", 1) comments = summary.get("comment_lines", 0) code = summary.get("code_lines", 1) if cyclomatic / functions > 10 if functions > 0 else False: recommendations.append("Consider refactoring functions with high cyclomatic complexity") if comments / code * 100 < 10 if code > 0 else False: recommendations.append("Increase code documentation - aim for 10%+ comment ratio") hotspots = self._identify_hotspots(analysis) if hotspots: recommendations.append(f"Focus refactoring on {len(hotspots)} high-complexity modules") return recommendations def save_report(self, output_path: Path = None) -> Path: """Save report to JSON file.""" if output_path is None: output_path = self.project_path / f"structure-analysis-{datetime.now().strftime('%Y%m%d-%H%M%S')}.json" output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(json.dumps(self.report, indent=2)) print(f"Report saved to: {output_path}") return output_path def save_to_knowledge_graph(self) -> Dict[str, Any]: """Save analysis to shared knowledge graph.""" if KnowledgeGraph is None: return {"error": "Knowledge graph not available"} try: kg = KnowledgeGraph("projects") except Exception as e: return {"error": f"Could not open knowledge graph: {e}"} result = {"entities_added": 0, "relations_added": 0, "errors": []} try: analysis = self.report.get("analysis", {}) summary = analysis.get("summary", {}) content = f"""Structural Analysis Report Project: {self.project_name} Path: {self.project_path} Metrics: - Total Lines: {summary.get('total_lines', 0)} - Code Lines: {summary.get('code_lines', 0)} - Functions: {summary.get('functions', 0)} - Classes: {summary.get('classes', 0)} - Cyclomatic Complexity: {summary.get('cyclomatic_complexity', 0)} Generated: {datetime.now().isoformat()} """ entity_name = f"{self.project_name}-structure-analysis" kg.add_entity( name=entity_name, entity_type="architecture", content=content, metadata={ "project": self.project_name, "report_type": "structural_analysis", "metrics": summary, "insights": self.report.get("insights", {}) }, source="structural_analysis" ) result["entities_added"] += 1 insights = self.report.get("insights", {}) for insight_type, insight_data in insights.items(): obs_content = json.dumps(insight_data, indent=2) kg.add_observation( entity_name=entity_name, content=f"{insight_type}: {obs_content}", observer="structural_analysis" ) files = analysis.get("files", {}) for file_path, file_data in files.items(): if isinstance(file_data, dict) and "components" in file_data: for comp in file_data["components"]: comp_name = f"{self.project_name}-{comp['name']}" try: kg.add_entity( name=comp_name, entity_type="component", content=f"File: {file_path}\nType: {comp['type']}\n{comp.get('docstring', '')}", metadata={ "file": file_path, "type": comp["type"], "line": comp.get("line_number", 0) } ) result["entities_added"] += 1 try: kg.add_relation( source_name=entity_name, target_name=comp_name, relation="contains" ) result["relations_added"] += 1 except Exception as e: result["errors"].append(f"Relation error: {e}") except Exception as e: result["errors"].append(f"Component error: {str(e)[:100]}") except Exception as e: result["errors"].append(f"Main error: {str(e)}") return result def print_summary(self): """Print human-readable summary.""" if not self.report: print("No report generated. Call generate_report() first.") return analysis = self.report.get("analysis", {}) summary = analysis.get("summary", {}) insights = self.report.get("insights", {}) print(f"\n{'='*60}") print(f"Structural Analysis Report: {self.project_name}") print(f"{'='*60}\n") print("Code Metrics:") print(f" Total Lines: {summary.get('total_lines', 0)}") print(f" Code Lines: {summary.get('code_lines', 0)}") print(f" Comment Lines: {summary.get('comment_lines', 0)}") print(f" Functions: {summary.get('functions', 0)}") print(f" Classes: {summary.get('classes', 0)}") complexity = insights.get("complexity_assessment", {}) print(f"\nComplexity Assessment: {complexity.get('level', 'N/A')}") print(f" Average Cyclomatic Complexity: {complexity.get('average_complexity_per_function', 0)}") quality = insights.get("code_quality_metrics", {}) print(f"\nCode Quality:") print(f" Code Ratio: {quality.get('code_ratio', 0)}%") print(f" Comment Ratio: {quality.get('comment_ratio', 0)}%") print(f" Assessment: {quality.get('assessment', 'N/A')}") hotspots = insights.get("hotspots", []) if hotspots: print(f"\nTop Hotspots (Complex Modules):") for i, hotspot in enumerate(hotspots[:5], 1): print(f" {i}. {Path(hotspot['file']).name}") print(f" Avg Complexity: {hotspot['avg_complexity_per_function']}") recommendations = insights.get("recommendations", []) if recommendations: print(f"\nRecommendations:") for rec in recommendations: print(f" • {rec}") print(f"\n{'='*60}\n") def analyze_project(project_path: str, project_name: str = None, save_json: bool = True, save_kg: bool = True, verbose: bool = True) -> Dict[str, Any]: """Convenience function to analyze a project.""" report_gen = StructuralAnalysisReport(Path(project_path), project_name) report_gen.generate_report() if verbose: report_gen.print_summary() if save_json: report_gen.save_report() kg_result = {} if save_kg: kg_result = report_gen.save_to_knowledge_graph() return { "report": report_gen.report, "kg_result": kg_result } if __name__ == "__main__": import argparse parser = argparse.ArgumentParser( description="Structural Analysis Tool for Code Projects" ) parser.add_argument("path", help="Project path to analyze") parser.add_argument("--name", help="Project name (defaults to directory name)") parser.add_argument("--json", action="store_true", help="Output as JSON") parser.add_argument("--no-kg", action="store_true", help="Don't save to knowledge graph") parser.add_argument("--output", help="Output file path") args = parser.parse_args() result = analyze_project( args.path, args.name, save_json=not args.json, save_kg=not args.no_kg, verbose=not args.json ) if args.json: print(json.dumps(result["report"], indent=2)) else: print(json.dumps(result["kg_result"], indent=2))