Files
luzia/lib/langchain_kg_retriever.py
admin ec33ac1936 Refactor cockpit to use DockerTmuxController pattern
Based on claude-code-tools TmuxCLIController, this refactor:

- Added DockerTmuxController class for robust tmux session management
- Implements send_keys() with configurable delay_enter
- Implements capture_pane() for output retrieval
- Implements wait_for_prompt() for pattern-based completion detection
- Implements wait_for_idle() for content-hash-based idle detection
- Implements wait_for_shell_prompt() for shell prompt detection

Also includes workflow improvements:
- Pre-task git snapshot before agent execution
- Post-task commit protocol in agent guidelines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:42:16 -03:00

276 lines
9.7 KiB
Python
Executable File

"""
LangChain Knowledge Graph Retriever - Hybrid search combining FTS5 + Vector embeddings.
Phase 2 of Luzia modernization: Create hybrid retriever with semantic ranking.
"""
import sqlite3
import json
import os
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class RetrievalResult:
"""Single result from hybrid search."""
entity_id: str
name: str
content: str
type: str
domain: str
source: str
relevance_score: float
retrieval_method: str # "fts5", "vector", "hybrid"
metadata: Dict[str, Any] = None
class FTS5Searcher:
"""Full-text search using SQLite FTS5."""
def __init__(self, kg_path: str = "/etc/luz-knowledge"):
self.kg_path = kg_path
self.domains = ["sysadmin", "users", "projects", "research"]
def search(self, query: str, top_k: int = 5) -> List[RetrievalResult]:
"""Search all KG domains using FTS5."""
results = []
for domain in self.domains:
db_path = os.path.join(self.kg_path, f"{domain}.db")
if not os.path.exists(db_path):
continue
try:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# FTS5 search with ranking
cursor.execute("""
SELECT e.id, e.name, e.type, e.domain, e.content, e.source,
rank as relevance_score
FROM entities_fts
JOIN entities e ON entities_fts.rowid = e.rowid
WHERE entities_fts MATCH ?
ORDER BY rank
LIMIT ?
""", (query, top_k))
for row in cursor.fetchall():
results.append(RetrievalResult(
entity_id=row["id"],
name=row["name"],
content=row["content"] or "",
type=row["type"],
domain=row["domain"],
source=row["source"] or "kg",
relevance_score=abs(row["relevance_score"]), # Convert rank to relevance
retrieval_method="fts5",
metadata={}
))
conn.close()
except Exception as e:
logger.debug(f"FTS5 search error in {domain}: {e}")
return results
class VectorSearcher:
"""Semantic search using ChromaDB vector store."""
def __init__(self, vector_store_path: str = "/opt/server-agents/state/vector_store"):
self.vector_store_path = vector_store_path
self.collection = None
self._initialize_vector_store()
def _initialize_vector_store(self):
"""Initialize ChromaDB client and collection."""
try:
import chromadb
client = chromadb.PersistentClient(path=self.vector_store_path)
self.collection = client.get_or_create_collection(name="kg_entities")
logger.info(f"✓ Vector store loaded: {self.collection.count()} entities indexed")
except Exception as e:
logger.error(f"Vector store initialization failed: {e}")
self.collection = None
def search(self, query: str, top_k: int = 5) -> List[RetrievalResult]:
"""Search vector store using semantic similarity."""
if not self.collection:
return []
try:
results_raw = self.collection.query(
query_texts=[query],
n_results=top_k
)
results = []
if results_raw['ids'] and results_raw['ids'][0]:
for i, entity_id in enumerate(results_raw['ids'][0]):
distance = results_raw['distances'][0][i] if results_raw['distances'] else 0
metadata = results_raw['metadatas'][0][i] if results_raw['metadatas'] else {}
# Convert distance to similarity score (0-1)
similarity = 1 - (distance / 2) # Normalize for cosine distance
results.append(RetrievalResult(
entity_id=entity_id,
name=metadata.get("name", ""),
content="", # Already in metadata
type=metadata.get("type", ""),
domain=metadata.get("domain", ""),
source=metadata.get("source", "vector"),
relevance_score=max(0, similarity),
retrieval_method="vector",
metadata=metadata
))
return results
except Exception as e:
logger.debug(f"Vector search error: {e}")
return []
class HybridRetriever:
"""Combine FTS5 and vector search with intelligent ranking."""
def __init__(self):
self.fts5_searcher = FTS5Searcher()
self.vector_searcher = VectorSearcher()
def retrieve(self, query: str, top_k: int = 8, rerank: bool = True) -> List[RetrievalResult]:
"""
Retrieve using hybrid search:
1. Run FTS5 and vector searches in parallel
2. Deduplicate by entity_id
3. Rerank combined results
4. Return top_k
"""
# Run both searches
fts5_results = self.fts5_searcher.search(query, top_k=10)
vector_results = self.vector_searcher.search(query, top_k=10)
# Combine and deduplicate
seen_ids = set()
combined = []
# Add FTS5 results first (keyword relevance)
for result in fts5_results:
if result.entity_id not in seen_ids:
combined.append(result)
seen_ids.add(result.entity_id)
# Add vector results (semantic relevance)
for result in vector_results:
if result.entity_id not in seen_ids:
combined.append(result)
seen_ids.add(result.entity_id)
# Rerank if multiple methods found results
if rerank and len(combined) > 1:
combined = self._rerank_results(combined, query)
# Sort by relevance and return top_k
combined.sort(key=lambda x: x.relevance_score, reverse=True)
return combined[:top_k]
def _rerank_results(self, results: List[RetrievalResult], query: str) -> List[RetrievalResult]:
"""
Simple reranking: Boost scores based on retrieval method combination.
- If entity found by both FTS5 and vector: +0.2 boost
- Vector scores are typically 0-1, FTS5 are negative (rank)
"""
# Group by entity_id
entity_map = {}
for result in results:
if result.entity_id not in entity_map:
entity_map[result.entity_id] = []
entity_map[result.entity_id].append(result)
# Normalize and rerank
reranked = []
for entity_id, entity_results in entity_map.items():
# Use the best score among retrieval methods
best_result = max(entity_results, key=lambda x: x.relevance_score)
# Boost if found by multiple methods
if len(entity_results) > 1:
best_result.relevance_score = min(1.0, best_result.relevance_score + 0.2)
best_result.retrieval_method = "hybrid"
reranked.append(best_result)
return reranked
class KnowledgeGraphRetriever:
"""Main retriever: Implements LangChain-compatible interface."""
def __init__(self):
self.hybrid = HybridRetriever()
logger.info("✓ KG Retriever initialized (hybrid FTS5+vector)")
def retrieve(self, query: str, top_k: int = 8) -> List[Dict[str, Any]]:
"""
Retrieve relevant context from KG.
Args:
query: Search query (natural language or keywords)
top_k: Number of results to return
Returns:
List of results with name, content, domain, relevance_score
"""
results = self.hybrid.retrieve(query, top_k=top_k, rerank=True)
# Format for prompt injection
formatted = []
for result in results:
formatted.append({
"name": result.name,
"type": result.type,
"domain": result.domain,
"content": result.content,
"relevance": result.relevance_score,
"source": result.retrieval_method
})
return formatted
# Testing and demonstration
if __name__ == "__main__":
logger.info("=" * 60)
logger.info("PHASE 2: LangChain Hybrid KG Retriever")
logger.info("=" * 60)
retriever = KnowledgeGraphRetriever()
# Test queries
test_queries = [
"authentication and security",
"container deployment",
"database migration"
]
for query in test_queries:
logger.info(f"\nQuery: '{query}'")
results = retriever.retrieve(query, top_k=3)
for i, result in enumerate(results, 1):
logger.info(f" {i}. {result['name']} ({result['domain']}) [{result['source']}] - {result['relevance']:.2f}")
logger.info("\n" + "=" * 60)
logger.info("✅ PHASE 2 COMPLETE: Hybrid retriever ready")
logger.info("=" * 60)