Files

448 lines
17 KiB
Python
Raw Permalink Normal View History

2026-04-11 21:15:01 -05:00
#!/usr/bin/env python3
"""
A0 Calibration standalone tkinter GUI
Usage: python calibrate_a0.py
Deps: pyserial numpy pandas openpyxl matplotlib
"""
import re
import time
import threading
from datetime import datetime
from pathlib import Path
import numpy as np
import tkinter as tk
from tkinter import ttk, messagebox
import matplotlib
matplotlib.use('TkAgg')
from matplotlib.figure import Figure
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import serial
import serial.tools.list_ports
import pandas as pd
# ── Configuration ─────────────────────────────────────────────
BAUD_RATE = 2_000_000
MM_MIN = 0.0
MM_MAX = 20.0
BUCKET_STEP = 0.05
BUCKET_WINDOW = 0.02
MIN_SAMPLES = 50
DATA_DIR = Path(__file__).parent / 'data'
DRAW_INTERVAL_MS = 300 # GUI redraw period — worker is never throttled by this
DATA_DIR.mkdir(exist_ok=True)
N_BUCKETS = int(round((MM_MAX - MM_MIN) / BUCKET_STEP)) + 1
bucket_centers = np.round(MM_MIN + np.arange(N_BUCKETS) * BUCKET_STEP, 4)
# Bytes regex: avoids decode overhead on every line in the hot path
LINE_RE = re.compile(rb'^\s*(-?\d+\.?\d*)\s*,\s*(\d+)\s*$')
class CalibrationApp:
def __init__(self, root: tk.Tk):
self.root = root
self.root.title('A0 Sensor Calibration')
self.root.minsize(900, 650)
# ── Shared bucket state (worker writes, GUI reads snapshots) ──
self.counts = np.zeros(N_BUCKETS, dtype=np.int64)
self.means = np.zeros(N_BUCKETS, dtype=np.float64)
self.data_lock = threading.Lock()
# ── Display stats: written by worker, read by GUI ──
self._stat_parsed = 0
self._stat_bucketed = 0
self._stat_malformed = 0
self._stats_lock = threading.Lock()
# ── Thread / connection state ──
self.ser = None
self.worker_thread = None
self.worker_stop = threading.Event()
self.collecting = False
self._build_ui()
self._refresh_ports()
self._schedule_draw()
# ── UI construction ───────────────────────────────────────
def _build_ui(self):
# Connection bar
top = ttk.Frame(self.root, padding=(6, 6, 6, 2))
top.pack(fill='x')
ttk.Label(top, text='Port:').pack(side='left')
self.port_var = tk.StringVar()
self.port_cb = ttk.Combobox(top, textvariable=self.port_var,
width=30, state='readonly')
self.port_cb.pack(side='left', padx=(4, 6))
ttk.Button(top, text='', width=3,
command=self._refresh_ports).pack(side='left', padx=2)
self.conn_btn = ttk.Button(top, text='Connect',
command=self._toggle_connect)
self.conn_btn.pack(side='left', padx=2)
self.conn_label = ttk.Label(top, text='● Disconnected',
foreground='#888')
self.conn_label.pack(side='left', padx=8)
# Control bar
ctrl = ttk.Frame(self.root, padding=(6, 2, 6, 2))
ctrl.pack(fill='x')
self.start_btn = ttk.Button(ctrl, text='Start Collection',
command=self._start, state='disabled')
self.start_btn.pack(side='left', padx=2)
self.stop_btn = ttk.Button(ctrl, text='Stop Collection',
command=self._stop, state='disabled')
self.stop_btn.pack(side='left', padx=2)
ttk.Button(ctrl, text='Export Excel',
command=self._export).pack(side='left', padx=(14, 2))
ttk.Separator(ctrl, orient='vertical').pack(side='left',
fill='y', padx=8)
self.progress_var = tk.IntVar(value=0)
self.progress_bar = ttk.Progressbar(ctrl, variable=self.progress_var,
maximum=N_BUCKETS, length=260)
self.progress_bar.pack(side='left')
self.progress_lbl = ttk.Label(ctrl, text=f' 0 / {N_BUCKETS} buckets')
self.progress_lbl.pack(side='left')
# Stats bar
sbar = ttk.Frame(self.root, padding=(6, 0, 6, 2))
sbar.pack(fill='x')
self.stats_lbl = ttk.Label(sbar,
text='parsed=0 bucketed=0 malformed=0',
foreground='#555')
self.stats_lbl.pack(side='left')
# Matplotlib canvas
self.fig = Figure(tight_layout=True)
self.ax1 = self.fig.add_subplot(2, 1, 1)
self.ax2 = self.fig.add_subplot(2, 1, 2, sharex=self.ax1)
self.ax1.set_ylabel('Avg A0 ADC')
self.ax1.set_title('Calibration curve')
self.ax1.grid(True, alpha=0.3)
self.ax2.set_xlabel('Position (mm)')
self.ax2.set_ylabel('Sample count')
self.ax2.set_title(
'Bucket fill progress (green = done, orange = partial, red = empty)')
self.ax2.axhline(MIN_SAMPLES, color='red', linestyle='--',
linewidth=1, label=f'target = {MIN_SAMPLES}')
self.ax2.legend(fontsize=8, loc='upper right')
self.ax2.grid(True, alpha=0.3)
# Create empty line and bar artists to update later
self.line, = self.ax1.plot([], [], 'b.-', markersize=3)
self.bars = self.ax2.bar(bucket_centers, np.zeros(N_BUCKETS), width=BUCKET_STEP * 0.9)
self.canvas = FigureCanvasTkAgg(self.fig, master=self.root)
self.canvas.get_tk_widget().pack(fill='both', expand=True,
padx=6, pady=(2, 6))
# ── Port management ───────────────────────────────────────
def _refresh_ports(self):
ports = list(serial.tools.list_ports.comports())
self.port_cb['values'] = [p.device for p in ports]
if not ports:
return
for p in ports:
d = (p.description or '').lower()
if ('usbmodem' in p.device or 'usbserial' in p.device.lower()
or 'arduino' in d or 'wch' in d):
self.port_var.set(p.device)
return
if not self.port_var.get():
self.port_var.set(ports[0].device)
# ── Connection ────────────────────────────────────────────
def _toggle_connect(self):
if self.ser and self.ser.is_open:
self._disconnect()
else:
self._connect()
def _connect(self):
port = self.port_var.get()
if not port:
messagebox.showerror('No port', 'Select a serial port first.')
return
try:
# timeout=0.05: readline() returns within 50 ms when no data,
# so the worker's stop-event check is always responsive.
self.ser = serial.Serial(port, BAUD_RATE, timeout=0.05)
except serial.SerialException as e:
messagebox.showerror('Connect failed', str(e))
return
self.conn_btn.config(state='disabled')
self.conn_label.config(text='⏳ Connecting…', foreground='orange')
threading.Thread(target=self._handshake, daemon=True).start()
def _handshake(self):
"""Runs in a daemon thread so the GUI stays responsive during waits."""
try:
time.sleep(3.0) # cover Arduino reset/boot
self.ser.reset_input_buffer()
self.ser.write(b'X') # stop stream if already running
time.sleep(0.2)
self.ser.reset_input_buffer()
self.ser.write(b'S') # send wake byte
deadline = time.time() + 15.0
while time.time() < deadline:
raw = self.ser.readline()
if raw.strip() == b'#READY':
self.ser.readline() # discard potential partial first line
self.root.after(0, self._on_connected)
return
self.root.after(0, lambda: self._on_connect_failed(
'Did not receive #READY within 15 s.\n'
'Check sketch upload and baud rate.'))
except Exception as e:
self.root.after(0, lambda err=str(e): self._on_connect_failed(err))
def _on_connected(self):
self.conn_label.config(text='● Connected', foreground='green')
self.conn_btn.config(text='Disconnect', state='normal')
self.start_btn.config(state='normal')
def _on_connect_failed(self, msg):
messagebox.showerror('Connection failed', msg)
try:
self.ser.close()
except Exception:
pass
self.ser = None
self.conn_label.config(text='● Disconnected', foreground='#888')
self.conn_btn.config(text='Connect', state='normal')
def _disconnect(self):
self._stop()
try:
if self.ser and self.ser.is_open:
self.ser.write(b'X')
self.ser.close()
except Exception:
pass
self.ser = None
self.conn_label.config(text='● Disconnected', foreground='#888')
self.conn_btn.config(text='Connect', state='normal')
self.start_btn.config(state='disabled')
self.stop_btn.config(state='disabled')
# ── Collection control ────────────────────────────────────
def _start(self):
if self.collecting:
return
with self.data_lock:
self.counts[:] = 0
self.means[:] = 0.0
with self._stats_lock:
self._stat_parsed = self._stat_bucketed = self._stat_malformed = 0
self.collecting = True
self.worker_stop.clear()
self.worker_thread = threading.Thread(
target=self._worker, daemon=True, name='serial-worker')
self.worker_thread.start()
self.start_btn.config(state='disabled')
self.stop_btn.config(state='normal')
def _stop(self):
if not self.collecting:
return
self.collecting = False
self.worker_stop.set()
if self.worker_thread:
self.worker_thread.join(timeout=1.0)
self.start_btn.config(
state='normal' if (self.ser and self.ser.is_open) else 'disabled')
self.stop_btn.config(state='disabled')
def _auto_complete(self):
"""Called from worker via root.after — runs on the GUI thread."""
self._stop()
messagebox.showinfo(
'Collection complete',
f'All {N_BUCKETS} buckets reached ≥{MIN_SAMPLES} samples.\n'
'Click Export Excel to save.')
# ── Serial worker ─────────────────────────────────────────
#
# Design priorities:
# 1. Never miss a byte — reads everything in the OS buffer each wake
# 2. Never wait on the GUI — lock hold-time is a single array update
# 3. Auto-complete in O(1) — tracks filled count incrementally
#
def _worker(self):
ser = self.ser
stop = self.worker_stop
counts = self.counts
means = self.means
lock = self.data_lock
centers = bucket_centers
parsed = bucketed = malformed = filled_count = 0
buf = bytearray()
STAT_EVERY = 500 # flush display stats every N parsed lines
while not stop.is_set():
waiting = ser.in_waiting
if waiting:
try:
buf += ser.read(waiting) # single syscall for all bytes
except serial.SerialException:
break
# Process every complete line accumulated in the buffer
while b'\n' in buf:
line, buf = buf.split(b'\n', 1)
line = line.strip()
if not line or line[0:1] == b'#':
continue
m = LINE_RE.match(line)
if not m:
malformed += 1
continue
parsed += 1
mm = float(m.group(1))
adc = int(m.group(2))
idx = int(round((mm - MM_MIN) / BUCKET_STEP))
if (0 <= idx < N_BUCKETS
and abs(mm - centers[idx]) <= BUCKET_WINDOW + 1e-9):
with lock:
counts[idx] += 1
means[idx] += (adc - means[idx]) / counts[idx]
just_filled = (counts[idx] == MIN_SAMPLES)
if just_filled:
filled_count += 1
if filled_count == N_BUCKETS:
# Flush final stats then signal the GUI
with self._stats_lock:
self._stat_parsed = parsed
self._stat_bucketed = bucketed + 1
self._stat_malformed = malformed
self.root.after(0, self._auto_complete)
return
bucketed += 1
if parsed % STAT_EVERY == 0:
with self._stats_lock:
self._stat_parsed = parsed
self._stat_bucketed = bucketed
self._stat_malformed = malformed
else:
# Serial quiet — yield briefly so stop_event can be noticed
time.sleep(0.0005)
# Final stats flush on manual stop
with self._stats_lock:
self._stat_parsed = parsed
self._stat_bucketed = bucketed
self._stat_malformed = malformed
# ── GUI draw (runs on main thread via root.after) ─────────
def _schedule_draw(self):
self._draw()
self.root.after(DRAW_INTERVAL_MS, self._schedule_draw)
def _draw(self):
with self.data_lock:
counts = self.counts.copy()
means = self.means.copy()
with self._stats_lock:
parsed = self._stat_parsed
bucketed = self._stat_bucketed
malformed = self._stat_malformed
filled = int(np.sum(counts >= MIN_SAMPLES))
self.progress_var.set(filled)
self.progress_lbl.config(text=f' {filled} / {N_BUCKETS} buckets')
self.stats_lbl.config(
text=f'parsed={parsed} bucketed={bucketed} malformed={malformed}')
# --- NEW: Update existing artists instead of clearing axes ---
has = counts > 0
if has.any():
self.line.set_data(bucket_centers[has], means[has])
self.ax1.relim() # Recalculate limits
self.ax1.autoscale_view() # Rescale axes
colors = [
'#2ecc71' if c >= MIN_SAMPLES
else '#f39c12' if c > 0
else '#e74c3c'
for c in counts
]
# Update each bar's height and color
for bar, count, color in zip(self.bars, counts, colors):
bar.set_height(count)
bar.set_facecolor(color)
self.ax2.relim()
self.ax2.autoscale_view()
self.canvas.draw_idle()
# ── Export ────────────────────────────────────────────────
def _export(self):
with self.data_lock:
counts = self.counts.copy()
means = self.means.copy()
if not counts.any():
messagebox.showwarning('No data', 'No data collected yet.')
return
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
out_path = DATA_DIR / f'a0_calibration_{ts}.xlsx'
has = counts > 0
pd.DataFrame({
'mm_val': bucket_centers[has],
'avg_adc': np.round(means[has], 3),
}).to_excel(out_path, index=False)
short = int(np.sum((counts > 0) & (counts < MIN_SAMPLES)))
empty = int(np.sum(counts == 0))
msg = f'Exported {int(has.sum())} rows to:\n{out_path}'
if short:
msg += f'\n\n{short} bucket(s) have data but < {MIN_SAMPLES} samples.'
if empty:
msg += f'\n{empty} empty bucket(s) omitted.'
messagebox.showinfo('Exported', msg)
# ── Cleanup ───────────────────────────────────────────────
def on_close(self):
self._stop()
try:
if self.ser and self.ser.is_open:
self.ser.write(b'X')
self.ser.close()
except Exception:
pass
self.root.destroy()
if __name__ == '__main__':
root = tk.Tk()
app = CalibrationApp(root)
root.protocol('WM_DELETE_WINDOW', app.on_close)
root.mainloop()