Files
Guido.Tech/ai_intelligence_layer/main.py

782 lines
34 KiB
Python
Raw Normal View History

2025-10-18 22:36:20 -05:00
"""
AI Intelligence Layer - FastAPI Application
Port: 9000
Provides F1 race strategy generation and analysis using Gemini AI.
Supports WebSocket connections from Pi for bidirectional control.
2025-10-18 22:36:20 -05:00
"""
from fastapi import FastAPI, HTTPException, status, WebSocket, WebSocketDisconnect
2025-10-18 22:36:20 -05:00
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
2025-10-18 22:36:20 -05:00
from contextlib import asynccontextmanager
import logging
import asyncio
import random
from typing import Dict, Any, List
from datetime import datetime
import json
2025-10-19 06:58:39 -05:00
from dotenv import load_dotenv
# Load environment variables from .env file in project root
load_dotenv()
2025-10-18 22:36:20 -05:00
from config import get_settings
from models.input_models import (
BrainstormRequest,
2025-10-18 23:56:53 -05:00
# AnalyzeRequest, # Disabled - not using analysis
EnrichedTelemetryWebhook,
2025-10-19 02:00:56 -05:00
EnrichedTelemetryWithContext,
2025-10-18 23:56:53 -05:00
RaceContext # Import for global storage
2025-10-18 22:36:20 -05:00
)
from models.output_models import (
BrainstormResponse,
2025-10-18 23:56:53 -05:00
# AnalyzeResponse, # Disabled - not using analysis
2025-10-18 22:36:20 -05:00
HealthResponse
)
from services.strategy_generator import StrategyGenerator
2025-10-18 23:56:53 -05:00
# from services.strategy_analyzer import StrategyAnalyzer # Disabled - not using analysis
2025-10-18 22:36:20 -05:00
from services.telemetry_client import TelemetryClient
from utils.telemetry_buffer import TelemetryBuffer
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Global instances
telemetry_buffer: TelemetryBuffer = None
strategy_generator: StrategyGenerator = None
2025-10-18 23:56:53 -05:00
# strategy_analyzer: StrategyAnalyzer = None # Disabled - not using analysis
2025-10-18 22:36:20 -05:00
telemetry_client: TelemetryClient = None
2025-10-18 23:56:53 -05:00
current_race_context: RaceContext = None # Store race context globally
last_control_command: Dict[str, int] = {"brake_bias": 5, "differential_slip": 5} # Store last command
strategy_history: List[Dict[str, Any]] = [] # Track past strategies for continuity
# WebSocket connection manager
class ConnectionManager:
"""Manages WebSocket connections from Pi clients."""
def __init__(self):
self.active_connections: List[WebSocket] = []
self.vehicle_counter = 0
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
self.vehicle_counter += 1
logger.info(f"Pi client connected. Total connections: {len(self.active_connections)}")
return self.vehicle_counter
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
logger.info(f"Pi client disconnected. Total connections: {len(self.active_connections)}")
async def send_control_command(self, websocket: WebSocket, command: Dict[str, Any]):
"""Send control command to specific Pi client."""
await websocket.send_json(command)
async def broadcast_control_command(self, command: Dict[str, Any]):
"""Broadcast control command to all connected Pi clients."""
for connection in self.active_connections:
try:
await connection.send_json(command)
except Exception as e:
logger.error(f"Error broadcasting to client: {e}")
class DashboardManager:
"""Manages WebSocket connections for dashboard clients."""
def __init__(self):
self.active_dashboards: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_dashboards.append(websocket)
logger.info(f"Dashboard connected. Total dashboards: {len(self.active_dashboards)}")
def disconnect(self, websocket: WebSocket):
if websocket in self.active_dashboards:
self.active_dashboards.remove(websocket)
logger.info(f"Dashboard disconnected. Total dashboards: {len(self.active_dashboards)}")
async def broadcast(self, message: Dict[str, Any]):
"""Broadcast message to all connected dashboards."""
disconnected = []
for dashboard in self.active_dashboards:
try:
await dashboard.send_json(message)
except Exception as e:
logger.error(f"Error broadcasting to dashboard: {e}")
disconnected.append(dashboard)
# Clean up disconnected dashboards
for dashboard in disconnected:
self.disconnect(dashboard)
websocket_manager = ConnectionManager()
dashboard_manager = DashboardManager()
2025-10-18 22:36:20 -05:00
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifecycle manager for FastAPI application."""
2025-10-18 23:56:53 -05:00
global telemetry_buffer, strategy_generator, telemetry_client
2025-10-18 22:36:20 -05:00
settings = get_settings()
logger.info(f"Starting AI Intelligence Layer on port {settings.ai_service_port}")
logger.info(f"Demo mode: {settings.demo_mode}")
2025-10-18 23:56:53 -05:00
logger.info(f"Strategy count: {settings.strategy_count}")
2025-10-18 22:36:20 -05:00
# Initialize services
telemetry_buffer = TelemetryBuffer()
strategy_generator = StrategyGenerator()
2025-10-18 23:56:53 -05:00
# strategy_analyzer = StrategyAnalyzer() # Disabled - not using analysis
2025-10-18 22:36:20 -05:00
telemetry_client = TelemetryClient()
logger.info("All services initialized successfully")
yield
# Cleanup
logger.info("Shutting down AI Intelligence Layer")
# Create FastAPI app
app = FastAPI(
title="F1 AI Intelligence Layer",
description="Advanced race strategy generation and analysis using HPC telemetry data",
version="1.0.0",
lifespan=lifespan
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount static files
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/")
async def dashboard():
"""Serve the dashboard HTML page."""
return FileResponse("static/dashboard.html")
2025-10-18 22:36:20 -05:00
@app.get("/api/health", response_model=HealthResponse)
async def health_check():
"""Health check endpoint."""
settings = get_settings()
return HealthResponse(
status="healthy",
service="AI Intelligence Layer",
version="1.0.0",
demo_mode=settings.demo_mode,
enrichment_service_url=settings.enrichment_service_url
)
@app.post("/api/ingest/enriched")
2025-10-19 02:00:56 -05:00
async def ingest_enriched_telemetry(data: EnrichedTelemetryWithContext):
2025-10-18 22:36:20 -05:00
"""
Webhook receiver for enriched telemetry data from HPC enrichment module.
This is called when enrichment service has NEXT_STAGE_CALLBACK_URL configured.
2025-10-19 02:00:56 -05:00
Receives enriched telemetry + race context and automatically triggers strategy brainstorming.
2025-10-18 22:36:20 -05:00
"""
2025-10-19 02:00:56 -05:00
global current_race_context
2025-10-18 22:36:20 -05:00
try:
2025-10-19 02:00:56 -05:00
logger.info(f"Received enriched telemetry webhook: lap {data.enriched_telemetry.lap}")
# Store telemetry in buffer
telemetry_buffer.add(data.enriched_telemetry)
# Update global race context
current_race_context = data.race_context
# Automatically trigger strategy brainstorming
buffer_data = telemetry_buffer.get_latest(limit=10)
if buffer_data and len(buffer_data) >= 3: # Wait for at least 3 laps of data
logger.info(f"Auto-triggering strategy brainstorm with {len(buffer_data)} telemetry records")
try:
# Generate strategies
response = await strategy_generator.generate(
enriched_telemetry=buffer_data,
race_context=data.race_context
)
logger.info(f"Auto-generated {len(response.strategies)} strategies for lap {data.enriched_telemetry.lap}")
return {
"status": "received_and_processed",
"lap": data.enriched_telemetry.lap,
"buffer_size": telemetry_buffer.size(),
"strategies_generated": len(response.strategies),
"strategies": [s.model_dump() for s in response.strategies]
}
except Exception as e:
logger.error(f"Error in auto-brainstorm: {e}", exc_info=True)
# Still return success for ingestion even if brainstorm fails
return {
"status": "received_but_brainstorm_failed",
"lap": data.enriched_telemetry.lap,
"buffer_size": telemetry_buffer.size(),
"error": str(e)
}
else:
logger.info(f"Buffer has only {len(buffer_data) if buffer_data else 0} records, waiting for more data before brainstorming")
return {
"status": "received_waiting_for_more_data",
"lap": data.enriched_telemetry.lap,
"buffer_size": telemetry_buffer.size()
}
2025-10-18 22:36:20 -05:00
except Exception as e:
logger.error(f"Error ingesting telemetry: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to ingest telemetry: {str(e)}"
)
@app.post("/api/strategy/brainstorm", response_model=BrainstormResponse)
async def brainstorm_strategies(request: BrainstormRequest):
"""
Generate 20 diverse race strategies based on enriched telemetry and race context.
This is Step 1 of the AI strategy process.
"""
try:
logger.info(f"Brainstorming strategies for {request.race_context.driver_state.driver_name}")
logger.info(f"Current lap: {request.race_context.race_info.current_lap}/{request.race_context.race_info.total_laps}")
# If no enriched telemetry provided, try buffer first, then enrichment service
enriched_data = request.enriched_telemetry
if not enriched_data:
# First try to get from webhook buffer (push model)
buffer_data = telemetry_buffer.get_latest(limit=10)
if buffer_data:
logger.info(f"Using {len(buffer_data)} telemetry records from webhook buffer")
enriched_data = buffer_data
else:
# Fallback: fetch from enrichment service (pull model)
logger.info("No telemetry in buffer, fetching from enrichment service...")
enriched_data = await telemetry_client.fetch_latest()
if not enriched_data:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No enriched telemetry available. Please provide data, ensure enrichment service is running, or configure webhook push."
)
# Generate strategies
response = await strategy_generator.generate(
enriched_telemetry=enriched_data,
race_context=request.race_context
)
logger.info(f"Generated {len(response.strategies)} strategies")
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in brainstorm: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Strategy generation failed: {str(e)}"
)
2025-10-18 23:56:53 -05:00
# ANALYSIS ENDPOINT DISABLED FOR SPEED
# Uncomment below to re-enable full analysis workflow
"""
2025-10-18 22:36:20 -05:00
@app.post("/api/strategy/analyze", response_model=AnalyzeResponse)
async def analyze_strategies(request: AnalyzeRequest):
2025-10-18 23:56:53 -05:00
'''
2025-10-18 22:36:20 -05:00
Analyze 20 strategies and select top 3 with detailed rationale.
This is Step 2 of the AI strategy process.
2025-10-18 23:56:53 -05:00
'''
2025-10-18 22:36:20 -05:00
try:
logger.info(f"Analyzing {len(request.strategies)} strategies")
logger.info(f"Current lap: {request.race_context.race_info.current_lap}")
# If no enriched telemetry provided, try buffer first, then enrichment service
enriched_data = request.enriched_telemetry
if not enriched_data:
# First try to get from webhook buffer (push model)
buffer_data = telemetry_buffer.get_latest(limit=10)
if buffer_data:
logger.info(f"Using {len(buffer_data)} telemetry records from webhook buffer")
enriched_data = buffer_data
else:
# Fallback: fetch from enrichment service (pull model)
logger.info("No telemetry in buffer, fetching from enrichment service...")
enriched_data = await telemetry_client.fetch_latest()
if not enriched_data:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No enriched telemetry available. Please provide data, ensure enrichment service is running, or configure webhook push."
)
# Analyze strategies
response = await strategy_analyzer.analyze(
enriched_telemetry=enriched_data,
race_context=request.race_context,
strategies=request.strategies
)
logger.info(f"Selected top 3 strategies: {[s.strategy_name for s in response.top_strategies]}")
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in analyze: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Strategy analysis failed: {str(e)}"
)
2025-10-18 23:56:53 -05:00
"""
2025-10-18 22:36:20 -05:00
@app.websocket("/ws/dashboard")
async def websocket_dashboard_endpoint(websocket: WebSocket):
"""
WebSocket endpoint for dashboard clients.
Broadcasts vehicle connection status and lap data updates.
"""
await dashboard_manager.connect(websocket)
try:
2025-10-19 14:52:08 -05:00
# Send historical data from current session immediately after connection
buffer_data = telemetry_buffer.get_all()
if buffer_data and current_race_context:
logger.info(f"[Dashboard] Sending {len(buffer_data)} historical lap records to new dashboard")
# Reverse to get chronological order (oldest to newest)
buffer_data.reverse()
# Send each historical lap as a lap_data message
for i, telemetry in enumerate(buffer_data):
try:
# Find matching strategy from history if available
lap_strategy = None
for strat in strategy_history:
if strat.get("lap") == telemetry.lap:
lap_strategy = {
"strategy_name": strat.get("strategy_name"),
"risk_level": strat.get("risk_level"),
"brief_description": strat.get("brief_description"),
"reasoning": strat.get("reasoning")
}
break
# Send historical lap data
await websocket.send_json({
"type": "lap_data",
"vehicle_id": 1, # Assume single vehicle for now
"lap_data": telemetry.model_dump(),
"race_context": {
"position": current_race_context.driver_state.current_position,
"gap_to_leader": current_race_context.driver_state.gap_to_leader,
"gap_to_ahead": current_race_context.driver_state.gap_to_ahead
},
"control_output": last_control_command if i == len(buffer_data) - 1 else {"brake_bias": 5, "differential_slip": 5},
"strategy": lap_strategy,
"timestamp": datetime.now().isoformat(),
"historical": True # Mark as historical data
})
except Exception as e:
logger.error(f"[Dashboard] Error sending historical lap {telemetry.lap}: {e}")
logger.info(f"[Dashboard] Historical data transmission complete")
else:
logger.info("[Dashboard] No historical data to send (buffer empty or no race context)")
# Keep connection alive and handle incoming messages
while True:
# Receive any messages (mostly just keepalive pings)
data = await websocket.receive_text()
# Echo back if needed
if data == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
logger.info("[Dashboard] Client disconnected")
except Exception as e:
logger.error(f"[Dashboard] Error: {e}")
finally:
dashboard_manager.disconnect(websocket)
@app.websocket("/ws/pi")
async def websocket_pi_endpoint(websocket: WebSocket):
"""
WebSocket endpoint for Raspberry Pi clients.
Flow:
1. Pi connects and streams lap telemetry via WebSocket
2. AI layer processes telemetry and generates strategies
3. AI layer pushes control commands back to Pi (brake_bias, differential_slip)
"""
global current_race_context, last_control_command, strategy_history
vehicle_id = await websocket_manager.connect(websocket)
# Clear telemetry buffer for fresh connection
# This ensures lap counting starts from scratch for each Pi session
telemetry_buffer.clear()
# Reset last control command to neutral for new session
last_control_command = {"brake_bias": 5, "differential_slip": 5}
# Clear strategy history for new race
strategy_history = []
logger.info("[WebSocket] Telemetry buffer cleared for new connection")
logger.info("[WebSocket] Strategy history cleared for new race")
# Notify dashboards of new vehicle connection
await dashboard_manager.broadcast({
"type": "vehicle_connected",
"vehicle_id": vehicle_id,
"timestamp": datetime.now().isoformat()
})
try:
# Send initial welcome message
await websocket.send_json({
"type": "connection_established",
"message": "Connected to AI Intelligence Layer",
"status": "ready",
"buffer_cleared": True
})
# Main message loop
while True:
# Receive telemetry from Pi
data = await websocket.receive_json()
message_type = data.get("type", "telemetry")
if message_type == "telemetry":
# Process incoming lap telemetry
lap_number = data.get("lap_number", 0)
# Store in buffer (convert to EnrichedTelemetryWebhook format)
# Note: This assumes data is already enriched. If raw, route through enrichment first.
enriched = data.get("enriched_telemetry")
race_context_data = data.get("race_context")
if enriched and race_context_data:
try:
# Parse enriched telemetry
enriched_obj = EnrichedTelemetryWebhook(**enriched)
telemetry_buffer.add(enriched_obj)
# Update race context
current_race_context = RaceContext(**race_context_data)
# Auto-generate strategies if we have enough data
buffer_data = telemetry_buffer.get_latest(limit=10)
if len(buffer_data) >= 3:
logger.info(f"\n{'='*60}")
logger.info(f"LAP {lap_number} - GENERATING STRATEGY")
logger.info(f"{'='*60}")
2025-10-19 06:58:39 -05:00
# Send SILENT acknowledgment to prevent timeout (no control update)
# This tells the Pi "we're working on it" without triggering voice/controls
await websocket.send_json({
2025-10-19 06:58:39 -05:00
"type": "acknowledgment",
"lap": lap_number,
2025-10-19 06:58:39 -05:00
"message": "Processing strategies, please wait..."
})
2025-10-19 14:52:08 -05:00
# Create a background task to send periodic keepalive pings during strategy generation
# This prevents WebSocket timeout during long AI operations
keepalive_active = asyncio.Event()
async def send_keepalive():
"""Send periodic pings to keep WebSocket alive during long operations."""
while not keepalive_active.is_set():
try:
await asyncio.sleep(10) # Send keepalive every 10 seconds
if not keepalive_active.is_set():
await websocket.send_json({
"type": "keepalive",
"timestamp": datetime.now().isoformat()
})
logger.debug(f"[WebSocket] Sent keepalive ping for lap {lap_number}")
except Exception as e:
logger.error(f"[WebSocket] Keepalive error: {e}")
break
# Start keepalive task
keepalive_task = asyncio.create_task(send_keepalive())
# Generate strategies (this is the slow part)
try:
response = await strategy_generator.generate(
enriched_telemetry=buffer_data,
race_context=current_race_context,
strategy_history=strategy_history
)
2025-10-19 14:52:08 -05:00
# Stop keepalive task
keepalive_active.set()
await keepalive_task
# Extract top strategy (first one)
top_strategy = response.strategies[0] if response.strategies else None
# Add to strategy history
if top_strategy:
strategy_history.append({
"lap": lap_number,
"strategy_name": top_strategy.strategy_name,
"risk_level": top_strategy.risk_level,
"brief_description": top_strategy.brief_description,
"reasoning": top_strategy.reasoning
})
# Keep only last 10 strategies
if len(strategy_history) > 10:
strategy_history.pop(0)
# Generate control commands based on strategy
control_command = generate_control_command(
lap_number=lap_number,
strategy=top_strategy,
enriched_telemetry=enriched_obj,
race_context=current_race_context
)
# Update global last command
last_control_command = {
"brake_bias": control_command["brake_bias"],
"differential_slip": control_command["differential_slip"]
}
# Send updated control command with strategies
await websocket.send_json({
"type": "control_command_update",
"lap": lap_number,
"brake_bias": control_command["brake_bias"],
"differential_slip": control_command["differential_slip"],
"strategy_name": top_strategy.strategy_name if top_strategy else "N/A",
2025-10-19 06:58:39 -05:00
"risk_level": top_strategy.risk_level if top_strategy else "medium",
"total_strategies": len(response.strategies),
"reasoning": control_command.get("reasoning", "")
})
# Broadcast to dashboards with strategy
await dashboard_manager.broadcast({
"type": "lap_data",
"vehicle_id": vehicle_id,
"lap_data": enriched,
"race_context": {
"position": current_race_context.driver_state.current_position,
"gap_to_leader": current_race_context.driver_state.gap_to_leader,
"gap_to_ahead": current_race_context.driver_state.gap_to_ahead
},
"control_output": {
"brake_bias": control_command["brake_bias"],
"differential_slip": control_command["differential_slip"]
},
"strategy": {
"strategy_name": top_strategy.strategy_name,
"risk_level": top_strategy.risk_level,
"brief_description": top_strategy.brief_description,
"reasoning": top_strategy.reasoning
} if top_strategy else None,
"timestamp": datetime.now().isoformat()
})
logger.info(f"{'='*60}\n")
except Exception as e:
2025-10-19 14:52:08 -05:00
# Stop keepalive task on error
keepalive_active.set()
try:
await keepalive_task
except:
pass
logger.error(f"[WebSocket] Strategy generation failed: {e}")
# Send error but keep neutral controls
await websocket.send_json({
"type": "error",
"lap": lap_number,
"message": f"Strategy generation failed: {str(e)}"
})
else:
# Not enough data yet, send neutral command
await websocket.send_json({
"type": "control_command",
"lap": lap_number,
"brake_bias": 5, # Neutral
"differential_slip": 5, # Neutral
"message": f"Collecting data ({len(buffer_data)}/3 laps)"
})
# Broadcast to dashboards (no strategy yet)
await dashboard_manager.broadcast({
"type": "lap_data",
"vehicle_id": vehicle_id,
"lap_data": enriched,
"race_context": {
"position": current_race_context.driver_state.current_position,
"gap_to_leader": current_race_context.driver_state.gap_to_leader,
"gap_to_ahead": current_race_context.driver_state.gap_to_ahead
},
"control_output": {
"brake_bias": 5,
"differential_slip": 5
},
"strategy": None,
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logger.error(f"[WebSocket] Error processing telemetry: {e}")
await websocket.send_json({
"type": "error",
"message": str(e)
})
else:
logger.warning(f"[WebSocket] Received incomplete data from Pi")
elif message_type == "ping":
# Respond to ping
await websocket.send_json({"type": "pong"})
elif message_type == "disconnect":
# Graceful disconnect
logger.info("[WebSocket] Pi requested disconnect")
break
except WebSocketDisconnect:
logger.info("[WebSocket] Pi client disconnected")
except Exception as e:
logger.error(f"[WebSocket] Unexpected error: {e}")
finally:
websocket_manager.disconnect(websocket)
# Clear buffer when connection closes to ensure fresh start for next connection
telemetry_buffer.clear()
logger.info("[WebSocket] Telemetry buffer cleared on disconnect")
# Notify dashboards of vehicle disconnect
await dashboard_manager.broadcast({
"type": "vehicle_disconnected",
"vehicle_id": vehicle_id,
"timestamp": datetime.now().isoformat()
})
def generate_control_command(
lap_number: int,
strategy: Any,
enriched_telemetry: EnrichedTelemetryWebhook,
race_context: RaceContext
) -> Dict[str, Any]:
"""
Generate control commands for Pi based on strategy and telemetry.
Returns brake_bias and differential_slip values (0-10) with reasoning.
Logic:
- Brake bias: Adjust based on tire degradation (higher deg = more rear bias)
- Differential slip: Adjust based on pace trend and tire cliff risk
"""
# Default neutral values
brake_bias = 5
differential_slip = 5
reasoning_parts = []
# Adjust brake bias based on tire degradation
if enriched_telemetry.tire_degradation_rate > 0.7:
# High degradation: shift bias to rear (protect fronts)
brake_bias = 7
reasoning_parts.append(f"High tire degradation ({enriched_telemetry.tire_degradation_rate:.2f}) → Brake bias 7 (rear) to protect fronts")
elif enriched_telemetry.tire_degradation_rate > 0.4:
# Moderate degradation: slight rear bias
brake_bias = 6
reasoning_parts.append(f"Moderate tire degradation ({enriched_telemetry.tire_degradation_rate:.2f}) → Brake bias 6 (slight rear)")
elif enriched_telemetry.tire_degradation_rate < 0.2:
# Fresh tires: can use front bias for better turn-in
brake_bias = 4
reasoning_parts.append(f"Fresh tires ({enriched_telemetry.tire_degradation_rate:.2f}) → Brake bias 4 (front) for better turn-in")
else:
reasoning_parts.append(f"Normal tire degradation ({enriched_telemetry.tire_degradation_rate:.2f}) → Brake bias 5 (neutral)")
# Adjust differential slip based on pace and tire cliff risk
if enriched_telemetry.tire_cliff_risk > 0.7:
# High cliff risk: increase slip for gentler tire treatment
differential_slip = 7
reasoning_parts.append(f"High tire cliff risk ({enriched_telemetry.tire_cliff_risk:.2f}) → Diff slip 7 (gentle tire treatment)")
elif enriched_telemetry.pace_trend == "declining":
# Pace declining: moderate slip increase
differential_slip = 6
reasoning_parts.append(f"Pace declining → Diff slip 6 (preserve performance)")
elif enriched_telemetry.pace_trend == "improving":
# Pace improving: can be aggressive, lower slip
differential_slip = 4
reasoning_parts.append(f"Pace improving → Diff slip 4 (aggressive, lower slip)")
else:
reasoning_parts.append(f"Pace stable → Diff slip 5 (neutral)")
# Check if within pit window
pit_window = enriched_telemetry.optimal_pit_window
if pit_window and pit_window[0] <= lap_number <= pit_window[1]:
# In pit window: conservative settings to preserve tires
old_brake = brake_bias
old_diff = differential_slip
brake_bias = min(brake_bias + 1, 10)
differential_slip = min(differential_slip + 1, 10)
reasoning_parts.append(f"In pit window (laps {pit_window[0]}-{pit_window[1]}) → Conservative: brake {old_brake}{brake_bias}, diff {old_diff}{differential_slip}")
# Format reasoning for terminal output
reasoning_text = "\n".join(f"{part}" for part in reasoning_parts)
# Print reasoning to terminal
logger.info(f"CONTROL DECISION REASONING:")
logger.info(reasoning_text)
logger.info(f"FINAL COMMANDS: Brake Bias = {brake_bias}, Differential Slip = {differential_slip}")
# Also include strategy info if available
if strategy:
logger.info(f"TOP STRATEGY: {strategy.strategy_name}")
logger.info(f" Risk Level: {strategy.risk_level}")
logger.info(f" Description: {strategy.brief_description}")
return {
"brake_bias": brake_bias,
"differential_slip": differential_slip,
"reasoning": reasoning_text
}
2025-10-18 22:36:20 -05:00
if __name__ == "__main__":
import uvicorn
settings = get_settings()
uvicorn.run(
"main:app",
host=settings.ai_service_host,
port=settings.ai_service_port,
2025-10-19 14:52:08 -05:00
reload=True,
ws_ping_interval=20, # Send ping every 20 seconds
ws_ping_timeout=60, # Wait up to 60 seconds for pong response
timeout_keep_alive=75 # HTTP keepalive timeout
2025-10-18 22:36:20 -05:00
)
2025-10-18 23:56:53 -05:00