Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
595 lines
22 KiB
Python
595 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Telegram Webhook Handler for Luzia
|
|
|
|
Handles incoming callback queries (button clicks), text messages, and voice/audio
|
|
messages from Bruno. Voice messages are transcribed and processed using Gemini 3.
|
|
|
|
Runs as a simple HTTP server that receives Telegram webhook updates.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
import base64
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Optional, Tuple
|
|
import urllib.request
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration
|
|
WEBHOOK_PORT = int(os.environ.get("LUZIA_WEBHOOK_PORT", "8765"))
|
|
BOT_TOKEN_PATH = "/etc/telegram-bot/token"
|
|
PENDING_REQUESTS_DIR = Path("/var/lib/luzia/telegram_requests")
|
|
AUDIO_CACHE_DIR = Path("/var/lib/luzia/audio_cache")
|
|
AUTHORIZED_USER_ID = "50639169" # Bruno's Telegram user ID
|
|
|
|
# Gemini configuration for audio processing
|
|
GEMINI_MODEL = "gemini-2.0-flash" # Supports audio input
|
|
|
|
|
|
class TelegramWebhookHandler(BaseHTTPRequestHandler):
|
|
"""Handle incoming Telegram webhook updates."""
|
|
|
|
def log_message(self, format, *args):
|
|
"""Custom logging."""
|
|
logger.info(f"Webhook: {args[0]}")
|
|
|
|
def do_POST(self):
|
|
"""Handle POST request from Telegram."""
|
|
content_length = int(self.headers.get('Content-Length', 0))
|
|
body = self.rfile.read(content_length)
|
|
|
|
try:
|
|
update = json.loads(body.decode('utf-8'))
|
|
self._process_update(update)
|
|
self._send_response(200, {"ok": True})
|
|
except Exception as e:
|
|
logger.error(f"Error processing update: {e}")
|
|
self._send_response(500, {"ok": False, "error": str(e)})
|
|
|
|
def _send_response(self, status: int, data: dict):
|
|
"""Send JSON response."""
|
|
self.send_response(status)
|
|
self.send_header('Content-Type', 'application/json')
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(data).encode())
|
|
|
|
def _process_update(self, update: dict):
|
|
"""Process a Telegram update."""
|
|
# Handle callback query (button click)
|
|
if "callback_query" in update:
|
|
self._handle_callback_query(update["callback_query"])
|
|
|
|
# Handle voice message
|
|
elif "message" in update and "voice" in update["message"]:
|
|
self._handle_voice_message(update["message"])
|
|
|
|
# Handle audio file
|
|
elif "message" in update and "audio" in update["message"]:
|
|
self._handle_audio_message(update["message"])
|
|
|
|
# Handle text message
|
|
elif "message" in update and "text" in update["message"]:
|
|
self._handle_message(update["message"])
|
|
|
|
def _handle_voice_message(self, message: dict):
|
|
"""Handle voice message - transcribe and process with Gemini."""
|
|
user_id = str(message.get("from", {}).get("id", ""))
|
|
chat_id = message.get("chat", {}).get("id")
|
|
voice = message.get("voice", {})
|
|
file_id = voice.get("file_id")
|
|
duration = voice.get("duration", 0)
|
|
|
|
logger.info(f"Voice message from {user_id}: {duration}s, file_id={file_id[:20]}...")
|
|
|
|
if user_id != AUTHORIZED_USER_ID:
|
|
return
|
|
|
|
# Send typing indicator
|
|
self._send_chat_action(chat_id, "typing")
|
|
|
|
# Download and process audio
|
|
try:
|
|
audio_path = self._download_telegram_file(file_id)
|
|
if not audio_path:
|
|
self._send_reply(chat_id, "❌ Failed to download voice message")
|
|
return
|
|
|
|
# Process with Gemini
|
|
response = self._process_audio_with_gemini(audio_path, "voice")
|
|
|
|
# Send response
|
|
if response:
|
|
self._send_reply(chat_id, response)
|
|
else:
|
|
self._send_reply(chat_id, "❌ Failed to process voice message")
|
|
|
|
# Cleanup
|
|
Path(audio_path).unlink(missing_ok=True)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing voice message: {e}")
|
|
self._send_reply(chat_id, f"❌ Error: {str(e)[:100]}")
|
|
|
|
def _handle_audio_message(self, message: dict):
|
|
"""Handle audio file - transcribe and process with Gemini."""
|
|
user_id = str(message.get("from", {}).get("id", ""))
|
|
chat_id = message.get("chat", {}).get("id")
|
|
audio = message.get("audio", {})
|
|
file_id = audio.get("file_id")
|
|
file_name = audio.get("file_name", "audio")
|
|
duration = audio.get("duration", 0)
|
|
|
|
logger.info(f"Audio file from {user_id}: {file_name}, {duration}s")
|
|
|
|
if user_id != AUTHORIZED_USER_ID:
|
|
return
|
|
|
|
# Send typing indicator
|
|
self._send_chat_action(chat_id, "typing")
|
|
|
|
# Download and process audio
|
|
try:
|
|
audio_path = self._download_telegram_file(file_id)
|
|
if not audio_path:
|
|
self._send_reply(chat_id, "❌ Failed to download audio file")
|
|
return
|
|
|
|
# Process with Gemini
|
|
response = self._process_audio_with_gemini(audio_path, "audio", file_name)
|
|
|
|
# Send response
|
|
if response:
|
|
self._send_reply(chat_id, response)
|
|
else:
|
|
self._send_reply(chat_id, "❌ Failed to process audio file")
|
|
|
|
# Cleanup
|
|
Path(audio_path).unlink(missing_ok=True)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing audio file: {e}")
|
|
self._send_reply(chat_id, f"❌ Error: {str(e)[:100]}")
|
|
|
|
def _download_telegram_file(self, file_id: str) -> Optional[str]:
|
|
"""Download a file from Telegram servers."""
|
|
bot_token = self._get_bot_token()
|
|
if not bot_token:
|
|
return None
|
|
|
|
try:
|
|
# Get file path from Telegram
|
|
url = f"https://api.telegram.org/bot{bot_token}/getFile"
|
|
data = json.dumps({"file_id": file_id}).encode()
|
|
req = urllib.request.Request(
|
|
url, data=data,
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
with urllib.request.urlopen(req, timeout=30) as response:
|
|
result = json.loads(response.read())
|
|
if not result.get("ok"):
|
|
return None
|
|
file_path = result["result"]["file_path"]
|
|
|
|
# Download the file
|
|
download_url = f"https://api.telegram.org/file/bot{bot_token}/{file_path}"
|
|
|
|
# Create cache directory
|
|
AUDIO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Save to temp file
|
|
ext = Path(file_path).suffix or ".ogg"
|
|
local_path = AUDIO_CACHE_DIR / f"{file_id[:16]}{ext}"
|
|
|
|
urllib.request.urlretrieve(download_url, local_path)
|
|
logger.info(f"Downloaded audio to {local_path}")
|
|
return str(local_path)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to download file: {e}")
|
|
return None
|
|
|
|
def _process_audio_with_gemini(self, audio_path: str, audio_type: str = "voice",
|
|
file_name: str = None) -> Optional[str]:
|
|
"""Process audio with Gemini 3 for transcription and response."""
|
|
try:
|
|
import google.generativeai as genai
|
|
|
|
# Configure Gemini - check multiple sources for API key
|
|
api_key = os.environ.get("GEMINI_API_KEY") or os.environ.get("GOOGLE_API_KEY")
|
|
|
|
if not api_key:
|
|
# Try to load from PAL MCP .env file
|
|
pal_env = Path("/opt/pal-mcp-server/.env")
|
|
if pal_env.exists():
|
|
for line in pal_env.read_text().split("\n"):
|
|
if line.startswith("GEMINI_API_KEY="):
|
|
api_key = line.split("=", 1)[1].strip().strip('"\'')
|
|
break
|
|
|
|
if not api_key:
|
|
# Try shared credentials
|
|
creds_file = Path("/etc/shared-ai-credentials/gemini/api_key.txt")
|
|
if creds_file.exists():
|
|
api_key = creds_file.read_text().strip()
|
|
|
|
if not api_key:
|
|
logger.error("No Google API key found")
|
|
return "❌ Gemini API not configured"
|
|
|
|
genai.configure(api_key=api_key)
|
|
|
|
# Read audio file
|
|
with open(audio_path, "rb") as f:
|
|
audio_data = f.read()
|
|
|
|
# Determine MIME type
|
|
ext = Path(audio_path).suffix.lower()
|
|
mime_types = {
|
|
".ogg": "audio/ogg",
|
|
".mp3": "audio/mp3",
|
|
".wav": "audio/wav",
|
|
".m4a": "audio/mp4",
|
|
".oga": "audio/ogg",
|
|
}
|
|
mime_type = mime_types.get(ext, "audio/ogg")
|
|
|
|
# Create the model
|
|
model = genai.GenerativeModel(GEMINI_MODEL)
|
|
|
|
# Create audio part
|
|
audio_part = {
|
|
"inline_data": {
|
|
"mime_type": mime_type,
|
|
"data": base64.b64encode(audio_data).decode()
|
|
}
|
|
}
|
|
|
|
# Prompt for the model
|
|
if audio_type == "voice":
|
|
prompt = """You are Bruno's AI assistant (Luzia).
|
|
Listen to this voice message and respond helpfully.
|
|
|
|
First, transcribe what was said, then provide a helpful response.
|
|
|
|
Format:
|
|
📝 **Transcription:** [what was said]
|
|
|
|
🤖 **Response:** [your helpful response]
|
|
|
|
If this is a command or task request, acknowledge it and explain what actions would be needed."""
|
|
else:
|
|
prompt = f"""Analyze this audio file ({file_name or 'audio'}).
|
|
|
|
If it contains speech, transcribe it.
|
|
If it's music or other audio, describe what you hear.
|
|
|
|
Provide any relevant insights or actions that might be helpful."""
|
|
|
|
# Generate response
|
|
response = model.generate_content([prompt, audio_part])
|
|
|
|
if response and response.text:
|
|
logger.info(f"Gemini processed audio successfully")
|
|
return response.text
|
|
else:
|
|
return "❌ No response from Gemini"
|
|
|
|
except ImportError:
|
|
logger.error("google-generativeai not installed")
|
|
return "❌ Gemini SDK not installed. Run: pip install google-generativeai"
|
|
except Exception as e:
|
|
logger.error(f"Gemini processing failed: {e}")
|
|
return f"❌ Gemini error: {str(e)[:200]}"
|
|
|
|
def _send_chat_action(self, chat_id: int, action: str = "typing"):
|
|
"""Send chat action (typing indicator)."""
|
|
bot_token = self._get_bot_token()
|
|
if not bot_token:
|
|
return
|
|
|
|
url = f"https://api.telegram.org/bot{bot_token}/sendChatAction"
|
|
data = json.dumps({
|
|
"chat_id": chat_id,
|
|
"action": action
|
|
}).encode()
|
|
|
|
try:
|
|
req = urllib.request.Request(
|
|
url, data=data,
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
urllib.request.urlopen(req, timeout=5)
|
|
except Exception as e:
|
|
logger.debug(f"Failed to send chat action: {e}")
|
|
|
|
def _handle_callback_query(self, query: dict):
|
|
"""Handle inline keyboard button click."""
|
|
callback_id = query.get("id")
|
|
user_id = str(query.get("from", {}).get("id", ""))
|
|
data = query.get("data", "")
|
|
|
|
logger.info(f"Callback query from {user_id}: {data}")
|
|
|
|
# Verify user authorization
|
|
if user_id != AUTHORIZED_USER_ID:
|
|
self._answer_callback(callback_id, "Unauthorized")
|
|
return
|
|
|
|
# Parse callback data: action:request_id[:option_index]
|
|
parts = data.split(":")
|
|
if len(parts) < 2:
|
|
self._answer_callback(callback_id, "Invalid callback data")
|
|
return
|
|
|
|
action = parts[0]
|
|
request_id = parts[1]
|
|
option_index = parts[2] if len(parts) > 2 else None
|
|
|
|
# Process action
|
|
if action == "approve":
|
|
self._record_response(request_id, "approved", approved=True)
|
|
self._answer_callback(callback_id, "✅ Approved!")
|
|
self._notify_luzia(request_id, "approved")
|
|
|
|
elif action == "deny":
|
|
self._record_response(request_id, "denied", approved=False)
|
|
self._answer_callback(callback_id, "❌ Denied")
|
|
self._notify_luzia(request_id, "denied")
|
|
|
|
elif action == "answer":
|
|
# Get the original options from the request
|
|
request = self._load_request(request_id)
|
|
if request and option_index:
|
|
# For now, record the option index as the response
|
|
self._record_response(request_id, f"option_{option_index}")
|
|
self._answer_callback(callback_id, "✅ Answer recorded")
|
|
self._notify_luzia(request_id, f"option_{option_index}")
|
|
else:
|
|
self._answer_callback(callback_id, "Request not found")
|
|
|
|
elif action == "custom":
|
|
self._answer_callback(callback_id, "Reply to this message with your answer")
|
|
# Set a flag to expect a text reply
|
|
self._set_expecting_reply(request_id)
|
|
|
|
else:
|
|
self._answer_callback(callback_id, "Unknown action")
|
|
|
|
def _handle_message(self, message: dict):
|
|
"""Handle text message from Bruno."""
|
|
user_id = str(message.get("from", {}).get("id", ""))
|
|
text = message.get("text", "")
|
|
chat_id = message.get("chat", {}).get("id")
|
|
|
|
logger.info(f"Message from {user_id}: {text[:50]}...")
|
|
|
|
if user_id != AUTHORIZED_USER_ID:
|
|
return
|
|
|
|
# Check for command responses
|
|
if text.startswith("/approve "):
|
|
request_id = text.split()[1] if len(text.split()) > 1 else None
|
|
if request_id:
|
|
self._record_response(request_id, "approved", approved=True)
|
|
self._send_reply(chat_id, f"✅ Request {request_id} approved")
|
|
self._notify_luzia(request_id, "approved")
|
|
|
|
elif text.startswith("/deny "):
|
|
parts = text.split(maxsplit=2)
|
|
request_id = parts[1] if len(parts) > 1 else None
|
|
reason = parts[2] if len(parts) > 2 else "No reason given"
|
|
if request_id:
|
|
self._record_response(request_id, f"denied: {reason}", approved=False)
|
|
self._send_reply(chat_id, f"❌ Request {request_id} denied")
|
|
self._notify_luzia(request_id, f"denied: {reason}")
|
|
|
|
elif text.startswith("/answer "):
|
|
parts = text.split(maxsplit=2)
|
|
request_id = parts[1] if len(parts) > 1 else None
|
|
answer = parts[2] if len(parts) > 2 else ""
|
|
if request_id and answer:
|
|
self._record_response(request_id, answer)
|
|
self._send_reply(chat_id, f"✅ Answer recorded for {request_id}")
|
|
self._notify_luzia(request_id, answer)
|
|
|
|
elif text.startswith("/pending"):
|
|
pending = self._get_pending_requests()
|
|
if pending:
|
|
msg = "*Pending Requests:*\n\n"
|
|
for req in pending[:10]:
|
|
msg += f"• `{req['request_id']}` [{req['request_type']}]\n"
|
|
msg += f" {req['message'][:50]}...\n\n"
|
|
self._send_reply(chat_id, msg)
|
|
else:
|
|
self._send_reply(chat_id, "No pending requests")
|
|
|
|
elif text.startswith("/help"):
|
|
help_text = """*Luzia Telegram Commands:*
|
|
|
|
/approve <id> - Approve a request
|
|
/deny <id> <reason> - Deny a request
|
|
/answer <id> <response> - Answer a question
|
|
/pending - Show pending requests
|
|
/help - Show this help"""
|
|
self._send_reply(chat_id, help_text)
|
|
|
|
# Check if we're expecting a custom reply
|
|
else:
|
|
expecting = self._check_expecting_reply()
|
|
if expecting:
|
|
self._record_response(expecting, text)
|
|
self._send_reply(chat_id, f"✅ Answer recorded")
|
|
self._notify_luzia(expecting, text)
|
|
self._clear_expecting_reply()
|
|
|
|
def _answer_callback(self, callback_id: str, text: str):
|
|
"""Answer a callback query (acknowledge button click)."""
|
|
bot_token = self._get_bot_token()
|
|
if not bot_token:
|
|
return
|
|
|
|
url = f"https://api.telegram.org/bot{bot_token}/answerCallbackQuery"
|
|
data = json.dumps({
|
|
"callback_query_id": callback_id,
|
|
"text": text,
|
|
"show_alert": False
|
|
}).encode()
|
|
|
|
try:
|
|
req = urllib.request.Request(
|
|
url, data=data,
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
urllib.request.urlopen(req, timeout=5)
|
|
except Exception as e:
|
|
logger.error(f"Failed to answer callback: {e}")
|
|
|
|
def _send_reply(self, chat_id: int, text: str):
|
|
"""Send a text reply."""
|
|
bot_token = self._get_bot_token()
|
|
if not bot_token:
|
|
return
|
|
|
|
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
|
|
data = json.dumps({
|
|
"chat_id": chat_id,
|
|
"text": text,
|
|
"parse_mode": "Markdown"
|
|
}).encode()
|
|
|
|
try:
|
|
req = urllib.request.Request(
|
|
url, data=data,
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
urllib.request.urlopen(req, timeout=5)
|
|
except Exception as e:
|
|
logger.error(f"Failed to send reply: {e}")
|
|
|
|
def _get_bot_token(self) -> Optional[str]:
|
|
"""Get bot token from file."""
|
|
try:
|
|
return Path(BOT_TOKEN_PATH).read_text().strip()
|
|
except Exception:
|
|
return None
|
|
|
|
def _load_request(self, request_id: str) -> Optional[dict]:
|
|
"""Load a pending request."""
|
|
request_file = PENDING_REQUESTS_DIR / f"{request_id}.json"
|
|
if request_file.exists():
|
|
try:
|
|
return json.loads(request_file.read_text())
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def _record_response(self, request_id: str, response: str, approved: bool = None):
|
|
"""Record a response to a pending request."""
|
|
request_file = PENDING_REQUESTS_DIR / f"{request_id}.json"
|
|
if not request_file.exists():
|
|
logger.warning(f"Request not found: {request_id}")
|
|
return
|
|
|
|
try:
|
|
data = json.loads(request_file.read_text())
|
|
data["response"] = response
|
|
data["responded_at"] = datetime.now().isoformat()
|
|
data["status"] = "approved" if approved == True else "denied" if approved == False else "responded"
|
|
request_file.write_text(json.dumps(data, indent=2))
|
|
logger.info(f"Response recorded for {request_id}: {response[:50]}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to record response: {e}")
|
|
|
|
def _notify_luzia(self, request_id: str, response: str):
|
|
"""Notify Luzia that a response was received."""
|
|
# Write to a notification file that Luzia can watch
|
|
notify_file = Path("/var/lib/luzia/telegram_responses.log")
|
|
try:
|
|
with open(notify_file, "a") as f:
|
|
f.write(f"{datetime.now().isoformat()}|{request_id}|{response}\n")
|
|
except Exception as e:
|
|
logger.error(f"Failed to notify luzia: {e}")
|
|
|
|
def _get_pending_requests(self) -> list:
|
|
"""Get list of pending requests."""
|
|
pending = []
|
|
for req_file in PENDING_REQUESTS_DIR.glob("*.json"):
|
|
try:
|
|
data = json.loads(req_file.read_text())
|
|
if data.get("status") == "pending":
|
|
pending.append(data)
|
|
except Exception:
|
|
continue
|
|
return sorted(pending, key=lambda r: r.get("created_at", ""), reverse=True)
|
|
|
|
def _set_expecting_reply(self, request_id: str):
|
|
"""Set a flag that we're expecting a text reply."""
|
|
flag_file = PENDING_REQUESTS_DIR / ".expecting_reply"
|
|
flag_file.write_text(request_id)
|
|
|
|
def _check_expecting_reply(self) -> Optional[str]:
|
|
"""Check if we're expecting a text reply."""
|
|
flag_file = PENDING_REQUESTS_DIR / ".expecting_reply"
|
|
if flag_file.exists():
|
|
return flag_file.read_text().strip()
|
|
return None
|
|
|
|
def _clear_expecting_reply(self):
|
|
"""Clear the expecting reply flag."""
|
|
flag_file = PENDING_REQUESTS_DIR / ".expecting_reply"
|
|
if flag_file.exists():
|
|
flag_file.unlink()
|
|
|
|
|
|
def run_webhook_server(port: int = WEBHOOK_PORT):
|
|
"""Run the webhook server."""
|
|
PENDING_REQUESTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
server = HTTPServer(('0.0.0.0', port), TelegramWebhookHandler)
|
|
logger.info(f"Luzia Telegram webhook server starting on port {port}")
|
|
logger.info(f"Set webhook URL to: https://your-domain.com/telegram-webhook")
|
|
|
|
try:
|
|
server.serve_forever()
|
|
except KeyboardInterrupt:
|
|
logger.info("Shutting down webhook server")
|
|
server.shutdown()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
if len(sys.argv) > 1 and sys.argv[1] == "--setup":
|
|
# Setup webhook with Telegram
|
|
bot_token = Path(BOT_TOKEN_PATH).read_text().strip()
|
|
webhook_url = sys.argv[2] if len(sys.argv) > 2 else None
|
|
|
|
if not webhook_url:
|
|
print("Usage: telegram_webhook.py --setup <webhook_url>")
|
|
print("Example: telegram_webhook.py --setup https://luz.uy/luzia-webhook")
|
|
sys.exit(1)
|
|
|
|
url = f"https://api.telegram.org/bot{bot_token}/setWebhook"
|
|
data = json.dumps({"url": webhook_url}).encode()
|
|
|
|
try:
|
|
req = urllib.request.Request(
|
|
url, data=data,
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
with urllib.request.urlopen(req) as response:
|
|
result = json.loads(response.read())
|
|
print(f"Webhook setup: {result}")
|
|
except Exception as e:
|
|
print(f"Failed to setup webhook: {e}")
|
|
sys.exit(1)
|
|
else:
|
|
run_webhook_server()
|