The voices... They're getting louder.

This commit is contained in:
Aditya Pulipaka
2025-10-19 06:58:39 -05:00
parent 89924c1580
commit 154283e7c6
13 changed files with 1167 additions and 25 deletions

View File

@@ -6,9 +6,10 @@ Connects to AI Intelligence Layer via WebSocket and:
1. Streams lap telemetry to AI layer
2. Receives control commands (brake_bias, differential_slip) from AI layer
3. Applies control adjustments in real-time
4. Generates voice announcements for strategy updates
Usage:
python simulate_pi_websocket.py --interval 5 --ws-url ws://localhost:9000/ws/pi
python simulate_pi_websocket.py --interval 5 --ws-url ws://localhost:9000/ws/pi --enable-voice
"""
from __future__ import annotations
@@ -19,6 +20,8 @@ import logging
from pathlib import Path
from typing import Dict, Any, Optional
import sys
import os
from datetime import datetime
try:
import pandas as pd
@@ -29,6 +32,19 @@ except ImportError:
print("Run: pip install pandas websockets")
sys.exit(1)
# Optional voice support
try:
from elevenlabs.client import ElevenLabs
from elevenlabs import save
from dotenv import load_dotenv
# Load .env from root directory (default behavior)
load_dotenv()
VOICE_AVAILABLE = True
except ImportError:
VOICE_AVAILABLE = False
print("Note: elevenlabs not installed. Voice features disabled.")
print("To enable voice: pip install elevenlabs python-dotenv")
# Configure logging
logging.basicConfig(
level=logging.INFO,
@@ -37,10 +53,260 @@ logging.basicConfig(
logger = logging.getLogger(__name__)
class PiSimulator:
"""WebSocket-based Pi simulator with control feedback."""
class VoiceAnnouncer:
"""ElevenLabs text-to-speech announcer for race engineer communications."""
def __init__(self, csv_path: Path, ws_url: str, interval: float = 60.0, enrichment_url: str = "http://localhost:8000"):
def __init__(self, enabled: bool = True):
"""Initialize ElevenLabs voice engine if available."""
self.enabled = enabled and VOICE_AVAILABLE
self.client = None
self.audio_dir = Path("data/audio")
# Use exact same voice as voice_service.py
self.voice_id = "mbBupyLcEivjpxh8Brkf" # Rachel voice
if self.enabled:
try:
api_key = os.getenv("ELEVENLABS_API_KEY")
if not api_key:
logger.warning("⚠ ELEVENLABS_API_KEY not found in environment")
self.enabled = False
return
self.client = ElevenLabs(api_key=api_key)
self.audio_dir.mkdir(parents=True, exist_ok=True)
logger.info("✓ Voice announcer initialized (ElevenLabs)")
except Exception as e:
logger.warning(f"⚠ Voice engine initialization failed: {e}")
self.enabled = False
def _format_strategy_message(self, data: Dict[str, Any]) -> str:
"""
Format strategy update into natural race engineer speech.
Args:
data: Control command update from AI layer
Returns:
Formatted message string
"""
lap = data.get('lap', 0)
strategy_name = data.get('strategy_name', 'Unknown')
brake_bias = data.get('brake_bias', 5)
diff_slip = data.get('differential_slip', 5)
reasoning = data.get('reasoning', '')
risk_level = data.get('risk_level', '')
# Build natural message
parts = []
# Opening with lap number
parts.append(f"Lap {lap}.")
# Strategy announcement with risk level
if strategy_name and strategy_name != "N/A":
# Simplify strategy name for speech
clean_strategy = strategy_name.replace('-', ' ').replace('_', ' ')
if risk_level:
parts.append(f"Running {clean_strategy} strategy, {risk_level} risk.")
else:
parts.append(f"Running {clean_strategy} strategy.")
# Control adjustments with specific values
control_messages = []
# Brake bias announcement with context
if brake_bias < 4:
control_messages.append(f"Brake bias set to {brake_bias}, forward biased for sharper turn in response")
elif brake_bias == 4:
control_messages.append(f"Brake bias {brake_bias}, slightly forward to help rotation")
elif brake_bias > 6:
control_messages.append(f"Brake bias set to {brake_bias}, rearward to protect front tire wear")
elif brake_bias == 6:
control_messages.append(f"Brake bias {brake_bias}, slightly rear for front tire management")
else:
control_messages.append(f"Brake bias neutral at {brake_bias}")
# Differential slip announcement with context
if diff_slip < 4:
control_messages.append(f"Differential at {diff_slip}, tightened for better rotation through corners")
elif diff_slip == 4:
control_messages.append(f"Differential {diff_slip}, slightly tight for rotation")
elif diff_slip > 6:
control_messages.append(f"Differential set to {diff_slip}, loosened to reduce rear tire degradation")
elif diff_slip == 6:
control_messages.append(f"Differential {diff_slip}, slightly loose for tire preservation")
else:
control_messages.append(f"Differential neutral at {diff_slip}")
if control_messages:
parts.append(". ".join(control_messages) + ".")
# Key reasoning excerpt (first sentence only)
if reasoning:
# Extract first meaningful sentence
sentences = reasoning.split('.')
if sentences:
key_reason = sentences[0].strip()
if len(key_reason) > 20 and len(key_reason) < 150: # Slightly longer for more context
parts.append(key_reason + ".")
return " ".join(parts)
def _format_control_message(self, data: Dict[str, Any]) -> str:
"""
Format control command into brief message.
Args:
data: Control command from AI layer
Returns:
Formatted message string
"""
lap = data.get('lap', 0)
brake_bias = data.get('brake_bias', 5)
diff_slip = data.get('differential_slip', 5)
message = data.get('message', '')
# For early laps or non-strategy updates
if message and "Collecting data" in message:
return f"Lap {lap}. Collecting baseline data."
if brake_bias == 5 and diff_slip == 5:
return f"Lap {lap}. Maintaining neutral settings."
return f"Lap {lap}. Controls adjusted."
async def announce_strategy(self, data: Dict[str, Any]):
"""
Announce strategy update with ElevenLabs voice synthesis.
Args:
data: Control command update from AI layer
"""
if not self.enabled:
return
try:
# Format message
message = self._format_strategy_message(data)
logger.info(f"[VOICE] Announcing: {message}")
# Generate unique audio filename
lap = data.get('lap', 0)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
audio_path = self.audio_dir / f"lap_{lap}_{timestamp}.mp3"
# Synthesize with ElevenLabs (exact same settings as voice_service.py)
def synthesize():
try:
audio = self.client.text_to_speech.convert(
voice_id=self.voice_id,
text=message,
model_id="eleven_multilingual_v2", # Fast, low-latency model
voice_settings={
"stability": 0.4,
"similarity_boost": 0.95,
"style": 0.7,
"use_speaker_boost": True
}
)
save(audio, str(audio_path))
logger.info(f"[VOICE] Saved to {audio_path}")
# Play the audio
if sys.platform == "darwin": # macOS
os.system(f"afplay {audio_path}")
elif sys.platform == "linux":
os.system(f"mpg123 {audio_path} || ffplay -nodisp -autoexit {audio_path}")
elif sys.platform == "win32":
os.system(f"start {audio_path}")
# Clean up audio file after playing
try:
if audio_path.exists():
audio_path.unlink()
logger.info(f"[VOICE] Cleaned up {audio_path}")
except Exception as cleanup_error:
logger.warning(f"[VOICE] Failed to delete audio file: {cleanup_error}")
except Exception as e:
logger.error(f"[VOICE] Synthesis error: {e}")
# Run in separate thread to avoid blocking
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, synthesize)
except Exception as e:
logger.error(f"[VOICE] Announcement failed: {e}")
async def announce_control(self, data: Dict[str, Any]):
"""
Announce control command with ElevenLabs voice synthesis (brief version).
Args:
data: Control command from AI layer
"""
if not self.enabled:
return
try:
# Format message
message = self._format_control_message(data)
logger.info(f"[VOICE] Announcing: {message}")
# Generate unique audio filename
lap = data.get('lap', 0)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
audio_path = self.audio_dir / f"lap_{lap}_control_{timestamp}.mp3"
# Synthesize with ElevenLabs (exact same settings as voice_service.py)
def synthesize():
try:
audio = self.client.text_to_speech.convert(
voice_id=self.voice_id,
text=message,
model_id="eleven_multilingual_v2", # Fast, low-latency model
voice_settings={
"stability": 0.4,
"similarity_boost": 0.95,
"style": 0.7,
"use_speaker_boost": True
}
)
save(audio, str(audio_path))
logger.info(f"[VOICE] Saved to {audio_path}")
# Play the audio
if sys.platform == "darwin": # macOS
os.system(f"afplay {audio_path}")
elif sys.platform == "linux":
os.system(f"mpg123 {audio_path} || ffplay -nodisp -autoexit {audio_path}")
elif sys.platform == "win32":
os.system(f"start {audio_path}")
# Clean up audio file after playing
try:
if audio_path.exists():
audio_path.unlink()
logger.info(f"[VOICE] Cleaned up {audio_path}")
except Exception as cleanup_error:
logger.warning(f"[VOICE] Failed to delete audio file: {cleanup_error}")
except Exception as e:
logger.error(f"[VOICE] Synthesis error: {e}")
# Run in separate thread to avoid blocking
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, synthesize)
except Exception as e:
logger.error(f"[VOICE] Announcement failed: {e}")
class PiSimulator:
"""WebSocket-based Pi simulator with control feedback and voice announcements."""
def __init__(self, csv_path: Path, ws_url: str, interval: float = 60.0, enrichment_url: str = "http://localhost:8000", voice_enabled: bool = False):
self.csv_path = csv_path
self.ws_url = ws_url
self.enrichment_url = enrichment_url
@@ -50,6 +316,12 @@ class PiSimulator:
"brake_bias": 5,
"differential_slip": 5
}
self.previous_controls = {
"brake_bias": 5,
"differential_slip": 5
}
self.current_risk_level: Optional[str] = None
self.voice_announcer = VoiceAnnouncer(enabled=voice_enabled)
def load_lap_csv(self) -> pd.DataFrame:
"""Load lap-level CSV data."""
@@ -245,12 +517,73 @@ class PiSimulator:
response = await asyncio.wait_for(websocket.recv(), timeout=5.0)
response_data = json.loads(response)
if response_data.get("type") == "control_command":
# Handle silent acknowledgment (no control update, no voice)
if response_data.get("type") == "acknowledgment":
message = response_data.get("message", "")
logger.info(f"[ACK] {message}")
# Now wait for the actual control command update
try:
update = await asyncio.wait_for(websocket.recv(), timeout=45.0)
update_data = json.loads(update)
if update_data.get("type") == "control_command_update":
brake_bias = update_data.get("brake_bias", 5)
diff_slip = update_data.get("differential_slip", 5)
strategy_name = update_data.get("strategy_name", "N/A")
risk_level = update_data.get("risk_level", "medium")
reasoning = update_data.get("reasoning", "")
# Check if controls changed from previous
controls_changed = (
self.current_controls["brake_bias"] != brake_bias or
self.current_controls["differential_slip"] != diff_slip
)
# Check if risk level changed
risk_level_changed = (
self.current_risk_level is not None and
self.current_risk_level != risk_level
)
self.previous_controls = self.current_controls.copy()
self.current_controls["brake_bias"] = brake_bias
self.current_controls["differential_slip"] = diff_slip
self.current_risk_level = risk_level
logger.info(f"[UPDATED] Strategy-Based Control:")
logger.info(f" ├─ Brake Bias: {brake_bias}/10")
logger.info(f" ├─ Differential Slip: {diff_slip}/10")
logger.info(f" ├─ Strategy: {strategy_name}")
logger.info(f" ├─ Risk Level: {risk_level}")
if reasoning:
logger.info(f" └─ Reasoning: {reasoning[:100]}...")
self.apply_controls(brake_bias, diff_slip)
# Voice announcement if controls OR risk level changed
if controls_changed or risk_level_changed:
if risk_level_changed and not controls_changed:
logger.info(f"[VOICE] Risk level changed to {risk_level}")
await self.voice_announcer.announce_strategy(update_data)
else:
logger.info(f"[VOICE] Skipping announcement - controls and risk level unchanged")
except asyncio.TimeoutError:
logger.warning("[TIMEOUT] Strategy generation took too long")
elif response_data.get("type") == "control_command":
brake_bias = response_data.get("brake_bias", 5)
diff_slip = response_data.get("differential_slip", 5)
strategy_name = response_data.get("strategy_name", "N/A")
message = response_data.get("message")
# Store previous values before updating
controls_changed = (
self.current_controls["brake_bias"] != brake_bias or
self.current_controls["differential_slip"] != diff_slip
)
self.previous_controls = self.current_controls.copy()
self.current_controls["brake_bias"] = brake_bias
self.current_controls["differential_slip"] = diff_slip
@@ -265,6 +598,10 @@ class PiSimulator:
# Apply controls (in real Pi, this would adjust hardware)
self.apply_controls(brake_bias, diff_slip)
# Voice announcement ONLY if controls changed
if controls_changed:
await self.voice_announcer.announce_control(response_data)
# If message indicates processing, wait for update
if message and "Processing" in message:
logger.info(" AI is generating strategies, waiting for update...")
@@ -276,16 +613,43 @@ class PiSimulator:
brake_bias = update_data.get("brake_bias", 5)
diff_slip = update_data.get("differential_slip", 5)
strategy_name = update_data.get("strategy_name", "N/A")
risk_level = update_data.get("risk_level", "medium")
reasoning = update_data.get("reasoning", "")
# Check if controls changed from previous
controls_changed = (
self.current_controls["brake_bias"] != brake_bias or
self.current_controls["differential_slip"] != diff_slip
)
# Check if risk level changed
risk_level_changed = (
self.current_risk_level is not None and
self.current_risk_level != risk_level
)
self.previous_controls = self.current_controls.copy()
self.current_controls["brake_bias"] = brake_bias
self.current_controls["differential_slip"] = diff_slip
self.current_risk_level = risk_level
logger.info(f"[UPDATED] Strategy-Based Control:")
logger.info(f" ├─ Brake Bias: {brake_bias}/10")
logger.info(f" ├─ Differential Slip: {diff_slip}/10")
logger.info(f" ─ Strategy: {strategy_name}")
logger.info(f" ─ Strategy: {strategy_name}")
logger.info(f" ├─ Risk Level: {risk_level}")
if reasoning:
logger.info(f" └─ Reasoning: {reasoning[:100]}...")
self.apply_controls(brake_bias, diff_slip)
# Voice announcement if controls OR risk level changed
if controls_changed or risk_level_changed:
if risk_level_changed and not controls_changed:
logger.info(f"[VOICE] Risk level changed to {risk_level}")
await self.voice_announcer.announce_strategy(update_data)
else:
logger.info(f"[VOICE] Skipping announcement - controls and risk level unchanged")
except asyncio.TimeoutError:
logger.warning("[TIMEOUT] Strategy generation took too long")
@@ -344,7 +708,7 @@ class PiSimulator:
async def main():
parser = argparse.ArgumentParser(
description="WebSocket-based Raspberry Pi Telemetry Simulator"
description="WebSocket-based Raspberry Pi Telemetry Simulator with Voice Announcements"
)
parser.add_argument(
"--interval",
@@ -370,6 +734,11 @@ async def main():
default=None,
help="Path to lap CSV file (default: scripts/ALONSO_2023_MONZA_LAPS.csv)"
)
parser.add_argument(
"--enable-voice",
action="store_true",
help="Enable voice announcements for strategy updates (requires elevenlabs and ELEVENLABS_API_KEY)"
)
args = parser.parse_args()
@@ -389,7 +758,8 @@ async def main():
csv_path=csv_path,
ws_url=args.ws_url,
enrichment_url=args.enrichment_url,
interval=args.interval
interval=args.interval,
voice_enabled=args.enable_voice
)
logger.info("Starting WebSocket Pi Simulator")
@@ -397,6 +767,7 @@ async def main():
logger.info(f"Enrichment Service: {args.enrichment_url}")
logger.info(f"WebSocket URL: {args.ws_url}")
logger.info(f"Interval: {args.interval}s per lap")
logger.info(f"Voice Announcements: {'Enabled' if args.enable_voice and VOICE_AVAILABLE else 'Disabled'}")
logger.info("-" * 60)
await simulator.stream_telemetry()

76
scripts/test_voice.py Normal file
View File

@@ -0,0 +1,76 @@
#!/usr/bin/env python3
"""
Quick test script for ElevenLabs voice announcements.
"""
import sys
import os
from pathlib import Path
sys.path.insert(0, '.')
try:
from elevenlabs.client import ElevenLabs
from elevenlabs import save
from dotenv import load_dotenv
load_dotenv()
# Check API key
api_key = os.getenv("ELEVENLABS_API_KEY")
if not api_key:
print("✗ ELEVENLABS_API_KEY not found in environment")
print("Create a .env file with: ELEVENLABS_API_KEY=your_key_here")
sys.exit(1)
# Initialize client with same settings as voice_service.py
client = ElevenLabs(api_key=api_key)
voice_id = "mbBupyLcEivjpxh8Brkf" # Rachel voice
# Test message
test_message = "Lap 3. Strategy: Conservative One Stop. Brake bias forward for turn in. Current tire degradation suggests extended first stint."
print(f"Testing ElevenLabs voice announcement...")
print(f"Voice ID: {voice_id} (Rachel)")
print(f"Message: {test_message}")
print("-" * 60)
# Synthesize
audio = client.text_to_speech.convert(
voice_id=voice_id,
text=test_message,
model_id="eleven_multilingual_v2",
voice_settings={
"stability": 0.4,
"similarity_boost": 0.95,
"style": 0.7,
"use_speaker_boost": True
}
)
# Save audio
output_dir = Path("data/audio")
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / "test_voice.mp3"
save(audio, str(output_path))
print(f"✓ Audio saved to: {output_path}")
# Play audio
print("✓ Playing audio...")
if sys.platform == "darwin": # macOS
os.system(f"afplay {output_path}")
elif sys.platform == "linux":
os.system(f"mpg123 {output_path} || ffplay -nodisp -autoexit {output_path}")
elif sys.platform == "win32":
os.system(f"start {output_path}")
print("✓ Voice test completed successfully!")
except ImportError as e:
print(f"✗ elevenlabs not available: {e}")
print("Install with: pip install elevenlabs python-dotenv")
except Exception as e:
print(f"✗ Voice test failed: {e}")
import traceback
traceback.print_exc()

View File

@@ -13,13 +13,8 @@ from dotenv import load_dotenv
load_dotenv()
class RaceEngineerVoice:
def __init__(self, voice_id: str = "mbBupyLcEivjpxh8Brkf"): # Default: Rachel
"""
Initialize ElevenLabs voice service.
def __init__(self, voice_id: str = "mbBupyLcEivjpxh8Brkf"):
Args:
voice_id: ElevenLabs voice ID (Rachel is default, professional female voice)
"""
self.client = ElevenLabs(api_key=os.getenv("ELEVENLABS_API_KEY"))
self.voice_id = voice_id