Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
373 lines
13 KiB
Python
373 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Skill & Documentation Usage Analyzer for Luzia
|
|
|
|
Provides comprehensive analysis of:
|
|
1. Which skills are being used during task dispatch
|
|
2. Documentation file access patterns
|
|
3. Usage trends and statistics
|
|
4. Skill-to-documentation relationships
|
|
5. Project-specific skill usage
|
|
|
|
This tool reads from:
|
|
- Queue entries: /var/lib/luzia/queue/pending/
|
|
- Job metadata: /var/log/luz-orchestrator/jobs/
|
|
- Knowledge graph databases: /etc/luz-knowledge/
|
|
"""
|
|
|
|
import json
|
|
import sqlite3
|
|
import os
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
from collections import defaultdict, Counter
|
|
import re
|
|
|
|
|
|
class SkillUsageAnalyzer:
|
|
"""Analyze skill and documentation usage patterns."""
|
|
|
|
QUEUE_BASE = Path("/var/lib/luzia/queue")
|
|
JOB_LOG_BASE = Path("/var/log/luz-orchestrator/jobs")
|
|
KG_BASE = Path("/etc/luz-knowledge")
|
|
|
|
CLAUDE_DEV_KEYWORDS = {
|
|
'skill': 'claude_dev',
|
|
'plugin': 'claude_dev',
|
|
'command': 'claude_dev',
|
|
'mcp': 'claude_dev',
|
|
'hook': 'claude_dev',
|
|
'slash': 'claude_dev',
|
|
'claude code': 'claude_dev',
|
|
'agent': 'agent_framework',
|
|
'tool': 'tool_framework',
|
|
'integration': 'integration',
|
|
'custom command': 'claude_dev',
|
|
'.claude': 'claude_config',
|
|
'slash command': 'claude_dev',
|
|
'skill file': 'claude_dev',
|
|
'skill library': 'claude_dev',
|
|
'tool specification': 'tool_spec',
|
|
'mcp server': 'mcp',
|
|
'mcp config': 'mcp',
|
|
'anthropic': 'anthropic_api',
|
|
'claude-code': 'claude_dev',
|
|
}
|
|
|
|
def __init__(self):
|
|
self.skills_detected = defaultdict(int)
|
|
self.doc_references = defaultdict(int)
|
|
self.project_skill_distribution = defaultdict(lambda: defaultdict(int))
|
|
self.job_metadata = []
|
|
self.queue_entries = []
|
|
|
|
def analyze_queue_entries(self) -> Dict[str, Any]:
|
|
"""Analyze pending queue entries for skill_match fields."""
|
|
result = {
|
|
"total_tasks": 0,
|
|
"tasks_with_skill": 0,
|
|
"skills_found": {},
|
|
"by_project": {},
|
|
"by_priority": {"high": 0, "normal": 0},
|
|
"entries": [],
|
|
}
|
|
|
|
for tier_dir in [self.QUEUE_BASE / "pending" / "high",
|
|
self.QUEUE_BASE / "pending" / "normal"]:
|
|
if not tier_dir.exists():
|
|
continue
|
|
|
|
tier_name = tier_dir.name
|
|
for entry_file in tier_dir.glob("*.json"):
|
|
try:
|
|
entry = json.loads(entry_file.read_text())
|
|
result["total_tasks"] += 1
|
|
result["by_priority"][tier_name] += 1
|
|
|
|
project = entry.get("project", "unknown")
|
|
if project not in result["by_project"]:
|
|
result["by_project"][project] = {"total": 0, "with_skill": 0}
|
|
result["by_project"][project]["total"] += 1
|
|
|
|
skill = entry.get("skill_match")
|
|
if skill:
|
|
result["tasks_with_skill"] += 1
|
|
result["by_project"][project]["with_skill"] += 1
|
|
result["skills_found"][skill] = result["skills_found"].get(skill, 0) + 1
|
|
self.skills_detected[skill] += 1
|
|
|
|
result["entries"].append({
|
|
"id": entry.get("id"),
|
|
"project": project,
|
|
"skill": skill,
|
|
"priority": entry.get("priority"),
|
|
"enqueued_at": entry.get("enqueued_at"),
|
|
})
|
|
|
|
except (json.JSONDecodeError, IOError):
|
|
pass
|
|
|
|
return result
|
|
|
|
def analyze_job_metadata(self, hours: int = 24) -> Dict[str, Any]:
|
|
"""Analyze job metadata for skill usage patterns."""
|
|
result = {
|
|
"time_window": f"Last {hours} hours",
|
|
"total_jobs": 0,
|
|
"jobs_with_skill": 0,
|
|
"skills_used": {},
|
|
"debug_mode_tasks": 0,
|
|
"by_project": {},
|
|
"jobs": [],
|
|
}
|
|
|
|
since = datetime.now() - timedelta(hours=hours)
|
|
|
|
if not self.JOB_LOG_BASE.exists():
|
|
return result
|
|
|
|
for job_dir in self.JOB_LOG_BASE.glob("*/meta.json"):
|
|
try:
|
|
meta = json.loads(job_dir.read_text())
|
|
started = datetime.fromisoformat(meta.get("started", ""))
|
|
|
|
if started < since:
|
|
continue
|
|
|
|
result["total_jobs"] += 1
|
|
project = meta.get("project", "unknown")
|
|
|
|
if project not in result["by_project"]:
|
|
result["by_project"][project] = {
|
|
"total": 0,
|
|
"with_skill": 0,
|
|
"debug_mode": 0,
|
|
}
|
|
result["by_project"][project]["total"] += 1
|
|
|
|
skill = meta.get("skill")
|
|
if skill:
|
|
result["jobs_with_skill"] += 1
|
|
result["by_project"][project]["with_skill"] += 1
|
|
result["skills_used"][skill] = result["skills_used"].get(skill, 0) + 1
|
|
self.skills_detected[skill] += 1
|
|
|
|
# Check for debug mode (indicates Claude dev task)
|
|
if meta.get("debug"):
|
|
result["debug_mode_tasks"] += 1
|
|
result["by_project"][project]["debug_mode"] += 1
|
|
|
|
result["jobs"].append({
|
|
"id": meta.get("id"),
|
|
"project": project,
|
|
"task": meta.get("task", "")[:100],
|
|
"skill": skill,
|
|
"started": meta.get("started"),
|
|
"status": meta.get("status"),
|
|
"debug": meta.get("debug", False),
|
|
})
|
|
|
|
except (json.JSONDecodeError, IOError, ValueError):
|
|
pass
|
|
|
|
return result
|
|
|
|
def detect_skills_in_tasks(self) -> Dict[str, List[Dict]]:
|
|
"""Detect skills from task prompts using keyword analysis."""
|
|
result = defaultdict(list)
|
|
|
|
# Analyze queue entries
|
|
if self.QUEUE_BASE.exists():
|
|
for entry_file in (self.QUEUE_BASE / "pending").glob("*/*/*.json"):
|
|
try:
|
|
entry = json.loads(entry_file.read_text())
|
|
prompt = entry.get("prompt", "").lower()
|
|
task_id = entry.get("id", "unknown")
|
|
project = entry.get("project", "unknown")
|
|
|
|
detected = self._detect_keywords(prompt)
|
|
if detected:
|
|
for skill_type in set(detected.values()):
|
|
result[skill_type].append({
|
|
"task_id": task_id,
|
|
"project": project,
|
|
"prompt": entry.get("prompt", "")[:100],
|
|
})
|
|
|
|
except (json.JSONDecodeError, IOError):
|
|
pass
|
|
|
|
return result
|
|
|
|
def _detect_keywords(self, text: str) -> Dict[str, str]:
|
|
"""Detect skill keywords in text."""
|
|
detected = {}
|
|
for keyword, skill_type in self.CLAUDE_DEV_KEYWORDS.items():
|
|
if keyword in text:
|
|
detected[keyword] = skill_type
|
|
return detected
|
|
|
|
def analyze_documentation_usage(self) -> Dict[str, Any]:
|
|
"""Analyze documentation file usage patterns."""
|
|
result = {
|
|
"doc_files": {},
|
|
"doc_references": {},
|
|
"sync_patterns": {},
|
|
}
|
|
|
|
# Check for .md files in project directories
|
|
for doc_file in Path("/opt/server-agents/orchestrator").glob("*.md"):
|
|
stat = doc_file.stat()
|
|
result["doc_files"][doc_file.name] = {
|
|
"size_bytes": stat.st_size,
|
|
"last_modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
|
|
}
|
|
|
|
# Analyze job logs for doc references
|
|
for job_dir in self.JOB_LOG_BASE.glob("*/dialogue/*/"):
|
|
try:
|
|
dialogue_file = job_dir / "agent.md"
|
|
if dialogue_file.exists():
|
|
content = dialogue_file.read_text()
|
|
# Look for doc references
|
|
doc_refs = self._find_doc_references(content)
|
|
for ref in doc_refs:
|
|
result["doc_references"][ref] = result["doc_references"].get(ref, 0) + 1
|
|
except (IOError, OSError):
|
|
pass
|
|
|
|
return result
|
|
|
|
def _find_doc_references(self, text: str) -> List[str]:
|
|
"""Find references to documentation files in text."""
|
|
refs = []
|
|
# Match patterns like [doc_name], .md file references, etc.
|
|
patterns = [
|
|
r'\[([A-Z_\-]+\.md)\]',
|
|
r'([A-Z_\-]+\.md)',
|
|
r'luzia docs (\S+)',
|
|
]
|
|
for pattern in patterns:
|
|
refs.extend(re.findall(pattern, text, re.IGNORECASE))
|
|
return list(set(refs))
|
|
|
|
def get_skill_distribution(self) -> Dict[str, int]:
|
|
"""Get distribution of skills across all tasks."""
|
|
return dict(self.skills_detected)
|
|
|
|
def get_project_skill_usage(self) -> Dict[str, Dict[str, int]]:
|
|
"""Get skill usage breakdown by project."""
|
|
result = {}
|
|
|
|
# Analyze job logs
|
|
for job_dir in self.JOB_LOG_BASE.glob("*/meta.json"):
|
|
try:
|
|
meta = json.loads(job_dir.read_text())
|
|
project = meta.get("project", "unknown")
|
|
skill = meta.get("skill")
|
|
|
|
if skill:
|
|
if project not in result:
|
|
result[project] = {}
|
|
result[project][skill] = result[project].get(skill, 0) + 1
|
|
|
|
except (json.JSONDecodeError, IOError):
|
|
pass
|
|
|
|
return result
|
|
|
|
def generate_report(self) -> Dict[str, Any]:
|
|
"""Generate comprehensive usage report."""
|
|
return {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"queue_analysis": self.analyze_queue_entries(),
|
|
"job_analysis": self.analyze_job_metadata(),
|
|
"skill_detection": self.detect_skills_in_tasks(),
|
|
"doc_analysis": self.analyze_documentation_usage(),
|
|
"skill_distribution": self.get_skill_distribution(),
|
|
"project_skill_usage": self.get_project_skill_usage(),
|
|
"summary": {
|
|
"total_unique_skills": len(self.skills_detected),
|
|
"most_used_skill": max(self.skills_detected, key=self.skills_detected.get)
|
|
if self.skills_detected else None,
|
|
"skill_usage_stats": dict(self.skills_detected),
|
|
}
|
|
}
|
|
|
|
def save_report(self, filepath: str) -> None:
|
|
"""Save report to file."""
|
|
report = self.generate_report()
|
|
with open(filepath, 'w') as f:
|
|
json.dump(report, f, indent=2)
|
|
print(f"Report saved to {filepath}")
|
|
|
|
def print_summary(self) -> None:
|
|
"""Print summary of findings."""
|
|
queue_analysis = self.analyze_queue_entries()
|
|
job_analysis = self.analyze_job_metadata()
|
|
skill_dist = self.get_skill_distribution()
|
|
project_usage = self.get_project_skill_usage()
|
|
|
|
print("\n" + "="*70)
|
|
print("LUZIA SKILL & DOCUMENTATION USAGE REPORT")
|
|
print("="*70)
|
|
|
|
print("\n📋 QUEUE ANALYSIS")
|
|
print(f" Total pending tasks: {queue_analysis['total_tasks']}")
|
|
print(f" Tasks with skill match: {queue_analysis['tasks_with_skill']}")
|
|
print(f" High priority: {queue_analysis['by_priority'].get('high', 0)}")
|
|
print(f" Normal priority: {queue_analysis['by_priority'].get('normal', 0)}")
|
|
|
|
if queue_analysis['skills_found']:
|
|
print(f"\n Skills in queue:")
|
|
for skill, count in queue_analysis['skills_found'].items():
|
|
print(f" - {skill}: {count}")
|
|
|
|
print("\n📊 JOB EXECUTION ANALYSIS (Last 24h)")
|
|
print(f" Total jobs: {job_analysis['total_jobs']}")
|
|
print(f" Jobs with skill: {job_analysis['jobs_with_skill']}")
|
|
print(f" Debug mode tasks: {job_analysis['debug_mode_tasks']}")
|
|
|
|
if job_analysis['skills_used']:
|
|
print(f"\n Skills executed:")
|
|
for skill, count in job_analysis['skills_used'].items():
|
|
print(f" - {skill}: {count}")
|
|
|
|
print("\n📈 PROJECT SKILL DISTRIBUTION")
|
|
for project, skills in project_usage.items():
|
|
print(f" {project}:")
|
|
for skill, count in skills.items():
|
|
print(f" - {skill}: {count}")
|
|
|
|
if skill_dist:
|
|
print("\n🎯 SKILL USAGE STATISTICS")
|
|
total = sum(skill_dist.values())
|
|
for skill, count in sorted(skill_dist.items(), key=lambda x: x[1], reverse=True):
|
|
pct = (count / total * 100) if total > 0 else 0
|
|
print(f" {skill}: {count} ({pct:.1f}%)")
|
|
|
|
print("\n" + "="*70 + "\n")
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
import sys
|
|
|
|
analyzer = SkillUsageAnalyzer()
|
|
|
|
if len(sys.argv) > 1:
|
|
if sys.argv[1] == "json":
|
|
report = analyzer.generate_report()
|
|
print(json.dumps(report, indent=2))
|
|
elif sys.argv[1] == "save" and len(sys.argv) > 2:
|
|
analyzer.save_report(sys.argv[2])
|
|
else:
|
|
analyzer.print_summary()
|
|
else:
|
|
analyzer.print_summary()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|