From aab4ab876713bf6bea1bf9cc80183b7f0cbc2988 Mon Sep 17 00:00:00 2001 From: Aditya Pulipaka Date: Sun, 19 Oct 2025 14:52:08 -0500 Subject: [PATCH] erm herro --- ai_intelligence_layer/main.py | 85 +++++++++++++++++++++++- scripts/simulate_pi_websocket.py | 109 ++++++++++++++++++------------- 2 files changed, 147 insertions(+), 47 deletions(-) diff --git a/ai_intelligence_layer/main.py b/ai_intelligence_layer/main.py index 9d6e10f..ddeba66 100644 --- a/ai_intelligence_layer/main.py +++ b/ai_intelligence_layer/main.py @@ -358,7 +358,52 @@ async def websocket_dashboard_endpoint(websocket: WebSocket): await dashboard_manager.connect(websocket) try: - # Keep connection alive + # Send historical data from current session immediately after connection + buffer_data = telemetry_buffer.get_all() + if buffer_data and current_race_context: + logger.info(f"[Dashboard] Sending {len(buffer_data)} historical lap records to new dashboard") + + # Reverse to get chronological order (oldest to newest) + buffer_data.reverse() + + # Send each historical lap as a lap_data message + for i, telemetry in enumerate(buffer_data): + try: + # Find matching strategy from history if available + lap_strategy = None + for strat in strategy_history: + if strat.get("lap") == telemetry.lap: + lap_strategy = { + "strategy_name": strat.get("strategy_name"), + "risk_level": strat.get("risk_level"), + "brief_description": strat.get("brief_description"), + "reasoning": strat.get("reasoning") + } + break + + # Send historical lap data + await websocket.send_json({ + "type": "lap_data", + "vehicle_id": 1, # Assume single vehicle for now + "lap_data": telemetry.model_dump(), + "race_context": { + "position": current_race_context.driver_state.current_position, + "gap_to_leader": current_race_context.driver_state.gap_to_leader, + "gap_to_ahead": current_race_context.driver_state.gap_to_ahead + }, + "control_output": last_control_command if i == len(buffer_data) - 1 else {"brake_bias": 5, "differential_slip": 5}, + "strategy": lap_strategy, + "timestamp": datetime.now().isoformat(), + "historical": True # Mark as historical data + }) + except Exception as e: + logger.error(f"[Dashboard] Error sending historical lap {telemetry.lap}: {e}") + + logger.info(f"[Dashboard] Historical data transmission complete") + else: + logger.info("[Dashboard] No historical data to send (buffer empty or no race context)") + + # Keep connection alive and handle incoming messages while True: # Receive any messages (mostly just keepalive pings) data = await websocket.receive_text() @@ -457,6 +502,28 @@ async def websocket_pi_endpoint(websocket: WebSocket): "message": "Processing strategies, please wait..." }) + # Create a background task to send periodic keepalive pings during strategy generation + # This prevents WebSocket timeout during long AI operations + keepalive_active = asyncio.Event() + + async def send_keepalive(): + """Send periodic pings to keep WebSocket alive during long operations.""" + while not keepalive_active.is_set(): + try: + await asyncio.sleep(10) # Send keepalive every 10 seconds + if not keepalive_active.is_set(): + await websocket.send_json({ + "type": "keepalive", + "timestamp": datetime.now().isoformat() + }) + logger.debug(f"[WebSocket] Sent keepalive ping for lap {lap_number}") + except Exception as e: + logger.error(f"[WebSocket] Keepalive error: {e}") + break + + # Start keepalive task + keepalive_task = asyncio.create_task(send_keepalive()) + # Generate strategies (this is the slow part) try: response = await strategy_generator.generate( @@ -465,6 +532,10 @@ async def websocket_pi_endpoint(websocket: WebSocket): strategy_history=strategy_history ) + # Stop keepalive task + keepalive_active.set() + await keepalive_task + # Extract top strategy (first one) top_strategy = response.strategies[0] if response.strategies else None @@ -533,6 +604,13 @@ async def websocket_pi_endpoint(websocket: WebSocket): logger.info(f"{'='*60}\n") except Exception as e: + # Stop keepalive task on error + keepalive_active.set() + try: + await keepalive_task + except: + pass + logger.error(f"[WebSocket] Strategy generation failed: {e}") # Send error but keep neutral controls await websocket.send_json({ @@ -694,7 +772,10 @@ if __name__ == "__main__": "main:app", host=settings.ai_service_host, port=settings.ai_service_port, - reload=True + reload=True, + ws_ping_interval=20, # Send ping every 20 seconds + ws_ping_timeout=60, # Wait up to 60 seconds for pong response + timeout_keep_alive=75 # HTTP keepalive timeout ) diff --git a/scripts/simulate_pi_websocket.py b/scripts/simulate_pi_websocket.py index 8d597af..e67262c 100644 --- a/scripts/simulate_pi_websocket.py +++ b/scripts/simulate_pi_websocket.py @@ -9,7 +9,7 @@ Connects to AI Intelligence Layer via WebSocket and: 4. Generates voice announcements for strategy updates Usage: - python simulate_pi_websocket.py --interval 5 --ws-url ws://192.168.137.134:9000/ws/pi --enable-voice + python simulate_pi_websocket.py --interval 5 --ws-url ws://localhost:9000/ws/pi --enable-voice """ from __future__ import annotations @@ -306,7 +306,7 @@ class VoiceAnnouncer: class PiSimulator: """WebSocket-based Pi simulator with control feedback and voice announcements.""" - def __init__(self, csv_path: Path, ws_url: str, interval: float = 60.0, enrichment_url: str = "http://192.168.137.134:8000", voice_enabled: bool = False): + def __init__(self, csv_path: Path, ws_url: str, interval: float = 60.0, enrichment_url: str = "http://localhost:8000", voice_enabled: bool = False): self.csv_path = csv_path self.ws_url = ws_url self.enrichment_url = enrichment_url @@ -529,51 +529,70 @@ class PiSimulator: logger.info(f"[ACK] {message}") # Now wait for the actual control command update + # Keep receiving messages until we get the update (ignoring keepalives) try: - update = await asyncio.wait_for(websocket.recv(), timeout=45.0) - update_data = json.loads(update) + timeout_remaining = 45.0 + start_time = asyncio.get_event_loop().time() - if update_data.get("type") == "control_command_update": - brake_bias = update_data.get("brake_bias", 5) - diff_slip = update_data.get("differential_slip", 5) - strategy_name = update_data.get("strategy_name", "N/A") - risk_level = update_data.get("risk_level", "medium") - reasoning = update_data.get("reasoning", "") + while timeout_remaining > 0: + update = await asyncio.wait_for(websocket.recv(), timeout=timeout_remaining) + update_data = json.loads(update) - # Check if controls changed from previous - controls_changed = ( - self.current_controls["brake_bias"] != brake_bias or - self.current_controls["differential_slip"] != diff_slip - ) + # Ignore keepalive messages + if update_data.get("type") == "keepalive": + logger.debug(f"[KEEPALIVE] Received ping from server during strategy generation") + elapsed = asyncio.get_event_loop().time() - start_time + timeout_remaining = 45.0 - elapsed + continue - # Check if risk level changed - risk_level_changed = ( - self.current_risk_level is not None and - self.current_risk_level != risk_level - ) + # Process control command update + if update_data.get("type") == "control_command_update": + brake_bias = update_data.get("brake_bias", 5) + diff_slip = update_data.get("differential_slip", 5) + strategy_name = update_data.get("strategy_name", "N/A") + risk_level = update_data.get("risk_level", "medium") + reasoning = update_data.get("reasoning", "") + + # Check if controls changed from previous + controls_changed = ( + self.current_controls["brake_bias"] != brake_bias or + self.current_controls["differential_slip"] != diff_slip + ) + + # Check if risk level changed + risk_level_changed = ( + self.current_risk_level is not None and + self.current_risk_level != risk_level + ) + + self.previous_controls = self.current_controls.copy() + self.current_controls["brake_bias"] = brake_bias + self.current_controls["differential_slip"] = diff_slip + self.current_risk_level = risk_level + + logger.info(f"[UPDATED] Strategy-Based Control:") + logger.info(f" ├─ Brake Bias: {brake_bias}/10") + logger.info(f" ├─ Differential Slip: {diff_slip}/10") + logger.info(f" ├─ Strategy: {strategy_name}") + logger.info(f" ├─ Risk Level: {risk_level}") + if reasoning: + logger.info(f" └─ Reasoning: {reasoning[:100]}...") + + self.apply_controls(brake_bias, diff_slip) + + # Voice announcement if controls OR risk level changed + if controls_changed or risk_level_changed: + if risk_level_changed and not controls_changed: + logger.info(f"[VOICE] Risk level changed to {risk_level}") + await self.voice_announcer.announce_strategy(update_data) + else: + logger.info(f"[VOICE] Skipping announcement - controls and risk level unchanged") + break # Exit loop after processing update - self.previous_controls = self.current_controls.copy() - self.current_controls["brake_bias"] = brake_bias - self.current_controls["differential_slip"] = diff_slip - self.current_risk_level = risk_level + # Update timeout + elapsed = asyncio.get_event_loop().time() - start_time + timeout_remaining = 45.0 - elapsed - logger.info(f"[UPDATED] Strategy-Based Control:") - logger.info(f" ├─ Brake Bias: {brake_bias}/10") - logger.info(f" ├─ Differential Slip: {diff_slip}/10") - logger.info(f" ├─ Strategy: {strategy_name}") - logger.info(f" ├─ Risk Level: {risk_level}") - if reasoning: - logger.info(f" └─ Reasoning: {reasoning[:100]}...") - - self.apply_controls(brake_bias, diff_slip) - - # Voice announcement if controls OR risk level changed - if controls_changed or risk_level_changed: - if risk_level_changed and not controls_changed: - logger.info(f"[VOICE] Risk level changed to {risk_level}") - await self.voice_announcer.announce_strategy(update_data) - else: - logger.info(f"[VOICE] Skipping announcement - controls and risk level unchanged") except asyncio.TimeoutError: logger.warning("[TIMEOUT] Strategy generation took too long") @@ -734,14 +753,14 @@ async def main(): parser.add_argument( "--ws-url", type=str, - default="ws://192.168.137.134:9000/ws/pi", - help="WebSocket URL for AI layer (default: ws://192.168.137.134:9000/ws/pi)" + default="ws://localhost:9000/ws/pi", + help="WebSocket URL for AI layer (default: ws://localhost:9000/ws/pi)" ) parser.add_argument( "--enrichment-url", type=str, - default="http://192.168.137.134:8000", - help="Enrichment service URL (default: http://192.168.137.134:8000)" + default="http://localhost:8000", + help="Enrichment service URL (default: http://localhost:8000)" ) parser.add_argument( "--csv",