#!/usr/bin/env python3 """ Knowledge Graph Pattern Detector Identifies incomplete research sessions in both the KG database and file system. Detects 4 pattern types: 1. Unresolved questions - content ends with user decision point 2. Minimal duration - sessions <5min with no findings 3. Claude indicators without conclusions - Assistant responses missing synthesis 4. Markdown files - incomplete research in file system """ import re import time import json from sqlite3 import connect as sqlite_connect from pathlib import Path from typing import List, Dict, Tuple class KGPatternDetector: """Detect incomplete research patterns in KG databases and file system.""" # Question patterns indicating unresolved state UNRESOLVED_QUESTION_PATTERNS = [ r'what\s+(?:do\s+you|should\s+we|would\s+you)', r'which\s+(?:approach|method|option)', r'should\s+we', r'please\s+choose', r'your\s+(?:thoughts|preference|opinion)', r'(?:any|what)\s+thoughts', r'(?:how|what)\s+would\s+you', r'would\s+you\s+(?:recommend|prefer)', r'what\'s\s+your', r'do\s+you\s+(?:think|agree)', ] # Claude writing indicators CLAUDE_INDICATORS = [ r'^assistant:', r'^i\'ll\s+(?:analyze|review|help|create|implement)', r'^let\s+me\s+(?:analyze|review|help)', r'^here\'s', r'^based\s+on', r'^to\s+summarize', r'^in\s+summary', r'^the\s+key\s+(?:findings|points)', ] # Conclusion/synthesis indicators CONCLUSION_PATTERNS = [ r'conclusion:', r'findings?:', r'recommendation:', r'summary:', r'synthesis:', r'takeaway:', r'next\s+steps?:', r'action\s+items?:', ] def __init__(self): """Initialize the pattern detector.""" self.findings: List[Dict] = [] def find_incomplete_research_kg(self, db_path: str, time_scope_days: int = 30) -> List[Dict]: """ Find incomplete research sessions in KG database. Args: db_path: Path to research.db SQLite database time_scope_days: Only examine sessions modified in last N days Returns: List of finding dicts with source, id, name, pattern, severity, example """ findings = [] cutoff_time = time.time() - (time_scope_days * 86400) if not Path(db_path).exists(): return findings try: with sqlite_connect(db_path) as conn: cursor = conn.cursor() # Query research entities from last 30 days cursor.execute(""" SELECT id, name, content, created_at, updated_at FROM entities WHERE type = 'session' AND domain = 'research' AND updated_at > ? ORDER BY updated_at DESC """, (cutoff_time,)) for row in cursor.fetchall(): entity_id, name, content, created, updated = row # Skip empty content if not content or not isinstance(content, str): continue duration_secs = int(updated - created) # Pattern 1: Unresolved questions if self._has_unresolved_question(content): findings.append({ 'source': 'kg_database', 'id': entity_id, 'name': name, 'pattern': 'unresolved_question', 'duration_secs': duration_secs, 'severity': 'high', 'example': self._extract_ending(content, 300), 'timestamp': updated, 'db_path': db_path }) # Pattern 2: Minimal duration with no findings if duration_secs < 300 and not self._has_findings(content): findings.append({ 'source': 'kg_database', 'id': entity_id, 'name': name, 'pattern': 'incomplete_duration', 'duration_secs': duration_secs, 'severity': 'medium', 'example': content[:300], 'timestamp': updated, 'db_path': db_path }) # Pattern 3: Claude indicators without conclusions if self._has_claude_indicators(content) and not self._has_conclusions(content): findings.append({ 'source': 'kg_database', 'id': entity_id, 'name': name, 'pattern': 'claude_no_conclusion', 'duration_secs': duration_secs, 'severity': 'high', 'example': self._extract_ending(content, 300), 'timestamp': updated, 'db_path': db_path }) except Exception as e: print(f"Error querying KG database {db_path}: {e}") return findings def find_incomplete_research_files(self, time_scope_days: int = 30) -> List[Dict]: """ Find incomplete research in markdown and JSON files. Args: time_scope_days: Only examine files modified in last N days Returns: List of finding dicts with source, path, pattern, severity, example """ findings = [] cutoff_time = time.time() - (time_scope_days * 86400) # Search in relevant directories search_dirs = [ Path('/home/admin'), Path('/home/admin/conductor'), Path('/opt/server-agents/state'), ] # File patterns that might contain research file_patterns = [ '**/*research*.md', '**/*findings*.md', '**/*analysis*.md', '**/*research*.json', '**/*incomplete*.md', '**/*session*.json', ] for search_dir in search_dirs: if not search_dir.exists(): continue for file_pattern in file_patterns: try: for file_path in search_dir.glob(file_pattern): # Skip if too old mtime = file_path.stat().st_mtime if mtime < cutoff_time: continue try: content = file_path.read_text(errors='ignore') # Skip very small files (likely noise) if len(content) < 100: continue # Pattern 1: Unresolved questions if self._has_unresolved_question(content): findings.append({ 'source': 'file', 'path': str(file_path), 'pattern': 'unresolved_question', 'severity': 'high', 'example': self._extract_ending(content, 300), 'timestamp': mtime }) # Pattern 3: Claude indicators without conclusions if self._has_claude_indicators(content) and not self._has_conclusions(content): findings.append({ 'source': 'file', 'path': str(file_path), 'pattern': 'claude_no_conclusion', 'severity': 'high', 'example': self._extract_ending(content, 300), 'timestamp': mtime }) except Exception as e: # Silently skip files that can't be read pass except Exception: pass return findings def find_all_incomplete_research(self, kg_db_paths: List[str] = None, time_scope_days: int = 30) -> Dict: """ Comprehensive incomplete research discovery across KG and files. Args: kg_db_paths: List of KG database paths to check. Defaults to standard locations. time_scope_days: Time scope for search in days Returns: Dict with: - 'findings': List of all findings - 'summary': Dict with counts by pattern and severity - 'timestamp': When scan was performed """ if kg_db_paths is None: kg_db_paths = [ '/etc/luz-knowledge/research.db', '/etc/zen-swarm/memory/research.db', ] all_findings = [] # Scan KG databases for db_path in kg_db_paths: kg_findings = self.find_incomplete_research_kg(db_path, time_scope_days) all_findings.extend(kg_findings) # Scan file system file_findings = self.find_incomplete_research_files(time_scope_days) all_findings.extend(file_findings) # Generate summary statistics summary = self._generate_summary(all_findings) return { 'findings': all_findings, 'summary': summary, 'timestamp': time.time(), 'time_scope_days': time_scope_days } def _has_unresolved_question(self, content: str) -> bool: """Check if content contains unresolved user decision points.""" if not content: return False # Check if ends with question-like pattern for pattern in self.UNRESOLVED_QUESTION_PATTERNS: if re.search(pattern, content, re.IGNORECASE | re.MULTILINE): # Make sure there's no resolution after the question if not re.search(r'(conclusion|resolution|decision made|will do|here\'s|approved)', content[-200:], re.IGNORECASE): return True return False def _has_findings(self, content: str) -> bool: """Check if content contains synthesis/findings/analysis section.""" if not content: return False return re.search( r'(finding|synthesis|analysis|conclusion|recommendation)', content, re.IGNORECASE ) is not None def _has_claude_indicators(self, content: str) -> bool: """Check if content contains Claude-style writing indicators.""" if not content: return False for pattern in self.CLAUDE_INDICATORS: if re.search(pattern, content, re.MULTILINE): return True return False def _has_conclusions(self, content: str) -> bool: """Check if content contains conclusion/synthesis indicators.""" if not content: return False return re.search( r'|'.join(self.CONCLUSION_PATTERNS), content, re.IGNORECASE ) is not None def _extract_ending(self, content: str, length: int) -> str: """Extract last N characters of content (the most relevant part).""" if not content: return "" return content[-length:] if len(content) > length else content def _generate_summary(self, findings: List[Dict]) -> Dict: """Generate statistics about findings.""" summary = { 'total': len(findings), 'by_pattern': {}, 'by_severity': {}, 'by_source': {} } for finding in findings: pattern = finding.get('pattern', 'unknown') severity = finding.get('severity', 'unknown') source = finding.get('source', 'unknown') summary['by_pattern'][pattern] = summary['by_pattern'].get(pattern, 0) + 1 summary['by_severity'][severity] = summary['by_severity'].get(severity, 0) + 1 summary['by_source'][source] = summary['by_source'].get(source, 0) + 1 return summary if __name__ == '__main__': # Quick test detector = KGPatternDetector() result = detector.find_all_incomplete_research(time_scope_days=30) print(f"Found {result['summary']['total']} incomplete research sessions") print(f"Summary by pattern: {result['summary']['by_pattern']}") print(f"Summary by severity: {result['summary']['by_severity']}") # Show first few findings for finding in result['findings'][:5]: print(f"\n[{finding['severity'].upper()}] {finding['pattern']}") print(f" Source: {finding['source']}") if 'name' in finding: print(f" Name: {finding['name']}") if 'path' in finding: print(f" Path: {finding['path']}") print(f" Example: {finding['example'][:100]}...")