Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
391 lines
13 KiB
Python
391 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Retriever Tester
|
|
|
|
Test suite for context retrieval quality:
|
|
- Query execution performance
|
|
- Result ranking quality
|
|
- Deduplication effectiveness
|
|
- Relevance scoring accuracy
|
|
"""
|
|
|
|
import time
|
|
import json
|
|
from pathlib import Path
|
|
from typing import List, Dict, Tuple
|
|
from datetime import datetime
|
|
|
|
|
|
class RetrieverTester:
|
|
"""Test and validate context retrieval quality."""
|
|
|
|
def __init__(self):
|
|
"""Initialize retriever tester."""
|
|
self.test_results = []
|
|
self.performance_metrics = {}
|
|
|
|
def test_query_execution(self, query: str, timeout_secs: float = 2.0) -> Dict:
|
|
"""
|
|
Test query execution performance.
|
|
|
|
Args:
|
|
query: Search query to test
|
|
timeout_secs: Max execution time
|
|
|
|
Returns:
|
|
Dict with execution metrics
|
|
"""
|
|
start_time = time.time()
|
|
result = {
|
|
'query': query,
|
|
'execution_time_ms': 0,
|
|
'within_budget': False,
|
|
'status': 'unknown',
|
|
'result_count': 0,
|
|
'issues': []
|
|
}
|
|
|
|
try:
|
|
import sqlite3
|
|
|
|
# Test query against main KG databases
|
|
kg_db_paths = [
|
|
'/etc/luz-knowledge/research.db',
|
|
'/etc/luz-knowledge/projects.db',
|
|
]
|
|
|
|
total_results = 0
|
|
for db_path in kg_db_paths:
|
|
if not Path(db_path).exists():
|
|
continue
|
|
|
|
try:
|
|
with sqlite3.connect(db_path) as conn:
|
|
cursor = conn.cursor()
|
|
# FTS5 query
|
|
cursor.execute(
|
|
"SELECT COUNT(*) FROM entities WHERE name LIKE ? OR content LIKE ?",
|
|
(f'%{query}%', f'%{query}%')
|
|
)
|
|
count = cursor.fetchone()[0]
|
|
total_results += count
|
|
except Exception as e:
|
|
result['issues'].append(f"Query error: {e}")
|
|
|
|
elapsed = (time.time() - start_time) * 1000
|
|
|
|
result['execution_time_ms'] = round(elapsed, 1)
|
|
result['within_budget'] = elapsed < (timeout_secs * 1000)
|
|
result['result_count'] = total_results
|
|
result['status'] = 'pass' if result['within_budget'] else 'timeout'
|
|
|
|
except Exception as e:
|
|
result['status'] = 'error'
|
|
result['issues'].append(str(e))
|
|
|
|
return result
|
|
|
|
def test_result_ranking(self, query: str, top_k: int = 10) -> Dict:
|
|
"""
|
|
Test result ranking quality.
|
|
|
|
Args:
|
|
query: Search query
|
|
top_k: Number of results to evaluate
|
|
|
|
Returns:
|
|
Dict with ranking metrics
|
|
"""
|
|
result = {
|
|
'query': query,
|
|
'top_k': top_k,
|
|
'ranking_quality': 0,
|
|
'relevance_variance': 0,
|
|
'issues': []
|
|
}
|
|
|
|
try:
|
|
import sqlite3
|
|
|
|
results_list = []
|
|
for db_path in ['/etc/luz-knowledge/research.db', '/etc/luz-knowledge/projects.db']:
|
|
if not Path(db_path).exists():
|
|
continue
|
|
|
|
try:
|
|
with sqlite3.connect(db_path) as conn:
|
|
cursor = conn.cursor()
|
|
# Get ranked results
|
|
cursor.execute("""
|
|
SELECT name, content FROM entities
|
|
WHERE name LIKE ? OR content LIKE ?
|
|
LIMIT ?
|
|
""", (f'%{query}%', f'%{query}%', top_k))
|
|
|
|
for row in cursor.fetchall():
|
|
name, content = row
|
|
# Simple relevance heuristic: name match is more relevant than content
|
|
relevance = 1.0 if query.lower() in name.lower() else 0.5
|
|
results_list.append({
|
|
'name': name,
|
|
'relevance': relevance
|
|
})
|
|
except Exception as e:
|
|
result['issues'].append(f"Ranking error: {e}")
|
|
|
|
if results_list:
|
|
# Calculate ranking metrics
|
|
relevances = [r['relevance'] for r in results_list]
|
|
avg_relevance = sum(relevances) / len(relevances)
|
|
variance = sum((r - avg_relevance) ** 2 for r in relevances) / len(relevances)
|
|
|
|
result['ranking_quality'] = round(avg_relevance * 100, 1)
|
|
result['relevance_variance'] = round(variance, 3)
|
|
result['result_count'] = len(results_list)
|
|
|
|
except Exception as e:
|
|
result['issues'].append(str(e))
|
|
|
|
return result
|
|
|
|
def test_deduplication(self, query: str) -> Dict:
|
|
"""
|
|
Test deduplication effectiveness.
|
|
|
|
Args:
|
|
query: Search query
|
|
|
|
Returns:
|
|
Dict with deduplication metrics
|
|
"""
|
|
result = {
|
|
'query': query,
|
|
'total_results': 0,
|
|
'unique_results': 0,
|
|
'duplicate_count': 0,
|
|
'dedup_efficiency': 0,
|
|
'issues': []
|
|
}
|
|
|
|
try:
|
|
import sqlite3
|
|
|
|
seen_entities = set()
|
|
total = 0
|
|
duplicates = 0
|
|
|
|
for db_path in ['/etc/luz-knowledge/research.db', '/etc/luz-knowledge/projects.db']:
|
|
if not Path(db_path).exists():
|
|
continue
|
|
|
|
try:
|
|
with sqlite3.connect(db_path) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT id, name FROM entities
|
|
WHERE name LIKE ? OR content LIKE ?
|
|
""", (f'%{query}%', f'%{query}%'))
|
|
|
|
for entity_id, name in cursor.fetchall():
|
|
total += 1
|
|
entity_key = (entity_id, name)
|
|
|
|
if entity_key in seen_entities:
|
|
duplicates += 1
|
|
else:
|
|
seen_entities.add(entity_key)
|
|
|
|
except Exception as e:
|
|
result['issues'].append(f"Dedup error: {e}")
|
|
|
|
result['total_results'] = total
|
|
result['unique_results'] = len(seen_entities)
|
|
result['duplicate_count'] = duplicates
|
|
result['dedup_efficiency'] = (1 - (duplicates / max(total, 1))) * 100
|
|
|
|
except Exception as e:
|
|
result['issues'].append(str(e))
|
|
|
|
return result
|
|
|
|
def test_relevance_scoring(self, queries: List[str]) -> Dict:
|
|
"""
|
|
Test relevance scoring accuracy across multiple queries.
|
|
|
|
Args:
|
|
queries: List of test queries
|
|
|
|
Returns:
|
|
Dict with relevance scoring metrics
|
|
"""
|
|
results = {
|
|
'tests_run': len(queries),
|
|
'avg_relevance_score': 0,
|
|
'consistency': 0,
|
|
'issues': []
|
|
}
|
|
|
|
relevance_scores = []
|
|
|
|
for query in queries:
|
|
ranking = self.test_result_ranking(query, top_k=5)
|
|
if 'ranking_quality' in ranking:
|
|
relevance_scores.append(ranking['ranking_quality'])
|
|
|
|
if relevance_scores:
|
|
results['avg_relevance_score'] = round(sum(relevance_scores) / len(relevance_scores), 1)
|
|
|
|
# Consistency = low variance in scores
|
|
avg = results['avg_relevance_score']
|
|
variance = sum((s - avg) ** 2 for s in relevance_scores) / len(relevance_scores)
|
|
results['consistency'] = round(100 - (variance / 100), 1)
|
|
|
|
return results
|
|
|
|
def run_comprehensive_test_suite(self) -> Dict:
|
|
"""
|
|
Run comprehensive retrieval test suite.
|
|
|
|
Returns:
|
|
Dict with all test results and recommendations
|
|
"""
|
|
# Sample queries covering different domains
|
|
test_queries = [
|
|
'research',
|
|
'conductor',
|
|
'task',
|
|
'user',
|
|
'project',
|
|
'knowledge',
|
|
'system',
|
|
'analysis',
|
|
]
|
|
|
|
all_results = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'test_queries': test_queries,
|
|
'execution_tests': [],
|
|
'ranking_tests': [],
|
|
'dedup_tests': [],
|
|
'relevance_scores': None,
|
|
'summary': {}
|
|
}
|
|
|
|
# Test 1: Query execution
|
|
for query in test_queries:
|
|
exec_result = self.test_query_execution(query)
|
|
all_results['execution_tests'].append(exec_result)
|
|
|
|
# Test 2: Result ranking
|
|
for query in test_queries[:5]: # Sample subset
|
|
ranking_result = self.test_result_ranking(query)
|
|
all_results['ranking_tests'].append(ranking_result)
|
|
|
|
# Test 3: Deduplication
|
|
for query in test_queries[:5]:
|
|
dedup_result = self.test_deduplication(query)
|
|
all_results['dedup_tests'].append(dedup_result)
|
|
|
|
# Test 4: Relevance scoring
|
|
relevance_result = self.test_relevance_scoring(test_queries)
|
|
all_results['relevance_scores'] = relevance_result
|
|
|
|
# Generate summary metrics
|
|
all_results['summary'] = self._generate_test_summary(all_results)
|
|
|
|
return all_results
|
|
|
|
def _generate_test_summary(self, results: Dict) -> Dict:
|
|
"""Generate summary statistics from test results."""
|
|
summary = {
|
|
'execution_speed': 'unknown',
|
|
'ranking_quality': 'unknown',
|
|
'dedup_effectiveness': 'unknown',
|
|
'overall_retriever_health': 0,
|
|
'issues': [],
|
|
'recommendations': []
|
|
}
|
|
|
|
# Analyze execution tests
|
|
exec_tests = results.get('execution_tests', [])
|
|
if exec_tests:
|
|
within_budget = sum(1 for t in exec_tests if t['within_budget']) / len(exec_tests)
|
|
if within_budget >= 0.95:
|
|
summary['execution_speed'] = 'excellent'
|
|
elif within_budget >= 0.80:
|
|
summary['execution_speed'] = 'good'
|
|
else:
|
|
summary['execution_speed'] = 'slow'
|
|
summary['issues'].append("Query execution exceeds timeout budget")
|
|
|
|
# Analyze ranking tests
|
|
ranking_tests = results.get('ranking_tests', [])
|
|
if ranking_tests:
|
|
avg_ranking = sum(t.get('ranking_quality', 0) for t in ranking_tests) / len(ranking_tests)
|
|
if avg_ranking >= 80:
|
|
summary['ranking_quality'] = 'excellent'
|
|
elif avg_ranking >= 60:
|
|
summary['ranking_quality'] = 'good'
|
|
else:
|
|
summary['ranking_quality'] = 'poor'
|
|
|
|
# Analyze dedup tests
|
|
dedup_tests = results.get('dedup_tests', [])
|
|
if dedup_tests:
|
|
avg_dedup = sum(t.get('dedup_efficiency', 0) for t in dedup_tests) / len(dedup_tests)
|
|
if avg_dedup >= 95:
|
|
summary['dedup_effectiveness'] = 'excellent'
|
|
elif avg_dedup >= 80:
|
|
summary['dedup_effectiveness'] = 'good'
|
|
else:
|
|
summary['dedup_effectiveness'] = 'poor'
|
|
summary['issues'].append(f"Deduplication efficiency only {avg_dedup}%")
|
|
|
|
# Overall health
|
|
health_score = 0
|
|
if summary['execution_speed'] in ['excellent', 'good']:
|
|
health_score += 30
|
|
if summary['ranking_quality'] in ['excellent', 'good']:
|
|
health_score += 35
|
|
if summary['dedup_effectiveness'] in ['excellent', 'good']:
|
|
health_score += 35
|
|
|
|
summary['overall_retriever_health'] = health_score
|
|
|
|
# Recommendations
|
|
if summary['execution_speed'] == 'slow':
|
|
summary['recommendations'].append("Optimize FTS5 queries or add caching layer")
|
|
if summary['ranking_quality'] == 'poor':
|
|
summary['recommendations'].append("Improve ranking algorithm or add semantic scoring")
|
|
if summary['dedup_effectiveness'] == 'poor':
|
|
summary['recommendations'].append("Strengthen entity deduplication logic")
|
|
|
|
return summary
|
|
|
|
|
|
if __name__ == '__main__':
|
|
tester = RetrieverTester()
|
|
|
|
print("=" * 70)
|
|
print("RUNNING RETRIEVER TEST SUITE")
|
|
print("=" * 70)
|
|
results = tester.run_comprehensive_test_suite()
|
|
|
|
print(f"\nTests run: {len(results['test_queries'])} queries")
|
|
print(f"\nSummary:")
|
|
summary = results['summary']
|
|
print(f" Execution Speed: {summary['execution_speed'].upper()}")
|
|
print(f" Ranking Quality: {summary['ranking_quality'].upper()}")
|
|
print(f" Dedup Effectiveness: {summary['dedup_effectiveness'].upper()}")
|
|
print(f" Overall Health: {summary['overall_retriever_health']}/100")
|
|
|
|
if summary['issues']:
|
|
print(f"\nIssues ({len(summary['issues'])}):")
|
|
for issue in summary['issues']:
|
|
print(f" - {issue}")
|
|
|
|
if summary['recommendations']:
|
|
print(f"\nRecommendations:")
|
|
for rec in summary['recommendations']:
|
|
print(f" - {rec}")
|