heave control + plotter
This commit is contained in:
179
heave_plotter.py
Normal file
179
heave_plotter.py
Normal file
@@ -0,0 +1,179 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Minimal serial plotter for the HeaveOnly sketch.
|
||||
|
||||
Expects CSV lines at 2_000_000 baud: Front,Back,Avg,PWM,outputOn
|
||||
Key commands (focus the plot window):
|
||||
0 → output off
|
||||
1 → output on, PID
|
||||
2 → output on, full attract
|
||||
q → quit
|
||||
|
||||
Text boxes at the bottom of the window send:
|
||||
Ref (mm) → R<value>
|
||||
PID (kp,ki,kd) → P<kp>,<ki>,<kd>
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
import time
|
||||
from collections import deque
|
||||
|
||||
import matplotlib
|
||||
matplotlib.use('TkAgg')
|
||||
import matplotlib.pyplot as plt
|
||||
from matplotlib.animation import FuncAnimation
|
||||
from matplotlib.widgets import TextBox, CheckButtons
|
||||
import serial
|
||||
import serial.tools.list_ports
|
||||
|
||||
BAUD_RATE = 2_000_000
|
||||
MAX_POINTS = 400 # rolling window length
|
||||
TIMEOUT_S = 0.02
|
||||
|
||||
|
||||
def pick_port():
|
||||
ports = list(serial.tools.list_ports.comports())
|
||||
if not ports:
|
||||
sys.exit('No serial ports found.')
|
||||
for p in ports:
|
||||
d = (p.description or '').lower()
|
||||
if 'usbmodem' in p.device or 'arduino' in d or 'usbserial' in p.device.lower():
|
||||
return p.device
|
||||
return ports[0].device
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument('--port', default=None)
|
||||
ap.add_argument('--window', type=int, default=MAX_POINTS)
|
||||
args = ap.parse_args()
|
||||
|
||||
port = args.port or pick_port()
|
||||
print(f'Opening {port} @ {BAUD_RATE} baud')
|
||||
ser = serial.Serial(port, BAUD_RATE, timeout=TIMEOUT_S)
|
||||
time.sleep(2.0) # let the Arduino reset
|
||||
ser.reset_input_buffer()
|
||||
|
||||
t0 = time.time()
|
||||
N = args.window
|
||||
t_buf = deque(maxlen=N)
|
||||
front = deque(maxlen=N)
|
||||
back = deque(maxlen=N)
|
||||
avg = deque(maxlen=N)
|
||||
pwm = deque(maxlen=N)
|
||||
on_buf = deque(maxlen=N)
|
||||
|
||||
fig, (ax_mm, ax_pwm) = plt.subplots(2, 1, sharex=True, figsize=(10, 6))
|
||||
fig.subplots_adjust(bottom=0.18)
|
||||
l_front, = ax_mm.plot([], [], label='Front', color='tab:blue')
|
||||
l_back, = ax_mm.plot([], [], label='Back', color='tab:orange')
|
||||
l_avg, = ax_mm.plot([], [], label='Avg', color='k', lw=2)
|
||||
ax_mm.set_ylabel('Gap (mm)')
|
||||
ax_mm.grid(True, alpha=0.3)
|
||||
ax_mm.legend(loc='upper right')
|
||||
|
||||
l_pwm, = ax_pwm.plot([], [], label='PWM', color='tab:red')
|
||||
ax_pwm.axhline(0, color='gray', lw=0.5)
|
||||
ax_pwm.set_ylabel('PWM')
|
||||
ax_pwm.set_xlabel('Time (s)')
|
||||
ax_pwm.set_ylim(-260, 260)
|
||||
ax_pwm.grid(True, alpha=0.3)
|
||||
|
||||
mode_txt = fig.text(0.01, 0.97, 'mode: ?', fontsize=10,
|
||||
family='monospace', va='top')
|
||||
fig.suptitle('HeaveOnly — keys: 0=off 1=PID 2=attract q=quit')
|
||||
|
||||
def send_mode(cmd_char):
|
||||
ser.write((cmd_char + '\n').encode())
|
||||
mode_txt.set_text({'0': 'mode: OFF',
|
||||
'1': 'mode: PID',
|
||||
'2': 'mode: ATTRACT'}[cmd_char])
|
||||
|
||||
def on_key(event):
|
||||
# Suppress mode keys while either TextBox is actively editing,
|
||||
# so typing digits into Ref/PID fields doesn't ping-pong the coils.
|
||||
if getattr(tb_ref, 'capturekeystrokes', False) or \
|
||||
getattr(tb_pid, 'capturekeystrokes', False):
|
||||
return
|
||||
if event.key in ('0', '1', '2'):
|
||||
send_mode(event.key)
|
||||
elif event.key == 'q':
|
||||
plt.close(fig)
|
||||
fig.canvas.mpl_connect('key_press_event', on_key)
|
||||
|
||||
ax_ref = fig.add_axes([0.10, 0.04, 0.15, 0.05])
|
||||
tb_ref = TextBox(ax_ref, 'Ref (mm) ', initial='12.36')
|
||||
ax_pid = fig.add_axes([0.50, 0.04, 0.30, 0.05])
|
||||
tb_pid = TextBox(ax_pid, 'PID (kp,ki,kd) ', initial='10,0,8')
|
||||
ax_ff = fig.add_axes([0.88, 0.03, 0.08, 0.07])
|
||||
cb_ff = CheckButtons(ax_ff, ['FF'], [True])
|
||||
|
||||
def on_ref(text):
|
||||
try:
|
||||
float(text)
|
||||
except ValueError:
|
||||
return
|
||||
ser.write(f'R{text}\n'.encode())
|
||||
|
||||
def on_pid(text):
|
||||
parts = [p.strip() for p in text.split(',')]
|
||||
if len(parts) != 3:
|
||||
return
|
||||
try:
|
||||
[float(p) for p in parts]
|
||||
except ValueError:
|
||||
return
|
||||
ser.write(f'P{",".join(parts)}\n'.encode())
|
||||
|
||||
def on_ff(_label):
|
||||
ser.write(b'F1\n' if cb_ff.get_status()[0] else b'F0\n')
|
||||
|
||||
tb_ref.on_submit(on_ref)
|
||||
tb_pid.on_submit(on_pid)
|
||||
cb_ff.on_clicked(on_ff)
|
||||
|
||||
def poll_serial():
|
||||
# Drain everything in the OS buffer each frame
|
||||
while ser.in_waiting:
|
||||
raw = ser.readline()
|
||||
try:
|
||||
parts = raw.decode('ascii', 'ignore').strip().split(',')
|
||||
if len(parts) != 5:
|
||||
continue
|
||||
f, b, a = float(parts[0]), float(parts[1]), float(parts[2])
|
||||
p = int(parts[3])
|
||||
on = int(parts[4])
|
||||
except ValueError:
|
||||
continue
|
||||
t_buf.append(time.time() - t0)
|
||||
front.append(f); back.append(b); avg.append(a)
|
||||
pwm.append(p); on_buf.append(on)
|
||||
|
||||
def update(_frame):
|
||||
poll_serial()
|
||||
if not t_buf:
|
||||
return l_front, l_back, l_avg, l_pwm, mode_txt
|
||||
xs = list(t_buf)
|
||||
l_front.set_data(xs, list(front))
|
||||
l_back .set_data(xs, list(back))
|
||||
l_avg .set_data(xs, list(avg))
|
||||
l_pwm .set_data(xs, list(pwm))
|
||||
ax_mm.relim(); ax_mm.autoscale_view(scalex=True, scaley=True)
|
||||
ax_pwm.set_xlim(xs[0], max(xs[-1], xs[0] + 1e-3))
|
||||
return l_front, l_back, l_avg, l_pwm, mode_txt
|
||||
|
||||
ani = FuncAnimation(fig, update, interval=50, blit=False,
|
||||
cache_frame_data=False)
|
||||
try:
|
||||
plt.show()
|
||||
finally:
|
||||
try:
|
||||
ser.write(b'0\n') # safety: turn output off on exit
|
||||
ser.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user