""" Four-Bucket Context Assembly - Modernized prompt construction for luzia. Phase 4 of Luzia modernization: Integrate hybrid retriever + semantic router into luzia CLI. ENHANCED (Jan 2026): Added per-project RAG context injection. - Each project can have .knowledge/ directory with project-specific facts - Luzia injects relevant project knowledge before task dispatch - Falls back to global KG if no project-specific knowledge exists """ import json import os import logging from typing import Dict, List, Any, Optional from datetime import datetime logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') logger = logging.getLogger(__name__) class FourBucketContextAssembler: """Assemble 4-bucket context for luzia prompt injection.""" def __init__(self): self.hybrid_retriever = None self.semantic_router = None self.project_knowledge_loader = None self._initialize_components() def _initialize_components(self): """Lazy-load hybrid retriever, semantic router, and project knowledge loader.""" try: # Import after paths are set up import sys sys.path.insert(0, os.path.dirname(__file__)) from langchain_kg_retriever import KnowledgeGraphRetriever from semantic_router import SemanticRouter self.hybrid_retriever = KnowledgeGraphRetriever() self.semantic_router = SemanticRouter() logger.debug("āœ“ Hybrid retriever and semantic router loaded") except Exception as e: logger.debug(f"Could not load new retrievers (OK for fallback): {e}") self.hybrid_retriever = None self.semantic_router = None # Load project-specific knowledge loader try: from project_knowledge_loader import ProjectKnowledgeLoader self.project_knowledge_loader = ProjectKnowledgeLoader() logger.debug("āœ“ Project knowledge loader initialized") except Exception as e: logger.debug(f"Could not load project knowledge loader: {e}") self.project_knowledge_loader = None def get_global_identity(self) -> str: """Bucket 1: Global identity context (static).""" return """You are Claude, an AI assistant by Anthropic. You specialize in software engineering and systems administration. You have access to a knowledge graph of learned solutions and best practices. Your goal is to help users accomplish their tasks efficiently and safely.""" def get_project_grounding(self, project: str, user: str, cwd: str) -> str: """Bucket 2: Project-specific grounding (static, highest priority).""" return f"""PROJECT CONTEXT (HIGHEST PRIORITY): - Project: {project} - User: {user} - Working Directory: {cwd} - Permissions: Use luzia for cross-project work - File Ownership: All changes must preserve ownership IMPORTANT: This context is provided LAST for maximum precedence.""" def get_project_knowledge_context(self, project: str, query: str) -> Dict[str, Any]: """Bucket 2.5: Project-specific RAG context from .knowledge/ directory.""" if not self.project_knowledge_loader: return {"source": "none", "context": "", "entities": []} try: # Check if project has .knowledge/ directory if not self.project_knowledge_loader.has_knowledge(project): logger.debug(f"No .knowledge/ directory for {project}") return {"source": "none", "context": "", "entities": []} # Get formatted context for prompt context = self.project_knowledge_loader.format_for_prompt(project, query, max_tokens=1500) # Get relevant entities for metadata entities = self.project_knowledge_loader.search_project_knowledge(project, query, top_k=5) logger.debug(f"Loaded project knowledge for {project}: {len(entities)} relevant entities") return { "source": "project_kg", "context": context, "entities": entities, "timestamp": datetime.now().isoformat() } except Exception as e: logger.debug(f"Project knowledge retrieval failed: {e}") return {"source": "error", "context": "", "entities": [], "error": str(e)} def get_intelligence_context(self, query: str, project: str, max_results: int = 5) -> Dict[str, Any]: """Bucket 3: Dynamic intelligence from global KG retrieval.""" if not self.hybrid_retriever: return {"source": "fallback", "results": []} try: # Build search query search_query = f"{project} {query}" # Retrieve relevant entities kg_results = self.hybrid_retriever.retrieve(search_query, top_k=max_results) logger.debug(f"Retrieved {len(kg_results)} KG results for '{search_query}'") return { "source": "hybrid_retrieval", "timestamp": datetime.now().isoformat(), "results": kg_results, "count": len(kg_results) } except Exception as e: logger.debug(f"KG retrieval failed (using fallback): {e}") return {"source": "fallback", "results": [], "error": str(e)} def get_task_context(self, query: str) -> Dict[str, Any]: """Bucket 4: Dynamic task context with domain detection.""" task_context = { "original_query": query, "timestamp": datetime.now().isoformat() } if self.semantic_router: try: routing = self.semantic_router.route(query) task_context.update({ "detected_domain": routing["primary_domain"], "domain_confidence": routing["confidence"], "reasoning_enabled": routing["reasoning_enabled"], "system_instructions": routing["system_instructions"] }) logger.debug(f"Detected domain: {routing['primary_domain']} ({routing['confidence']:.2f})") except Exception as e: logger.debug(f"Domain detection failed: {e}") return task_context def assemble_prompt_context(self, query: str, project: str, user: str, cwd: str) -> str: """ Assemble complete 5-bucket context for prompt injection. Order (IMPORTANT - recency bias means LAST items have highest precedence): 1. Bucket 1: Identity (global) 2. Bucket 3: Global Intelligence (learned solutions from /etc/luz-knowledge/) 3. Bucket 2.5: Project Knowledge (from .knowledge/ directory - NEW) 4. Bucket 4: Task (domain-specific) 5. Bucket 2: Grounding (project, placed LAST for precedence) """ buckets = [] # Bucket 1: Identity buckets.append("## SYSTEM CONTEXT\n" + self.get_global_identity()) # Bucket 3: Global Intelligence intelligence = self.get_intelligence_context(query, project) if intelligence.get("results"): intel_text = "## LEARNED KNOWLEDGE\nRelevant solutions from global knowledge graph:\n" for result in intelligence["results"]: intel_text += f"\n- {result.get('name', 'Unknown')}" if result.get('content'): content_preview = result['content'][:100] intel_text += f": {content_preview}..." buckets.append(intel_text) # Bucket 2.5: Project-Specific Knowledge (NEW - RAG from .knowledge/) project_knowledge = self.get_project_knowledge_context(project, query) if project_knowledge.get("context"): buckets.append("## PROJECT-SPECIFIC KNOWLEDGE\n" + project_knowledge["context"]) logger.debug(f"Injected project knowledge ({len(project_knowledge.get('entities', []))} entities)") # Bucket 4: Task task_info = self.get_task_context(query) task_text = f"## TASK CONTEXT\nDetected Domain: {task_info.get('detected_domain', 'general')}\n" if task_info.get("system_instructions"): task_text += f"\n{task_info['system_instructions']}\n" buckets.append(task_text) # Bucket 2: Grounding (LAST for precedence) buckets.append("## PROJECT GROUNDING (HIGHEST PRIORITY)\n" + self.get_project_grounding(project, user, cwd)) return "\n\n".join(buckets) class ContextCache: """Cache assembled context for efficiency.""" def __init__(self, cache_dir: str = "/tmp/luzia_context_cache"): self.cache_dir = cache_dir os.makedirs(cache_dir, exist_ok=True) def get_cache_key(self, query: str, project: str) -> str: """Generate cache key for context.""" import hashlib key = f"{project}:{query}" return hashlib.md5(key.encode()).hexdigest()[:16] def get(self, query: str, project: str) -> Optional[str]: """Retrieve cached context if available.""" try: cache_file = os.path.join(self.cache_dir, self.get_cache_key(query, project)) if os.path.exists(cache_file): with open(cache_file, 'r') as f: data = json.load(f) if (datetime.now().timestamp() - data['timestamp']) < 3600: # 1 hour TTL logger.debug("Using cached context") return data['context'] except Exception as e: logger.debug(f"Cache read failed: {e}") return None def set(self, query: str, project: str, context: str): """Cache assembled context.""" try: cache_file = os.path.join(self.cache_dir, self.get_cache_key(query, project)) data = { 'timestamp': datetime.now().timestamp(), 'context': context } with open(cache_file, 'w') as f: json.dump(data, f) except Exception as e: logger.debug(f"Cache write failed: {e}") # Factory function for luzia integration def create_context_assembler() -> FourBucketContextAssembler: """Factory function to create and configure assembler.""" return FourBucketContextAssembler() def assemble_prompt_context(query: str, project: str, user: str, cwd: str) -> str: """ High-level API for luzia to use. Usage in luzia CLI: from four_bucket_context import assemble_prompt_context context = assemble_prompt_context(task_query, project_name, user, cwd) """ assembler = create_context_assembler() return assembler.assemble_prompt_context(query, project, user, cwd) # Testing if __name__ == "__main__": logger.info("=" * 60) logger.info("PHASE 4+: Five-Bucket Context Assembly (with Project RAG)") logger.info("=" * 60) # Test context assembly test_query = "Create a REST API for user authentication with database" test_project = "musica" test_user = "admin" test_cwd = "/home/musica" logger.info(f"\nAssembling context for: {test_query}") assembler = create_context_assembler() # Show project knowledge status if assembler.project_knowledge_loader: logger.info("\nProject Knowledge Status:") projects_status = assembler.project_knowledge_loader.list_projects_with_knowledge() for p in projects_status[:5]: # Show first 5 status = "Has KG" if p["has_knowledge"] else "No KG" logger.info(f" {p['project']}: {status}") context = assembler.assemble_prompt_context(test_query, test_project, test_user, test_cwd) logger.info("\nGenerated 5-Bucket Context:") logger.info("-" * 60) print(context[:2000]) if len(context) > 2000: print(f"\n... ({len(context) - 2000} more characters)") logger.info("-" * 60) logger.info("\nāœ… PHASE 4+ COMPLETE: Ready for luzia integration with project RAG")