pipeline works from pi simulation to control output and strategy generation.

This commit is contained in:
Aditya Pulipaka
2025-10-19 03:57:03 -05:00
parent 9f70ba7221
commit 636ddf27d4
42 changed files with 1297 additions and 4472 deletions

View File

@@ -4,147 +4,105 @@ from typing import Dict, Any
def normalize_telemetry(payload: Dict[str, Any]) -> Dict[str, Any]:
"""Normalize Pi/FastF1-like telemetry payload to Enricher expected schema.
"""Normalize lap-level telemetry payload from Pi stream to Enricher schema.
Accepted aliases:
- speed: Speed
- throttle: Throttle
- brake: Brake, Brakes
- tire_compound: Compound, TyreCompound, Tire
- fuel_level: Fuel, FuelRel, FuelLevel
- ers: ERS, ERSCharge
- track_temp: TrackTemp, track_temperature
- rain_probability: RainProb, PrecipProb
- lap: Lap, LapNumber, lap_number
Accepted aliases for lap-level data:
- lap_number: lap, Lap, LapNumber, lap_number
- total_laps: TotalLaps, total_laps
- track_name: TrackName, track_name, Circuit
- driver_name: DriverName, driver_name, Driver
- current_position: Position, current_position
- tire_life_laps: TireAge, tire_age, tire_life_laps
- rainfall: Rainfall, rainfall, Rain
- lap_time: lap_time, LapTime, Time
- average_speed: average_speed, avg_speed, AvgSpeed
- max_speed: max_speed, MaxSpeed, max
- tire_compound: tire_compound, Compound, TyreCompound, Tire
- tire_life_laps: tire_life_laps, TireAge, tire_age
- track_temperature: track_temperature, TrackTemp, track_temp
- rainfall: rainfall, Rainfall, Rain
Values are clamped and defaulted if missing.
Returns normalized dict ready for enrichment layer.
"""
aliases = {
"lap": ["lap", "Lap", "LapNumber", "lap_number"],
"speed": ["speed", "Speed"],
"throttle": ["throttle", "Throttle"],
"brake": ["brake", "Brake", "Brakes"],
"tire_compound": ["tire_compound", "Compound", "TyreCompound", "Tire"],
"fuel_level": ["fuel_level", "Fuel", "FuelRel", "FuelLevel"],
"ers": ["ers", "ERS", "ERSCharge"],
"track_temp": ["track_temp", "TrackTemp", "track_temperature"],
"rain_probability": ["rain_probability", "RainProb", "PrecipProb"],
"lap_number": ["lap_number", "lap", "Lap", "LapNumber"],
"total_laps": ["total_laps", "TotalLaps"],
"track_name": ["track_name", "TrackName", "Circuit"],
"driver_name": ["driver_name", "DriverName", "Driver"],
"current_position": ["current_position", "Position"],
"lap_time": ["lap_time", "LapTime", "Time"],
"average_speed": ["average_speed", "avg_speed", "AvgSpeed"],
"max_speed": ["max_speed", "MaxSpeed", "max"],
"tire_compound": ["tire_compound", "Compound", "TyreCompound", "Tire"],
"tire_life_laps": ["tire_life_laps", "TireAge", "tire_age"],
"track_temperature": ["track_temperature", "TrackTemp", "track_temp"],
"rainfall": ["rainfall", "Rainfall", "Rain"],
}
out: Dict[str, Any] = {}
def pick(key: str, default=None):
"""Pick first matching alias from payload."""
for k in aliases.get(key, [key]):
if k in payload and payload[k] is not None:
return payload[k]
return default
def clamp01(x, default=0.0):
try:
v = float(x)
except (TypeError, ValueError):
return default
return max(0.0, min(1.0, v))
# Map values with sensible defaults
lap = pick("lap", 0)
# Extract and validate lap-level fields
lap_number = pick("lap_number", 0)
try:
lap = int(lap)
lap_number = int(lap_number)
except (TypeError, ValueError):
lap = 0
lap_number = 0
speed = pick("speed", 0.0)
total_laps = pick("total_laps", 51)
try:
speed = float(speed)
total_laps = int(total_laps)
except (TypeError, ValueError):
speed = 0.0
total_laps = 51
throttle = clamp01(pick("throttle", 0.0), 0.0)
brake = clamp01(pick("brake", 0.0), 0.0)
lap_time = pick("lap_time", None)
if lap_time:
out["lap_time"] = str(lap_time)
average_speed = pick("average_speed", 0.0)
try:
average_speed = float(average_speed)
except (TypeError, ValueError):
average_speed = 0.0
max_speed = pick("max_speed", 0.0)
try:
max_speed = float(max_speed)
except (TypeError, ValueError):
max_speed = 0.0
tire_compound = pick("tire_compound", "medium")
if isinstance(tire_compound, str):
tire_compound = tire_compound.lower()
tire_compound = tire_compound.upper() # Keep uppercase for consistency
else:
tire_compound = "medium"
tire_compound = "MEDIUM"
fuel_level = clamp01(pick("fuel_level", 0.5), 0.5)
ers = pick("ers", None)
if ers is not None:
ers = clamp01(ers, None)
track_temp = pick("track_temp", None)
tire_life_laps = pick("tire_life_laps", 0)
try:
track_temp = float(track_temp) if track_temp is not None else None
tire_life_laps = int(tire_life_laps)
except (TypeError, ValueError):
track_temp = None
tire_life_laps = 0
rain_prob = pick("rain_probability", None)
track_temperature = pick("track_temperature", 25.0)
try:
rain_prob = clamp01(rain_prob, None) if rain_prob is not None else None
except Exception:
rain_prob = None
track_temperature = float(track_temperature)
except (TypeError, ValueError):
track_temperature = 25.0
rainfall = pick("rainfall", False)
try:
rainfall = bool(rainfall)
except (TypeError, ValueError):
rainfall = False
# Build normalized output
out.update({
"lap": lap,
"speed": speed,
"throttle": throttle,
"brake": brake,
"lap_number": lap_number,
"total_laps": total_laps,
"average_speed": average_speed,
"max_speed": max_speed,
"tire_compound": tire_compound,
"fuel_level": fuel_level,
"tire_life_laps": tire_life_laps,
"track_temperature": track_temperature,
"rainfall": rainfall,
})
if ers is not None:
out["ers"] = ers
if track_temp is not None:
out["track_temp"] = track_temp
if rain_prob is not None:
out["rain_probability"] = rain_prob
# Add race context fields if present
total_laps = pick("total_laps", None)
if total_laps is not None:
try:
out["total_laps"] = int(total_laps)
except (TypeError, ValueError):
pass
track_name = pick("track_name", None)
if track_name:
out["track_name"] = str(track_name)
driver_name = pick("driver_name", None)
if driver_name:
out["driver_name"] = str(driver_name)
current_position = pick("current_position", None)
if current_position is not None:
try:
out["current_position"] = int(current_position)
except (TypeError, ValueError):
pass
tire_life_laps = pick("tire_life_laps", None)
if tire_life_laps is not None:
try:
out["tire_life_laps"] = int(tire_life_laps)
except (TypeError, ValueError):
pass
rainfall = pick("rainfall", None)
if rainfall is not None:
out["rainfall"] = bool(rainfall)
return out

View File

@@ -25,24 +25,24 @@ _CALLBACK_URL = os.getenv("NEXT_STAGE_CALLBACK_URL")
class EnrichedRecord(BaseModel):
"""Lap-level enriched telemetry model."""
lap: int
aero_efficiency: float
tire_degradation_index: float
ers_charge: float
fuel_optimization_score: float
driver_consistency: float
weather_impact: str
tire_degradation_rate: float
pace_trend: str
tire_cliff_risk: float
optimal_pit_window: List[int]
performance_delta: float
@app.post("/ingest/telemetry")
async def ingest_telemetry(payload: Dict[str, Any] = Body(...)):
"""Receive raw telemetry (from Pi), normalize, enrich, return enriched with race context.
"""Receive raw lap-level telemetry (from Pi), normalize, enrich, return enriched with race context.
Optionally forwards to NEXT_STAGE_CALLBACK_URL if set.
"""
try:
normalized = normalize_telemetry(payload)
result = _enricher.enrich_with_context(normalized)
result = _enricher.enrich_lap_data(normalized)
enriched = result["enriched_telemetry"]
race_context = result["race_context"]
except Exception as e:
@@ -85,3 +85,12 @@ async def list_enriched(limit: int = 50):
@app.get("/healthz")
async def healthz():
return {"status": "ok", "stored": len(_recent)}
@app.post("/reset")
async def reset_enricher():
"""Reset enricher state for a new session/race."""
global _enricher
_enricher = Enricher()
_recent.clear()
return {"status": "reset", "message": "Enricher state and buffer cleared"}

View File

@@ -2,370 +2,254 @@ from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, Any, Optional, List
import math
import pandas as pd
# --- Contracts ---
# Input telemetry (example, extensible):
# --- LAP-LEVEL TELEMETRY CONTRACT ---
# Input from Raspberry Pi (lap-level data):
# {
# "lap": 27,
# "speed": 282, # km/h
# "throttle": 0.91, # 0..1
# "brake": 0.05, # 0..1
# "tire_compound": "medium",# soft|medium|hard|inter|wet
# "fuel_level": 0.47, # 0..1 (fraction of race fuel)
# "ers": 0.72, # optional 0..1
# "track_temp": 38, # optional Celsius
# "rain_probability": 0.2 # optional 0..1
#
# # Additional fields for race context:
# "track_name": "Monza", # optional
# "total_laps": 51, # optional
# "driver_name": "Alonso", # optional
# "current_position": 5, # optional
# "tire_life_laps": 12, # optional (tire age)
# "rainfall": False # optional (boolean)
# }
#
# Output enrichment + race context:
# {
# "enriched_telemetry": {
# "lap": 27,
# "aero_efficiency": 0.83,
# "tire_degradation_index": 0.65,
# "ers_charge": 0.72,
# "fuel_optimization_score": 0.91,
# "driver_consistency": 0.89,
# "weather_impact": "low|medium|high"
# },
# "race_context": {
# "race_info": {...},
# "driver_state": {...},
# "competitors": [...]
# }
# "lap_number": 27,
# "total_laps": 51,
# "lap_time": "0 days 00:01:27.318000",
# "average_speed": 234.62,
# "max_speed": 333.0,
# "tire_compound": "MEDIUM",
# "tire_life_laps": 19,
# "track_temperature": 43.6,
# "rainfall": false
# }
_TIRES_BASE_WEAR = {
"soft": 0.012,
"medium": 0.008,
"hard": 0.006,
"inter": 0.015,
"wet": 0.02,
_TIRE_DEGRADATION_RATES = {
"soft": 0.030, # Fast degradation
"medium": 0.020, # Moderate degradation
"hard": 0.015, # Slow degradation
"inter": 0.025,
"wet": 0.022,
}
_TIRE_CLIFF_THRESHOLD = 25 # Laps before cliff risk increases significantly
@dataclass
class EnricherState:
last_lap: Optional[int] = None
lap_speeds: Dict[int, float] = field(default_factory=dict)
lap_throttle_avg: Dict[int, float] = field(default_factory=dict)
cumulative_wear: float = 0.0 # 0..1 approx
# Race context state
track_name: str = "Unknown Circuit"
total_laps: int = 50
driver_name: str = "Driver"
current_position: int = 10
tire_compound_history: List[str] = field(default_factory=list)
"""Maintains race state across laps for trend analysis."""
lap_times: List[float] = field(default_factory=list) # Recent lap times in seconds
lap_speeds: List[float] = field(default_factory=list) # Recent average speeds
current_tire_age: int = 0
current_tire_compound: str = "medium"
tire_stint_start_lap: int = 1
total_laps: int = 51
track_name: str = "Monza"
class Enricher:
"""Heuristic enrichment engine to simulate HPC analytics on telemetry.
Stateless inputs are enriched with stateful estimates (wear, consistency, etc.).
Designed for predictable, dependency-free behavior.
"""
HPC-simulated enrichment for lap-level F1 telemetry.
Accepts lap-level data from Raspberry Pi and generates performance insights
that simulate HPC computational analysis.
"""
def __init__(self):
self.state = EnricherState()
self._baseline_lap_time: Optional[float] = None # Best lap time as baseline
# --- Public API ---
def enrich(self, telemetry: Dict[str, Any]) -> Dict[str, Any]:
"""Legacy method - returns only enriched telemetry metrics."""
lap = int(telemetry.get("lap", 0))
speed = float(telemetry.get("speed", 0.0))
throttle = float(telemetry.get("throttle", 0.0))
brake = float(telemetry.get("brake", 0.0))
tire_compound = str(telemetry.get("tire_compound", "medium")).lower()
fuel_level = float(telemetry.get("fuel_level", 0.5))
ers = telemetry.get("ers")
track_temp = telemetry.get("track_temp")
rain_prob = telemetry.get("rain_probability")
# Update per-lap aggregates
self._update_lap_stats(lap, speed, throttle)
# Metrics
aero_eff = self._compute_aero_efficiency(speed, throttle, brake)
tire_deg = self._compute_tire_degradation(lap, speed, throttle, tire_compound, track_temp)
ers_charge = self._compute_ers_charge(ers, throttle, brake)
fuel_opt = self._compute_fuel_optimization(fuel_level, throttle)
consistency = self._compute_driver_consistency()
weather_impact = self._compute_weather_impact(rain_prob, track_temp)
return {
"lap": lap,
"aero_efficiency": round(aero_eff, 3),
"tire_degradation_index": round(tire_deg, 3),
"ers_charge": round(ers_charge, 3),
"fuel_optimization_score": round(fuel_opt, 3),
"driver_consistency": round(consistency, 3),
"weather_impact": weather_impact,
}
def enrich_with_context(self, telemetry: Dict[str, Any]) -> Dict[str, Any]:
"""Enrich telemetry and build complete race context for AI layer."""
# Extract all fields
lap = int(telemetry.get("lap", telemetry.get("lap_number", 0)))
speed = float(telemetry.get("speed", 0.0))
throttle = float(telemetry.get("throttle", 0.0))
brake = float(telemetry.get("brake", 0.0))
tire_compound = str(telemetry.get("tire_compound", "medium")).lower()
fuel_level = float(telemetry.get("fuel_level", 0.5))
ers = telemetry.get("ers")
track_temp = telemetry.get("track_temp", telemetry.get("track_temperature"))
rain_prob = telemetry.get("rain_probability")
rainfall = telemetry.get("rainfall", False)
def enrich_lap_data(self, lap_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Main enrichment method for lap-level data.
Returns enriched telemetry + race context for AI layer.
"""
# Extract lap data
lap_number = int(lap_data.get("lap_number", 0))
total_laps = int(lap_data.get("total_laps", 51))
lap_time_str = lap_data.get("lap_time")
average_speed = float(lap_data.get("average_speed", 0.0))
max_speed = float(lap_data.get("max_speed", 0.0))
tire_compound = str(lap_data.get("tire_compound", "MEDIUM")).lower()
tire_life_laps = int(lap_data.get("tire_life_laps", 0))
track_temperature = float(lap_data.get("track_temperature", 25.0))
rainfall = bool(lap_data.get("rainfall", False))
# Race context fields
track_name = telemetry.get("track_name", self.state.track_name)
total_laps = int(telemetry.get("total_laps", self.state.total_laps))
driver_name = telemetry.get("driver_name", self.state.driver_name)
current_position = int(telemetry.get("current_position", self.state.current_position))
tire_life_laps = int(telemetry.get("tire_life_laps", 0))
# Convert lap time to seconds
lap_time_seconds = self._parse_lap_time(lap_time_str)
# Update state
self.state.lap_times.append(lap_time_seconds)
self.state.lap_speeds.append(average_speed)
self.state.current_tire_age = tire_life_laps
self.state.current_tire_compound = tire_compound
self.state.total_laps = total_laps
# Keep only last 10 laps for analysis
if len(self.state.lap_times) > 10:
self.state.lap_times = self.state.lap_times[-10:]
self.state.lap_speeds = self.state.lap_speeds[-10:]
# Set baseline (best lap time)
if self._baseline_lap_time is None or lap_time_seconds < self._baseline_lap_time:
self._baseline_lap_time = lap_time_seconds
# Compute HPC-simulated insights
tire_deg_rate = self._compute_tire_degradation_rate(tire_compound, tire_life_laps, track_temperature)
pace_trend = self._compute_pace_trend()
tire_cliff_risk = self._compute_tire_cliff_risk(tire_compound, tire_life_laps)
pit_window = self._compute_optimal_pit_window(lap_number, total_laps, tire_life_laps, tire_compound)
performance_delta = self._compute_performance_delta(lap_time_seconds)
# Update state with race context
if track_name:
self.state.track_name = track_name
if total_laps:
self.state.total_laps = total_laps
if driver_name:
self.state.driver_name = driver_name
if current_position:
self.state.current_position = current_position
# Track tire compound changes
if tire_compound and (not self.state.tire_compound_history or
self.state.tire_compound_history[-1] != tire_compound):
self.state.tire_compound_history.append(tire_compound)
# Update per-lap aggregates
self._update_lap_stats(lap, speed, throttle)
# Compute enriched metrics
aero_eff = self._compute_aero_efficiency(speed, throttle, brake)
tire_deg = self._compute_tire_degradation(lap, speed, throttle, tire_compound, track_temp)
ers_charge = self._compute_ers_charge(ers, throttle, brake)
fuel_opt = self._compute_fuel_optimization(fuel_level, throttle)
consistency = self._compute_driver_consistency()
weather_impact = self._compute_weather_impact(rain_prob, track_temp)
# Build enriched telemetry
enriched_telemetry = {
"lap": lap,
"aero_efficiency": round(aero_eff, 3),
"tire_degradation_index": round(tire_deg, 3),
"ers_charge": round(ers_charge, 3),
"fuel_optimization_score": round(fuel_opt, 3),
"driver_consistency": round(consistency, 3),
"weather_impact": weather_impact,
"lap": lap_number,
"tire_degradation_rate": round(tire_deg_rate, 3),
"pace_trend": pace_trend,
"tire_cliff_risk": round(tire_cliff_risk, 3),
"optimal_pit_window": pit_window,
"performance_delta": round(performance_delta, 2)
}
# Build race context
race_context = self._build_race_context(
lap=lap,
total_laps=total_laps,
track_name=track_name,
track_temp=track_temp,
rainfall=rainfall,
driver_name=driver_name,
current_position=current_position,
tire_compound=tire_compound,
tire_life_laps=tire_life_laps,
fuel_level=fuel_level
)
race_context = {
"race_info": {
"track_name": self.state.track_name,
"total_laps": total_laps,
"current_lap": lap_number,
"weather_condition": "Wet" if rainfall else "Dry",
"track_temp_celsius": track_temperature
},
"driver_state": {
"driver_name": "Alonso",
"current_position": 5, # Mock - could be passed in
"current_tire_compound": tire_compound,
"tire_age_laps": tire_life_laps,
"fuel_remaining_percent": self._estimate_fuel(lap_number, total_laps)
}
}
return {
"enriched_telemetry": enriched_telemetry,
"race_context": race_context
}
def _build_race_context(
self,
lap: int,
total_laps: int,
track_name: str,
track_temp: Optional[float],
rainfall: bool,
driver_name: str,
current_position: int,
tire_compound: str,
tire_life_laps: int,
fuel_level: float
) -> Dict[str, Any]:
"""Build complete race context structure for AI layer."""
# Normalize tire compound for output
tire_map = {
"soft": "soft",
"medium": "medium",
"hard": "hard",
"inter": "intermediate",
"intermediate": "intermediate",
"wet": "wet"
}
normalized_tire = tire_map.get(tire_compound.lower(), "medium")
# Determine weather condition
if rainfall:
weather_condition = "Wet"
else:
weather_condition = "Dry"
race_context = {
"race_info": {
"track_name": track_name,
"total_laps": total_laps,
"current_lap": lap,
"weather_condition": weather_condition,
"track_temp_celsius": float(track_temp) if track_temp is not None else 25.0
},
"driver_state": {
"driver_name": driver_name,
"current_position": current_position,
"current_tire_compound": normalized_tire,
"tire_age_laps": tire_life_laps,
"fuel_remaining_percent": fuel_level * 100.0 # Convert 0..1 to 0..100
},
"competitors": self._generate_mock_competitors(current_position, normalized_tire, tire_life_laps)
}
return race_context
# --- HPC-Simulated Computation Methods ---
def _generate_mock_competitors(
self,
current_position: int,
current_tire: str,
current_tire_age: int
) -> List[Dict[str, Any]]:
"""Generate realistic mock competitor data for race context."""
competitors = []
def _compute_tire_degradation_rate(self, tire_compound: str, tire_age: int, track_temp: float) -> float:
"""
Simulate HPC computation of tire degradation rate.
Returns 0-1 value (higher = worse degradation).
"""
base_rate = _TIRE_DEGRADATION_RATES.get(tire_compound, 0.020)
# Driver names pool
driver_names = [
"Verstappen", "Hamilton", "Leclerc", "Perez", "Sainz",
"Russell", "Norris", "Piastri", "Alonso", "Stroll",
"Gasly", "Ocon", "Tsunoda", "Ricciardo", "Bottas",
"Zhou", "Magnussen", "Hulkenberg", "Albon", "Sargeant"
]
# Temperature effect: higher temp = more degradation
temp_multiplier = 1.0
if track_temp > 45:
temp_multiplier = 1.3
elif track_temp > 40:
temp_multiplier = 1.15
elif track_temp < 20:
temp_multiplier = 0.9
tire_compounds = ["soft", "medium", "hard"]
# Age effect: exponential increase after certain threshold
age_multiplier = 1.0
if tire_age > 20:
age_multiplier = 1.0 + ((tire_age - 20) * 0.05) # +5% per lap over 20
# Generate positions around the current driver (±3 positions)
positions_to_show = []
for offset in [-3, -2, -1, 1, 2, 3]:
pos = current_position + offset
if 1 <= pos <= 20 and pos != current_position:
positions_to_show.append(pos)
degradation = base_rate * tire_age * temp_multiplier * age_multiplier
return min(1.0, degradation)
def _compute_pace_trend(self) -> str:
"""
Analyze recent lap times to determine pace trend.
Returns: "improving", "stable", or "declining"
"""
if len(self.state.lap_times) < 3:
return "stable"
for pos in sorted(positions_to_show):
# Calculate gap (negative if ahead, positive if behind)
gap_base = (pos - current_position) * 2.5 # ~2.5s per position
gap_variation = (hash(str(pos)) % 100) / 50.0 - 1.0 # -1 to +1 variation
gap = gap_base + gap_variation
# Choose tire compound (bias towards similar strategy)
tire_choice = current_tire
if abs(hash(str(pos)) % 3) == 0: # 33% different strategy
tire_choice = tire_compounds[pos % 3]
# Tire age variation
tire_age = max(0, current_tire_age + (hash(str(pos * 7)) % 11) - 5)
competitor = {
"position": pos,
"driver": driver_names[(pos - 1) % len(driver_names)],
"tire_compound": tire_choice,
"tire_age_laps": tire_age,
"gap_seconds": round(gap, 2)
}
competitors.append(competitor)
recent_laps = self.state.lap_times[-5:] # Last 5 laps
return competitors
# --- Internals ---
def _update_lap_stats(self, lap: int, speed: float, throttle: float) -> None:
if lap <= 0:
return
# Store simple aggregates for consistency metrics
self.state.lap_speeds[lap] = speed
self.state.lap_throttle_avg[lap] = 0.8 * self.state.lap_throttle_avg.get(lap, throttle) + 0.2 * throttle
self.state.last_lap = lap
def _compute_aero_efficiency(self, speed: float, throttle: float, brake: float) -> float:
# Heuristic: favor high speed with low throttle variance (efficiency) and minimal braking at high speeds
# Normalize speed into 0..1 assuming 0..330 km/h typical
speed_n = max(0.0, min(1.0, speed / 330.0))
brake_penalty = 0.4 * brake
throttle_bonus = 0.2 * throttle
base = 0.5 * speed_n + throttle_bonus - brake_penalty
return max(0.0, min(1.0, base))
def _compute_tire_degradation(self, lap: int, speed: float, throttle: float, tire_compound: str, track_temp: Optional[float]) -> float:
base_wear = _TIRES_BASE_WEAR.get(tire_compound, _TIRES_BASE_WEAR["medium"]) # per lap
temp_factor = 1.0
if isinstance(track_temp, (int, float)):
if track_temp > 42:
temp_factor = 1.25
elif track_temp < 15:
temp_factor = 0.9
stress = 0.5 + 0.5 * throttle + 0.2 * max(0.0, (speed - 250.0) / 100.0)
wear_this_lap = base_wear * stress * temp_factor
# Update cumulative wear but cap at 1.0
self.state.cumulative_wear = min(1.0, self.state.cumulative_wear + wear_this_lap)
return self.state.cumulative_wear
def _compute_ers_charge(self, ers: Optional[float], throttle: float, brake: float) -> float:
if isinstance(ers, (int, float)):
# simple recovery under braking, depletion under throttle
ers_level = float(ers) + 0.1 * brake - 0.05 * throttle
# Calculate trend (simple linear regression)
avg_first_half = sum(recent_laps[:len(recent_laps)//2]) / max(1, len(recent_laps)//2)
avg_second_half = sum(recent_laps[len(recent_laps)//2:]) / max(1, len(recent_laps) - len(recent_laps)//2)
diff = avg_second_half - avg_first_half
if diff < -0.5: # Getting faster by more than 0.5s
return "improving"
elif diff > 0.5: # Getting slower by more than 0.5s
return "declining"
else:
# infer ers trend if not provided
ers_level = 0.6 + 0.05 * brake - 0.03 * throttle
return max(0.0, min(1.0, ers_level))
def _compute_fuel_optimization(self, fuel_level: float, throttle: float) -> float:
# Reward keeping throttle moderate when fuel is low and pushing when fuel is high
fuel_n = max(0.0, min(1.0, fuel_level))
ideal_throttle = 0.5 + 0.4 * fuel_n # higher fuel -> higher ideal throttle
penalty = abs(throttle - ideal_throttle)
score = 1.0 - penalty
return max(0.0, min(1.0, score))
def _compute_driver_consistency(self) -> float:
# Use last up to 5 laps speed variance to estimate consistency (lower variance -> higher consistency)
laps = sorted(self.state.lap_speeds.keys())[-5:]
if not laps:
return 0.5
speeds = [self.state.lap_speeds[l] for l in laps]
mean = sum(speeds) / len(speeds)
var = sum((s - mean) ** 2 for s in speeds) / len(speeds)
# Map variance to 0..1; assume 0..(30 km/h)^2 typical range
norm = min(1.0, var / (30.0 ** 2))
return max(0.0, 1.0 - norm)
def _compute_weather_impact(self, rain_prob: Optional[float], track_temp: Optional[float]) -> str:
score = 0.0
if isinstance(rain_prob, (int, float)):
score += 0.7 * float(rain_prob)
if isinstance(track_temp, (int, float)):
if track_temp < 12: # cold tires harder
score += 0.2
if track_temp > 45: # overheating
score += 0.2
if score < 0.3:
return "low"
if score < 0.6:
return "medium"
return "high"
return "stable"
def _compute_tire_cliff_risk(self, tire_compound: str, tire_age: int) -> float:
"""
Compute probability of hitting tire performance cliff.
Returns 0-1 (0 = no risk, 1 = imminent cliff).
"""
# Different compounds have different cliff points
cliff_points = {
"soft": 15,
"medium": 25,
"hard": 35,
"inter": 20,
"wet": 18
}
cliff_point = cliff_points.get(tire_compound, 25)
if tire_age < cliff_point - 5:
return 0.0
elif tire_age >= cliff_point + 5:
return 1.0
else:
# Linear risk increase in 10-lap window around cliff point
return (tire_age - (cliff_point - 5)) / 10.0
def _compute_optimal_pit_window(self, current_lap: int, total_laps: int, tire_age: int, tire_compound: str) -> List[int]:
"""
Calculate optimal pit stop window based on tire degradation.
Returns [start_lap, end_lap] for pit window.
"""
cliff_risk = self._compute_tire_cliff_risk(tire_compound, tire_age)
if cliff_risk > 0.7:
# Urgent pit needed
return [current_lap + 1, current_lap + 3]
elif cliff_risk > 0.4:
# Pit soon
return [current_lap + 3, current_lap + 6]
else:
# Tire still good, estimate based on compound
if tire_compound == "soft":
laps_remaining = max(0, 18 - tire_age)
elif tire_compound == "medium":
laps_remaining = max(0, 28 - tire_age)
else: # hard
laps_remaining = max(0, 38 - tire_age)
pit_lap = min(current_lap + laps_remaining, total_laps - 5)
return [max(current_lap + 1, pit_lap - 2), pit_lap + 2]
def _compute_performance_delta(self, current_lap_time: float) -> float:
"""
Calculate performance delta vs baseline lap time.
Negative = slower than baseline, Positive = faster.
"""
if self._baseline_lap_time is None:
return 0.0
return self._baseline_lap_time - current_lap_time # Negative if slower
def _estimate_fuel(self, current_lap: int, total_laps: int) -> float:
"""Estimate remaining fuel percentage based on lap progression."""
return max(0.0, 100.0 * (1.0 - (current_lap / total_laps)))
def _parse_lap_time(self, lap_time_str: Optional[str]) -> float:
"""Convert lap time string to seconds."""
if not lap_time_str:
return 90.0 # Default ~1:30
try:
# Handle pandas Timedelta string format
td = pd.to_timedelta(lap_time_str)
return td.total_seconds()
except:
return 90.0