#!/usr/bin/env python3 """ A0 Calibration — standalone tkinter GUI Usage: python calibrate_a0.py Deps: pyserial numpy pandas openpyxl matplotlib """ import re import time import threading from datetime import datetime from pathlib import Path import numpy as np import tkinter as tk from tkinter import ttk, messagebox import matplotlib matplotlib.use('TkAgg') from matplotlib.figure import Figure from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import serial import serial.tools.list_ports import pandas as pd # ── Configuration ───────────────────────────────────────────── BAUD_RATE = 2_000_000 MM_MIN = 0.0 MM_MAX = 20.0 BUCKET_STEP = 0.05 BUCKET_WINDOW = 0.02 MIN_SAMPLES = 50 DATA_DIR = Path(__file__).parent / 'data' DRAW_INTERVAL_MS = 300 # GUI redraw period — worker is never throttled by this DATA_DIR.mkdir(exist_ok=True) N_BUCKETS = int(round((MM_MAX - MM_MIN) / BUCKET_STEP)) + 1 bucket_centers = np.round(MM_MIN + np.arange(N_BUCKETS) * BUCKET_STEP, 4) # Bytes regex: avoids decode overhead on every line in the hot path LINE_RE = re.compile(rb'^\s*(-?\d+\.?\d*)\s*,\s*(\d+)\s*$') class CalibrationApp: def __init__(self, root: tk.Tk): self.root = root self.root.title('A0 Sensor Calibration') self.root.minsize(900, 650) # ── Shared bucket state (worker writes, GUI reads snapshots) ── self.counts = np.zeros(N_BUCKETS, dtype=np.int64) self.means = np.zeros(N_BUCKETS, dtype=np.float64) self.data_lock = threading.Lock() # ── Display stats: written by worker, read by GUI ── self._stat_parsed = 0 self._stat_bucketed = 0 self._stat_malformed = 0 self._stats_lock = threading.Lock() # ── Thread / connection state ── self.ser = None self.worker_thread = None self.worker_stop = threading.Event() self.collecting = False self._build_ui() self._refresh_ports() self._schedule_draw() # ── UI construction ─────────────────────────────────────── def _build_ui(self): # Connection bar top = ttk.Frame(self.root, padding=(6, 6, 6, 2)) top.pack(fill='x') ttk.Label(top, text='Port:').pack(side='left') self.port_var = tk.StringVar() self.port_cb = ttk.Combobox(top, textvariable=self.port_var, width=30, state='readonly') self.port_cb.pack(side='left', padx=(4, 6)) ttk.Button(top, text='↺', width=3, command=self._refresh_ports).pack(side='left', padx=2) self.conn_btn = ttk.Button(top, text='Connect', command=self._toggle_connect) self.conn_btn.pack(side='left', padx=2) self.conn_label = ttk.Label(top, text='● Disconnected', foreground='#888') self.conn_label.pack(side='left', padx=8) # Control bar ctrl = ttk.Frame(self.root, padding=(6, 2, 6, 2)) ctrl.pack(fill='x') self.start_btn = ttk.Button(ctrl, text='Start Collection', command=self._start, state='disabled') self.start_btn.pack(side='left', padx=2) self.stop_btn = ttk.Button(ctrl, text='Stop Collection', command=self._stop, state='disabled') self.stop_btn.pack(side='left', padx=2) ttk.Button(ctrl, text='Export Excel', command=self._export).pack(side='left', padx=(14, 2)) ttk.Separator(ctrl, orient='vertical').pack(side='left', fill='y', padx=8) self.progress_var = tk.IntVar(value=0) self.progress_bar = ttk.Progressbar(ctrl, variable=self.progress_var, maximum=N_BUCKETS, length=260) self.progress_bar.pack(side='left') self.progress_lbl = ttk.Label(ctrl, text=f' 0 / {N_BUCKETS} buckets') self.progress_lbl.pack(side='left') # Stats bar sbar = ttk.Frame(self.root, padding=(6, 0, 6, 2)) sbar.pack(fill='x') self.stats_lbl = ttk.Label(sbar, text='parsed=0 bucketed=0 malformed=0', foreground='#555') self.stats_lbl.pack(side='left') # Matplotlib canvas self.fig = Figure(tight_layout=True) self.ax1 = self.fig.add_subplot(2, 1, 1) self.ax2 = self.fig.add_subplot(2, 1, 2, sharex=self.ax1) self.ax1.set_ylabel('Avg A0 ADC') self.ax1.set_title('Calibration curve') self.ax1.grid(True, alpha=0.3) self.ax2.set_xlabel('Position (mm)') self.ax2.set_ylabel('Sample count') self.ax2.set_title( 'Bucket fill progress (green = done, orange = partial, red = empty)') self.ax2.axhline(MIN_SAMPLES, color='red', linestyle='--', linewidth=1, label=f'target = {MIN_SAMPLES}') self.ax2.legend(fontsize=8, loc='upper right') self.ax2.grid(True, alpha=0.3) # Create empty line and bar artists to update later self.line, = self.ax1.plot([], [], 'b.-', markersize=3) self.bars = self.ax2.bar(bucket_centers, np.zeros(N_BUCKETS), width=BUCKET_STEP * 0.9) self.canvas = FigureCanvasTkAgg(self.fig, master=self.root) self.canvas.get_tk_widget().pack(fill='both', expand=True, padx=6, pady=(2, 6)) # ── Port management ─────────────────────────────────────── def _refresh_ports(self): ports = list(serial.tools.list_ports.comports()) self.port_cb['values'] = [p.device for p in ports] if not ports: return for p in ports: d = (p.description or '').lower() if ('usbmodem' in p.device or 'usbserial' in p.device.lower() or 'arduino' in d or 'wch' in d): self.port_var.set(p.device) return if not self.port_var.get(): self.port_var.set(ports[0].device) # ── Connection ──────────────────────────────────────────── def _toggle_connect(self): if self.ser and self.ser.is_open: self._disconnect() else: self._connect() def _connect(self): port = self.port_var.get() if not port: messagebox.showerror('No port', 'Select a serial port first.') return try: # timeout=0.05: readline() returns within 50 ms when no data, # so the worker's stop-event check is always responsive. self.ser = serial.Serial(port, BAUD_RATE, timeout=0.05) except serial.SerialException as e: messagebox.showerror('Connect failed', str(e)) return self.conn_btn.config(state='disabled') self.conn_label.config(text='⏳ Connecting…', foreground='orange') threading.Thread(target=self._handshake, daemon=True).start() def _handshake(self): """Runs in a daemon thread so the GUI stays responsive during waits.""" try: time.sleep(3.0) # cover Arduino reset/boot self.ser.reset_input_buffer() self.ser.write(b'X') # stop stream if already running time.sleep(0.2) self.ser.reset_input_buffer() self.ser.write(b'S') # send wake byte deadline = time.time() + 15.0 while time.time() < deadline: raw = self.ser.readline() if raw.strip() == b'#READY': self.ser.readline() # discard potential partial first line self.root.after(0, self._on_connected) return self.root.after(0, lambda: self._on_connect_failed( 'Did not receive #READY within 15 s.\n' 'Check sketch upload and baud rate.')) except Exception as e: self.root.after(0, lambda err=str(e): self._on_connect_failed(err)) def _on_connected(self): self.conn_label.config(text='● Connected', foreground='green') self.conn_btn.config(text='Disconnect', state='normal') self.start_btn.config(state='normal') def _on_connect_failed(self, msg): messagebox.showerror('Connection failed', msg) try: self.ser.close() except Exception: pass self.ser = None self.conn_label.config(text='● Disconnected', foreground='#888') self.conn_btn.config(text='Connect', state='normal') def _disconnect(self): self._stop() try: if self.ser and self.ser.is_open: self.ser.write(b'X') self.ser.close() except Exception: pass self.ser = None self.conn_label.config(text='● Disconnected', foreground='#888') self.conn_btn.config(text='Connect', state='normal') self.start_btn.config(state='disabled') self.stop_btn.config(state='disabled') # ── Collection control ──────────────────────────────────── def _start(self): if self.collecting: return with self.data_lock: self.counts[:] = 0 self.means[:] = 0.0 with self._stats_lock: self._stat_parsed = self._stat_bucketed = self._stat_malformed = 0 self.collecting = True self.worker_stop.clear() self.worker_thread = threading.Thread( target=self._worker, daemon=True, name='serial-worker') self.worker_thread.start() self.start_btn.config(state='disabled') self.stop_btn.config(state='normal') def _stop(self): if not self.collecting: return self.collecting = False self.worker_stop.set() if self.worker_thread: self.worker_thread.join(timeout=1.0) self.start_btn.config( state='normal' if (self.ser and self.ser.is_open) else 'disabled') self.stop_btn.config(state='disabled') def _auto_complete(self): """Called from worker via root.after — runs on the GUI thread.""" self._stop() messagebox.showinfo( 'Collection complete', f'All {N_BUCKETS} buckets reached ≥{MIN_SAMPLES} samples.\n' 'Click Export Excel to save.') # ── Serial worker ───────────────────────────────────────── # # Design priorities: # 1. Never miss a byte — reads everything in the OS buffer each wake # 2. Never wait on the GUI — lock hold-time is a single array update # 3. Auto-complete in O(1) — tracks filled count incrementally # def _worker(self): ser = self.ser stop = self.worker_stop counts = self.counts means = self.means lock = self.data_lock centers = bucket_centers parsed = bucketed = malformed = filled_count = 0 buf = bytearray() STAT_EVERY = 500 # flush display stats every N parsed lines while not stop.is_set(): waiting = ser.in_waiting if waiting: try: buf += ser.read(waiting) # single syscall for all bytes except serial.SerialException: break # Process every complete line accumulated in the buffer while b'\n' in buf: line, buf = buf.split(b'\n', 1) line = line.strip() if not line or line[0:1] == b'#': continue m = LINE_RE.match(line) if not m: malformed += 1 continue parsed += 1 mm = float(m.group(1)) adc = int(m.group(2)) idx = int(round((mm - MM_MIN) / BUCKET_STEP)) if (0 <= idx < N_BUCKETS and abs(mm - centers[idx]) <= BUCKET_WINDOW + 1e-9): with lock: counts[idx] += 1 means[idx] += (adc - means[idx]) / counts[idx] just_filled = (counts[idx] == MIN_SAMPLES) if just_filled: filled_count += 1 if filled_count == N_BUCKETS: # Flush final stats then signal the GUI with self._stats_lock: self._stat_parsed = parsed self._stat_bucketed = bucketed + 1 self._stat_malformed = malformed self.root.after(0, self._auto_complete) return bucketed += 1 if parsed % STAT_EVERY == 0: with self._stats_lock: self._stat_parsed = parsed self._stat_bucketed = bucketed self._stat_malformed = malformed else: # Serial quiet — yield briefly so stop_event can be noticed time.sleep(0.0005) # Final stats flush on manual stop with self._stats_lock: self._stat_parsed = parsed self._stat_bucketed = bucketed self._stat_malformed = malformed # ── GUI draw (runs on main thread via root.after) ───────── def _schedule_draw(self): self._draw() self.root.after(DRAW_INTERVAL_MS, self._schedule_draw) def _draw(self): with self.data_lock: counts = self.counts.copy() means = self.means.copy() with self._stats_lock: parsed = self._stat_parsed bucketed = self._stat_bucketed malformed = self._stat_malformed filled = int(np.sum(counts >= MIN_SAMPLES)) self.progress_var.set(filled) self.progress_lbl.config(text=f' {filled} / {N_BUCKETS} buckets') self.stats_lbl.config( text=f'parsed={parsed} bucketed={bucketed} malformed={malformed}') # --- NEW: Update existing artists instead of clearing axes --- has = counts > 0 if has.any(): self.line.set_data(bucket_centers[has], means[has]) self.ax1.relim() # Recalculate limits self.ax1.autoscale_view() # Rescale axes colors = [ '#2ecc71' if c >= MIN_SAMPLES else '#f39c12' if c > 0 else '#e74c3c' for c in counts ] # Update each bar's height and color for bar, count, color in zip(self.bars, counts, colors): bar.set_height(count) bar.set_facecolor(color) self.ax2.relim() self.ax2.autoscale_view() self.canvas.draw_idle() # ── Export ──────────────────────────────────────────────── def _export(self): with self.data_lock: counts = self.counts.copy() means = self.means.copy() if not counts.any(): messagebox.showwarning('No data', 'No data collected yet.') return ts = datetime.now().strftime('%Y%m%d_%H%M%S') out_path = DATA_DIR / f'a0_calibration_{ts}.xlsx' has = counts > 0 pd.DataFrame({ 'mm_val': bucket_centers[has], 'avg_adc': np.round(means[has], 3), }).to_excel(out_path, index=False) short = int(np.sum((counts > 0) & (counts < MIN_SAMPLES))) empty = int(np.sum(counts == 0)) msg = f'Exported {int(has.sum())} rows to:\n{out_path}' if short: msg += f'\n\n{short} bucket(s) have data but < {MIN_SAMPLES} samples.' if empty: msg += f'\n{empty} empty bucket(s) omitted.' messagebox.showinfo('Exported', msg) # ── Cleanup ─────────────────────────────────────────────── def on_close(self): self._stop() try: if self.ser and self.ser.is_open: self.ser.write(b'X') self.ser.close() except Exception: pass self.root.destroy() if __name__ == '__main__': root = tk.Tk() app = CalibrationApp(root) root.protocol('WM_DELETE_WINDOW', app.on_close) root.mainloop()