From 3c961efaff62ca263abac69819ded22dc17c887b Mon Sep 17 00:00:00 2001 From: Yahya Kousa Date: Sat, 18 Oct 2025 20:51:16 -0500 Subject: [PATCH] telemetry file scripts --- scripts/fetch_race_data.py | 283 ++++++++++++++++++++++++++++++++++ scripts/simulate_pi_stream.py | 144 +++++++++++++++++ 2 files changed, 427 insertions(+) create mode 100644 scripts/fetch_race_data.py create mode 100644 scripts/simulate_pi_stream.py diff --git a/scripts/fetch_race_data.py b/scripts/fetch_race_data.py new file mode 100644 index 0000000..ed8769b --- /dev/null +++ b/scripts/fetch_race_data.py @@ -0,0 +1,283 @@ +""" +FastF1 Data Fetcher for HPC F1 AI Strategy System + +Downloads telemetry and race data from a specific F1 session to simulate +live telemetry streaming from a Raspberry Pi "racecar" to the HPC layer. + +Usage: + python fetch_race_data.py --year 2024 --race "Monaco" --driver VER --output data/ +""" + +import argparse +import json +import os +from pathlib import Path +from typing import Dict, List, Any +import warnings + +import fastf1 +import pandas as pd +import numpy as np + +# Suppress FastF1 warnings +warnings.filterwarnings('ignore') + +# Enable FastF1 cache for faster subsequent loads +CACHE_DIR = Path.home() / ".cache" / "fastf1" +CACHE_DIR.mkdir(parents=True, exist_ok=True) +fastf1.Cache.enable_cache(str(CACHE_DIR)) + + +def fetch_session_data(year: int, race: str, session_type: str = "R") -> fastf1.core.Session: + """ + Load a FastF1 session. + + Args: + year: Race year (e.g., 2024) + race: Race name or round number (e.g., "Monaco" or 6) + session_type: 'R' (Race), 'Q' (Quali), 'FP1', 'FP2', 'FP3', 'S' (Sprint) + + Returns: + Loaded FastF1 session + """ + print(f"Loading {year} {race} - {session_type}...") + session = fastf1.get_session(year, race, session_type) + session.load() + print(f"✓ Session loaded: {session.event['EventName']} - {session.name}") + return session + + +def extract_driver_telemetry(session: fastf1.core.Session, driver: str) -> pd.DataFrame: + """ + Extract comprehensive telemetry for a specific driver. + + Args: + session: Loaded FastF1 session + driver: Driver abbreviation (e.g., 'VER', 'HAM', 'LEC') + + Returns: + DataFrame with telemetry data + """ + print(f"Extracting telemetry for driver {driver}...") + + driver_laps = session.laps.pick_driver(driver) + if driver_laps.empty: + raise ValueError(f"No laps found for driver {driver}") + + # Get telemetry for all laps + telemetry_data = [] + + for lap_num in driver_laps['LapNumber'].unique(): + lap = driver_laps[driver_laps['LapNumber'] == lap_num].iloc[0] + + try: + telemetry = lap.get_telemetry() + + if telemetry.empty: + continue + + # Add lap metadata to each telemetry point + telemetry['LapNumber'] = lap_num + telemetry['Compound'] = lap['Compound'] + telemetry['TyreLife'] = lap['TyreLife'] + telemetry['LapTime'] = lap['LapTime'].total_seconds() if pd.notna(lap['LapTime']) else None + telemetry['IsPersonalBest'] = lap['IsPersonalBest'] + + telemetry_data.append(telemetry) + + except Exception as e: + print(f" ⚠ Warning: Could not get telemetry for lap {lap_num}: {e}") + continue + + if not telemetry_data: + raise ValueError(f"No telemetry data extracted for {driver}") + + full_telemetry = pd.concat(telemetry_data, ignore_index=True) + print(f"✓ Extracted {len(full_telemetry)} telemetry points across {len(driver_laps)} laps") + + return full_telemetry + + +def extract_race_context(session: fastf1.core.Session) -> Dict[str, Any]: + """ + Extract race-level context data. + + Returns: + Dictionary with weather, track, and competitor data + """ + print("Extracting race context...") + + context = { + "event": { + "name": session.event['EventName'], + "location": session.event['Location'], + "country": session.event['Country'], + "circuit": session.event.get('CircuitKey', 'unknown'), + }, + "session": { + "type": session.name, + "date": str(session.date), + "total_laps": int(session.total_laps) if hasattr(session, 'total_laps') else None, + }, + "weather": {}, + "competitors": [], + } + + # Weather data + try: + weather = session.weather_data + if not weather.empty: + # Average weather conditions + context["weather"] = { + "track_temp_avg": float(weather['TrackTemp'].mean()), + "track_temp_min": float(weather['TrackTemp'].min()), + "track_temp_max": float(weather['TrackTemp'].max()), + "air_temp_avg": float(weather['AirTemp'].mean()), + "humidity_avg": float(weather['Humidity'].mean()), + "pressure_avg": float(weather['Pressure'].mean()), + "rainfall": bool(weather['Rainfall'].any()), + } + except Exception as e: + print(f" ⚠ Warning: Could not extract weather data: {e}") + + # Competitor positions and pace + try: + results = session.results + if not results.empty: + for _, driver in results.iterrows(): + context["competitors"].append({ + "driver": driver['Abbreviation'], + "team": driver['TeamName'], + "position": int(driver['Position']) if pd.notna(driver['Position']) else None, + "grid_position": int(driver['GridPosition']) if pd.notna(driver['GridPosition']) else None, + "status": driver.get('Status', 'Unknown'), + }) + except Exception as e: + print(f" ⚠ Warning: Could not extract competitor data: {e}") + + print("✓ Race context extracted") + return context + + +def prepare_telemetry_stream(telemetry: pd.DataFrame, sample_rate_hz: float = 10.0) -> List[Dict[str, Any]]: + """ + Convert telemetry DataFrame to stream-ready format. + + Args: + telemetry: Raw telemetry DataFrame + sample_rate_hz: Target sampling rate (Hz) for simulation + + Returns: + List of telemetry dictionaries ready for streaming + """ + print(f"Preparing telemetry stream at {sample_rate_hz} Hz...") + + # Resample to target rate if needed + telemetry = telemetry.copy() + telemetry['Time'] = pd.to_timedelta(telemetry['Time']) + telemetry = telemetry.sort_values('Time') + + # Convert to milliseconds for easier time tracking + telemetry['TimeMs'] = (telemetry['Time'].dt.total_seconds() * 1000).astype(int) + + stream = [] + + for _, row in telemetry.iterrows(): + point = { + "timestamp_ms": int(row['TimeMs']), + "lap": int(row['LapNumber']), + "speed": float(row['Speed']) if pd.notna(row['Speed']) else 0.0, + "throttle": float(row['Throttle']) / 100.0 if pd.notna(row['Throttle']) else 0.0, + "brake": float(row['Brake']) if pd.notna(row['Brake']) else 0.0, + "gear": int(row['nGear']) if pd.notna(row['nGear']) else 0, + "rpm": int(row['RPM']) if pd.notna(row['RPM']) else 0, + "drs": int(row['DRS']) if pd.notna(row['DRS']) else 0, + "tire_compound": str(row['Compound']).lower() if pd.notna(row['Compound']) else "unknown", + "tire_life": int(row['TyreLife']) if pd.notna(row['TyreLife']) else 0, + } + + stream.append(point) + + print(f"✓ Prepared {len(stream)} telemetry points") + return stream + + +def save_dataset(output_dir: Path, driver: str, telemetry_stream: List[Dict], context: Dict): + """ + Save the dataset to disk for later replay. + """ + output_dir = Path(output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + # Save telemetry stream + telemetry_file = output_dir / f"{driver}_telemetry.json" + with open(telemetry_file, 'w') as f: + json.dump(telemetry_stream, f, indent=2) + print(f"✓ Saved telemetry: {telemetry_file}") + + # Save race context + context_file = output_dir / f"{driver}_context.json" + with open(context_file, 'w') as f: + json.dump(context, f, indent=2) + print(f"✓ Saved context: {context_file}") + + # Save summary metadata + summary = { + "driver": driver, + "telemetry_points": len(telemetry_stream), + "laps": len(set(p['lap'] for p in telemetry_stream)), + "duration_seconds": telemetry_stream[-1]['timestamp_ms'] / 1000.0 if telemetry_stream else 0, + "event": context['event']['name'], + "session": context['session']['type'], + } + + summary_file = output_dir / f"{driver}_summary.json" + with open(summary_file, 'w') as f: + json.dump(summary, f, indent=2) + print(f"✓ Saved summary: {summary_file}") + + print(f"\n📦 Dataset ready for simulation:") + print(f" Driver: {driver}") + print(f" Laps: {summary['laps']}") + print(f" Duration: {summary['duration_seconds']:.1f}s") + print(f" Points: {summary['telemetry_points']}") + + +def main(): + parser = argparse.ArgumentParser( + description="Fetch FastF1 data for HPC F1 AI Strategy System" + ) + parser.add_argument("--year", type=int, default=2024, help="Race year") + parser.add_argument("--race", type=str, default="Monaco", help="Race name or round number") + parser.add_argument("--driver", type=str, default="VER", help="Driver abbreviation (VER, HAM, LEC, etc.)") + parser.add_argument("--session", type=str, default="R", help="Session type (R, Q, FP1, etc.)") + parser.add_argument("--output", type=str, default="data/race_data", help="Output directory") + parser.add_argument("--sample-rate", type=float, default=10.0, help="Target sampling rate (Hz)") + + args = parser.parse_args() + + try: + # Fetch session + session = fetch_session_data(args.year, args.race, args.session) + + # Extract driver telemetry + telemetry = extract_driver_telemetry(session, args.driver) + + # Extract race context + context = extract_race_context(session) + + # Prepare telemetry stream + stream = prepare_telemetry_stream(telemetry, args.sample_rate) + + # Save dataset + save_dataset(Path(args.output), args.driver, stream, context) + + print("\n✅ Data fetch complete! Ready for Pi simulation.") + + except Exception as e: + print(f"\n❌ Error: {e}") + raise + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/scripts/simulate_pi_stream.py b/scripts/simulate_pi_stream.py new file mode 100644 index 0000000..2499c3a --- /dev/null +++ b/scripts/simulate_pi_stream.py @@ -0,0 +1,144 @@ +""" +Raspberry Pi Telemetry Stream Simulator + +Replays downloaded FastF1 data as if it's coming from a live Raspberry Pi sensor. +Sends data to the HPC simulation layer via HTTP POST. + +Usage: + python simulate_pi_stream.py --data data/race_data/VER_telemetry.json --speed 1.0 +""" + +import argparse +import json +import time +import sys +from pathlib import Path +from typing import Dict, List, Any + +import requests + + +def load_telemetry(filepath: Path) -> List[Dict[str, Any]]: + """Load telemetry data from JSON file.""" + with open(filepath, 'r') as f: + data = json.load(f) + print(f"✓ Loaded {len(data)} telemetry points from {filepath}") + return data + + +def simulate_stream( + telemetry: List[Dict[str, Any]], + endpoint: str, + speed: float = 1.0, + start_lap: int = 1, + end_lap: int = None +): + """ + Simulate live telemetry streaming. + + Args: + telemetry: List of telemetry points + endpoint: HPC API endpoint URL + speed: Playback speed multiplier (1.0 = real-time, 2.0 = 2x speed) + start_lap: Starting lap number + end_lap: Ending lap number (None = all laps) + """ + # Filter by lap range + filtered = [p for p in telemetry if p['lap'] >= start_lap] + if end_lap: + filtered = [p for p in filtered if p['lap'] <= end_lap] + + if not filtered: + print("❌ No telemetry points in specified lap range") + return + + print(f"\n🏁 Starting telemetry stream simulation") + print(f" Endpoint: {endpoint}") + print(f" Laps: {start_lap} → {end_lap or 'end'}") + print(f" Speed: {speed}x") + print(f" Points: {len(filtered)}") + print(f" Duration: {filtered[-1]['timestamp_ms'] / 1000.0:.1f}s\n") + + start_time = time.time() + start_ts = filtered[0]['timestamp_ms'] + + sent_count = 0 + error_count = 0 + + try: + for i, point in enumerate(filtered): + # Calculate when this point should be sent + point_offset = (point['timestamp_ms'] - start_ts) / 1000.0 / speed + target_time = start_time + point_offset + + # Wait until the right time + sleep_time = target_time - time.time() + if sleep_time > 0: + time.sleep(sleep_time) + + # Send telemetry point + try: + response = requests.post( + endpoint, + json=point, + timeout=2.0 + ) + + if response.status_code == 200: + sent_count += 1 + if sent_count % 100 == 0: + elapsed = time.time() - start_time + progress = (i + 1) / len(filtered) * 100 + print(f" 📡 Lap {point['lap']}: {sent_count} points sent " + f"({progress:.1f}% complete, {elapsed:.1f}s elapsed)") + else: + error_count += 1 + print(f" ⚠ HTTP {response.status_code}: {response.text[:50]}") + + except requests.RequestException as e: + error_count += 1 + if error_count % 10 == 0: + print(f" ⚠ Connection error ({error_count} total): {e}") + + print(f"\n✅ Stream complete!") + print(f" Sent: {sent_count} points") + print(f" Errors: {error_count}") + print(f" Duration: {time.time() - start_time:.1f}s") + + except KeyboardInterrupt: + print(f"\n⏸ Stream interrupted by user") + print(f" Sent: {sent_count}/{len(filtered)} points") + + +def main(): + parser = argparse.ArgumentParser( + description="Simulate Raspberry Pi telemetry streaming" + ) + parser.add_argument("--data", type=str, required=True, help="Path to telemetry JSON file") + parser.add_argument("--endpoint", type=str, default="http://localhost:8000/telemetry", + help="HPC API endpoint") + parser.add_argument("--speed", type=float, default=1.0, help="Playback speed (1.0 = real-time)") + parser.add_argument("--start-lap", type=int, default=1, help="Starting lap number") + parser.add_argument("--end-lap", type=int, default=None, help="Ending lap number") + + args = parser.parse_args() + + try: + telemetry = load_telemetry(Path(args.data)) + simulate_stream( + telemetry, + args.endpoint, + args.speed, + args.start_lap, + args.end_lap + ) + except FileNotFoundError: + print(f"❌ File not found: {args.data}") + sys.exit(1) + except Exception as e: + print(f"❌ Error: {e}") + raise + + +if __name__ == "__main__": + main() \ No newline at end of file