""" Vector Store Builder - Embeds KG entities into ChromaDB for semantic search. Phase 1 of Luzia modernization: Create hybrid retriever with vector+keyword search. """ import sqlite3 import json import os import sys from pathlib import Path from datetime import datetime from typing import List, Dict, Any, Optional import logging # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s' ) logger = logging.getLogger(__name__) class KnowledgeGraphLoader: """Load entities from existing SQLite KG databases.""" def __init__(self, kg_path: str = "/etc/luz-knowledge"): self.kg_path = kg_path self.domains = ["sysadmin", "users", "projects", "research"] def load_all_entities(self) -> List[Dict[str, Any]]: """Load all entities from all domain KGs.""" all_entities = [] for domain in self.domains: db_path = os.path.join(self.kg_path, f"{domain}.db") if not os.path.exists(db_path): logger.warning(f"Domain KG not found: {db_path}") continue try: conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT id, name, type, domain, content, metadata, created_at FROM entities") rows = cursor.fetchall() for row in rows: entity = { "id": row["id"], "name": row["name"], "type": row["type"], "domain": row["domain"], "content": row["content"] or "", "metadata": json.loads(row["metadata"]) if row["metadata"] else {}, "created_at": row["created_at"], "source": "kg", "document": f"{row['name']}: {row['content'] or ''}" # For embedding } all_entities.append(entity) logger.info(f"Loaded {len(rows)} entities from {domain}.db") conn.close() except Exception as e: logger.error(f"Error loading {domain}.db: {e}") logger.info(f"Total entities loaded: {len(all_entities)}") return all_entities class ChromaDBVectorStore: """Manage ChromaDB vector store for semantic search.""" def __init__(self, vector_store_path: str = "/opt/server-agents/state/vector_store"): self.vector_store_path = vector_store_path Path(vector_store_path).mkdir(parents=True, exist_ok=True) # Import chroma try: import chromadb self.chroma = chromadb except ImportError: logger.error("chromadb not installed. Install with: pip install chromadb") sys.exit(1) def create_client(self): """Create ChromaDB client for persistent storage.""" return self.chroma.PersistentClient(path=self.vector_store_path) def get_or_create_collection(self, client, name: str = "kg_entities"): """Get or create a collection in ChromaDB.""" return client.get_or_create_collection( name=name, metadata={ "description": "Knowledge Graph entities with semantic embeddings", "created_at": datetime.now().isoformat(), "version": "1.0" } ) class EmbeddingGenerator: """Generate embeddings using ChromaDB's built-in default embeddings.""" def __init__(self): self.embeddings = True # ChromaDB handles embeddings internally logger.info("✓ Using ChromaDB default embeddings (all-MiniLM-L6-v2)") def embed_text(self, text: str) -> Optional[List[float]]: """ChromaDB embeds texts automatically when added to collection.""" return None # Not needed - ChromaDB handles it def embed_batch(self, texts: List[str]) -> Optional[List[List[float]]]: """ChromaDB embeds texts automatically when added to collection.""" return None # Not needed - ChromaDB handles it class VectorStoreBuilder: """Main builder: load KG entities → embed → store in ChromaDB.""" def __init__(self): self.kg_loader = KnowledgeGraphLoader() self.vector_store = ChromaDBVectorStore() self.embedding_gen = EmbeddingGenerator() def build(self, batch_size: int = 50) -> Dict[str, Any]: """Build complete vector store from KG entities.""" logger.info("=" * 60) logger.info("PHASE 1: Build ChromaDB Vector Store") logger.info("=" * 60) # Step 1: Load entities logger.info("\n[1/3] Loading entities from KG...") entities = self.kg_loader.load_all_entities() if not entities: logger.error("No entities loaded!") return {"success": False, "error": "No entities loaded"} # Step 2: Store in ChromaDB (embeddings generated automatically) logger.info(f"\n[2/3] Storing {len(entities)} entities in ChromaDB...") logger.info("(ChromaDB auto-generates embeddings using all-MiniLM-L6-v2)") try: client = self.vector_store.create_client() collection = self.vector_store.get_or_create_collection(client) # Add entities to collection (ChromaDB handles embeddings) for i, entity in enumerate(entities): collection.add( ids=[entity["id"]], documents=[entity["document"]], metadatas=[{ "name": entity["name"], "type": entity["type"], "domain": entity["domain"], "source": entity["source"] }] ) if (i + 1) % 50 == 0: logger.info(f" Stored {i + 1}/{len(entities)} entities") logger.info(f"✓ Stored all {len(entities)} entities in ChromaDB") except Exception as e: logger.error(f"Failed to store in ChromaDB: {e}") return {"success": False, "error": str(e)} # Step 3: Verify logger.info("\n[3/3] Verifying vector store...") try: # Test query test_query = "authentication security login" results = collection.query( query_texts=[test_query], n_results=3 ) logger.info(f"✓ Test query returned {len(results['ids'][0])} results") if results['ids'][0]: for i, doc_id in enumerate(results['ids'][0]): logger.info(f" {i+1}. {results['metadatas'][0][i]['name']} (domain: {results['metadatas'][0][i]['domain']})") except Exception as e: logger.error(f"Verification failed: {e}") return {"success": False, "error": str(e)} result = { "success": True, "entities_loaded": len(entities), "vector_store_path": self.vector_store.vector_store_path, "timestamp": datetime.now().isoformat() } logger.info("\n" + "=" * 60) logger.info("✅ PHASE 1 COMPLETE: Vector store built successfully") logger.info("=" * 60) logger.info(f" • {result['entities_loaded']} entities embedded") logger.info(f" • Stored at: {result['vector_store_path']}") logger.info(f" • Ready for Phase 2: Hybrid retriever creation") return result if __name__ == "__main__": builder = VectorStoreBuilder() result = builder.build() # Exit with status sys.exit(0 if result["success"] else 1)