#!/usr/bin/env python3 """ Luz Knowledge Graph - Centralized documentation storage Four domains: - sysadmin: Server admin docs, commands, procedures - users: User management, permissions, workflows - projects: Project-specific docs, features, APIs - research: Research sessions, findings, sources All use SQLite with FTS5 for full-text search. """ import json import sqlite3 import uuid import time import os import grp import pwd from pathlib import Path from typing import Optional, Dict, List, Any from datetime import datetime # Knowledge graph paths KG_BASE = Path("/etc/luz-knowledge") KG_PATHS = { "sysadmin": KG_BASE / "sysadmin.db", "users": KG_BASE / "users.db", "projects": KG_BASE / "projects.db", "research": KG_BASE / "research.db", } # Entity types per domain ENTITY_TYPES = { "sysadmin": ["command", "service", "config", "procedure", "troubleshooting", "architecture"], "users": ["user_type", "permission", "workflow", "guide", "policy"], "projects": ["project", "feature", "api", "component", "changelog", "config"], "research": ["session", "finding", "source", "synthesis", "query"], } # Relation types RELATION_TYPES = [ "relates_to", # General relation "depends_on", # Dependency "documents", # Documentation link "implements", # Implementation "supersedes", # Replacement "contains", # Parent-child "references", # Cross-reference "triggers", # Causal ] # Access control per domain # Format: {domain: {"read": [users/groups], "write": [users/groups]}} # Special values: "admin" = admin user only, "operators" = operators group, "all" = everyone KG_PERMISSIONS = { "sysadmin": {"read": ["admin"], "write": ["admin"]}, "users": {"read": ["admin"], "write": ["admin"]}, "projects": {"read": ["admin", "operators"], "write": ["admin", "operators"]}, "research": {"read": ["all"], "write": ["all"]}, # All users can write research via Zen } def get_current_user() -> str: """Get current username.""" return pwd.getpwuid(os.getuid()).pw_name def get_user_groups(username: str = None) -> List[str]: """Get groups for a user.""" if username is None: username = get_current_user() try: groups = [g.gr_name for g in grp.getgrall() if username in g.gr_mem] # Add primary group primary_gid = pwd.getpwnam(username).pw_gid primary_group = grp.getgrgid(primary_gid).gr_name if primary_group not in groups: groups.append(primary_group) return groups except KeyError: return [] def check_permission(domain: str, action: str) -> bool: """Check if current user has permission for action on domain. Args: domain: KG domain (sysadmin, users, projects, research) action: "read" or "write" Returns: True if permitted, False otherwise """ if domain not in KG_PERMISSIONS: return False allowed = KG_PERMISSIONS[domain].get(action, []) # "all" means everyone if "all" in allowed: return True username = get_current_user() # Root always has access if username == "root": return True # Check direct user match if username in allowed: return True # Check group membership user_groups = get_user_groups(username) for group in allowed: if group in user_groups: return True return False class KnowledgeGraph: """Knowledge graph operations for a single domain.""" def __init__(self, domain: str, skip_permission_check: bool = False): if domain not in KG_PATHS: raise ValueError(f"Unknown domain: {domain}. Valid: {list(KG_PATHS.keys())}") self.domain = domain self.db_path = KG_PATHS[domain] self._skip_permission_check = skip_permission_check self._ensure_schema() def _check_read(self): """Check read permission.""" if self._skip_permission_check: return if not check_permission(self.domain, "read"): user = get_current_user() raise PermissionError(f"User '{user}' does not have read access to '{self.domain}' KG") def _check_write(self): """Check write permission.""" if self._skip_permission_check: return if not check_permission(self.domain, "write"): user = get_current_user() raise PermissionError(f"User '{user}' does not have write access to '{self.domain}' KG") def _ensure_schema(self): """Create tables if they don't exist.""" KG_BASE.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(self.db_path) c = conn.cursor() # Main entities table c.execute(''' CREATE TABLE IF NOT EXISTS entities ( id TEXT PRIMARY KEY, name TEXT UNIQUE NOT NULL, type TEXT NOT NULL, domain TEXT NOT NULL, content TEXT, metadata TEXT, created_at REAL, updated_at REAL, source TEXT ) ''') # Relations table c.execute(''' CREATE TABLE IF NOT EXISTS relations ( id TEXT PRIMARY KEY, source_id TEXT NOT NULL, target_id TEXT NOT NULL, relation TEXT NOT NULL, context TEXT, weight INTEGER DEFAULT 1, created_at REAL, FOREIGN KEY (source_id) REFERENCES entities(id), FOREIGN KEY (target_id) REFERENCES entities(id) ) ''') # Observations table (notes, QA findings, etc.) c.execute(''' CREATE TABLE IF NOT EXISTS observations ( id TEXT PRIMARY KEY, entity_id TEXT NOT NULL, content TEXT NOT NULL, observer TEXT, created_at REAL, FOREIGN KEY (entity_id) REFERENCES entities(id) ) ''') # FTS5 virtual table for full-text search c.execute(''' CREATE VIRTUAL TABLE IF NOT EXISTS entities_fts USING fts5( name, type, content, metadata, content='entities', content_rowid='rowid' ) ''') # Triggers to keep FTS in sync c.execute(''' CREATE TRIGGER IF NOT EXISTS entities_ai AFTER INSERT ON entities BEGIN INSERT INTO entities_fts(rowid, name, type, content, metadata) VALUES (NEW.rowid, NEW.name, NEW.type, NEW.content, NEW.metadata); END ''') c.execute(''' CREATE TRIGGER IF NOT EXISTS entities_ad AFTER DELETE ON entities BEGIN INSERT INTO entities_fts(entities_fts, rowid, name, type, content, metadata) VALUES ('delete', OLD.rowid, OLD.name, OLD.type, OLD.content, OLD.metadata); END ''') c.execute(''' CREATE TRIGGER IF NOT EXISTS entities_au AFTER UPDATE ON entities BEGIN INSERT INTO entities_fts(entities_fts, rowid, name, type, content, metadata) VALUES ('delete', OLD.rowid, OLD.name, OLD.type, OLD.content, OLD.metadata); INSERT INTO entities_fts(rowid, name, type, content, metadata) VALUES (NEW.rowid, NEW.name, NEW.type, NEW.content, NEW.metadata); END ''') # Indexes c.execute('CREATE INDEX IF NOT EXISTS idx_entities_type ON entities(type)') c.execute('CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name)') c.execute('CREATE INDEX IF NOT EXISTS idx_relations_source ON relations(source_id)') c.execute('CREATE INDEX IF NOT EXISTS idx_relations_target ON relations(target_id)') c.execute('CREATE INDEX IF NOT EXISTS idx_observations_entity ON observations(entity_id)') conn.commit() conn.close() def _connect(self) -> sqlite3.Connection: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row return conn # --- Entity Operations --- def add_entity(self, name: str, entity_type: str, content: str = "", metadata: dict = None, source: str = None) -> str: """Add or update an entity.""" self._check_write() if entity_type not in ENTITY_TYPES.get(self.domain, []): valid = ENTITY_TYPES.get(self.domain, []) raise ValueError(f"Invalid type '{entity_type}' for {self.domain}. Valid: {valid}") conn = self._connect() c = conn.cursor() now = time.time() entity_id = str(uuid.uuid4()) metadata_json = json.dumps(metadata) if metadata else "{}" # Upsert c.execute(''' INSERT INTO entities (id, name, type, domain, content, metadata, created_at, updated_at, source) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(name) DO UPDATE SET type = excluded.type, content = excluded.content, metadata = excluded.metadata, updated_at = excluded.updated_at, source = excluded.source ''', (entity_id, name, entity_type, self.domain, content, metadata_json, now, now, source)) # Get the actual ID (might be existing) c.execute('SELECT id FROM entities WHERE name = ?', (name,)) row = c.fetchone() entity_id = row['id'] if row else entity_id conn.commit() conn.close() return entity_id def get_entity(self, name: str) -> Optional[Dict]: """Get entity by name.""" self._check_read() conn = self._connect() c = conn.cursor() c.execute('SELECT * FROM entities WHERE name = ?', (name,)) row = c.fetchone() conn.close() if not row: return None return { "id": row["id"], "name": row["name"], "type": row["type"], "domain": row["domain"], "content": row["content"], "metadata": json.loads(row["metadata"]) if row["metadata"] else {}, "created_at": row["created_at"], "updated_at": row["updated_at"], "source": row["source"], } def get_entity_by_id(self, entity_id: str) -> Optional[Dict]: """Get entity by ID.""" self._check_read() conn = self._connect() c = conn.cursor() c.execute('SELECT * FROM entities WHERE id = ?', (entity_id,)) row = c.fetchone() conn.close() if not row: return None return dict(row) def list_entities(self, entity_type: str = None, limit: int = 100) -> List[Dict]: """List entities, optionally filtered by type.""" self._check_read() conn = self._connect() c = conn.cursor() if entity_type: c.execute(''' SELECT * FROM entities WHERE type = ? ORDER BY updated_at DESC LIMIT ? ''', (entity_type, limit)) else: c.execute(''' SELECT * FROM entities ORDER BY updated_at DESC LIMIT ? ''', (limit,)) rows = c.fetchall() conn.close() return [dict(row) for row in rows] def delete_entity(self, name: str) -> bool: """Delete entity and its relations/observations.""" self._check_write() conn = self._connect() c = conn.cursor() # Get entity ID c.execute('SELECT id FROM entities WHERE name = ?', (name,)) row = c.fetchone() if not row: conn.close() return False entity_id = row['id'] # Delete relations c.execute('DELETE FROM relations WHERE source_id = ? OR target_id = ?', (entity_id, entity_id)) # Delete observations c.execute('DELETE FROM observations WHERE entity_id = ?', (entity_id,)) # Delete entity c.execute('DELETE FROM entities WHERE id = ?', (entity_id,)) conn.commit() conn.close() return True # --- Search --- def search(self, query: str, limit: int = 20) -> List[Dict]: """Full-text search across entities.""" self._check_read() conn = self._connect() c = conn.cursor() # FTS5 search c.execute(''' SELECT e.*, rank FROM entities_fts fts JOIN entities e ON e.rowid = fts.rowid WHERE entities_fts MATCH ? ORDER BY rank LIMIT ? ''', (query, limit)) rows = c.fetchall() conn.close() return [dict(row) for row in rows] # --- Relations --- def add_relation(self, source_name: str, target_name: str, relation: str, context: str = None, weight: int = 1) -> Optional[str]: """Add relation between entities.""" self._check_write() if relation not in RELATION_TYPES: raise ValueError(f"Invalid relation: {relation}. Valid: {RELATION_TYPES}") conn = self._connect() c = conn.cursor() # Get entity IDs c.execute('SELECT id FROM entities WHERE name = ?', (source_name,)) source = c.fetchone() c.execute('SELECT id FROM entities WHERE name = ?', (target_name,)) target = c.fetchone() if not source or not target: conn.close() return None rel_id = str(uuid.uuid4()) now = time.time() c.execute(''' INSERT INTO relations (id, source_id, target_id, relation, context, weight, created_at) VALUES (?, ?, ?, ?, ?, ?, ?) ''', (rel_id, source['id'], target['id'], relation, context, weight, now)) conn.commit() conn.close() return rel_id def get_relations(self, entity_name: str, direction: str = "both") -> List[Dict]: """Get relations for an entity.""" self._check_read() conn = self._connect() c = conn.cursor() c.execute('SELECT id FROM entities WHERE name = ?', (entity_name,)) row = c.fetchone() if not row: conn.close() return [] entity_id = row['id'] if direction == "outgoing": c.execute(''' SELECT r.*, e.name as target_name FROM relations r JOIN entities e ON e.id = r.target_id WHERE r.source_id = ? ''', (entity_id,)) elif direction == "incoming": c.execute(''' SELECT r.*, e.name as source_name FROM relations r JOIN entities e ON e.id = r.source_id WHERE r.target_id = ? ''', (entity_id,)) else: c.execute(''' SELECT r.*, s.name as source_name, t.name as target_name FROM relations r JOIN entities s ON s.id = r.source_id JOIN entities t ON t.id = r.target_id WHERE r.source_id = ? OR r.target_id = ? ''', (entity_id, entity_id)) rows = c.fetchall() conn.close() return [dict(row) for row in rows] # --- Observations --- def add_observation(self, entity_name: str, content: str, observer: str = "system") -> Optional[str]: """Add observation to an entity.""" self._check_write() conn = self._connect() c = conn.cursor() c.execute('SELECT id FROM entities WHERE name = ?', (entity_name,)) row = c.fetchone() if not row: conn.close() return None obs_id = str(uuid.uuid4()) now = time.time() c.execute(''' INSERT INTO observations (id, entity_id, content, observer, created_at) VALUES (?, ?, ?, ?, ?) ''', (obs_id, row['id'], content, observer, now)) conn.commit() conn.close() return obs_id def get_observations(self, entity_name: str) -> List[Dict]: """Get observations for an entity.""" self._check_read() conn = self._connect() c = conn.cursor() c.execute('SELECT id FROM entities WHERE name = ?', (entity_name,)) row = c.fetchone() if not row: conn.close() return [] c.execute(''' SELECT * FROM observations WHERE entity_id = ? ORDER BY created_at DESC ''', (row['id'],)) rows = c.fetchall() conn.close() return [dict(row) for row in rows] # --- Stats --- def stats(self) -> Dict: """Get KG statistics.""" conn = self._connect() c = conn.cursor() c.execute('SELECT COUNT(*) as count FROM entities') entities = c.fetchone()['count'] c.execute('SELECT COUNT(*) as count FROM relations') relations = c.fetchone()['count'] c.execute('SELECT COUNT(*) as count FROM observations') observations = c.fetchone()['count'] c.execute('SELECT type, COUNT(*) as count FROM entities GROUP BY type') by_type = {row['type']: row['count'] for row in c.fetchall()} conn.close() return { "domain": self.domain, "entities": entities, "relations": relations, "observations": observations, "by_type": by_type, } # --- Cross-Domain Search --- def search_all(query: str, limit: int = 20) -> Dict[str, List[Dict]]: """Search across all knowledge graphs.""" results = {} for domain in KG_PATHS.keys(): try: kg = KnowledgeGraph(domain) results[domain] = kg.search(query, limit) except Exception as e: results[domain] = [{"error": str(e)}] return results def get_all_stats() -> Dict[str, Dict]: """Get stats from all knowledge graphs.""" stats = {} for domain in KG_PATHS.keys(): try: kg = KnowledgeGraph(domain) stats[domain] = kg.stats() except Exception as e: stats[domain] = {"error": str(e)} return stats # --- CLI for testing --- if __name__ == "__main__": import sys if len(sys.argv) < 2: print("Usage: knowledge_graph.py [args]") print("Commands:") print(" stats - Show all KG stats") print(" search - Search all KGs") print(" add ") print(" get ") print(" list [type]") sys.exit(1) cmd = sys.argv[1] if cmd == "stats": for domain, s in get_all_stats().items(): print(f"\n{domain}:") for k, v in s.items(): print(f" {k}: {v}") elif cmd == "search" and len(sys.argv) >= 3: query = " ".join(sys.argv[2:]) results = search_all(query) for domain, entities in results.items(): if entities and not entities[0].get("error"): print(f"\n{domain}:") for e in entities: print(f" - {e.get('name', 'unknown')}: {e.get('type', '')}") elif cmd == "add" and len(sys.argv) >= 5: domain, name, etype = sys.argv[2:5] content = " ".join(sys.argv[5:]) if len(sys.argv) > 5 else "" kg = KnowledgeGraph(domain) eid = kg.add_entity(name, etype, content) print(f"Added: {eid}") elif cmd == "get" and len(sys.argv) >= 4: kg = KnowledgeGraph(sys.argv[2]) entity = kg.get_entity(sys.argv[3]) print(json.dumps(entity, indent=2)) elif cmd == "list" and len(sys.argv) >= 3: kg = KnowledgeGraph(sys.argv[2]) etype = sys.argv[3] if len(sys.argv) > 3 else None for e in kg.list_entities(etype): print(f" - {e['name']}: {e['type']}") else: print(f"Unknown command: {cmd}") sys.exit(1)