Files
luzia/lib/qa_postflight.py
admin ec33ac1936 Refactor cockpit to use DockerTmuxController pattern
Based on claude-code-tools TmuxCLIController, this refactor:

- Added DockerTmuxController class for robust tmux session management
- Implements send_keys() with configurable delay_enter
- Implements capture_pane() for output retrieval
- Implements wait_for_prompt() for pattern-based completion detection
- Implements wait_for_idle() for content-hash-based idle detection
- Implements wait_for_shell_prompt() for shell prompt detection

Also includes workflow improvements:
- Pre-task git snapshot before agent execution
- Post-task commit protocol in agent guidelines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:42:16 -03:00

477 lines
16 KiB
Python

#!/usr/bin/env python3
"""
QA Postflight - Post-task validation and learning capture
Runs after each task completes to:
1. Validate task output quality
2. Detect common error patterns
3. Capture learnings for the knowledge graph
4. Generate QA report
"""
import json
import re
import os
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, List, Optional
import logging
# Configure logging
log_dir = Path("/var/log/luz-orchestrator")
log_dir.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Paths
JOBS_DIR = Path("/var/log/luz-orchestrator/jobs")
QA_REPORTS_DIR = Path("/var/log/luz-orchestrator/qa-reports")
LEARNING_LOG = log_dir / "learning-captures.jsonl"
class QAPostflight:
"""Post-task QA validation and learning capture."""
# Error patterns to detect
ERROR_PATTERNS = [
(r"error:|Error:|ERROR:", "error_detected", "high"),
(r"exception:|Exception:|EXCEPTION:", "exception_detected", "high"),
(r"failed|Failed|FAILED", "failure_detected", "medium"),
(r"permission denied|Permission denied", "permission_error", "high"),
(r"not found|Not found|NOT FOUND", "not_found_error", "medium"),
(r"timeout|Timeout|TIMEOUT", "timeout_error", "high"),
(r"connection refused|Connection refused", "connection_error", "high"),
(r"syntax error|SyntaxError", "syntax_error", "high"),
(r"import error|ImportError|ModuleNotFoundError", "import_error", "high"),
(r"GOOGLE_KEY not configured|API.*not configured", "config_error", "medium"),
]
# Success patterns
SUCCESS_PATTERNS = [
(r"completed successfully|task completed|done", "success_signal"),
(r"tests? passed|all.*pass", "tests_passed"),
(r"deployed|deployment.*success", "deployment_success"),
(r"created|updated|fixed", "action_completed"),
]
# Learning extraction patterns
LEARNING_PATTERNS = [
(r"learned?:?\s*(.+?)(?:\n|$)", "explicit_learning"),
(r"solution:?\s*(.+?)(?:\n|$)", "solution_found"),
(r"fixed by:?\s*(.+?)(?:\n|$)", "fix_applied"),
(r"root cause:?\s*(.+?)(?:\n|$)", "root_cause"),
(r"workaround:?\s*(.+?)(?:\n|$)", "workaround"),
]
def __init__(self):
QA_REPORTS_DIR.mkdir(parents=True, exist_ok=True)
def validate_task(self, job_id: str) -> Dict[str, Any]:
"""
Run full postflight validation on a completed task.
Returns validation report with:
- exit_code analysis
- error detection
- success signals
- quality score
- extracted learnings
"""
job_dir = JOBS_DIR / job_id
report = {
"job_id": job_id,
"timestamp": datetime.now().isoformat(),
"validated": False,
"exit_code": None,
"quality_score": 0,
"errors": [],
"warnings": [],
"successes": [],
"learnings": [],
"recommendations": [],
}
if not job_dir.exists():
report["errors"].append(f"Job directory not found: {job_dir}")
return report
# Read output file
output_file = job_dir / "output.log"
output_content = ""
if output_file.exists():
try:
output_content = output_file.read_text(errors='ignore')
except Exception as e:
report["warnings"].append(f"Could not read output: {e}")
# Read metadata
meta_file = job_dir / "meta.json"
meta = {}
if meta_file.exists():
try:
meta = json.loads(meta_file.read_text())
except:
pass
report["project"] = meta.get("project", "unknown")
report["task"] = meta.get("task", "")[:200]
# Extract exit code
report["exit_code"] = self._extract_exit_code(output_content)
# Run validations
report["errors"] = self._detect_errors(output_content)
report["successes"] = self._detect_successes(output_content)
report["learnings"] = self._extract_learnings(output_content)
# Calculate quality score
report["quality_score"] = self._calculate_quality_score(report)
# Generate recommendations
report["recommendations"] = self._generate_recommendations(report)
report["validated"] = True
# Save report
self._save_report(report)
# Capture learnings
if report["learnings"]:
self._capture_learnings(report)
return report
def _extract_exit_code(self, content: str) -> Optional[int]:
"""Extract exit code from output."""
match = re.search(r'exit:(\d+)', content)
if match:
return int(match.group(1))
return None
def _detect_errors(self, content: str) -> List[Dict[str, Any]]:
"""Detect error patterns in output."""
errors = []
for pattern, error_type, severity in self.ERROR_PATTERNS:
matches = re.findall(pattern, content, re.IGNORECASE)
if matches:
# Get context around first match
match = re.search(pattern, content, re.IGNORECASE)
if match:
start = max(0, match.start() - 50)
end = min(len(content), match.end() + 100)
context = content[start:end].strip()
errors.append({
"type": error_type,
"severity": severity,
"count": len(matches),
"context": context[:200],
})
return errors
def _detect_successes(self, content: str) -> List[Dict[str, str]]:
"""Detect success patterns in output."""
successes = []
for pattern, success_type in self.SUCCESS_PATTERNS:
if re.search(pattern, content, re.IGNORECASE):
successes.append({"type": success_type})
return successes
def _extract_learnings(self, content: str) -> List[Dict[str, str]]:
"""Extract learnings from output."""
learnings = []
for pattern, learning_type in self.LEARNING_PATTERNS:
matches = re.findall(pattern, content, re.IGNORECASE)
for match in matches:
if len(match.strip()) > 10: # Filter noise
learnings.append({
"type": learning_type,
"content": match.strip()[:500],
})
return learnings
def _calculate_quality_score(self, report: Dict) -> int:
"""Calculate quality score 0-100."""
score = 50 # Base score
# Exit code impact
if report["exit_code"] == 0:
score += 30
elif report["exit_code"] is not None:
score -= 20
# Error impact
for error in report["errors"]:
if error["severity"] == "high":
score -= 15
elif error["severity"] == "medium":
score -= 8
# Success signals boost
score += len(report["successes"]) * 5
# Learnings boost (shows reflection)
score += len(report["learnings"]) * 3
return max(0, min(100, score))
def _generate_recommendations(self, report: Dict) -> List[str]:
"""Generate actionable recommendations."""
recs = []
if report["exit_code"] != 0 and report["exit_code"] is not None:
recs.append("Task failed - review error logs and consider retry")
for error in report["errors"]:
if error["type"] == "config_error":
recs.append("Configuration error detected - check environment variables")
elif error["type"] == "permission_error":
recs.append("Permission issue - verify file ownership and access rights")
elif error["type"] == "timeout_error":
recs.append("Timeout occurred - consider increasing timeout or optimizing task")
elif error["type"] == "import_error":
recs.append("Import error - check dependencies are installed")
if report["quality_score"] < 50:
recs.append("Low quality score - task may need review or retry")
if not report["learnings"]:
recs.append("No learnings captured - consider documenting key insights")
return recs
def _save_report(self, report: Dict):
"""Save QA report to file."""
report_file = QA_REPORTS_DIR / f"{report['job_id']}.json"
try:
with open(report_file, 'w') as f:
json.dump(report, f, indent=2, default=str)
logger.info(f"QA report saved: {report_file}")
except Exception as e:
logger.error(f"Failed to save QA report: {e}")
def _capture_learnings(self, report: Dict):
"""Capture learnings to learning log."""
try:
with open(LEARNING_LOG, 'a') as f:
for learning in report["learnings"]:
entry = {
"timestamp": report["timestamp"],
"job_id": report["job_id"],
"project": report["project"],
"type": learning["type"],
"content": learning["content"],
"quality_score": report["quality_score"],
}
f.write(json.dumps(entry) + "\n")
logger.info(f"Captured {len(report['learnings'])} learnings from {report['job_id']}")
except Exception as e:
logger.error(f"Failed to capture learnings: {e}")
class PerTaskLearning:
"""Per-task learning capture and KG integration."""
def __init__(self):
self.kg_path = Path("/etc/luz-knowledge/research.db")
def capture_task_learning(self, job_id: str, report: Dict) -> Dict[str, Any]:
"""
Capture learnings from task and store in KG.
Extracts:
- Solutions found
- Errors resolved
- Patterns discovered
- Tools/commands used
"""
result = {
"job_id": job_id,
"learnings_stored": 0,
"relations_created": 0,
}
if not report.get("learnings"):
return result
# Try to store in KG
try:
from knowledge_graph import KnowledgeGraph
kg = KnowledgeGraph("research")
for learning in report["learnings"]:
# Create learning entity
entity_name = f"learning_{job_id}_{learning['type']}"
content = f"""
Project: {report.get('project', 'unknown')}
Task: {report.get('task', '')[:100]}
Type: {learning['type']}
Learning: {learning['content']}
Quality Score: {report.get('quality_score', 0)}
"""
kg.add_entity(
name=entity_name,
entity_type="finding",
content=content,
metadata={
"job_id": job_id,
"project": report.get("project"),
"learning_type": learning["type"],
"quality_score": report.get("quality_score", 0),
},
source=f"job:{job_id}"
)
result["learnings_stored"] += 1
# Create relation to project if exists
project = report.get("project")
if project:
try:
kg.add_relation(entity_name, project, "learned_from")
result["relations_created"] += 1
except:
pass
logger.info(f"Stored {result['learnings_stored']} learnings in KG for {job_id}")
except ImportError:
logger.warning("KnowledgeGraph not available - learnings stored to log only")
except Exception as e:
logger.error(f"Failed to store learnings in KG: {e}")
return result
def _send_telegram_notification(report: Dict[str, Any]) -> bool:
"""
Send telegram notification for important task completions.
Notifies for:
- Task failures (exit_code != 0)
- Low quality score (<50)
- High severity errors
"""
try:
# Import telegram bridge
import sys
sys.path.insert(0, str(Path(__file__).parent))
from telegram_bridge import notify_bruno as send_notification
job_id = report.get("job_id", "unknown")[:8]
project = report.get("project", "luzia")
exit_code = report.get("exit_code")
quality = report.get("quality_score", 0)
# Determine if notification needed and severity
should_notify = False
severity = "info"
message = ""
# Task failure
if exit_code is not None and exit_code != 0:
should_notify = True
severity = "critical" if exit_code in [126, 137, 254] else "warning"
message = f"Task `{job_id}` failed (exit {exit_code})"
# Low quality score
elif quality < 50:
should_notify = True
severity = "warning"
message = f"Task `{job_id}` low quality ({quality}/100)"
# High severity errors detected
elif any(e.get("severity") == "high" for e in report.get("errors", [])):
should_notify = True
severity = "warning"
high_errors = [e["type"] for e in report.get("errors", []) if e.get("severity") == "high"]
message = f"Task `{job_id}` errors: {', '.join(high_errors[:3])}"
# Success with learnings (optional notification)
elif exit_code == 0 and report.get("learnings"):
# Only notify on success if there are significant learnings
if len(report.get("learnings", [])) >= 2:
should_notify = True
severity = "info"
message = f"Task `{job_id}` completed with {len(report['learnings'])} learnings"
if should_notify:
send_notification(message, project, job_id, severity)
logger.info(f"Telegram notification sent for {job_id}")
return True
except ImportError:
logger.debug("Telegram bridge not available - notification skipped")
except Exception as e:
logger.warning(f"Failed to send telegram notification: {e}")
return False
def run_postflight(job_id: str) -> Dict[str, Any]:
"""
Main entry point for postflight validation.
Called after task completion to:
1. Validate output quality
2. Extract and store learnings
3. Generate QA report
4. Send telegram notification for important events
"""
logger.info(f"Running postflight for job: {job_id}")
qa = QAPostflight()
report = qa.validate_task(job_id)
# Per-task learning
learning = PerTaskLearning()
learning_result = learning.capture_task_learning(job_id, report)
report["learning_result"] = learning_result
# Send telegram notification for important events
report["telegram_notified"] = _send_telegram_notification(report)
# Log summary
logger.info(
f"Postflight complete for {job_id}: "
f"score={report['quality_score']}, "
f"errors={len(report['errors'])}, "
f"learnings={len(report['learnings'])}, "
f"notified={report.get('telegram_notified', False)}"
)
return report
# CLI interface
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: qa_postflight.py <job_id>")
print(" qa_postflight.py --recent [count]")
sys.exit(1)
if sys.argv[1] == "--recent":
# Run postflight on recent jobs
count = int(sys.argv[2]) if len(sys.argv) > 2 else 5
jobs = sorted(JOBS_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True)[:count]
for job_dir in jobs:
job_id = job_dir.name
print(f"\n=== Postflight: {job_id} ===")
report = run_postflight(job_id)
print(f" Score: {report['quality_score']}/100")
print(f" Errors: {len(report['errors'])}")
print(f" Learnings: {len(report['learnings'])}")
if report['recommendations']:
print(f" Recommendations:")
for rec in report['recommendations'][:3]:
print(f" - {rec}")
else:
job_id = sys.argv[1]
report = run_postflight(job_id)
print(json.dumps(report, indent=2, default=str))