#!/usr/bin/env python3 """ Chat KG Lookup - Fast SQLite-based knowledge graph queries Provides sub-200ms responses for common KG queries """ import sqlite3 import time from pathlib import Path from typing import List, Dict, Optional import re class ChatKGLookup: """Direct SQLite queries to KG databases for chat interface""" KG_PATHS = { 'sysadmin': Path('/etc/luz-knowledge/sysadmin.db'), 'projects': Path('/etc/luz-knowledge/projects.db'), 'users': Path('/etc/luz-knowledge/users.db'), 'research': Path('/etc/luz-knowledge/research.db'), } def __init__(self, timeout_ms: int = 200): """Initialize with query timeout""" self.timeout_ms = timeout_ms self.timeout_seconds = timeout_ms / 1000.0 def search_all_domains(self, query: str, limit: int = 10) -> Dict: """Search query across all KG domains""" results = { 'query': query, 'domains': {}, 'total_hits': 0, 'execution_time_ms': 0 } start_time = time.time() for domain, db_path in self.KG_PATHS.items(): if not db_path.exists(): continue try: domain_results = self._search_domain(domain, db_path, query, limit) results['domains'][domain] = domain_results results['total_hits'] += len(domain_results.get('entities', [])) except Exception as e: results['domains'][domain] = {'error': str(e), 'entities': []} # Check timeout elapsed = (time.time() - start_time) * 1000 if elapsed > self.timeout_ms: results['timeout'] = True break results['execution_time_ms'] = round((time.time() - start_time) * 1000, 2) return results def _search_domain(self, domain: str, db_path: Path, query: str, limit: int) -> Dict: """Search single KG domain""" try: conn = sqlite3.connect(str(db_path), timeout=self.timeout_seconds) conn.row_factory = sqlite3.Row cursor = conn.cursor() # Try FTS5 first try: cursor.execute( "SELECT id, name, type FROM entities_fts WHERE entities_fts MATCH ? LIMIT ?", (f'"{query}"*', limit) ) rows = cursor.fetchall() except sqlite3.OperationalError: # Fallback to LIKE search cursor.execute( "SELECT id, name, type FROM entities WHERE name LIKE ? OR description LIKE ? LIMIT ?", (f'%{query}%', f'%{query}%', limit) ) rows = cursor.fetchall() entities = [ { 'id': row['id'], 'name': row['name'], 'type': row['type'] } for row in rows ] conn.close() return {'entities': entities, 'count': len(entities)} except Exception as e: return {'error': str(e), 'entities': []} def get_entity_details(self, entity_id: str, domain: Optional[str] = None) -> Dict: """Get detailed information about an entity""" if domain and domain in self.KG_PATHS: domains_to_check = [domain] else: domains_to_check = list(self.KG_PATHS.keys()) for domain in domains_to_check: db_path = self.KG_PATHS[domain] if not db_path.exists(): continue try: conn = sqlite3.connect(str(db_path), timeout=self.timeout_seconds) conn.row_factory = sqlite3.Row cursor = conn.cursor() # Get entity cursor.execute( "SELECT id, name, type, description FROM entities WHERE id = ?", (entity_id,) ) entity_row = cursor.fetchone() if not entity_row: continue entity = { 'id': entity_row['id'], 'name': entity_row['name'], 'type': entity_row['type'], 'description': entity_row['description'], 'domain': domain } # Get observations cursor.execute( "SELECT content FROM observations WHERE entity_id = ? LIMIT 5", (entity_id,) ) entity['observations'] = [row['content'] for row in cursor.fetchall()] # Get relations cursor.execute( "SELECT from_entity_id, to_entity_id, relation_type FROM relations WHERE from_entity_id = ? OR to_entity_id = ? LIMIT 10", (entity_id, entity_id) ) entity['relations'] = [ { 'from': row['from_entity_id'], 'to': row['to_entity_id'], 'type': row['relation_type'] } for row in cursor.fetchall() ] conn.close() return entity except Exception as e: continue return {'error': f'Entity {entity_id} not found'} def get_entities_by_type(self, entity_type: str, limit: int = 10, domain: Optional[str] = None) -> Dict: """Get all entities of a specific type""" if domain and domain in self.KG_PATHS: domains_to_check = [domain] else: domains_to_check = list(self.KG_PATHS.keys()) results = { 'type': entity_type, 'results': [], 'domains_checked': 0 } for domain in domains_to_check: db_path = self.KG_PATHS[domain] if not db_path.exists(): continue try: conn = sqlite3.connect(str(db_path), timeout=self.timeout_seconds) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute( "SELECT id, name, type FROM entities WHERE type = ? LIMIT ?", (entity_type, limit) ) for row in cursor.fetchall(): results['results'].append({ 'id': row['id'], 'name': row['name'], 'domain': domain }) results['domains_checked'] += 1 conn.close() except Exception: continue return results def get_kg_statistics(self) -> Dict: """Get statistics about KG databases""" stats = { 'domains': {}, 'total_entities': 0, 'total_relations': 0 } for domain, db_path in self.KG_PATHS.items(): if not db_path.exists(): stats['domains'][domain] = {'available': False} continue try: conn = sqlite3.connect(str(db_path), timeout=self.timeout_seconds) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM entities") entity_count = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM relations") relation_count = cursor.fetchone()[0] stats['domains'][domain] = { 'available': True, 'entities': entity_count, 'relations': relation_count } stats['total_entities'] += entity_count stats['total_relations'] += relation_count conn.close() except Exception as e: stats['domains'][domain] = {'available': False, 'error': str(e)} return stats if __name__ == '__main__': import json lookup = ChatKGLookup() # Test searches print("KG Statistics:") print(json.dumps(lookup.get_kg_statistics(), indent=2)) print() print("Search 'admin':") results = lookup.search_all_domains('admin', limit=5) print(json.dumps(results, indent=2, default=str))