Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
368 lines
13 KiB
Python
368 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Knowledge Graph Pattern Detector
|
|
|
|
Identifies incomplete research sessions in both the KG database and file system.
|
|
Detects 4 pattern types:
|
|
1. Unresolved questions - content ends with user decision point
|
|
2. Minimal duration - sessions <5min with no findings
|
|
3. Claude indicators without conclusions - Assistant responses missing synthesis
|
|
4. Markdown files - incomplete research in file system
|
|
"""
|
|
|
|
import re
|
|
import time
|
|
import json
|
|
from sqlite3 import connect as sqlite_connect
|
|
from pathlib import Path
|
|
from typing import List, Dict, Tuple
|
|
|
|
|
|
class KGPatternDetector:
|
|
"""Detect incomplete research patterns in KG databases and file system."""
|
|
|
|
# Question patterns indicating unresolved state
|
|
UNRESOLVED_QUESTION_PATTERNS = [
|
|
r'what\s+(?:do\s+you|should\s+we|would\s+you)',
|
|
r'which\s+(?:approach|method|option)',
|
|
r'should\s+we',
|
|
r'please\s+choose',
|
|
r'your\s+(?:thoughts|preference|opinion)',
|
|
r'(?:any|what)\s+thoughts',
|
|
r'(?:how|what)\s+would\s+you',
|
|
r'would\s+you\s+(?:recommend|prefer)',
|
|
r'what\'s\s+your',
|
|
r'do\s+you\s+(?:think|agree)',
|
|
]
|
|
|
|
# Claude writing indicators
|
|
CLAUDE_INDICATORS = [
|
|
r'^assistant:',
|
|
r'^i\'ll\s+(?:analyze|review|help|create|implement)',
|
|
r'^let\s+me\s+(?:analyze|review|help)',
|
|
r'^here\'s',
|
|
r'^based\s+on',
|
|
r'^to\s+summarize',
|
|
r'^in\s+summary',
|
|
r'^the\s+key\s+(?:findings|points)',
|
|
]
|
|
|
|
# Conclusion/synthesis indicators
|
|
CONCLUSION_PATTERNS = [
|
|
r'conclusion:',
|
|
r'findings?:',
|
|
r'recommendation:',
|
|
r'summary:',
|
|
r'synthesis:',
|
|
r'takeaway:',
|
|
r'next\s+steps?:',
|
|
r'action\s+items?:',
|
|
]
|
|
|
|
def __init__(self):
|
|
"""Initialize the pattern detector."""
|
|
self.findings: List[Dict] = []
|
|
|
|
def find_incomplete_research_kg(self, db_path: str, time_scope_days: int = 30) -> List[Dict]:
|
|
"""
|
|
Find incomplete research sessions in KG database.
|
|
|
|
Args:
|
|
db_path: Path to research.db SQLite database
|
|
time_scope_days: Only examine sessions modified in last N days
|
|
|
|
Returns:
|
|
List of finding dicts with source, id, name, pattern, severity, example
|
|
"""
|
|
findings = []
|
|
cutoff_time = time.time() - (time_scope_days * 86400)
|
|
|
|
if not Path(db_path).exists():
|
|
return findings
|
|
|
|
try:
|
|
with sqlite_connect(db_path) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Query research entities from last 30 days
|
|
cursor.execute("""
|
|
SELECT id, name, content, created_at, updated_at
|
|
FROM entities
|
|
WHERE type = 'session' AND domain = 'research'
|
|
AND updated_at > ?
|
|
ORDER BY updated_at DESC
|
|
""", (cutoff_time,))
|
|
|
|
for row in cursor.fetchall():
|
|
entity_id, name, content, created, updated = row
|
|
|
|
# Skip empty content
|
|
if not content or not isinstance(content, str):
|
|
continue
|
|
|
|
duration_secs = int(updated - created)
|
|
|
|
# Pattern 1: Unresolved questions
|
|
if self._has_unresolved_question(content):
|
|
findings.append({
|
|
'source': 'kg_database',
|
|
'id': entity_id,
|
|
'name': name,
|
|
'pattern': 'unresolved_question',
|
|
'duration_secs': duration_secs,
|
|
'severity': 'high',
|
|
'example': self._extract_ending(content, 300),
|
|
'timestamp': updated,
|
|
'db_path': db_path
|
|
})
|
|
|
|
# Pattern 2: Minimal duration with no findings
|
|
if duration_secs < 300 and not self._has_findings(content):
|
|
findings.append({
|
|
'source': 'kg_database',
|
|
'id': entity_id,
|
|
'name': name,
|
|
'pattern': 'incomplete_duration',
|
|
'duration_secs': duration_secs,
|
|
'severity': 'medium',
|
|
'example': content[:300],
|
|
'timestamp': updated,
|
|
'db_path': db_path
|
|
})
|
|
|
|
# Pattern 3: Claude indicators without conclusions
|
|
if self._has_claude_indicators(content) and not self._has_conclusions(content):
|
|
findings.append({
|
|
'source': 'kg_database',
|
|
'id': entity_id,
|
|
'name': name,
|
|
'pattern': 'claude_no_conclusion',
|
|
'duration_secs': duration_secs,
|
|
'severity': 'high',
|
|
'example': self._extract_ending(content, 300),
|
|
'timestamp': updated,
|
|
'db_path': db_path
|
|
})
|
|
|
|
except Exception as e:
|
|
print(f"Error querying KG database {db_path}: {e}")
|
|
|
|
return findings
|
|
|
|
def find_incomplete_research_files(self, time_scope_days: int = 30) -> List[Dict]:
|
|
"""
|
|
Find incomplete research in markdown and JSON files.
|
|
|
|
Args:
|
|
time_scope_days: Only examine files modified in last N days
|
|
|
|
Returns:
|
|
List of finding dicts with source, path, pattern, severity, example
|
|
"""
|
|
findings = []
|
|
cutoff_time = time.time() - (time_scope_days * 86400)
|
|
|
|
# Search in relevant directories
|
|
search_dirs = [
|
|
Path('/home/admin'),
|
|
Path('/home/admin/conductor'),
|
|
Path('/opt/server-agents/state'),
|
|
]
|
|
|
|
# File patterns that might contain research
|
|
file_patterns = [
|
|
'**/*research*.md',
|
|
'**/*findings*.md',
|
|
'**/*analysis*.md',
|
|
'**/*research*.json',
|
|
'**/*incomplete*.md',
|
|
'**/*session*.json',
|
|
]
|
|
|
|
for search_dir in search_dirs:
|
|
if not search_dir.exists():
|
|
continue
|
|
|
|
for file_pattern in file_patterns:
|
|
try:
|
|
for file_path in search_dir.glob(file_pattern):
|
|
# Skip if too old
|
|
mtime = file_path.stat().st_mtime
|
|
if mtime < cutoff_time:
|
|
continue
|
|
|
|
try:
|
|
content = file_path.read_text(errors='ignore')
|
|
|
|
# Skip very small files (likely noise)
|
|
if len(content) < 100:
|
|
continue
|
|
|
|
# Pattern 1: Unresolved questions
|
|
if self._has_unresolved_question(content):
|
|
findings.append({
|
|
'source': 'file',
|
|
'path': str(file_path),
|
|
'pattern': 'unresolved_question',
|
|
'severity': 'high',
|
|
'example': self._extract_ending(content, 300),
|
|
'timestamp': mtime
|
|
})
|
|
|
|
# Pattern 3: Claude indicators without conclusions
|
|
if self._has_claude_indicators(content) and not self._has_conclusions(content):
|
|
findings.append({
|
|
'source': 'file',
|
|
'path': str(file_path),
|
|
'pattern': 'claude_no_conclusion',
|
|
'severity': 'high',
|
|
'example': self._extract_ending(content, 300),
|
|
'timestamp': mtime
|
|
})
|
|
|
|
except Exception as e:
|
|
# Silently skip files that can't be read
|
|
pass
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
return findings
|
|
|
|
def find_all_incomplete_research(self,
|
|
kg_db_paths: List[str] = None,
|
|
time_scope_days: int = 30) -> Dict:
|
|
"""
|
|
Comprehensive incomplete research discovery across KG and files.
|
|
|
|
Args:
|
|
kg_db_paths: List of KG database paths to check. Defaults to standard locations.
|
|
time_scope_days: Time scope for search in days
|
|
|
|
Returns:
|
|
Dict with:
|
|
- 'findings': List of all findings
|
|
- 'summary': Dict with counts by pattern and severity
|
|
- 'timestamp': When scan was performed
|
|
"""
|
|
if kg_db_paths is None:
|
|
kg_db_paths = [
|
|
'/etc/luz-knowledge/research.db',
|
|
'/etc/zen-swarm/memory/research.db',
|
|
]
|
|
|
|
all_findings = []
|
|
|
|
# Scan KG databases
|
|
for db_path in kg_db_paths:
|
|
kg_findings = self.find_incomplete_research_kg(db_path, time_scope_days)
|
|
all_findings.extend(kg_findings)
|
|
|
|
# Scan file system
|
|
file_findings = self.find_incomplete_research_files(time_scope_days)
|
|
all_findings.extend(file_findings)
|
|
|
|
# Generate summary statistics
|
|
summary = self._generate_summary(all_findings)
|
|
|
|
return {
|
|
'findings': all_findings,
|
|
'summary': summary,
|
|
'timestamp': time.time(),
|
|
'time_scope_days': time_scope_days
|
|
}
|
|
|
|
def _has_unresolved_question(self, content: str) -> bool:
|
|
"""Check if content contains unresolved user decision points."""
|
|
if not content:
|
|
return False
|
|
|
|
# Check if ends with question-like pattern
|
|
for pattern in self.UNRESOLVED_QUESTION_PATTERNS:
|
|
if re.search(pattern, content, re.IGNORECASE | re.MULTILINE):
|
|
# Make sure there's no resolution after the question
|
|
if not re.search(r'(conclusion|resolution|decision made|will do|here\'s|approved)',
|
|
content[-200:], re.IGNORECASE):
|
|
return True
|
|
|
|
return False
|
|
|
|
def _has_findings(self, content: str) -> bool:
|
|
"""Check if content contains synthesis/findings/analysis section."""
|
|
if not content:
|
|
return False
|
|
|
|
return re.search(
|
|
r'(finding|synthesis|analysis|conclusion|recommendation)',
|
|
content,
|
|
re.IGNORECASE
|
|
) is not None
|
|
|
|
def _has_claude_indicators(self, content: str) -> bool:
|
|
"""Check if content contains Claude-style writing indicators."""
|
|
if not content:
|
|
return False
|
|
|
|
for pattern in self.CLAUDE_INDICATORS:
|
|
if re.search(pattern, content, re.MULTILINE):
|
|
return True
|
|
|
|
return False
|
|
|
|
def _has_conclusions(self, content: str) -> bool:
|
|
"""Check if content contains conclusion/synthesis indicators."""
|
|
if not content:
|
|
return False
|
|
|
|
return re.search(
|
|
r'|'.join(self.CONCLUSION_PATTERNS),
|
|
content,
|
|
re.IGNORECASE
|
|
) is not None
|
|
|
|
def _extract_ending(self, content: str, length: int) -> str:
|
|
"""Extract last N characters of content (the most relevant part)."""
|
|
if not content:
|
|
return ""
|
|
return content[-length:] if len(content) > length else content
|
|
|
|
def _generate_summary(self, findings: List[Dict]) -> Dict:
|
|
"""Generate statistics about findings."""
|
|
summary = {
|
|
'total': len(findings),
|
|
'by_pattern': {},
|
|
'by_severity': {},
|
|
'by_source': {}
|
|
}
|
|
|
|
for finding in findings:
|
|
pattern = finding.get('pattern', 'unknown')
|
|
severity = finding.get('severity', 'unknown')
|
|
source = finding.get('source', 'unknown')
|
|
|
|
summary['by_pattern'][pattern] = summary['by_pattern'].get(pattern, 0) + 1
|
|
summary['by_severity'][severity] = summary['by_severity'].get(severity, 0) + 1
|
|
summary['by_source'][source] = summary['by_source'].get(source, 0) + 1
|
|
|
|
return summary
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Quick test
|
|
detector = KGPatternDetector()
|
|
result = detector.find_all_incomplete_research(time_scope_days=30)
|
|
|
|
print(f"Found {result['summary']['total']} incomplete research sessions")
|
|
print(f"Summary by pattern: {result['summary']['by_pattern']}")
|
|
print(f"Summary by severity: {result['summary']['by_severity']}")
|
|
|
|
# Show first few findings
|
|
for finding in result['findings'][:5]:
|
|
print(f"\n[{finding['severity'].upper()}] {finding['pattern']}")
|
|
print(f" Source: {finding['source']}")
|
|
if 'name' in finding:
|
|
print(f" Name: {finding['name']}")
|
|
if 'path' in finding:
|
|
print(f" Path: {finding['path']}")
|
|
print(f" Example: {finding['example'][:100]}...")
|