Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
482 lines
17 KiB
Python
482 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Documentation Sync - Migrate .md files to Knowledge Graphs
|
|
|
|
Parses markdown files and creates KG entities:
|
|
- Headers become entity names
|
|
- Content becomes entity content
|
|
- Links become relations
|
|
- Code blocks stored in metadata
|
|
|
|
Archives original .md files after migration.
|
|
"""
|
|
|
|
import json
|
|
import re
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple, Optional
|
|
from datetime import datetime
|
|
|
|
import sys
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
from knowledge_graph import KnowledgeGraph, ENTITY_TYPES
|
|
|
|
# Source directories
|
|
DOCS_DIR = Path("/opt/server-agents/docs")
|
|
ARCHIVE_DIR = Path("/opt/server-agents/archive/docs-migrated")
|
|
PROJECT_HOMES = Path("/home")
|
|
|
|
|
|
class MarkdownParser:
|
|
"""Parse markdown files into structured entities."""
|
|
|
|
def __init__(self, filepath: Path):
|
|
self.filepath = filepath
|
|
self.content = filepath.read_text() if filepath.exists() else ""
|
|
self.entities: List[Dict] = []
|
|
self.relations: List[Tuple[str, str, str]] = []
|
|
|
|
def parse(self) -> Dict:
|
|
"""Parse the markdown file."""
|
|
if not self.content:
|
|
return {"entities": [], "relations": []}
|
|
|
|
# Extract title from first H1 or filename
|
|
title_match = re.search(r'^#\s+(.+)$', self.content, re.MULTILINE)
|
|
title = title_match.group(1) if title_match else self.filepath.stem
|
|
|
|
# Create main entity
|
|
main_entity = {
|
|
"name": self._sanitize_name(title),
|
|
"type": self._infer_type(title, self.content),
|
|
"content": self.content,
|
|
"metadata": {
|
|
"source_file": str(self.filepath),
|
|
"title": title,
|
|
"sections": self._extract_sections(),
|
|
"code_blocks": self._extract_code_blocks(),
|
|
}
|
|
}
|
|
self.entities.append(main_entity)
|
|
|
|
# Extract internal links as relations
|
|
self._extract_links(main_entity["name"])
|
|
|
|
return {
|
|
"entities": self.entities,
|
|
"relations": self.relations,
|
|
}
|
|
|
|
def _sanitize_name(self, name: str) -> str:
|
|
"""Convert name to KG-safe format."""
|
|
# Remove special chars, lowercase, replace spaces with underscores
|
|
name = re.sub(r'[^\w\s-]', '', name)
|
|
name = re.sub(r'\s+', '_', name)
|
|
return name.lower()[:100]
|
|
|
|
def _infer_type(self, title: str, content: str) -> str:
|
|
"""Infer entity type from title/content."""
|
|
title_lower = title.lower()
|
|
content_lower = content.lower()
|
|
|
|
# Check for specific patterns
|
|
if any(x in title_lower for x in ["command", "cli", "usage"]):
|
|
return "command"
|
|
if any(x in title_lower for x in ["service", "daemon"]):
|
|
return "service"
|
|
if any(x in title_lower for x in ["config", "settings", "setup"]):
|
|
return "config"
|
|
if any(x in title_lower for x in ["troubleshoot", "debug", "fix"]):
|
|
return "troubleshooting"
|
|
if any(x in title_lower for x in ["architecture", "design", "system"]):
|
|
return "architecture"
|
|
if any(x in title_lower for x in ["guide", "how", "tutorial"]):
|
|
return "procedure"
|
|
if any(x in title_lower for x in ["user", "account", "permission"]):
|
|
return "guide"
|
|
|
|
# Default based on presence of code
|
|
if "```" in content:
|
|
return "procedure"
|
|
|
|
return "procedure"
|
|
|
|
def _extract_sections(self) -> List[Dict]:
|
|
"""Extract sections (H2, H3 headers)."""
|
|
sections = []
|
|
pattern = r'^(#{2,3})\s+(.+)$'
|
|
|
|
for match in re.finditer(pattern, self.content, re.MULTILINE):
|
|
level = len(match.group(1))
|
|
title = match.group(2)
|
|
sections.append({
|
|
"level": level,
|
|
"title": title,
|
|
"position": match.start(),
|
|
})
|
|
|
|
return sections
|
|
|
|
def _extract_code_blocks(self) -> List[Dict]:
|
|
"""Extract code blocks with language."""
|
|
blocks = []
|
|
pattern = r'```(\w*)\n(.*?)```'
|
|
|
|
for match in re.finditer(pattern, self.content, re.DOTALL):
|
|
lang = match.group(1) or "text"
|
|
code = match.group(2).strip()
|
|
blocks.append({
|
|
"language": lang,
|
|
"code": code[:500], # Truncate long blocks
|
|
"position": match.start(),
|
|
})
|
|
|
|
return blocks
|
|
|
|
def _extract_links(self, source_name: str):
|
|
"""Extract markdown links as relations."""
|
|
# [text](url) pattern
|
|
pattern = r'\[([^\]]+)\]\(([^)]+)\)'
|
|
|
|
for match in re.finditer(pattern, self.content):
|
|
text = match.group(1)
|
|
url = match.group(2)
|
|
|
|
# Internal .md links become relations
|
|
if url.endswith('.md') and not url.startswith('http'):
|
|
target = self._sanitize_name(Path(url).stem)
|
|
self.relations.append((source_name, target, "references"))
|
|
|
|
|
|
class DocSync:
|
|
"""Sync documentation files to knowledge graphs."""
|
|
|
|
def __init__(self):
|
|
self.stats = {
|
|
"files_processed": 0,
|
|
"entities_created": 0,
|
|
"relations_created": 0,
|
|
"errors": [],
|
|
}
|
|
|
|
def migrate_docs_dir(self, domain: str = "sysadmin", dry_run: bool = True) -> Dict:
|
|
"""Migrate /opt/server-agents/docs/*.md to KG."""
|
|
if not DOCS_DIR.exists():
|
|
return {"error": f"Docs directory not found: {DOCS_DIR}"}
|
|
|
|
try:
|
|
kg = KnowledgeGraph(domain)
|
|
except Exception as e:
|
|
return {"error": f"Could not open KG: {e}"}
|
|
|
|
md_files = list(DOCS_DIR.glob("*.md"))
|
|
self.stats["files_processed"] = len(md_files)
|
|
|
|
for md_file in md_files:
|
|
try:
|
|
self._process_md_file(md_file, kg, domain, dry_run)
|
|
except Exception as e:
|
|
self.stats["errors"].append(f"{md_file.name}: {e}")
|
|
|
|
# Archive if not dry run
|
|
if not dry_run and not self.stats["errors"]:
|
|
self._archive_files(md_files)
|
|
|
|
return self.stats
|
|
|
|
def migrate_project_docs(self, dry_run: bool = True) -> Dict:
|
|
"""Migrate /home/*/CLAUDE.md to projects KG."""
|
|
try:
|
|
kg = KnowledgeGraph("projects")
|
|
except Exception as e:
|
|
return {"error": f"Could not open KG: {e}"}
|
|
|
|
claude_files = list(PROJECT_HOMES.glob("*/CLAUDE.md"))
|
|
self.stats["files_processed"] = len(claude_files)
|
|
|
|
for claude_file in claude_files:
|
|
try:
|
|
project = claude_file.parent.name
|
|
self._process_claude_md(claude_file, project, kg, dry_run)
|
|
except Exception as e:
|
|
self.stats["errors"].append(f"{claude_file}: {e}")
|
|
|
|
return self.stats
|
|
|
|
def migrate_research_dir(self, research_dir: str = "/home/admin/research",
|
|
archive: bool = False, dry_run: bool = True) -> Dict:
|
|
"""Migrate research .md files to research KG.
|
|
|
|
Args:
|
|
research_dir: Directory containing research .md files
|
|
archive: If True, move files to archive after migration
|
|
dry_run: If True, preview without making changes
|
|
"""
|
|
research_path = Path(research_dir)
|
|
if not research_path.exists():
|
|
return {"error": f"Research directory not found: {research_dir}"}
|
|
|
|
try:
|
|
kg = KnowledgeGraph("research")
|
|
except Exception as e:
|
|
return {"error": f"Could not open research KG: {e}"}
|
|
|
|
md_files = list(research_path.glob("*.md"))
|
|
self.stats["files_processed"] = len(md_files)
|
|
|
|
for md_file in md_files:
|
|
try:
|
|
self._process_research_md(md_file, kg, dry_run)
|
|
except Exception as e:
|
|
self.stats["errors"].append(f"{md_file.name}: {e}")
|
|
|
|
# Archive if requested and not dry run
|
|
if archive and not dry_run and not self.stats["errors"]:
|
|
archive_dir = research_path / "archived"
|
|
archive_dir.mkdir(exist_ok=True)
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
for f in md_files:
|
|
dest = archive_dir / f"{timestamp}_{f.name}"
|
|
shutil.move(str(f), str(dest))
|
|
|
|
return self.stats
|
|
|
|
def _process_research_md(self, filepath: Path, kg: KnowledgeGraph, dry_run: bool):
|
|
"""Process a research .md file into KG entities."""
|
|
content = filepath.read_text()
|
|
|
|
# Extract title from first H1
|
|
title_match = re.search(r'^#\s+(.+)$', content, re.MULTILINE)
|
|
title = title_match.group(1) if title_match else filepath.stem
|
|
|
|
# Extract session ID if present
|
|
session_match = re.search(r'Session\s+([a-f0-9-]+)', content)
|
|
session_id = session_match.group(1) if session_match else filepath.stem
|
|
|
|
# Extract key findings
|
|
findings = []
|
|
findings_section = re.search(r'(?:Key Findings|Executive Summary)(.*?)(?=##|\Z)',
|
|
content, re.DOTALL | re.IGNORECASE)
|
|
if findings_section:
|
|
# Extract numbered items
|
|
for match in re.finditer(r'\d+\.\s+\*\*([^*]+)\*\*[:\s]*(.+?)(?=\d+\.\s+\*\*|\Z)',
|
|
findings_section.group(1), re.DOTALL):
|
|
findings.append({
|
|
"title": match.group(1).strip(),
|
|
"detail": match.group(2).strip()[:500]
|
|
})
|
|
|
|
# Create main research entity
|
|
entity_name = self._sanitize_name(title)
|
|
|
|
if not dry_run:
|
|
# Add main research document entity (use 'synthesis' as the valid type)
|
|
kg.add_entity(
|
|
name=entity_name,
|
|
entity_type="synthesis",
|
|
content=content,
|
|
metadata={
|
|
"source_file": str(filepath),
|
|
"session_id": session_id,
|
|
"title": title,
|
|
"findings_count": len(findings),
|
|
"word_count": len(content.split()),
|
|
},
|
|
source=str(filepath)
|
|
)
|
|
|
|
# Add findings as separate entities with relations
|
|
for i, finding in enumerate(findings):
|
|
finding_name = self._sanitize_name(f"{session_id}_finding_{i+1}")
|
|
kg.add_entity(
|
|
name=finding_name,
|
|
entity_type="finding",
|
|
content=f"**{finding['title']}**\n\n{finding['detail']}",
|
|
metadata={"research_session": session_id, "index": i+1},
|
|
source=str(filepath)
|
|
)
|
|
kg.add_relation(entity_name, finding_name, "contains")
|
|
|
|
self.stats["entities_created"] += 1 + len(findings)
|
|
self.stats["relations_created"] += len(findings)
|
|
|
|
def _sanitize_name(self, name: str) -> str:
|
|
"""Convert name to KG-safe format."""
|
|
name = re.sub(r'[^\w\s-]', '', name)
|
|
name = re.sub(r'\s+', '_', name)
|
|
return name.lower()[:100]
|
|
|
|
def _process_md_file(self, filepath: Path, kg: KnowledgeGraph, domain: str, dry_run: bool):
|
|
"""Process a single .md file."""
|
|
parser = MarkdownParser(filepath)
|
|
data = parser.parse()
|
|
|
|
for entity in data["entities"]:
|
|
# Validate entity type for domain
|
|
valid_types = ENTITY_TYPES.get(domain, [])
|
|
if entity["type"] not in valid_types:
|
|
entity["type"] = valid_types[0] if valid_types else "procedure"
|
|
|
|
if not dry_run:
|
|
kg.add_entity(
|
|
name=entity["name"],
|
|
entity_type=entity["type"],
|
|
content=entity["content"],
|
|
metadata=entity["metadata"],
|
|
source=str(filepath)
|
|
)
|
|
self.stats["entities_created"] += 1
|
|
|
|
for source, target, relation in data["relations"]:
|
|
if not dry_run:
|
|
kg.add_relation(source, target, relation)
|
|
self.stats["relations_created"] += 1
|
|
|
|
def _process_claude_md(self, filepath: Path, project: str, kg: KnowledgeGraph, dry_run: bool):
|
|
"""Process a project CLAUDE.md file."""
|
|
content = filepath.read_text()
|
|
|
|
# Extract key sections
|
|
sections = {}
|
|
current_section = "overview"
|
|
current_content = []
|
|
|
|
for line in content.split("\n"):
|
|
if line.startswith("## "):
|
|
if current_content:
|
|
sections[current_section] = "\n".join(current_content)
|
|
current_section = line[3:].strip().lower().replace(" ", "_")
|
|
current_content = []
|
|
else:
|
|
current_content.append(line)
|
|
|
|
if current_content:
|
|
sections[current_section] = "\n".join(current_content)
|
|
|
|
# Create/update project entity
|
|
if not dry_run:
|
|
kg.add_entity(
|
|
name=project,
|
|
entity_type="project",
|
|
content=content,
|
|
metadata={
|
|
"source_file": str(filepath),
|
|
"sections": list(sections.keys()),
|
|
"has_build_commands": "build" in content.lower(),
|
|
"has_test_commands": "test" in content.lower(),
|
|
},
|
|
source=str(filepath)
|
|
)
|
|
self.stats["entities_created"] += 1
|
|
|
|
def _archive_files(self, files: List[Path]):
|
|
"""Archive migrated files."""
|
|
ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
archive_subdir = ARCHIVE_DIR / timestamp
|
|
|
|
archive_subdir.mkdir(exist_ok=True)
|
|
|
|
for f in files:
|
|
shutil.move(str(f), str(archive_subdir / f.name))
|
|
|
|
def categorize_md_file(self, filepath: Path) -> str:
|
|
"""Determine which KG domain a file belongs to."""
|
|
content = filepath.read_text().lower()
|
|
name = filepath.stem.lower()
|
|
|
|
# Check filename patterns
|
|
if any(x in name for x in ["user", "account", "permission", "webuser"]):
|
|
return "users"
|
|
if any(x in name for x in ["research", "finding", "synthesis"]):
|
|
return "research"
|
|
if any(x in name for x in ["project", "overbits", "musica", "dss"]):
|
|
return "projects"
|
|
|
|
# Check content patterns
|
|
if "user management" in content or "create user" in content:
|
|
return "users"
|
|
if "research" in content and "methodology" in content:
|
|
return "research"
|
|
|
|
# Default to sysadmin
|
|
return "sysadmin"
|
|
|
|
|
|
def run_migration(dry_run: bool = True, verbose: bool = False) -> int:
|
|
"""Run full documentation migration."""
|
|
print(f"\n=== Documentation Migration {'(DRY RUN)' if dry_run else ''} ===\n")
|
|
|
|
sync = DocSync()
|
|
|
|
# Categorize files first
|
|
if DOCS_DIR.exists():
|
|
md_files = list(DOCS_DIR.glob("*.md"))
|
|
categories = {}
|
|
|
|
for f in md_files:
|
|
domain = sync.categorize_md_file(f)
|
|
if domain not in categories:
|
|
categories[domain] = []
|
|
categories[domain].append(f.name)
|
|
|
|
print("File categorization:")
|
|
for domain, files in categories.items():
|
|
print(f" {domain}: {len(files)} files")
|
|
if verbose:
|
|
for f in files[:5]:
|
|
print(f" - {f}")
|
|
if len(files) > 5:
|
|
print(f" ... and {len(files) - 5} more")
|
|
|
|
# Migrate docs
|
|
print("\nMigrating /opt/server-agents/docs/...")
|
|
result = sync.migrate_docs_dir("sysadmin", dry_run)
|
|
if "error" in result:
|
|
print(f" Error: {result['error']}")
|
|
else:
|
|
print(f" Files: {result['files_processed']}")
|
|
print(f" Entities: {result['entities_created']}")
|
|
print(f" Relations: {result['relations_created']}")
|
|
if result["errors"]:
|
|
print(f" Errors: {len(result['errors'])}")
|
|
|
|
# Migrate project CLAUDE.md files
|
|
sync2 = DocSync()
|
|
print("\nMigrating project CLAUDE.md files...")
|
|
result2 = sync2.migrate_project_docs(dry_run)
|
|
if "error" in result2:
|
|
print(f" Error: {result2['error']}")
|
|
else:
|
|
print(f" Files: {result2['files_processed']}")
|
|
print(f" Entities: {result2['entities_created']}")
|
|
|
|
if dry_run:
|
|
print("\n[DRY RUN] No changes made. Run with --execute to apply.")
|
|
|
|
return 0
|
|
|
|
|
|
# --- CLI ---
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Documentation Migration")
|
|
parser.add_argument("--execute", action="store_true", help="Actually perform migration")
|
|
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
|
|
parser.add_argument("--categorize", action="store_true", help="Only show file categorization")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.categorize:
|
|
sync = DocSync()
|
|
if DOCS_DIR.exists():
|
|
for f in sorted(DOCS_DIR.glob("*.md")):
|
|
domain = sync.categorize_md_file(f)
|
|
print(f" {domain:12} {f.name}")
|
|
else:
|
|
exit(run_migration(dry_run=not args.execute, verbose=args.verbose))
|