Files
luzia/lib/chat_orchestrator.py
admin ec33ac1936 Refactor cockpit to use DockerTmuxController pattern
Based on claude-code-tools TmuxCLIController, this refactor:

- Added DockerTmuxController class for robust tmux session management
- Implements send_keys() with configurable delay_enter
- Implements capture_pane() for output retrieval
- Implements wait_for_prompt() for pattern-based completion detection
- Implements wait_for_idle() for content-hash-based idle detection
- Implements wait_for_shell_prompt() for shell prompt detection

Also includes workflow improvements:
- Pre-task git snapshot before agent execution
- Post-task commit protocol in agent guidelines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:42:16 -03:00

259 lines
9.6 KiB
Python

#!/usr/bin/env python3
"""
Chat Orchestrator - Main coordinator for Luzia chat functionality
"""
import time
import sys
from typing import Dict, Optional
# Import all components
from chat_kg_lookup import ChatKGLookup
from chat_memory_lookup import ChatMemoryLookup
from chat_bash_executor import ChatBashExecutor
from chat_intent_parser import ChatIntentParser
from chat_response_formatter import ChatResponseFormatter
class ChatOrchestrator:
"""Main coordinator for chat operations"""
def __init__(self, timeout_ms: int = 500):
"""Initialize all components"""
self.timeout_ms = timeout_ms
self.kg_lookup = ChatKGLookup(timeout_ms=200)
self.memory_lookup = ChatMemoryLookup(timeout_ms=150)
self.bash_executor = ChatBashExecutor(timeout_ms=300)
self.intent_parser = ChatIntentParser()
self.formatter = ChatResponseFormatter()
self.conversation_history = []
def process_query(self, query: str) -> Dict:
"""Process a single query and return response"""
start_time = time.time()
# Parse intent
intent_result = self.intent_parser.parse(query)
# Route to appropriate handler
if query.lower() == 'help':
response_text = self.formatter.format_help()
return {
'query': query,
'response': response_text,
'execution_time_ms': round((time.time() - start_time) * 1000, 2),
'status': 'success'
}
# Route based on scope
if intent_result['scope'] == 'bash':
return self._handle_bash_query(query, intent_result, start_time)
elif intent_result['scope'] == 'local_memory':
return self._handle_memory_query(query, intent_result, start_time)
elif intent_result['scope'] == 'reasoning':
return self._handle_reasoning_query(query, intent_result, start_time)
else:
# Default: route based on intent
if intent_result['intent'] == 'system_status':
return self._handle_bash_query(query, intent_result, start_time)
elif intent_result['intent'] == 'project_info':
return self._handle_memory_query(query, intent_result, start_time)
else:
return self._handle_kg_query(query, intent_result, start_time)
def _handle_kg_query(self, query: str, intent_result: Dict, start_time: float) -> Dict:
"""Handle KG search query"""
search_term = self.intent_parser.extract_search_term(query)
results = self.kg_lookup.search_all_domains(search_term, limit=10)
response_text = self.formatter.format_kg_search_results(results)
execution_time = round((time.time() - start_time) * 1000, 2)
return {
'query': query,
'intent': intent_result['intent'],
'search_term': search_term,
'response': response_text,
'execution_time_ms': execution_time,
'status': 'success',
'response_time_indicator': self.formatter.format_response_time(execution_time)
}
def _handle_memory_query(self, query: str, intent_result: Dict, start_time: float) -> Dict:
"""Handle local memory query"""
keywords = intent_result['keywords']
if 'project' in keywords or 'projects' in keywords:
# Project-specific query
search_term = self.intent_parser.extract_search_term(query)
results = self.memory_lookup.list_all_projects()
response_text = self.formatter.format_project_list(results)
else:
# General entity search
search_term = self.intent_parser.extract_search_term(query)
results = self.memory_lookup.search_entities(search_term, limit=10)
response_text = self.formatter.format_memory_statistics(results) if not results.get('entities') else self.formatter.format_help()
execution_time = round((time.time() - start_time) * 1000, 2)
return {
'query': query,
'intent': intent_result['intent'],
'response': response_text,
'execution_time_ms': execution_time,
'status': 'success',
'response_time_indicator': self.formatter.format_response_time(execution_time)
}
def _handle_bash_query(self, query: str, intent_result: Dict, start_time: float) -> Dict:
"""Handle bash command execution"""
# Map common queries to bash commands
query_lower = query.lower()
command_map = {
'uptime': 'uptime',
'status': 'uptime',
'disk': 'disk',
'memory': 'memory',
'services': 'active_services',
'running': 'active_services',
'load': 'load',
}
command_name = 'uptime' # Default
for keyword, cmd in command_map.items():
if keyword in query_lower:
command_name = cmd
break
result = self.bash_executor.execute(command_name)
response_text = self.formatter.format_command_output(result)
execution_time = round((time.time() - start_time) * 1000, 2)
return {
'query': query,
'intent': intent_result['intent'],
'command': command_name,
'response': response_text,
'execution_time_ms': execution_time,
'status': 'success' if result.get('success') else 'error',
'response_time_indicator': self.formatter.format_response_time(execution_time)
}
def _handle_reasoning_query(self, query: str, intent_result: Dict, start_time: float) -> Dict:
"""Handle deep reasoning query (would use Gemini)"""
response_text = """# Deep Analysis Required
This query requires advanced reasoning beyond fast lookup.
**Recommendation:** Use `luzia think deep "<query>"` for Gemini 3 Flash analysis.
For now, try:
- `luzia health --report` for system analysis
- `luzia docs <query>` for knowledge lookup
"""
execution_time = round((time.time() - start_time) * 1000, 2)
return {
'query': query,
'intent': intent_result['intent'],
'response': response_text,
'execution_time_ms': execution_time,
'status': 'deferred',
'note': 'Requires deep reasoning - use luzia think deep'
}
def start_interactive_session(self):
"""Start interactive chat session"""
print("╔════════════════════════════════════════════════════════════╗")
print("║ Luzia Chat Mode ║")
print("║ Type 'help' for commands ║")
print("║ Type 'exit' to quit ║")
print("╚════════════════════════════════════════════════════════════╝")
print()
while True:
try:
user_input = input("luzia chat> ").strip()
if not user_input:
continue
if user_input.lower() in ['exit', 'quit', 'bye']:
print("Goodbye!")
break
# Process query
result = self.process_query(user_input)
# Display response
print()
print(result['response'])
print()
print(f"*{result.get('response_time_indicator', 'processed')}*")
print()
# Add to history
self.conversation_history.append({
'query': user_input,
'result': result
})
except KeyboardInterrupt:
print("\nGoodbye!")
break
except Exception as e:
print(f"Error: {e}")
print()
def get_statistics(self) -> Dict:
"""Get system statistics for chat context"""
return {
'kg_statistics': self.kg_lookup.get_kg_statistics(),
'memory_statistics': self.memory_lookup.memory_statistics(),
'system_status': self.bash_executor.system_status(),
'allowed_bash_commands': list(self.bash_executor.ALLOWED_COMMANDS.keys())
}
def main():
"""Main entry point"""
import argparse
parser = argparse.ArgumentParser(description='Luzia Chat Mode')
parser.add_argument('query', nargs='*', help='Query to process')
parser.add_argument('--interactive', '-i', action='store_true', help='Start interactive session')
parser.add_argument('--stats', action='store_true', help='Show system statistics')
parser.add_argument('--help-commands', action='store_true', help='Show available commands')
args = parser.parse_args()
orchestrator = ChatOrchestrator()
if args.help_commands:
formatter = ChatResponseFormatter()
print(formatter.format_help())
return
if args.stats:
import json
stats = orchestrator.get_statistics()
print(json.dumps(stats, indent=2))
return
if args.interactive or not args.query:
orchestrator.start_interactive_session()
else:
query = ' '.join(args.query)
result = orchestrator.process_query(query)
print()
print(result['response'])
print()
print(f"*{result.get('response_time_indicator', 'processed')}*")
if __name__ == '__main__':
main()