Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
331 lines
12 KiB
Python
Executable File
331 lines
12 KiB
Python
Executable File
"""
|
|
Modernization Test Suite - Validate all 4 phases before production deployment.
|
|
Phase 5 of Luzia modernization: Testing and validation.
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import logging
|
|
from typing import List, Dict, Any, Tuple
|
|
from dataclasses import dataclass
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class TestResult:
|
|
"""Result of a single test."""
|
|
test_name: str
|
|
passed: bool
|
|
duration_ms: float
|
|
message: str
|
|
details: Dict[str, Any] = None
|
|
|
|
|
|
class ModernizationTestSuite:
|
|
"""Test suite for all modernization phases."""
|
|
|
|
def __init__(self):
|
|
self.results = []
|
|
|
|
def run_all_tests(self) -> Tuple[List[TestResult], Dict[str, Any]]:
|
|
"""Run all tests and return results."""
|
|
|
|
logger.info("=" * 70)
|
|
logger.info("LUZIA MODERNIZATION TEST SUITE")
|
|
logger.info("=" * 70)
|
|
|
|
# Phase 1: Vector Store
|
|
logger.info("\n[PHASE 1] Testing vector store...")
|
|
self._test_vector_store()
|
|
|
|
# Phase 2: Hybrid Retriever
|
|
logger.info("\n[PHASE 2] Testing hybrid retriever...")
|
|
self._test_hybrid_retriever()
|
|
|
|
# Phase 3: Semantic Router
|
|
logger.info("\n[PHASE 3] Testing semantic router...")
|
|
self._test_semantic_router()
|
|
|
|
# Phase 4: Context Assembly
|
|
logger.info("\n[PHASE 4] Testing 4-bucket context...")
|
|
self._test_four_bucket_context()
|
|
|
|
# Integration Tests
|
|
logger.info("\n[INTEGRATION] Testing end-to-end flow...")
|
|
self._test_integration()
|
|
|
|
# Summary
|
|
return self._print_summary()
|
|
|
|
def _test_vector_store(self):
|
|
"""Test vector store initialization and queries."""
|
|
test_name = "Vector Store"
|
|
start = time.time()
|
|
|
|
try:
|
|
import chromadb
|
|
client = chromadb.PersistentClient(path="/opt/server-agents/state/vector_store")
|
|
collection = client.get_or_create_collection(name="kg_entities")
|
|
|
|
count = collection.count()
|
|
if count == 0:
|
|
raise Exception("Vector store is empty")
|
|
|
|
# Test query
|
|
results = collection.query(query_texts=["authentication"], n_results=3)
|
|
|
|
duration_ms = (time.time() - start) * 1000
|
|
self.results.append(TestResult(
|
|
test_name=test_name,
|
|
passed=True,
|
|
duration_ms=duration_ms,
|
|
message=f"✓ Vector store operational with {count} entities",
|
|
details={"entities": count, "test_query_results": len(results['ids'][0]) if results['ids'] else 0}
|
|
))
|
|
logger.info(f" ✓ {count} entities indexed")
|
|
logger.info(f" ✓ Test query returned {len(results['ids'][0]) if results['ids'] else 0} results")
|
|
except Exception as e:
|
|
duration_ms = (time.time() - start) * 1000
|
|
self.results.append(TestResult(
|
|
test_name=test_name,
|
|
passed=False,
|
|
duration_ms=duration_ms,
|
|
message=f"✗ Vector store failed: {str(e)}"
|
|
))
|
|
logger.error(f" ✗ {str(e)}")
|
|
|
|
def _test_hybrid_retriever(self):
|
|
"""Test hybrid retriever combining FTS5 + vector."""
|
|
test_name = "Hybrid Retriever"
|
|
start = time.time()
|
|
|
|
try:
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.dirname(__file__))
|
|
from langchain_kg_retriever import HybridRetriever
|
|
|
|
retriever = HybridRetriever()
|
|
|
|
# Test queries
|
|
test_queries = ["authentication", "deployment", "database"]
|
|
all_results = []
|
|
|
|
for query in test_queries:
|
|
results = retriever.retrieve(query, top_k=3)
|
|
all_results.extend(results)
|
|
|
|
duration_ms = (time.time() - start) * 1000
|
|
self.results.append(TestResult(
|
|
test_name=test_name,
|
|
passed=len(all_results) > 0,
|
|
duration_ms=duration_ms,
|
|
message=f"✓ Hybrid retriever returned {len(all_results)} combined results",
|
|
details={"queries_tested": len(test_queries), "total_results": len(all_results)}
|
|
))
|
|
logger.info(f" ✓ {len(test_queries)} test queries executed")
|
|
logger.info(f" ✓ Retrieved {len(all_results)} combined results")
|
|
except Exception as e:
|
|
duration_ms = (time.time() - start) * 1000
|
|
self.results.append(TestResult(
|
|
test_name=test_name,
|
|
passed=False,
|
|
duration_ms=duration_ms,
|
|
message=f"✗ Hybrid retriever failed: {str(e)}"
|
|
))
|
|
logger.error(f" ✗ {str(e)}")
|
|
|
|
def _test_semantic_router(self):
|
|
"""Test semantic router domain detection."""
|
|
test_name = "Semantic Router"
|
|
start = time.time()
|
|
|
|
try:
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.dirname(__file__))
|
|
from semantic_router import SemanticRouter
|
|
|
|
router = SemanticRouter()
|
|
|
|
# Test domain detection
|
|
test_cases = [
|
|
("Build REST API", "backend"),
|
|
("Fix React component", "frontend"),
|
|
("Deploy Kubernetes", "devops"),
|
|
("Research patterns", "research"),
|
|
("Audit security", "security"),
|
|
("Configure permissions", "system")
|
|
]
|
|
|
|
correct_detections = 0
|
|
for query, expected_domain in test_cases:
|
|
result = router.route(query)
|
|
if result['primary_domain'] == expected_domain:
|
|
correct_detections += 1
|
|
|
|
duration_ms = (time.time() - start) * 1000
|
|
accuracy = (correct_detections / len(test_cases)) * 100
|
|
|
|
self.results.append(TestResult(
|
|
test_name=test_name,
|
|
passed=accuracy >= 60, # 60% accuracy threshold
|
|
duration_ms=duration_ms,
|
|
message=f"✓ Domain detection accuracy: {accuracy:.1f}%",
|
|
details={"test_cases": len(test_cases), "correct": correct_detections}
|
|
))
|
|
logger.info(f" ✓ Tested {len(test_cases)} domain detection cases")
|
|
logger.info(f" ✓ Accuracy: {accuracy:.1f}%")
|
|
except Exception as e:
|
|
duration_ms = (time.time() - start) * 1000
|
|
self.results.append(TestResult(
|
|
test_name=test_name,
|
|
passed=False,
|
|
duration_ms=duration_ms,
|
|
message=f"✗ Semantic router failed: {str(e)}"
|
|
))
|
|
logger.error(f" ✗ {str(e)}")
|
|
|
|
def _test_four_bucket_context(self):
|
|
"""Test 4-bucket context assembly."""
|
|
test_name = "Four-Bucket Context"
|
|
start = time.time()
|
|
|
|
try:
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.dirname(__file__))
|
|
from four_bucket_context import assemble_prompt_context
|
|
|
|
context = assemble_prompt_context(
|
|
query="Create authentication API",
|
|
project="musica",
|
|
user="admin",
|
|
cwd="/home/musica"
|
|
)
|
|
|
|
# Verify all buckets are present
|
|
buckets_found = {
|
|
"identity": "SYSTEM CONTEXT" in context,
|
|
"intelligence": "LEARNED KNOWLEDGE" in context,
|
|
"task": "TASK CONTEXT" in context,
|
|
"grounding": "PROJECT GROUNDING" in context
|
|
}
|
|
|
|
all_buckets_present = all(buckets_found.values())
|
|
context_length = len(context)
|
|
|
|
duration_ms = (time.time() - start) * 1000
|
|
self.results.append(TestResult(
|
|
test_name=test_name,
|
|
passed=all_buckets_present,
|
|
duration_ms=duration_ms,
|
|
message=f"✓ All 4 buckets assembled ({context_length} chars)",
|
|
details={"buckets": buckets_found, "context_length": context_length}
|
|
))
|
|
logger.info(f" ✓ All 4 buckets present: {buckets_found}")
|
|
logger.info(f" ✓ Context length: {context_length} characters")
|
|
except Exception as e:
|
|
duration_ms = (time.time() - start) * 1000
|
|
self.results.append(TestResult(
|
|
test_name=test_name,
|
|
passed=False,
|
|
duration_ms=duration_ms,
|
|
message=f"✗ Context assembly failed: {str(e)}"
|
|
))
|
|
logger.error(f" ✗ {str(e)}")
|
|
|
|
def _test_integration(self):
|
|
"""Test end-to-end integration."""
|
|
test_name = "End-to-End Integration"
|
|
start = time.time()
|
|
|
|
try:
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.dirname(__file__))
|
|
from four_bucket_context import assemble_prompt_context
|
|
from langchain_kg_retriever import HybridRetriever
|
|
from semantic_router import SemanticRouter
|
|
|
|
# Simulate luzia dispatch
|
|
queries = [
|
|
("Fix database performance issue", "admin", "/home/admin"),
|
|
("Deploy new frontend component", "musica", "/home/musica"),
|
|
("Configure system permissions", "overbits", "/home/overbits")
|
|
]
|
|
|
|
successful_contexts = 0
|
|
for query, project, cwd in queries:
|
|
context = assemble_prompt_context(query, project, "admin", cwd)
|
|
if context and len(context) > 100: # Reasonable context size
|
|
successful_contexts += 1
|
|
|
|
duration_ms = (time.time() - start) * 1000
|
|
self.results.append(TestResult(
|
|
test_name=test_name,
|
|
passed=successful_contexts == len(queries),
|
|
duration_ms=duration_ms,
|
|
message=f"✓ {successful_contexts}/{len(queries)} contexts assembled successfully",
|
|
details={"total_queries": len(queries), "successful": successful_contexts}
|
|
))
|
|
logger.info(f" ✓ Processed {len(queries)} queries")
|
|
logger.info(f" ✓ Success rate: {successful_contexts}/{len(queries)}")
|
|
except Exception as e:
|
|
duration_ms = (time.time() - start) * 1000
|
|
self.results.append(TestResult(
|
|
test_name=test_name,
|
|
passed=False,
|
|
duration_ms=duration_ms,
|
|
message=f"✗ Integration test failed: {str(e)}"
|
|
))
|
|
logger.error(f" ✗ {str(e)}")
|
|
|
|
def _print_summary(self) -> Tuple[List[TestResult], Dict[str, Any]]:
|
|
"""Print test summary and statistics."""
|
|
|
|
total_tests = len(self.results)
|
|
passed_tests = sum(1 for r in self.results if r.passed)
|
|
failed_tests = total_tests - passed_tests
|
|
total_duration_ms = sum(r.duration_ms for r in self.results)
|
|
|
|
logger.info("\n" + "=" * 70)
|
|
logger.info("TEST RESULTS SUMMARY")
|
|
logger.info("=" * 70)
|
|
|
|
for result in self.results:
|
|
status = "✓ PASS" if result.passed else "✗ FAIL"
|
|
logger.info(f"{status} | {result.test_name:30} | {result.duration_ms:7.1f}ms | {result.message}")
|
|
|
|
logger.info("=" * 70)
|
|
logger.info(f"TOTAL: {passed_tests}/{total_tests} passed in {total_duration_ms:.1f}ms")
|
|
|
|
if failed_tests == 0:
|
|
logger.info("✅ ALL TESTS PASSED - Ready for production deployment")
|
|
else:
|
|
logger.warning(f"⚠️ {failed_tests} test(s) failed - Review before deployment")
|
|
|
|
logger.info("=" * 70)
|
|
|
|
summary = {
|
|
"total_tests": total_tests,
|
|
"passed": passed_tests,
|
|
"failed": failed_tests,
|
|
"success_rate": (passed_tests / total_tests) * 100,
|
|
"total_duration_ms": total_duration_ms,
|
|
"ready_for_production": failed_tests == 0
|
|
}
|
|
|
|
return self.results, summary
|
|
|
|
|
|
if __name__ == "__main__":
|
|
suite = ModernizationTestSuite()
|
|
results, summary = suite.run_all_tests()
|
|
|
|
# Exit with appropriate code
|
|
import sys
|
|
sys.exit(0 if summary["ready_for_production"] else 1)
|