#!/usr/bin/env python3 """ Documentation Sync - Migrate .md files to Knowledge Graphs Parses markdown files and creates KG entities: - Headers become entity names - Content becomes entity content - Links become relations - Code blocks stored in metadata Archives original .md files after migration. """ import json import re import shutil from pathlib import Path from typing import Dict, List, Tuple, Optional from datetime import datetime import sys sys.path.insert(0, str(Path(__file__).parent)) from knowledge_graph import KnowledgeGraph, ENTITY_TYPES # Source directories DOCS_DIR = Path("/opt/server-agents/docs") ARCHIVE_DIR = Path("/opt/server-agents/archive/docs-migrated") PROJECT_HOMES = Path("/home") class MarkdownParser: """Parse markdown files into structured entities.""" def __init__(self, filepath: Path): self.filepath = filepath self.content = filepath.read_text() if filepath.exists() else "" self.entities: List[Dict] = [] self.relations: List[Tuple[str, str, str]] = [] def parse(self) -> Dict: """Parse the markdown file.""" if not self.content: return {"entities": [], "relations": []} # Extract title from first H1 or filename title_match = re.search(r'^#\s+(.+)$', self.content, re.MULTILINE) title = title_match.group(1) if title_match else self.filepath.stem # Create main entity main_entity = { "name": self._sanitize_name(title), "type": self._infer_type(title, self.content), "content": self.content, "metadata": { "source_file": str(self.filepath), "title": title, "sections": self._extract_sections(), "code_blocks": self._extract_code_blocks(), } } self.entities.append(main_entity) # Extract internal links as relations self._extract_links(main_entity["name"]) return { "entities": self.entities, "relations": self.relations, } def _sanitize_name(self, name: str) -> str: """Convert name to KG-safe format.""" # Remove special chars, lowercase, replace spaces with underscores name = re.sub(r'[^\w\s-]', '', name) name = re.sub(r'\s+', '_', name) return name.lower()[:100] def _infer_type(self, title: str, content: str) -> str: """Infer entity type from title/content.""" title_lower = title.lower() content_lower = content.lower() # Check for specific patterns if any(x in title_lower for x in ["command", "cli", "usage"]): return "command" if any(x in title_lower for x in ["service", "daemon"]): return "service" if any(x in title_lower for x in ["config", "settings", "setup"]): return "config" if any(x in title_lower for x in ["troubleshoot", "debug", "fix"]): return "troubleshooting" if any(x in title_lower for x in ["architecture", "design", "system"]): return "architecture" if any(x in title_lower for x in ["guide", "how", "tutorial"]): return "procedure" if any(x in title_lower for x in ["user", "account", "permission"]): return "guide" # Default based on presence of code if "```" in content: return "procedure" return "procedure" def _extract_sections(self) -> List[Dict]: """Extract sections (H2, H3 headers).""" sections = [] pattern = r'^(#{2,3})\s+(.+)$' for match in re.finditer(pattern, self.content, re.MULTILINE): level = len(match.group(1)) title = match.group(2) sections.append({ "level": level, "title": title, "position": match.start(), }) return sections def _extract_code_blocks(self) -> List[Dict]: """Extract code blocks with language.""" blocks = [] pattern = r'```(\w*)\n(.*?)```' for match in re.finditer(pattern, self.content, re.DOTALL): lang = match.group(1) or "text" code = match.group(2).strip() blocks.append({ "language": lang, "code": code[:500], # Truncate long blocks "position": match.start(), }) return blocks def _extract_links(self, source_name: str): """Extract markdown links as relations.""" # [text](url) pattern pattern = r'\[([^\]]+)\]\(([^)]+)\)' for match in re.finditer(pattern, self.content): text = match.group(1) url = match.group(2) # Internal .md links become relations if url.endswith('.md') and not url.startswith('http'): target = self._sanitize_name(Path(url).stem) self.relations.append((source_name, target, "references")) class DocSync: """Sync documentation files to knowledge graphs.""" def __init__(self): self.stats = { "files_processed": 0, "entities_created": 0, "relations_created": 0, "errors": [], } def migrate_docs_dir(self, domain: str = "sysadmin", dry_run: bool = True) -> Dict: """Migrate /opt/server-agents/docs/*.md to KG.""" if not DOCS_DIR.exists(): return {"error": f"Docs directory not found: {DOCS_DIR}"} try: kg = KnowledgeGraph(domain) except Exception as e: return {"error": f"Could not open KG: {e}"} md_files = list(DOCS_DIR.glob("*.md")) self.stats["files_processed"] = len(md_files) for md_file in md_files: try: self._process_md_file(md_file, kg, domain, dry_run) except Exception as e: self.stats["errors"].append(f"{md_file.name}: {e}") # Archive if not dry run if not dry_run and not self.stats["errors"]: self._archive_files(md_files) return self.stats def migrate_project_docs(self, dry_run: bool = True) -> Dict: """Migrate /home/*/CLAUDE.md to projects KG.""" try: kg = KnowledgeGraph("projects") except Exception as e: return {"error": f"Could not open KG: {e}"} claude_files = list(PROJECT_HOMES.glob("*/CLAUDE.md")) self.stats["files_processed"] = len(claude_files) for claude_file in claude_files: try: project = claude_file.parent.name self._process_claude_md(claude_file, project, kg, dry_run) except Exception as e: self.stats["errors"].append(f"{claude_file}: {e}") return self.stats def migrate_research_dir(self, research_dir: str = "/home/admin/research", archive: bool = False, dry_run: bool = True) -> Dict: """Migrate research .md files to research KG. Args: research_dir: Directory containing research .md files archive: If True, move files to archive after migration dry_run: If True, preview without making changes """ research_path = Path(research_dir) if not research_path.exists(): return {"error": f"Research directory not found: {research_dir}"} try: kg = KnowledgeGraph("research") except Exception as e: return {"error": f"Could not open research KG: {e}"} md_files = list(research_path.glob("*.md")) self.stats["files_processed"] = len(md_files) for md_file in md_files: try: self._process_research_md(md_file, kg, dry_run) except Exception as e: self.stats["errors"].append(f"{md_file.name}: {e}") # Archive if requested and not dry run if archive and not dry_run and not self.stats["errors"]: archive_dir = research_path / "archived" archive_dir.mkdir(exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") for f in md_files: dest = archive_dir / f"{timestamp}_{f.name}" shutil.move(str(f), str(dest)) return self.stats def _process_research_md(self, filepath: Path, kg: KnowledgeGraph, dry_run: bool): """Process a research .md file into KG entities.""" content = filepath.read_text() # Extract title from first H1 title_match = re.search(r'^#\s+(.+)$', content, re.MULTILINE) title = title_match.group(1) if title_match else filepath.stem # Extract session ID if present session_match = re.search(r'Session\s+([a-f0-9-]+)', content) session_id = session_match.group(1) if session_match else filepath.stem # Extract key findings findings = [] findings_section = re.search(r'(?:Key Findings|Executive Summary)(.*?)(?=##|\Z)', content, re.DOTALL | re.IGNORECASE) if findings_section: # Extract numbered items for match in re.finditer(r'\d+\.\s+\*\*([^*]+)\*\*[:\s]*(.+?)(?=\d+\.\s+\*\*|\Z)', findings_section.group(1), re.DOTALL): findings.append({ "title": match.group(1).strip(), "detail": match.group(2).strip()[:500] }) # Create main research entity entity_name = self._sanitize_name(title) if not dry_run: # Add main research document entity (use 'synthesis' as the valid type) kg.add_entity( name=entity_name, entity_type="synthesis", content=content, metadata={ "source_file": str(filepath), "session_id": session_id, "title": title, "findings_count": len(findings), "word_count": len(content.split()), }, source=str(filepath) ) # Add findings as separate entities with relations for i, finding in enumerate(findings): finding_name = self._sanitize_name(f"{session_id}_finding_{i+1}") kg.add_entity( name=finding_name, entity_type="finding", content=f"**{finding['title']}**\n\n{finding['detail']}", metadata={"research_session": session_id, "index": i+1}, source=str(filepath) ) kg.add_relation(entity_name, finding_name, "contains") self.stats["entities_created"] += 1 + len(findings) self.stats["relations_created"] += len(findings) def _sanitize_name(self, name: str) -> str: """Convert name to KG-safe format.""" name = re.sub(r'[^\w\s-]', '', name) name = re.sub(r'\s+', '_', name) return name.lower()[:100] def _process_md_file(self, filepath: Path, kg: KnowledgeGraph, domain: str, dry_run: bool): """Process a single .md file.""" parser = MarkdownParser(filepath) data = parser.parse() for entity in data["entities"]: # Validate entity type for domain valid_types = ENTITY_TYPES.get(domain, []) if entity["type"] not in valid_types: entity["type"] = valid_types[0] if valid_types else "procedure" if not dry_run: kg.add_entity( name=entity["name"], entity_type=entity["type"], content=entity["content"], metadata=entity["metadata"], source=str(filepath) ) self.stats["entities_created"] += 1 for source, target, relation in data["relations"]: if not dry_run: kg.add_relation(source, target, relation) self.stats["relations_created"] += 1 def _process_claude_md(self, filepath: Path, project: str, kg: KnowledgeGraph, dry_run: bool): """Process a project CLAUDE.md file.""" content = filepath.read_text() # Extract key sections sections = {} current_section = "overview" current_content = [] for line in content.split("\n"): if line.startswith("## "): if current_content: sections[current_section] = "\n".join(current_content) current_section = line[3:].strip().lower().replace(" ", "_") current_content = [] else: current_content.append(line) if current_content: sections[current_section] = "\n".join(current_content) # Create/update project entity if not dry_run: kg.add_entity( name=project, entity_type="project", content=content, metadata={ "source_file": str(filepath), "sections": list(sections.keys()), "has_build_commands": "build" in content.lower(), "has_test_commands": "test" in content.lower(), }, source=str(filepath) ) self.stats["entities_created"] += 1 def _archive_files(self, files: List[Path]): """Archive migrated files.""" ARCHIVE_DIR.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") archive_subdir = ARCHIVE_DIR / timestamp archive_subdir.mkdir(exist_ok=True) for f in files: shutil.move(str(f), str(archive_subdir / f.name)) def categorize_md_file(self, filepath: Path) -> str: """Determine which KG domain a file belongs to.""" content = filepath.read_text().lower() name = filepath.stem.lower() # Check filename patterns if any(x in name for x in ["user", "account", "permission", "webuser"]): return "users" if any(x in name for x in ["research", "finding", "synthesis"]): return "research" if any(x in name for x in ["project", "overbits", "musica", "dss"]): return "projects" # Check content patterns if "user management" in content or "create user" in content: return "users" if "research" in content and "methodology" in content: return "research" # Default to sysadmin return "sysadmin" def run_migration(dry_run: bool = True, verbose: bool = False) -> int: """Run full documentation migration.""" print(f"\n=== Documentation Migration {'(DRY RUN)' if dry_run else ''} ===\n") sync = DocSync() # Categorize files first if DOCS_DIR.exists(): md_files = list(DOCS_DIR.glob("*.md")) categories = {} for f in md_files: domain = sync.categorize_md_file(f) if domain not in categories: categories[domain] = [] categories[domain].append(f.name) print("File categorization:") for domain, files in categories.items(): print(f" {domain}: {len(files)} files") if verbose: for f in files[:5]: print(f" - {f}") if len(files) > 5: print(f" ... and {len(files) - 5} more") # Migrate docs print("\nMigrating /opt/server-agents/docs/...") result = sync.migrate_docs_dir("sysadmin", dry_run) if "error" in result: print(f" Error: {result['error']}") else: print(f" Files: {result['files_processed']}") print(f" Entities: {result['entities_created']}") print(f" Relations: {result['relations_created']}") if result["errors"]: print(f" Errors: {len(result['errors'])}") # Migrate project CLAUDE.md files sync2 = DocSync() print("\nMigrating project CLAUDE.md files...") result2 = sync2.migrate_project_docs(dry_run) if "error" in result2: print(f" Error: {result2['error']}") else: print(f" Files: {result2['files_processed']}") print(f" Entities: {result2['entities_created']}") if dry_run: print("\n[DRY RUN] No changes made. Run with --execute to apply.") return 0 # --- CLI --- if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Documentation Migration") parser.add_argument("--execute", action="store_true", help="Actually perform migration") parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") parser.add_argument("--categorize", action="store_true", help="Only show file categorization") args = parser.parse_args() if args.categorize: sync = DocSync() if DOCS_DIR.exists(): for f in sorted(DOCS_DIR.glob("*.md")): domain = sync.categorize_md_file(f) print(f" {domain:12} {f.name}") else: exit(run_migration(dry_run=not args.execute, verbose=args.verbose))