Fixed loophole where device left in streaming state on app closure

This commit is contained in:
2026-01-20 00:52:07 -06:00
parent c8c15df889
commit 5bab86c703
2 changed files with 151 additions and 51 deletions

View File

@@ -70,6 +70,7 @@ import serial
import serial.tools.list_ports
import json
import time
import threading
from typing import Optional, List, Dict, Any
from enum import Enum
@@ -113,6 +114,7 @@ class RealSerialStream:
self.serial: Optional[serial.Serial] = None
self.state = ConnectionState.DISCONNECTED
self.device_info: Optional[Dict[str, Any]] = None
self._auto_detect_result: Optional[Dict[str, Any]] = None # Cache for concurrent auto-detect
def connect(self, timeout: float = 5.0) -> Dict[str, Any]:
"""
@@ -167,7 +169,7 @@ class RealSerialStream:
# Perform handshake
try:
# Send connect command
# Send connect command (works from any device state - handles reconnection)
connect_cmd = {"cmd": "connect"}
self._send_json(connect_cmd)
print(f"[SERIAL] Sent: {connect_cmd}")
@@ -185,8 +187,14 @@ class RealSerialStream:
print(f"[SERIAL] Handshake complete: {response}")
return response
except json.JSONDecodeError:
# Ignore non-JSON lines (might be startup messages)
print(f"[SERIAL] Ignoring: {line.strip()}")
# Ignore non-JSON lines (startup messages, residual CSV data from reconnection)
# This allows reconnection even if device was streaming when app crashed
if line and line[0].isdigit() and ',' in line:
# Residual CSV data - ignore silently
pass
else:
# Other non-JSON - show for debugging
print(f"[SERIAL] Ignoring: {line.strip()}")
continue
# Timeout reached
@@ -331,31 +339,119 @@ class RealSerialStream:
def _auto_detect_port(self) -> Optional[str]:
"""
@brief Attempt to auto-detect the ESP32 serial port.
@brief Attempt to auto-detect the ESP32 serial port using concurrent handshake.
Looks for common USB-UART bridge chips used on ESP32 dev boards:
- CP210x (Silicon Labs)
- CH340 (WCH)
- FTDI
Sends connect command to all available ports simultaneously and selects
the first one that responds with valid handshake acknowledgment.
This is more reliable than USB chip detection since it verifies the
device is actually running the expected firmware.
@return Port name if found, None otherwise.
@note Stores device_info in self._auto_detect_result for reuse by connect()
"""
ports = serial.tools.list_ports.comports()
# Known USB-UART chip identifiers
known_chips = ['cp210', 'ch340', 'ftdi', 'usb-serial', 'usb serial']
if not ports:
print("[SERIAL] No serial ports found")
return None
for port in ports:
description_lower = port.description.lower()
if any(chip in description_lower for chip in known_chips):
print(f"[SERIAL] Auto-detected ESP32 on {port.device}")
return port.device
# Fallback: use first available port
if ports:
print(f"[SERIAL] No ESP32 detected, using first port: {ports[0].device}")
if len(ports) == 1:
# Only one port, no need for concurrent detection
print(f"[SERIAL] Only one port available: {ports[0].device}")
return ports[0].device
print(f"[SERIAL] Auto-detecting ESP32 across {len(ports)} ports...")
# Thread-safe result container
result_lock = threading.Lock()
result = {'port': None, 'device_info': None}
def try_handshake(port_name: str):
"""Attempt handshake on a single port (runs in thread)."""
try:
# Open serial connection with short timeout
ser = serial.Serial(
port=port_name,
baudrate=self.baud_rate,
timeout=0.5
)
# Clear buffer and settle
ser.reset_input_buffer()
time.sleep(0.05)
# Send connect command (resets device state if needed)
connect_cmd = {"cmd": "connect"}
json_str = json.dumps(connect_cmd) + "\n"
ser.write(json_str.encode('utf-8'))
ser.flush()
# Wait for acknowledgment (max 2 seconds)
start_time = time.time()
while (time.time() - start_time) < 2.0:
line_bytes = ser.readline()
if line_bytes:
try:
line = line_bytes.decode('utf-8', errors='ignore').strip()
response = json.loads(line)
if response.get("status") == "ack_connect":
# Found it! Store result if we're first
with result_lock:
if result['port'] is None:
result['port'] = port_name
result['device_info'] = response
print(f"[SERIAL] ✓ ESP32 found on {port_name}: {response.get('device', 'Unknown')}")
# Send disconnect to return device to IDLE
disconnect_cmd = {"cmd": "disconnect"}
json_str = json.dumps(disconnect_cmd) + "\n"
ser.write(json_str.encode('utf-8'))
ser.flush()
time.sleep(0.05)
ser.close()
return
except json.JSONDecodeError:
# Ignore non-JSON (residual CSV data, startup messages)
continue
# No valid response
ser.close()
except (serial.SerialException, OSError):
# Port unavailable or busy - skip silently
pass
# Launch concurrent handshake attempts
threads = []
for port in ports:
thread = threading.Thread(target=try_handshake, args=(port.device,), daemon=True)
thread.start()
threads.append(thread)
# Poll for first success or timeout (instead of sequential joins)
start_time = time.time()
max_wait = 2.5
while (time.time() - start_time) < max_wait:
# Check if we found a device
with result_lock:
if result['port'] is not None:
# Success! Return immediately
self._auto_detect_result = result
elapsed = time.time() - start_time
print(f"[SERIAL] Auto-detect complete in {elapsed:.2f}s")
return result['port']
# Check if all threads finished (no device found)
if not any(thread.is_alive() for thread in threads):
break
# Brief sleep to avoid busy-waiting CPU
time.sleep(0.05)
# Timeout or all threads finished without success
print("[SERIAL] No ESP32 responded to handshake on any port")
return None
@staticmethod