New tests verify: - DockerTmuxAdapter inherits from TmuxCLIController (architecture) - DockerTmuxController backward compatibility alias works - extract_response correctly parses tmux output - extract_response detects questions ending with ? - Question detection logic identifies questions correctly - load_state returns proper default state - Cockpit state tracks awaiting_response flag 53 tests total, all passing. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
619 lines
20 KiB
Python
619 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Integration tests for Luzia orchestrator components
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import os
|
|
import tempfile
|
|
import shutil
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
# Add lib to path
|
|
sys.path.insert(0, '/opt/server-agents/orchestrator/lib')
|
|
|
|
# Test results tracking
|
|
RESULTS = {'passed': 0, 'failed': 0, 'errors': []}
|
|
|
|
def test(name):
|
|
"""Decorator for test functions"""
|
|
def decorator(func):
|
|
def wrapper():
|
|
try:
|
|
func()
|
|
RESULTS['passed'] += 1
|
|
print(f" ✓ {name}")
|
|
return True
|
|
except AssertionError as e:
|
|
RESULTS['failed'] += 1
|
|
RESULTS['errors'].append(f"{name}: {e}")
|
|
print(f" ✗ {name}: {e}")
|
|
return False
|
|
except Exception as e:
|
|
RESULTS['failed'] += 1
|
|
RESULTS['errors'].append(f"{name}: {type(e).__name__}: {e}")
|
|
print(f" ✗ {name}: {type(e).__name__}: {e}")
|
|
return False
|
|
wrapper.__name__ = func.__name__
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
# =============================================================================
|
|
# Chat Memory Lookup Tests
|
|
# =============================================================================
|
|
|
|
print("\n### Chat Memory Lookup Tests ###")
|
|
|
|
@test("ChatMemoryLookup imports")
|
|
def test_chat_memory_import():
|
|
from chat_memory_lookup import ChatMemoryLookup
|
|
assert ChatMemoryLookup is not None
|
|
|
|
@test("ChatMemoryLookup initializes")
|
|
def test_chat_memory_init():
|
|
from chat_memory_lookup import ChatMemoryLookup
|
|
lookup = ChatMemoryLookup(timeout_ms=150)
|
|
assert lookup.timeout_ms == 150
|
|
|
|
@test("ChatMemoryLookup.memory_statistics returns data")
|
|
def test_chat_memory_stats():
|
|
from chat_memory_lookup import ChatMemoryLookup
|
|
lookup = ChatMemoryLookup()
|
|
stats = lookup.memory_statistics()
|
|
assert 'available' in stats
|
|
assert stats['available'] == True
|
|
assert 'entities' in stats
|
|
assert stats['entities'] > 0
|
|
|
|
@test("ChatMemoryLookup.list_all_projects returns projects")
|
|
def test_chat_memory_projects():
|
|
from chat_memory_lookup import ChatMemoryLookup
|
|
lookup = ChatMemoryLookup()
|
|
result = lookup.list_all_projects()
|
|
assert 'projects' in result
|
|
assert 'count' in result
|
|
assert result['count'] > 0
|
|
assert len(result['projects']) > 0
|
|
|
|
@test("ChatMemoryLookup.search_entities works")
|
|
def test_chat_memory_search():
|
|
from chat_memory_lookup import ChatMemoryLookup
|
|
lookup = ChatMemoryLookup()
|
|
result = lookup.search_entities('admin', limit=5)
|
|
assert 'entities' in result
|
|
assert 'count' in result
|
|
|
|
test_chat_memory_import()
|
|
test_chat_memory_init()
|
|
test_chat_memory_stats()
|
|
test_chat_memory_projects()
|
|
test_chat_memory_search()
|
|
|
|
|
|
# =============================================================================
|
|
# Chat Intent Parser Tests
|
|
# =============================================================================
|
|
|
|
print("\n### Chat Intent Parser Tests ###")
|
|
|
|
@test("ChatIntentParser imports")
|
|
def test_intent_import():
|
|
from chat_intent_parser import ChatIntentParser
|
|
assert ChatIntentParser is not None
|
|
|
|
@test("ChatIntentParser.parse returns intent structure")
|
|
def test_intent_parse():
|
|
from chat_intent_parser import ChatIntentParser
|
|
parser = ChatIntentParser()
|
|
result = parser.parse("list projects")
|
|
assert 'intent' in result
|
|
assert 'keywords' in result
|
|
assert 'scope' in result
|
|
|
|
@test("ChatIntentParser detects project_info intent")
|
|
def test_intent_project():
|
|
from chat_intent_parser import ChatIntentParser
|
|
parser = ChatIntentParser()
|
|
result = parser.parse("list projects")
|
|
assert result['intent'] == 'project_info'
|
|
assert 'projects' in result['keywords']
|
|
|
|
@test("ChatIntentParser detects system_status intent")
|
|
def test_intent_status():
|
|
from chat_intent_parser import ChatIntentParser
|
|
parser = ChatIntentParser()
|
|
result = parser.parse("system status")
|
|
assert result['intent'] == 'system_status'
|
|
|
|
@test("ChatIntentParser.extract_search_term works")
|
|
def test_intent_search_term():
|
|
from chat_intent_parser import ChatIntentParser
|
|
parser = ChatIntentParser()
|
|
term = parser.extract_search_term("search for authentication")
|
|
assert term is not None
|
|
assert len(term) > 0
|
|
|
|
test_intent_import()
|
|
test_intent_parse()
|
|
test_intent_project()
|
|
test_intent_status()
|
|
test_intent_search_term()
|
|
|
|
|
|
# =============================================================================
|
|
# Chat Orchestrator Tests
|
|
# =============================================================================
|
|
|
|
print("\n### Chat Orchestrator Tests ###")
|
|
|
|
@test("ChatOrchestrator imports")
|
|
def test_orchestrator_import():
|
|
from chat_orchestrator import ChatOrchestrator
|
|
assert ChatOrchestrator is not None
|
|
|
|
@test("ChatOrchestrator initializes")
|
|
def test_orchestrator_init():
|
|
from chat_orchestrator import ChatOrchestrator
|
|
orch = ChatOrchestrator(timeout_ms=500)
|
|
assert orch.timeout_ms == 500
|
|
|
|
@test("ChatOrchestrator.process_query returns response")
|
|
def test_orchestrator_query():
|
|
from chat_orchestrator import ChatOrchestrator
|
|
orch = ChatOrchestrator()
|
|
result = orch.process_query("help")
|
|
assert 'response' in result
|
|
assert 'status' in result
|
|
assert result['status'] == 'success'
|
|
|
|
@test("ChatOrchestrator handles system status query")
|
|
def test_orchestrator_status():
|
|
from chat_orchestrator import ChatOrchestrator
|
|
orch = ChatOrchestrator()
|
|
result = orch.process_query("system status")
|
|
assert 'response' in result
|
|
assert 'execution_time_ms' in result
|
|
|
|
@test("ChatOrchestrator handles project query")
|
|
def test_orchestrator_projects():
|
|
from chat_orchestrator import ChatOrchestrator
|
|
orch = ChatOrchestrator()
|
|
result = orch.process_query("list projects")
|
|
assert 'response' in result
|
|
assert 'Projects' in result['response'] or 'project' in result['response'].lower()
|
|
|
|
test_orchestrator_import()
|
|
test_orchestrator_init()
|
|
test_orchestrator_query()
|
|
test_orchestrator_status()
|
|
test_orchestrator_projects()
|
|
|
|
|
|
# =============================================================================
|
|
# Chat Response Formatter Tests
|
|
# =============================================================================
|
|
|
|
print("\n### Chat Response Formatter Tests ###")
|
|
|
|
@test("ChatResponseFormatter imports")
|
|
def test_formatter_import():
|
|
from chat_response_formatter import ChatResponseFormatter
|
|
assert ChatResponseFormatter is not None
|
|
|
|
@test("ChatResponseFormatter.format_help returns markdown")
|
|
def test_formatter_help():
|
|
from chat_response_formatter import ChatResponseFormatter
|
|
formatter = ChatResponseFormatter()
|
|
help_text = formatter.format_help()
|
|
assert '# ' in help_text # Has markdown header
|
|
assert len(help_text) > 100
|
|
|
|
@test("ChatResponseFormatter.format_response_time works")
|
|
def test_formatter_time():
|
|
from chat_response_formatter import ChatResponseFormatter
|
|
formatter = ChatResponseFormatter()
|
|
instant = formatter.format_response_time(5)
|
|
assert 'instant' in instant
|
|
fast = formatter.format_response_time(150)
|
|
assert 'fast' in fast.lower() or 'ms' in fast
|
|
|
|
@test("ChatResponseFormatter.format_project_list works")
|
|
def test_formatter_projects():
|
|
from chat_response_formatter import ChatResponseFormatter
|
|
formatter = ChatResponseFormatter()
|
|
data = {'projects': [{'name': 'test', 'type': 'project'}], 'count': 1}
|
|
result = formatter.format_project_list(data)
|
|
assert 'test' in result
|
|
assert 'Project' in result or '1' in result
|
|
|
|
test_formatter_import()
|
|
test_formatter_help()
|
|
test_formatter_time()
|
|
test_formatter_projects()
|
|
|
|
|
|
# =============================================================================
|
|
# Chat Bash Executor Tests
|
|
# =============================================================================
|
|
|
|
print("\n### Chat Bash Executor Tests ###")
|
|
|
|
@test("ChatBashExecutor imports")
|
|
def test_bash_import():
|
|
from chat_bash_executor import ChatBashExecutor
|
|
assert ChatBashExecutor is not None
|
|
|
|
@test("ChatBashExecutor.execute runs uptime")
|
|
def test_bash_uptime():
|
|
from chat_bash_executor import ChatBashExecutor
|
|
executor = ChatBashExecutor()
|
|
result = executor.execute('uptime')
|
|
assert 'success' in result
|
|
assert result['success'] == True
|
|
assert 'output' in result
|
|
|
|
@test("ChatBashExecutor.execute runs disk")
|
|
def test_bash_disk():
|
|
from chat_bash_executor import ChatBashExecutor
|
|
executor = ChatBashExecutor()
|
|
result = executor.execute('disk')
|
|
assert result['success'] == True
|
|
|
|
@test("ChatBashExecutor rejects unknown commands")
|
|
def test_bash_reject():
|
|
from chat_bash_executor import ChatBashExecutor
|
|
executor = ChatBashExecutor()
|
|
result = executor.execute('unknown_dangerous_cmd')
|
|
# Unknown commands return error without success key
|
|
assert 'error' in result
|
|
assert 'not allowed' in result['error'].lower()
|
|
|
|
test_bash_import()
|
|
test_bash_uptime()
|
|
test_bash_disk()
|
|
test_bash_reject()
|
|
|
|
|
|
# =============================================================================
|
|
# Task Watchdog Tests
|
|
# =============================================================================
|
|
|
|
print("\n### Task Watchdog Tests ###")
|
|
|
|
@test("TaskWatchdog imports")
|
|
def test_watchdog_import():
|
|
from task_watchdog import TaskWatchdog
|
|
assert TaskWatchdog is not None
|
|
|
|
@test("TaskWatchdog initializes")
|
|
def test_watchdog_init():
|
|
from task_watchdog import TaskWatchdog
|
|
watchdog = TaskWatchdog()
|
|
assert watchdog.HEARTBEAT_TIMEOUT_SECONDS == 300
|
|
assert watchdog.LOCK_TIMEOUT_SECONDS == 3600
|
|
|
|
@test("TaskWatchdog.check_heartbeats runs")
|
|
def test_watchdog_heartbeats():
|
|
from task_watchdog import TaskWatchdog
|
|
watchdog = TaskWatchdog()
|
|
stuck = watchdog.check_heartbeats()
|
|
assert isinstance(stuck, list)
|
|
|
|
@test("TaskWatchdog.get_project_queue_status returns dict")
|
|
def test_watchdog_queue_status():
|
|
from task_watchdog import TaskWatchdog
|
|
watchdog = TaskWatchdog()
|
|
status = watchdog.get_project_queue_status()
|
|
assert isinstance(status, dict)
|
|
|
|
@test("TaskWatchdog.is_project_blocked returns tuple")
|
|
def test_watchdog_blocked():
|
|
from task_watchdog import TaskWatchdog
|
|
watchdog = TaskWatchdog()
|
|
blocked, reason = watchdog.is_project_blocked('test_nonexistent')
|
|
assert isinstance(blocked, bool)
|
|
|
|
@test("TaskWatchdog.run_check returns summary")
|
|
def test_watchdog_check():
|
|
from task_watchdog import TaskWatchdog
|
|
watchdog = TaskWatchdog()
|
|
summary = watchdog.run_check()
|
|
assert 'timestamp' in summary
|
|
assert 'stuck_tasks' in summary
|
|
assert 'project_status' in summary
|
|
|
|
test_watchdog_import()
|
|
test_watchdog_init()
|
|
test_watchdog_heartbeats()
|
|
test_watchdog_queue_status()
|
|
test_watchdog_blocked()
|
|
test_watchdog_check()
|
|
|
|
|
|
# =============================================================================
|
|
# Task Completion Tests
|
|
# =============================================================================
|
|
|
|
print("\n### Task Completion Tests ###")
|
|
|
|
@test("TaskCompletion imports")
|
|
def test_completion_import():
|
|
from task_completion import TaskCompletion, complete_task, fail_task
|
|
assert TaskCompletion is not None
|
|
assert complete_task is not None
|
|
assert fail_task is not None
|
|
|
|
@test("TaskCompletion initializes")
|
|
def test_completion_init():
|
|
from task_completion import TaskCompletion
|
|
handler = TaskCompletion()
|
|
assert handler.COMPLETED_DIR.exists() or True # May not exist yet
|
|
|
|
@test("TaskCompletion.complete_task handles missing task")
|
|
def test_completion_missing():
|
|
from task_completion import TaskCompletion
|
|
handler = TaskCompletion()
|
|
result = handler.complete_task('nonexistent-task-12345')
|
|
assert result['success'] == False
|
|
assert 'not found' in result.get('error', '').lower()
|
|
|
|
@test("TaskCompletion.fail_task handles missing task")
|
|
def test_fail_missing():
|
|
from task_completion import TaskCompletion
|
|
handler = TaskCompletion()
|
|
result = handler.fail_task('nonexistent-task-12345', 'test error')
|
|
assert result['success'] == False
|
|
|
|
@test("TaskCompletion.set_awaiting_human handles missing task")
|
|
def test_awaiting_missing():
|
|
from task_completion import TaskCompletion
|
|
handler = TaskCompletion()
|
|
result = handler.set_awaiting_human('nonexistent-task-12345', 'question?')
|
|
assert result['success'] == False
|
|
|
|
test_completion_import()
|
|
test_completion_init()
|
|
test_completion_missing()
|
|
test_fail_missing()
|
|
test_awaiting_missing()
|
|
|
|
|
|
# =============================================================================
|
|
# Cockpit Tests
|
|
# =============================================================================
|
|
|
|
print("\n### Cockpit Tests ###")
|
|
|
|
@test("Cockpit module imports")
|
|
def test_cockpit_import():
|
|
from cockpit import cockpit_status, cockpit_start, cockpit_stop, cockpit_send
|
|
assert cockpit_status is not None
|
|
assert cockpit_start is not None
|
|
assert cockpit_stop is not None
|
|
|
|
@test("cockpit_status returns state")
|
|
def test_cockpit_status():
|
|
from cockpit import cockpit_status
|
|
result = cockpit_status('admin')
|
|
assert isinstance(result, dict)
|
|
# Should have status info even if not running
|
|
|
|
@test("container_exists helper works")
|
|
def test_cockpit_container_exists():
|
|
from cockpit import container_exists
|
|
# Test with a non-existent container
|
|
result = container_exists('nonexistent-container-12345')
|
|
assert isinstance(result, bool)
|
|
assert result == False
|
|
|
|
@test("get_container_name generates correct name")
|
|
def test_cockpit_container_name():
|
|
from cockpit import get_container_name
|
|
name = get_container_name('testproject')
|
|
assert 'testproject' in name
|
|
assert 'cockpit' in name.lower()
|
|
|
|
test_cockpit_import()
|
|
test_cockpit_status()
|
|
test_cockpit_container_exists()
|
|
test_cockpit_container_name()
|
|
|
|
|
|
# =============================================================================
|
|
# Cockpit Question Passing Tests
|
|
# =============================================================================
|
|
|
|
print("\n### Cockpit Question Passing Tests ###")
|
|
|
|
@test("DockerTmuxAdapter inherits from TmuxCLIController")
|
|
def test_cockpit_adapter_inheritance():
|
|
from cockpit import DockerTmuxAdapter
|
|
from claude_code_tools.tmux_cli_controller import TmuxCLIController
|
|
assert issubclass(DockerTmuxAdapter, TmuxCLIController)
|
|
|
|
@test("DockerTmuxController alias works")
|
|
def test_cockpit_alias():
|
|
from cockpit import DockerTmuxAdapter, DockerTmuxController
|
|
assert DockerTmuxController is DockerTmuxAdapter
|
|
|
|
@test("DockerTmuxAdapter.extract_response parses output correctly")
|
|
def test_cockpit_extract_response():
|
|
from cockpit import DockerTmuxAdapter
|
|
adapter = DockerTmuxAdapter('test-container')
|
|
|
|
# Simulated tmux output
|
|
output = '''root@host:/workspace# echo 'test' | claude --print -p
|
|
This is Claude's response.
|
|
It has multiple lines.
|
|
root@host:/workspace#'''
|
|
|
|
response = adapter.extract_response(output, command_marker="claude --print")
|
|
assert "This is Claude's response" in response
|
|
assert "multiple lines" in response
|
|
# Should not include shell prompts
|
|
assert "root@host" not in response
|
|
|
|
@test("DockerTmuxAdapter.extract_response detects questions")
|
|
def test_cockpit_extract_response_question():
|
|
from cockpit import DockerTmuxAdapter
|
|
adapter = DockerTmuxAdapter('test-container')
|
|
|
|
output = '''root@host:/workspace# echo 'task' | claude --print -p
|
|
I understand. Before I proceed, what authentication method would you prefer?
|
|
root@host:/workspace#'''
|
|
|
|
response = adapter.extract_response(output, command_marker="claude --print")
|
|
# Response should contain the question
|
|
assert "what authentication method" in response
|
|
# Verify it ends with ?
|
|
lines = [l.strip() for l in response.split('\n') if l.strip()]
|
|
assert lines[-1].endswith('?')
|
|
|
|
@test("Question detection identifies questions correctly")
|
|
def test_cockpit_question_detection():
|
|
# Test the question detection logic used in cockpit
|
|
test_cases = [
|
|
("What file should I modify?", True),
|
|
("I completed the task.", False),
|
|
("Should I proceed with the changes?", True),
|
|
("Done.", False),
|
|
("Which approach do you prefer?", True),
|
|
("Task finished successfully", False),
|
|
]
|
|
|
|
for text, expected in test_cases:
|
|
is_question = text.strip().endswith('?')
|
|
assert is_question == expected, f"Failed for: {text}"
|
|
|
|
@test("load_state returns default state for new project")
|
|
def test_cockpit_load_state_default():
|
|
from cockpit import load_state
|
|
# Use a project name that won't exist
|
|
state = load_state('nonexistent-test-project-xyz')
|
|
assert state['awaiting_response'] == False
|
|
assert state['last_question'] is None
|
|
assert state['status'] == 'not_started'
|
|
|
|
@test("Cockpit state tracks awaiting_response flag")
|
|
def test_cockpit_awaiting_response_tracking():
|
|
import json
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
# Create a mock state to verify structure
|
|
mock_state = {
|
|
"project": "test",
|
|
"session_id": "test-uuid",
|
|
"status": "running",
|
|
"session_started": True,
|
|
"awaiting_response": True,
|
|
"last_question": "What should I do next?",
|
|
"last_output": "Previous output"
|
|
}
|
|
|
|
# Verify all required fields exist
|
|
assert 'awaiting_response' in mock_state
|
|
assert 'last_question' in mock_state
|
|
assert mock_state['awaiting_response'] == True
|
|
assert mock_state['last_question'].endswith('?')
|
|
|
|
test_cockpit_adapter_inheritance()
|
|
test_cockpit_alias()
|
|
test_cockpit_extract_response()
|
|
test_cockpit_extract_response_question()
|
|
test_cockpit_question_detection()
|
|
test_cockpit_load_state_default()
|
|
test_cockpit_awaiting_response_tracking()
|
|
|
|
|
|
# =============================================================================
|
|
# KG Lookup Tests
|
|
# =============================================================================
|
|
|
|
print("\n### KG Lookup Tests ###")
|
|
|
|
@test("ChatKGLookup imports")
|
|
def test_kg_import():
|
|
from chat_kg_lookup import ChatKGLookup
|
|
assert ChatKGLookup is not None
|
|
|
|
@test("ChatKGLookup.get_kg_statistics returns stats")
|
|
def test_kg_stats():
|
|
from chat_kg_lookup import ChatKGLookup
|
|
lookup = ChatKGLookup()
|
|
stats = lookup.get_kg_statistics()
|
|
assert isinstance(stats, dict)
|
|
|
|
@test("ChatKGLookup.search_all_domains works")
|
|
def test_kg_search():
|
|
from chat_kg_lookup import ChatKGLookup
|
|
lookup = ChatKGLookup()
|
|
results = lookup.search_all_domains('admin', limit=5)
|
|
assert isinstance(results, dict)
|
|
|
|
test_kg_import()
|
|
test_kg_stats()
|
|
test_kg_search()
|
|
|
|
|
|
# =============================================================================
|
|
# CLI Integration Tests
|
|
# =============================================================================
|
|
|
|
print("\n### CLI Integration Tests ###")
|
|
|
|
import subprocess
|
|
|
|
@test("luzia --help works")
|
|
def test_cli_help():
|
|
result = subprocess.run(['luzia', '--help'], capture_output=True, text=True, timeout=10)
|
|
assert result.returncode == 0
|
|
assert 'luzia' in result.stdout.lower() or 'usage' in result.stdout.lower()
|
|
|
|
@test("luzia chat help works")
|
|
def test_cli_chat_help():
|
|
result = subprocess.run(['luzia', 'chat', 'help'], capture_output=True, text=True, timeout=10)
|
|
assert result.returncode == 0
|
|
assert 'Chat' in result.stdout or 'chat' in result.stdout.lower()
|
|
|
|
@test("luzia watchdog status works")
|
|
def test_cli_watchdog():
|
|
result = subprocess.run(['luzia', 'watchdog', 'status'], capture_output=True, text=True, timeout=10)
|
|
assert result.returncode == 0
|
|
assert 'PROJECT' in result.stdout or 'Queue' in result.stdout
|
|
|
|
@test("luzia cockpit status works")
|
|
def test_cli_cockpit():
|
|
result = subprocess.run(['luzia', 'cockpit', 'status'], capture_output=True, text=True, timeout=10)
|
|
assert result.returncode == 0
|
|
|
|
@test("luzia list works")
|
|
def test_cli_list():
|
|
result = subprocess.run(['luzia', 'list'], capture_output=True, text=True, timeout=10)
|
|
assert result.returncode == 0
|
|
|
|
test_cli_help()
|
|
test_cli_chat_help()
|
|
test_cli_watchdog()
|
|
test_cli_cockpit()
|
|
test_cli_list()
|
|
|
|
|
|
# =============================================================================
|
|
# Summary
|
|
# =============================================================================
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f"RESULTS: {RESULTS['passed']} passed, {RESULTS['failed']} failed")
|
|
print("=" * 60)
|
|
|
|
if RESULTS['errors']:
|
|
print("\nFailed tests:")
|
|
for err in RESULTS['errors']:
|
|
print(f" - {err}")
|
|
|
|
sys.exit(0 if RESULTS['failed'] == 0 else 1)
|