Files
guadaloop_lev_control/serial_plotter.py

654 lines
23 KiB
Python
Raw Normal View History

2025-11-30 19:05:57 -06:00
import serial
import serial.tools.list_ports
import matplotlib
matplotlib.use('TkAgg') # Use TkAgg backend explicitly
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from collections import deque
import argparse
import time
import random
import threading
import sys
from datetime import datetime
import queue
import tkinter as tk
from tkinter import ttk
# Constants
BAUD_RATE = 2000000
MAX_POINTS = 100 # Number of points to display on the plot
# Save queue for handling saves in main thread
save_queue = queue.Queue()
# Data storage
data = {
"Left": deque(maxlen=MAX_POINTS),
"Right": deque(maxlen=MAX_POINTS),
"Front": deque(maxlen=MAX_POINTS),
"Back": deque(maxlen=MAX_POINTS),
"Avg": deque(maxlen=MAX_POINTS),
"FLPWM": deque(maxlen=MAX_POINTS),
"BLPWM": deque(maxlen=MAX_POINTS),
"FRPWM": deque(maxlen=MAX_POINTS),
"BRPWM": deque(maxlen=MAX_POINTS),
"ControlOn": deque(maxlen=MAX_POINTS),
"Time": deque(maxlen=MAX_POINTS)
}
# Recording data storage (no max length limit)
recording_data = {
"Left": [],
"Right": [],
"Front": [],
"Back": [],
"Avg": [],
"FLPWM": [],
"BLPWM": [],
"FRPWM": [],
"BRPWM": [],
"ControlOn": [],
"Time": []
}
start_time = time.time()
recording = False
recording_start_time = None
def get_serial_port():
ports = list(serial.tools.list_ports.comports())
if not ports:
return None
print("Available ports:")
for i, p in enumerate(ports):
print(f"{i}: {p.device} - {p.description}")
if len(ports) == 1:
return ports[0].device
try:
selection = int(input("Select port index: "))
return ports[selection].device
except (ValueError, IndexError):
print("Invalid selection.")
return None
def mock_data_generator():
"""Generates mock data in the expected CSV format."""
while True:
# Simulate sensor readings (around 12.0mm with noise)
left = 12.0 + random.uniform(-0.5, 0.5)
right = 12.0 + random.uniform(-0.5, 0.5)
front = 12.0 + random.uniform(-0.5, 0.5)
back = 12.0 + random.uniform(-0.5, 0.5)
avg = (left + right + front + back) / 4.0
# Simulate PWM values (around 0 with noise)
fl_pwm = random.randint(-50, 50)
bl_pwm = random.randint(-50, 50)
fr_pwm = random.randint(-50, 50)
br_pwm = random.randint(-50, 50)
control_on = 1
# CSV Format: Left,Right,Front,Back,Avg,FLPWM,BLPWM,FRPWM,BRPWM,ControlOn
line = f"{left:.2f},{right:.2f},{front:.2f},{back:.2f},{avg:.2f},{fl_pwm},{bl_pwm},{fr_pwm},{br_pwm},{control_on}"
yield line
time.sleep(0.01) # 100Hz
def read_serial(ser, mock=False):
"""Reads data from serial port or mock generator."""
global recording, recording_start_time
generator = mock_data_generator() if mock else None
while True:
try:
if mock:
line = next(generator)
else:
if ser.in_waiting:
line = ser.readline().decode('utf-8').strip()
else:
continue
parts = line.split(',')
if len(parts) == 10:
current_time = time.time() - start_time
left = float(parts[0])
right = float(parts[1])
front = float(parts[2])
back = float(parts[3])
avg = float(parts[4])
flpwm = float(parts[5])
blpwm = float(parts[6])
frpwm = float(parts[7])
brpwm = float(parts[8])
control_on = int(parts[9])
# Update live display data
data["Left"].append(left)
data["Right"].append(right)
data["Front"].append(front)
data["Back"].append(back)
data["Avg"].append(avg)
data["FLPWM"].append(flpwm)
data["BLPWM"].append(blpwm)
data["FRPWM"].append(frpwm)
data["BRPWM"].append(brpwm)
data["ControlOn"].append(control_on)
data["Time"].append(current_time)
# If recording, store in unlimited buffer
if recording:
recording_time = current_time - recording_start_time
recording_data["Left"].append(left)
recording_data["Right"].append(right)
recording_data["Front"].append(front)
recording_data["Back"].append(back)
recording_data["Avg"].append(avg)
recording_data["FLPWM"].append(flpwm)
recording_data["BLPWM"].append(blpwm)
recording_data["FRPWM"].append(frpwm)
recording_data["BRPWM"].append(brpwm)
recording_data["ControlOn"].append(control_on)
recording_data["Time"].append(recording_time)
except ValueError:
pass # Ignore parse errors
except Exception as e:
print(f"Error reading data: {e}")
if not mock:
break
def save_recording(metadata=None):
"""Save the recorded data to a high-quality PNG file."""
global recording_data
if not recording_data["Time"]:
print("No data recorded to save.")
return
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Create directory if it doesn't exist
import os
save_dir = "tuningTrials"
os.makedirs(save_dir, exist_ok=True)
filename = f"{save_dir}/trial_{timestamp}.png"
# Create a completely independent figure with Agg backend
import matplotlib
from matplotlib.figure import Figure
from matplotlib.backends.backend_agg import FigureCanvasAgg
save_fig = Figure(figsize=(14, 10))
canvas = FigureCanvasAgg(save_fig)
save_fig.suptitle(f'Levitation Control Trial - {timestamp}', fontsize=16, fontweight='bold')
# Plot Sensors
ax1 = save_fig.add_subplot(2, 1, 1)
ax1.plot(recording_data["Time"], recording_data["Left"], label="Left", linewidth=2)
ax1.plot(recording_data["Time"], recording_data["Right"], label="Right", linewidth=2)
ax1.plot(recording_data["Time"], recording_data["Front"], label="Front", linewidth=2)
ax1.plot(recording_data["Time"], recording_data["Back"], label="Back", linewidth=2)
ax1.plot(recording_data["Time"], recording_data["Avg"], label="Avg", linestyle='--', color='black', linewidth=2.5)
ax1.set_ylabel("Distance (mm)", fontsize=12, fontweight='bold')
ax1.legend(loc='upper right', fontsize=10)
ax1.set_title("Sensor Readings", fontsize=14, fontweight='bold')
ax1.grid(True, alpha=0.3)
ax1.set_xlim([0, recording_data["Time"][-1]])
# Plot PWMs
ax2 = save_fig.add_subplot(2, 1, 2)
ax2.plot(recording_data["Time"], recording_data["FLPWM"], label="FL", linewidth=2)
ax2.plot(recording_data["Time"], recording_data["BLPWM"], label="BL", linewidth=2)
ax2.plot(recording_data["Time"], recording_data["FRPWM"], label="FR", linewidth=2)
ax2.plot(recording_data["Time"], recording_data["BRPWM"], label="BR", linewidth=2)
ax2.set_ylabel("PWM Value", fontsize=12, fontweight='bold')
ax2.set_xlabel("Time (s)", fontsize=12, fontweight='bold')
ax2.legend(loc='upper right', fontsize=10)
ax2.set_title("PWM Outputs", fontsize=14, fontweight='bold')
ax2.grid(True, alpha=0.3)
ax2.set_xlim([0, recording_data["Time"][-1]])
save_fig.tight_layout()
# Save with high DPI using the canvas
canvas.print_figure(filename, dpi=300, bbox_inches='tight')
print(f"\n{'='*60}")
print(f"Trial data saved to: {filename}")
print(f"Duration: {recording_data['Time'][-1]:.2f} seconds")
print(f"Data points: {len(recording_data['Time'])}")
# Save metadata if provided
if metadata:
txt_filename = f"{save_dir}/trial_{timestamp}.txt"
try:
with open(txt_filename, 'w') as f:
f.write(f"Trial Timestamp: {timestamp}\n")
f.write(f"Duration: {recording_data['Time'][-1]:.2f} seconds\n")
f.write(f"Data Points: {len(recording_data['Time'])}\n\n")
f.write("=== Reference Values ===\n")
if "references" in metadata:
for key, value in metadata["references"].items():
f.write(f"{key}: {value}\n")
f.write("\n")
f.write("=== PID Parameters ===\n")
if "pid" in metadata:
for mode, values in metadata["pid"].items():
f.write(f"[{mode}]\n")
f.write(f" Kp: {values['Kp']}\n")
f.write(f" Ki: {values['Ki']}\n")
f.write(f" Kd: {values['Kd']}\n")
f.write("\n")
print(f"Metadata saved to: {txt_filename}")
except Exception as e:
print(f"Failed to save metadata: {e}")
print(f"{'='*60}\n")
def update_plot(frame):
global recording
# Check if there's a save request
try:
while not save_queue.empty():
item = save_queue.get_nowait()
if isinstance(item, dict):
save_recording(metadata=item)
else:
save_recording()
except:
pass
plt.clf()
# Add recording indicator
fig = plt.gcf()
if recording:
fig.patch.set_facecolor('#ffe6e6') # Light red background when recording
else:
fig.patch.set_facecolor('white')
# Plot Sensors
plt.subplot(2, 1, 1)
plt.plot(data["Time"], data["Left"], label="Left")
plt.plot(data["Time"], data["Right"], label="Right")
plt.plot(data["Time"], data["Front"], label="Front")
plt.plot(data["Time"], data["Back"], label="Back")
plt.plot(data["Time"], data["Avg"], label="Avg", linestyle='--', color='black')
plt.ylabel("Distance (mm)")
plt.legend(loc='upper right')
title = "Sensor Readings"
if recording:
title += " [RECORDING]"
plt.title(title)
plt.grid(True)
# Plot PWMs
plt.subplot(2, 1, 2)
plt.plot(data["Time"], data["FLPWM"], label="FL")
plt.plot(data["Time"], data["BLPWM"], label="BL")
plt.plot(data["Time"], data["FRPWM"], label="FR")
plt.plot(data["Time"], data["BRPWM"], label="BR")
plt.ylabel("PWM Value")
plt.xlabel("Time (s)")
plt.legend(loc='upper right')
title = "PWM Outputs"
if recording:
title += " [RECORDING]"
plt.title(title)
plt.grid(True)
plt.tight_layout()
def main():
parser = argparse.ArgumentParser(description='Serial Plotter for Levitation Control')
parser.add_argument('--port', type=str, help='Serial port to connect to')
parser.add_argument('--mock', action='store_true', help='Use mock data instead of serial')
args = parser.parse_args()
ser = None
if not args.mock:
port = args.port or get_serial_port()
if not port:
print("No serial port found or selected. Use --mock to test without hardware.")
return
try:
ser = serial.Serial(port, BAUD_RATE, timeout=1)
print(f"Connected to {port} at {BAUD_RATE} baud.")
except serial.SerialException as e:
print(f"Error connecting to serial port: {e}")
return
# Start data reading thread
thread = threading.Thread(target=read_serial, args=(ser, args.mock), daemon=True)
thread.start()
# Send initial start command
if ser:
try:
print("Sending start command...")
ser.write(b'1')
except Exception as e:
print(f"Failed to send start command: {e}")
# Create Tkinter window
root = tk.Tk()
root.title("Levitation Control - Serial Plotter")
# Create main container
main_container = ttk.Frame(root)
main_container.pack(fill=tk.BOTH, expand=True)
# Left panel for controls
control_panel = ttk.Frame(main_container, width=400)
control_panel.pack(side=tk.LEFT, fill=tk.BOTH, padx=5, pady=5)
# Right panel for plot
plot_panel = ttk.Frame(main_container)
plot_panel.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=5, pady=5)
# === CONTROL BUTTONS ===
btn_frame = ttk.LabelFrame(control_panel, text="Control", padding=10)
btn_frame.pack(fill=tk.X, pady=5)
def start_control():
if ser:
ser.write(b'1')
print("Control started")
global recording, recording_start_time, recording_data
recording = True
recording_start_time = time.time() - start_time
for key in recording_data:
recording_data[key] = []
def stop_control():
if ser:
ser.write(b'0')
print("Control stopped")
global recording
if recording:
recording = False
# Collect metadata
metadata = {
"references": {
"Avg Height": avg_ref_entry.get(),
"LR Diff": lr_diff_entry.get(),
"FB Diff": fb_diff_entry.get()
},
"pid": {}
}
# Collect PID values
for name, _, _, _, _ in pid_modes:
kp_e, ki_e, kd_e = pid_entries[name]
metadata["pid"][name] = {
"Kp": kp_e.get(),
"Ki": ki_e.get(),
"Kd": kd_e.get()
}
save_queue.put(metadata)
ttk.Button(btn_frame, text="Start Control & Record", command=start_control, width=25).pack(pady=2)
ttk.Button(btn_frame, text="Stop Control & Save", command=stop_control, width=25).pack(pady=2)
# === REFERENCE VALUES ===
ref_frame = ttk.LabelFrame(control_panel, text="Reference Values", padding=10)
ref_frame.pack(fill=tk.X, pady=5)
# Average Reference
ttk.Label(ref_frame, text="Avg Height (mm):", width=15).grid(row=0, column=0, sticky=tk.W, pady=2)
avg_ref_entry = ttk.Entry(ref_frame, width=12)
avg_ref_entry.insert(0, "11.0")
avg_ref_entry.grid(row=0, column=1, padx=5, pady=2)
# Left-Right Diff Reference
ttk.Label(ref_frame, text="LR Diff (mm):", width=15).grid(row=1, column=0, sticky=tk.W, pady=2)
lr_diff_entry = ttk.Entry(ref_frame, width=12)
lr_diff_entry.insert(0, "-2.0")
lr_diff_entry.grid(row=1, column=1, padx=5, pady=2)
# Front-Back Diff Reference
ttk.Label(ref_frame, text="FB Diff (mm):", width=15).grid(row=2, column=0, sticky=tk.W, pady=2)
fb_diff_entry = ttk.Entry(ref_frame, width=12)
fb_diff_entry.insert(0, "0.0")
fb_diff_entry.grid(row=2, column=1, padx=5, pady=2)
def send_references():
try:
avg_ref = float(avg_ref_entry.get())
lr_diff = float(lr_diff_entry.get())
fb_diff = float(fb_diff_entry.get())
cmd = f"REF,{avg_ref},{lr_diff},{fb_diff}\n"
if ser:
ser.write(cmd.encode('utf-8'))
print(f"Sent References: Avg={avg_ref}, LR={lr_diff}, FB={fb_diff}")
else:
print(f"Mock mode - would send: {cmd.strip()}")
except ValueError as e:
print(f"Error: Invalid reference values - {e}")
ttk.Button(ref_frame, text="Send References", command=send_references, width=25).grid(row=3, column=0, columnspan=2, pady=10)
# === PID TUNING ===
pid_frame = ttk.LabelFrame(control_panel, text="PID Tuning", padding=10)
pid_frame.pack(fill=tk.BOTH, expand=True, pady=5)
# Create scrollable frame
canvas = tk.Canvas(pid_frame, height=500)
scrollbar = ttk.Scrollbar(pid_frame, orient="vertical", command=canvas.yview)
scrollable_frame = ttk.Frame(canvas)
scrollable_frame.bind(
"<Configure>",
lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
)
canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
canvas.configure(yscrollcommand=scrollbar.set)
# PID modes with default values from the .ino file
pid_modes = [
("Repelling", 0, 250, 0, 1000),
("Attracting", 1, 250, 0, 1000),
("RollLeftDown", 2, 0, 0, 100),
("RollLeftUp", 3, 0, 0, 100),
("RollFrontDown", 4, 0, 0, 500),
("RollFrontUp", 5, 0, 0, 500),
]
pid_entries = {}
for name, mode, default_kp, default_ki, default_kd in pid_modes:
mode_frame = ttk.LabelFrame(scrollable_frame, text=name, padding=5)
mode_frame.pack(fill=tk.X, pady=5, padx=5)
# Kp
ttk.Label(mode_frame, text="Kp:", width=5).grid(row=0, column=0, sticky=tk.W)
kp_entry = ttk.Entry(mode_frame, width=12)
kp_entry.insert(0, str(default_kp))
kp_entry.grid(row=0, column=1, padx=5)
# Ki
ttk.Label(mode_frame, text="Ki:", width=5).grid(row=1, column=0, sticky=tk.W)
ki_entry = ttk.Entry(mode_frame, width=12)
ki_entry.insert(0, str(default_ki))
ki_entry.grid(row=1, column=1, padx=5)
# Kd
ttk.Label(mode_frame, text="Kd:", width=5).grid(row=2, column=0, sticky=tk.W)
kd_entry = ttk.Entry(mode_frame, width=12)
kd_entry.insert(0, str(default_kd))
kd_entry.grid(row=2, column=1, padx=5)
# Send button
def make_send_func(mode_num, kp_e, ki_e, kd_e, mode_name):
def send_pid():
try:
kp = float(kp_e.get())
ki = float(ki_e.get())
kd = float(kd_e.get())
cmd = f"PID,{mode_num},{kp},{ki},{kd}\n"
if ser:
ser.write(cmd.encode('utf-8'))
print(f"Sent {mode_name}: Kp={kp}, Ki={ki}, Kd={kd}")
else:
print(f"Mock mode - would send: {cmd.strip()}")
except ValueError as e:
print(f"Error: Invalid PID values - {e}")
return send_pid
send_btn = ttk.Button(mode_frame, text="Send",
command=make_send_func(mode, kp_entry, ki_entry, kd_entry, name),
width=10)
send_btn.grid(row=3, column=0, columnspan=2, pady=5)
pid_entries[name] = (kp_entry, ki_entry, kd_entry)
# Send All button
def send_all_pid():
for name, mode, _, _, _ in pid_modes:
kp_e, ki_e, kd_e = pid_entries[name]
try:
kp = float(kp_e.get())
ki = float(ki_e.get())
kd = float(kd_e.get())
cmd = f"PID,{mode},{kp},{ki},{kd}\n"
if ser:
ser.write(cmd.encode('utf-8'))
time.sleep(0.05) # Small delay between commands
print(f"Sent {name}: Kp={kp}, Ki={ki}, Kd={kd}")
except ValueError as e:
print(f"Error in {name}: {e}")
send_all_frame = ttk.Frame(scrollable_frame)
send_all_frame.pack(fill=tk.X, pady=10, padx=5)
ttk.Button(send_all_frame, text="Send All PID Values", command=send_all_pid, width=25).pack()
canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# === PLOT SETUP ===
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8))
fig.tight_layout(pad=3.0)
canvas_plot = FigureCanvasTkAgg(fig, master=plot_panel)
canvas_plot.get_tk_widget().pack(fill=tk.BOTH, expand=True)
# Setup axes once
ax1.set_ylabel("Distance (mm)")
ax1.grid(True)
ax1.set_title("Sensor Readings")
ax2.set_ylabel("PWM Value")
ax2.set_xlabel("Time (s)")
ax2.grid(True)
ax2.set_title("PWM Outputs")
# Initialize line objects for efficient updating
line_left, = ax1.plot([], [], label="Left")
line_right, = ax1.plot([], [], label="Right")
line_front, = ax1.plot([], [], label="Front")
line_back, = ax1.plot([], [], label="Back")
line_avg, = ax1.plot([], [], label="Avg", linestyle='--', color='black', linewidth=1.5)
line_fl, = ax2.plot([], [], label="FL")
line_bl, = ax2.plot([], [], label="BL")
line_fr, = ax2.plot([], [], label="FR")
line_br, = ax2.plot([], [], label="BR")
# Create legends after line objects
ax1.legend(loc='upper right')
ax2.legend(loc='upper right')
# Track recording state to minimize updates
_was_recording = False
def update_plot(frame):
global recording
nonlocal _was_recording
# Check if there's a save request
try:
while not save_queue.empty():
item = save_queue.get_nowait()
if isinstance(item, dict):
save_recording(metadata=item)
else:
save_recording()
except:
pass
# Only update background color when recording state changes
if recording != _was_recording:
if recording:
fig.patch.set_facecolor('#ffe6e6')
ax1.set_title("Sensor Readings [RECORDING]")
ax2.set_title("PWM Outputs [RECORDING]")
else:
fig.patch.set_facecolor('white')
ax1.set_title("Sensor Readings")
ax2.set_title("PWM Outputs")
_was_recording = recording
# Update line data efficiently (no clear/replot)
if len(data["Time"]) > 0:
time_data = list(data["Time"])
line_left.set_data(time_data, list(data["Left"]))
line_right.set_data(time_data, list(data["Right"]))
line_front.set_data(time_data, list(data["Front"]))
line_back.set_data(time_data, list(data["Back"]))
line_avg.set_data(time_data, list(data["Avg"]))
line_fl.set_data(time_data, list(data["FLPWM"]))
line_bl.set_data(time_data, list(data["BLPWM"]))
line_fr.set_data(time_data, list(data["FRPWM"]))
line_br.set_data(time_data, list(data["BRPWM"]))
# Auto-scale axes
ax1.relim()
ax1.autoscale_view()
ax2.relim()
ax2.autoscale_view()
canvas_plot.draw_idle()
def on_close():
print("Window closed. Exiting script.")
if ser:
ser.close()
root.quit()
root.destroy()
sys.exit()
root.protocol("WM_DELETE_WINDOW", on_close)
# Start animation with slower interval for smoother GUI (100ms = 10 FPS is sufficient)
ani = animation.FuncAnimation(fig, update_plot, interval=100, cache_frame_data=False, blit=False)
print("\n" + "="*60)
print("SERIAL PLOTTER - GUI MODE")
print("="*60)
print(" Use buttons to start/stop control and recording")
print(" Adjust PID values and click 'Send' to update Arduino")
print("="*60 + "\n")
root.mainloop()
if __name__ == "__main__":
main()