Files
guadaloop_lev_control/RL Testing/optuna_pid_tune.py
2026-02-21 23:53:53 -06:00

338 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Optuna (TPE / Bayesian-style) optimization for the three-PID maglev controller.
Tunes all nine gains (height, roll, pitch × Kp, Ki, Kd) jointly so coupling is respected.
Noisy optimization: set disturbance_force_std > 0 so the env applies random force/torque
each step (system changes slightly each run). To keep Bayesian optimization effective,
use n_objective_repeats > 1: each candidate is evaluated multiple times and the mean cost
is returned, reducing variance so TPE can compare trials reliably.
Run from the command line or import and call run_optimization() from a notebook.
"""
import json
import os
import sys
import numpy as np
import optuna
from optuna.samplers import TPESampler
from lev_pod_env import TARGET_GAP
from pid_controller import DEFAULT_GAINS
from pid_simulation import run_pid_simulation, build_feedforward_lut
# Save pid_best_params.json next to this script so notebook and CLI find it regardless of cwd
_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
# Default optimization config: long enough to see late instability (~8s+), not so long that optimizer goes ultra-conservative
DEFAULT_MAX_STEPS = 1500
DEFAULT_INITIAL_GAPS_MM = [12.0, 18.0] # Two conditions for robustness
DEFAULT_N_TRIALS = 200
DEFAULT_TIMEOUT_S = 3600
TARGET_GAP_MM = TARGET_GAP * 1000
def _cost_from_data(
data: dict,
target_gap_mm: float,
penalty_early: float = 500.0,
weight_late_osc: float = 3.0,
weight_steady_state: float = 2.0,
) -> float:
"""
Single scalar cost from one run. Lower is better.
- ITAE for gap error and |roll|/|pitch| (tracking over time).
- Late-oscillation penalty: std(roll) and std(pitch) over the *last 50%* of the run,
plus max |roll|/|pitch| in that window, so gains that go unstable after ~half the run are penalized.
- Steady-state term: mean absolute gap error and mean |roll|/|pitch| over the *last 20%*,
so we reward settling at target with small angles.
- Penalty if episode ended early (crash/instability), regardless of run length.
- Small penalty on mean total current (efficiency).
"""
t = np.asarray(data["time"])
dt = np.diff(t, prepend=0.0)
gap_err_mm = np.abs(np.asarray(data["gap_avg"]) - target_gap_mm)
itae_gap = np.sum(t * gap_err_mm * dt)
roll_deg = np.asarray(data["roll_deg"])
pitch_deg = np.asarray(data["pitch_deg"])
roll_abs = np.abs(roll_deg)
pitch_abs = np.abs(pitch_deg)
itae_roll = np.sum(t * roll_abs * dt)
itae_pitch = np.sum(t * pitch_abs * dt)
n = len(t)
early_penalty = penalty_early if data.get("terminated_early", False) else 0.0
# Late half: penalize oscillation (std) and large angles (max) so "good for 6s then violent roll" is expensive
half = max(1, n // 2)
roll_last = roll_deg[-half:]
pitch_last = pitch_deg[-half:]
late_osc_roll = np.std(roll_last) + np.max(np.abs(roll_last))
late_osc_pitch = np.std(pitch_last) + np.max(np.abs(pitch_last))
late_osc_penalty = weight_late_osc * (late_osc_roll + late_osc_pitch)
# Last 20%: steady-state error — want small gap error and small roll/pitch at end
tail = max(1, n // 5)
ss_gap = np.mean(np.abs(np.asarray(data["gap_avg"])[-tail:] - target_gap_mm))
ss_roll = np.mean(roll_abs[-tail:])
ss_pitch = np.mean(pitch_abs[-tail:])
steady_state_penalty = weight_steady_state * (ss_gap + ss_roll + ss_pitch)
mean_current = np.mean(data["current_total"])
current_penalty = 0.01 * mean_current
return (
itae_gap
+ 2.0 * (itae_roll + itae_pitch)
+ early_penalty
+ late_osc_penalty
+ steady_state_penalty
+ current_penalty
)
def objective(
trial: optuna.Trial,
initial_gaps_mm: list,
max_steps: int,
use_feedforward: bool,
penalty_early: float,
disturbance_force_std: float = 0.0,
n_objective_repeats: int = 1,
) -> float:
"""
Optuna objective: suggest 9 gains, run simulation(s), return cost.
With disturbance_force_std > 0, the env applies random force/torque disturbances each step,
so the same gains yield different costs across runs. For Bayesian optimization under noise,
set n_objective_repeats > 1 to evaluate each candidate multiple times and return the mean cost,
reducing variance so TPE can compare trials reliably.
"""
# All three loops tuned together (recommended: they interact)
height_kp = trial.suggest_float("height_kp", 5.0, 200.0, log=True)
height_ki = trial.suggest_float("height_ki", 0.5, 50.0, log=True)
height_kd = trial.suggest_float("height_kd", 1.0, 50.0, log=True)
roll_kp = trial.suggest_float("roll_kp", 0.1, 20.0, log=True)
roll_ki = trial.suggest_float("roll_ki", 0.01, 5.0, log=True)
roll_kd = trial.suggest_float("roll_kd", 0.01, 5.0, log=True)
pitch_kp = trial.suggest_float("pitch_kp", 0.1, 20.0, log=True)
pitch_ki = trial.suggest_float("pitch_ki", 0.01, 5.0, log=True)
pitch_kd = trial.suggest_float("pitch_kd", 0.01, 5.0, log=True)
gains = {
"height_kp": height_kp, "height_ki": height_ki, "height_kd": height_kd,
"roll_kp": roll_kp, "roll_ki": roll_ki, "roll_kd": roll_kd,
"pitch_kp": pitch_kp, "pitch_ki": pitch_ki, "pitch_kd": pitch_kd,
}
cost_sum = 0.0
n_evals = 0
for _ in range(n_objective_repeats):
for initial_gap in initial_gaps_mm:
data = run_pid_simulation(
initial_gap_mm=initial_gap,
max_steps=max_steps,
use_gui=False,
disturbance_force_std=disturbance_force_std,
use_feedforward=use_feedforward,
record_video=False,
record_telemetry=False,
gains=gains,
verbose=False,
)
n = len(data["time"])
data["terminated_early"] = n < max_steps - 10
cost_sum += _cost_from_data(data, TARGET_GAP_MM, penalty_early=penalty_early)
n_evals += 1
return cost_sum / n_evals
def run_optimization(
n_trials: int = DEFAULT_N_TRIALS,
timeout: int = DEFAULT_TIMEOUT_S,
initial_gaps_mm: list = None,
max_steps: int = DEFAULT_MAX_STEPS,
use_feedforward: bool = True,
penalty_early: float = 500.0,
disturbance_force_std: float = 0.0,
n_objective_repeats: int = 1,
out_dir: str = None,
study_name: str = "pid_maglev",
) -> optuna.Study:
"""
Run Optuna study (TPE sampler) and save best params to JSON.
disturbance_force_std: passed to env so each step gets random force/torque noise (N).
n_objective_repeats: when > 1, each trial is evaluated this many times (different noise)
and the mean cost is returned, so Bayesian optimization sees a less noisy objective.
Returns:
study: Optuna Study (study.best_params, study.best_value).
"""
if initial_gaps_mm is None:
initial_gaps_mm = DEFAULT_INITIAL_GAPS_MM
if out_dir is None:
out_dir = _SCRIPT_DIR
# Build feedforward LUT once so first trial doesn't do it inside run
build_feedforward_lut()
sampler = TPESampler(
n_startup_trials=min(20, n_trials // 4),
n_ei_candidates=24,
seed=42,
)
study = optuna.create_study(
direction="minimize",
study_name=study_name,
sampler=sampler,
)
study.optimize(
lambda t: objective(
t,
initial_gaps_mm=initial_gaps_mm,
max_steps=max_steps,
use_feedforward=use_feedforward,
penalty_early=penalty_early,
disturbance_force_std=disturbance_force_std,
n_objective_repeats=n_objective_repeats,
),
n_trials=n_trials,
timeout=timeout,
show_progress_bar=True,
)
out_path = os.path.join(out_dir, "pid_best_params.json")
with open(out_path, "w") as f:
json.dump(study.best_params, f, indent=2)
print(f"Best cost: {study.best_value:.4f}")
print(f"Best params saved to {out_path}")
return study
def run_staged_optimization(
stage_steps: list = None,
n_trials_per_stage: int = 50,
timeout_per_stage: int = None,
initial_gaps_mm: list = None,
use_feedforward: bool = True,
penalty_early: float = 500.0,
disturbance_force_std: float = 0.0,
n_objective_repeats: int = 1,
out_dir: str = None,
) -> list:
"""
Run optimization in stages with increasing max_steps, warm-starting each stage from the previous best.
disturbance_force_std: passed to env for stochastic force/torque noise (N).
n_objective_repeats: mean over this many evaluations per trial for a less noisy objective.
Example: stage_steps=[1500, 3000, 6000]
- Stage 1: optimize with 1500 steps (finds good gap/initial roll), save best.
- Stage 2: optimize with 3000 steps; first trial is the 1500-step best (evaluated at 3000 steps), then TPE suggests improvements.
- Stage 3: same with 6000 steps starting from stage 2's best.
This keeps good lift-off and gap control from the short-horizon run while refining for late-horizon roll stability.
"""
if stage_steps is None:
stage_steps = [1500, 3000, 6000]
if initial_gaps_mm is None:
initial_gaps_mm = DEFAULT_INITIAL_GAPS_MM
if out_dir is None:
out_dir = _SCRIPT_DIR
if timeout_per_stage is None:
timeout_per_stage = DEFAULT_TIMEOUT_S
build_feedforward_lut()
best_params_path = os.path.join(out_dir, "pid_best_params.json")
studies = []
for stage_idx, max_steps in enumerate(stage_steps):
print(f"\n--- Stage {stage_idx + 1}/{len(stage_steps)}: max_steps={max_steps} ---")
study = optuna.create_study(
direction="minimize",
study_name=f"pid_maglev_stage_{max_steps}",
sampler=TPESampler(
n_startup_trials=min(20, n_trials_per_stage // 4),
n_ei_candidates=24,
seed=42 + stage_idx,
),
)
# Warm start: enqueue previous stage's best so we refine from it (stage 0 has no previous)
if stage_idx > 0 and os.path.isfile(best_params_path):
with open(best_params_path) as f:
prev_best = json.load(f)
study.enqueue_trial(prev_best)
print(f"Enqueued previous best params (from stage {stage_steps[stage_idx - 1]} steps) as first trial.")
study.optimize(
lambda t: objective(
t,
initial_gaps_mm=initial_gaps_mm,
max_steps=max_steps,
use_feedforward=use_feedforward,
penalty_early=penalty_early,
disturbance_force_std=disturbance_force_std,
n_objective_repeats=n_objective_repeats,
),
n_trials=n_trials_per_stage,
timeout=timeout_per_stage,
show_progress_bar=True,
)
with open(best_params_path, "w") as f:
json.dump(study.best_params, f, indent=2)
stage_path = os.path.join(out_dir, f"pid_best_params_{max_steps}.json")
with open(stage_path, "w") as f:
json.dump(study.best_params, f, indent=2)
print(f"Stage best cost: {study.best_value:.4f} -> saved to {best_params_path} and {stage_path}")
studies.append(study)
print(f"\nStaged optimization done. Final best params in {best_params_path}")
return studies
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Optuna PID tuning for maglev three-loop controller")
parser.add_argument("--n_trials", type=int, default=DEFAULT_N_TRIALS)
parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT_S)
parser.add_argument("--max_steps", type=int, default=DEFAULT_MAX_STEPS)
parser.add_argument("--gaps", type=float, nargs="+", default=DEFAULT_INITIAL_GAPS_MM)
parser.add_argument("--out_dir", type=str, default=_SCRIPT_DIR, help="Directory for pid_best_params.json (default: same as script)")
parser.add_argument("--no_feedforward", action="store_true")
parser.add_argument("--staged", action="store_true", help="Staged optimization: 1500 -> 3000 -> 6000 steps, each stage warm-starts from previous best")
parser.add_argument("--stage_steps", type=int, nargs="+", default=[1500, 3000, 6000], help="Steps per stage when using --staged (default: 1500 3000 6000)")
parser.add_argument("--n_trials_per_stage", type=int, default=DEFAULT_N_TRIALS, help="Trials per stage when using --staged")
parser.add_argument("--disturbance_force_std", type=float, default=0.0, help="Env disturbance force std (N); roll/pitch torque scale with this. Use >0 for noisy optimization.")
parser.add_argument("--n_objective_repeats", type=int, default=1, help="Evaluate each trial this many times and report mean cost (reduces noise for Bayesian optimization)")
args = parser.parse_args()
if args.staged:
run_staged_optimization(
stage_steps=args.stage_steps,
n_trials_per_stage=args.n_trials_per_stage,
timeout_per_stage=args.timeout,
initial_gaps_mm=args.gaps,
use_feedforward=not args.no_feedforward,
disturbance_force_std=args.disturbance_force_std,
n_objective_repeats=args.n_objective_repeats,
out_dir=args.out_dir,
)
else:
run_optimization(
n_trials=args.n_trials,
timeout=args.timeout,
initial_gaps_mm=args.gaps,
max_steps=args.max_steps,
use_feedforward=not args.no_feedforward,
disturbance_force_std=args.disturbance_force_std,
n_objective_repeats=args.n_objective_repeats,
out_dir=args.out_dir,
)