Files
luzia/lib/chat_kg_lookup.py
admin ec33ac1936 Refactor cockpit to use DockerTmuxController pattern
Based on claude-code-tools TmuxCLIController, this refactor:

- Added DockerTmuxController class for robust tmux session management
- Implements send_keys() with configurable delay_enter
- Implements capture_pane() for output retrieval
- Implements wait_for_prompt() for pattern-based completion detection
- Implements wait_for_idle() for content-hash-based idle detection
- Implements wait_for_shell_prompt() for shell prompt detection

Also includes workflow improvements:
- Pre-task git snapshot before agent execution
- Post-task commit protocol in agent guidelines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:42:16 -03:00

256 lines
8.2 KiB
Python

#!/usr/bin/env python3
"""
Chat KG Lookup - Fast SQLite-based knowledge graph queries
Provides sub-200ms responses for common KG queries
"""
import sqlite3
import time
from pathlib import Path
from typing import List, Dict, Optional
import re
class ChatKGLookup:
"""Direct SQLite queries to KG databases for chat interface"""
KG_PATHS = {
'sysadmin': Path('/etc/luz-knowledge/sysadmin.db'),
'projects': Path('/etc/luz-knowledge/projects.db'),
'users': Path('/etc/luz-knowledge/users.db'),
'research': Path('/etc/luz-knowledge/research.db'),
}
def __init__(self, timeout_ms: int = 200):
"""Initialize with query timeout"""
self.timeout_ms = timeout_ms
self.timeout_seconds = timeout_ms / 1000.0
def search_all_domains(self, query: str, limit: int = 10) -> Dict:
"""Search query across all KG domains"""
results = {
'query': query,
'domains': {},
'total_hits': 0,
'execution_time_ms': 0
}
start_time = time.time()
for domain, db_path in self.KG_PATHS.items():
if not db_path.exists():
continue
try:
domain_results = self._search_domain(domain, db_path, query, limit)
results['domains'][domain] = domain_results
results['total_hits'] += len(domain_results.get('entities', []))
except Exception as e:
results['domains'][domain] = {'error': str(e), 'entities': []}
# Check timeout
elapsed = (time.time() - start_time) * 1000
if elapsed > self.timeout_ms:
results['timeout'] = True
break
results['execution_time_ms'] = round((time.time() - start_time) * 1000, 2)
return results
def _search_domain(self, domain: str, db_path: Path, query: str, limit: int) -> Dict:
"""Search single KG domain"""
try:
conn = sqlite3.connect(str(db_path), timeout=self.timeout_seconds)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Try FTS5 first
try:
cursor.execute(
"SELECT id, name, type FROM entities_fts WHERE entities_fts MATCH ? LIMIT ?",
(f'"{query}"*', limit)
)
rows = cursor.fetchall()
except sqlite3.OperationalError:
# Fallback to LIKE search
cursor.execute(
"SELECT id, name, type FROM entities WHERE name LIKE ? OR description LIKE ? LIMIT ?",
(f'%{query}%', f'%{query}%', limit)
)
rows = cursor.fetchall()
entities = [
{
'id': row['id'],
'name': row['name'],
'type': row['type']
}
for row in rows
]
conn.close()
return {'entities': entities, 'count': len(entities)}
except Exception as e:
return {'error': str(e), 'entities': []}
def get_entity_details(self, entity_id: str, domain: Optional[str] = None) -> Dict:
"""Get detailed information about an entity"""
if domain and domain in self.KG_PATHS:
domains_to_check = [domain]
else:
domains_to_check = list(self.KG_PATHS.keys())
for domain in domains_to_check:
db_path = self.KG_PATHS[domain]
if not db_path.exists():
continue
try:
conn = sqlite3.connect(str(db_path), timeout=self.timeout_seconds)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Get entity
cursor.execute(
"SELECT id, name, type, description FROM entities WHERE id = ?",
(entity_id,)
)
entity_row = cursor.fetchone()
if not entity_row:
continue
entity = {
'id': entity_row['id'],
'name': entity_row['name'],
'type': entity_row['type'],
'description': entity_row['description'],
'domain': domain
}
# Get observations
cursor.execute(
"SELECT content FROM observations WHERE entity_id = ? LIMIT 5",
(entity_id,)
)
entity['observations'] = [row['content'] for row in cursor.fetchall()]
# Get relations
cursor.execute(
"SELECT from_entity_id, to_entity_id, relation_type FROM relations WHERE from_entity_id = ? OR to_entity_id = ? LIMIT 10",
(entity_id, entity_id)
)
entity['relations'] = [
{
'from': row['from_entity_id'],
'to': row['to_entity_id'],
'type': row['relation_type']
}
for row in cursor.fetchall()
]
conn.close()
return entity
except Exception as e:
continue
return {'error': f'Entity {entity_id} not found'}
def get_entities_by_type(self, entity_type: str, limit: int = 10, domain: Optional[str] = None) -> Dict:
"""Get all entities of a specific type"""
if domain and domain in self.KG_PATHS:
domains_to_check = [domain]
else:
domains_to_check = list(self.KG_PATHS.keys())
results = {
'type': entity_type,
'results': [],
'domains_checked': 0
}
for domain in domains_to_check:
db_path = self.KG_PATHS[domain]
if not db_path.exists():
continue
try:
conn = sqlite3.connect(str(db_path), timeout=self.timeout_seconds)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(
"SELECT id, name, type FROM entities WHERE type = ? LIMIT ?",
(entity_type, limit)
)
for row in cursor.fetchall():
results['results'].append({
'id': row['id'],
'name': row['name'],
'domain': domain
})
results['domains_checked'] += 1
conn.close()
except Exception:
continue
return results
def get_kg_statistics(self) -> Dict:
"""Get statistics about KG databases"""
stats = {
'domains': {},
'total_entities': 0,
'total_relations': 0
}
for domain, db_path in self.KG_PATHS.items():
if not db_path.exists():
stats['domains'][domain] = {'available': False}
continue
try:
conn = sqlite3.connect(str(db_path), timeout=self.timeout_seconds)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM entities")
entity_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM relations")
relation_count = cursor.fetchone()[0]
stats['domains'][domain] = {
'available': True,
'entities': entity_count,
'relations': relation_count
}
stats['total_entities'] += entity_count
stats['total_relations'] += relation_count
conn.close()
except Exception as e:
stats['domains'][domain] = {'available': False, 'error': str(e)}
return stats
if __name__ == '__main__':
import json
lookup = ChatKGLookup()
# Test searches
print("KG Statistics:")
print(json.dumps(lookup.get_kg_statistics(), indent=2))
print()
print("Search 'admin':")
results = lookup.search_all_domains('admin', limit=5)
print(json.dumps(results, indent=2, default=str))