Files
EMG_Arm/serial_stream.py

572 lines
20 KiB
Python
Raw Normal View History

"""
@file serial_stream.py
2026-01-20 00:25:52 -06:00
@brief Real serial stream for reading EMG data from ESP32 with handshake protocol.
This module provides a serial communication interface for receiving
2026-01-20 00:25:52 -06:00
EMG data from the ESP32 microcontroller over USB. It implements a
robust handshake protocol to ensure reliable connection before streaming.
@section usage Usage Example
@code{.py}
from serial_stream import RealSerialStream
# Create stream (auto-detects port, or specify manually)
stream = RealSerialStream(port='COM3')
2026-01-20 00:25:52 -06:00
# Connect with handshake (raises on timeout/failure)
stream.connect(timeout=5.0)
# Start streaming
stream.start()
2026-01-20 00:25:52 -06:00
# Read data
while True:
line = stream.readline()
if line:
print(line) # "1234,512,489,501,523"
2026-01-20 00:25:52 -06:00
# Stop streaming (device returns to CONNECTED state)
stream.stop()
2026-01-20 00:25:52 -06:00
# Disconnect (device returns to IDLE state)
stream.disconnect()
@endcode
2026-01-20 00:25:52 -06:00
@section protocol Handshake Protocol & State Machine
ESP32 States:
IDLE - Waiting for "connect" command
CONNECTED - Handshake complete, waiting for "start" command
STREAMING - Actively sending CSV data
State Transitions:
1. connect() : IDLE CONNECTED
App sends: {"cmd": "connect"}
Device responds: {"status": "ack_connect", "device": "ESP32-EMG", "channels": 4}
2. start() : CONNECTED STREAMING
App sends: {"cmd": "start"}
Device starts streaming CSV data
3. stop() : STREAMING CONNECTED
App sends: {"cmd": "stop"}
Device stops streaming, ready for new start command
4. disconnect() : ANY IDLE
App sends: {"cmd": "disconnect"}
Device returns to idle, waiting for new connection
@section format Data Format
The ESP32 sends data as CSV lines:
"timestamp_ms,ch0,ch1,ch2,ch3\\n"
Example:
"12345,512,489,501,523\\n"
@author Bucky Arm Project
"""
import serial
import serial.tools.list_ports
2026-01-20 00:25:52 -06:00
import json
import time
import threading
2026-01-20 00:25:52 -06:00
from typing import Optional, List, Dict, Any
from enum import Enum
class ConnectionState(Enum):
"""Connection states for the serial stream."""
DISCONNECTED = 0 # No serial connection
CONNECTING = 1 # Serial open, waiting for handshake
CONNECTED = 2 # Handshake complete, ready to stream
STREAMING = 3 # Actively streaming data
class RealSerialStream:
"""
2026-01-20 00:25:52 -06:00
@brief Reads EMG data from ESP32 over USB serial with handshake protocol.
2026-01-20 00:25:52 -06:00
This class implements a robust connection protocol:
- connect() : Open serial port and perform handshake
- start() : Begin streaming data
- stop() : Stop streaming (device stays connected)
- disconnect() : Close connection (device returns to idle)
- readline() : Read one line of data
2026-01-20 00:25:52 -06:00
State machine ensures reliable communication without timing dependencies.
@note Requires pyserial: pip install pyserial
"""
def __init__(self, port: str = None, baud_rate: int = 115200, timeout: float = 1.0):
"""
@brief Initialize the serial stream.
@param port Serial port name (e.g., 'COM3' on Windows, '/dev/ttyUSB0' on Linux).
If None, will attempt to auto-detect the ESP32.
@param baud_rate Communication speed in bits per second. Default 115200 matches ESP32.
2026-01-20 00:25:52 -06:00
@param timeout Read timeout in seconds for readline().
"""
self.port = port
self.baud_rate = baud_rate
self.timeout = timeout
self.serial: Optional[serial.Serial] = None
2026-01-20 00:25:52 -06:00
self.state = ConnectionState.DISCONNECTED
self.device_info: Optional[Dict[str, Any]] = None
self._auto_detect_result: Optional[Dict[str, Any]] = None # Cache for concurrent auto-detect
2026-01-20 00:25:52 -06:00
def connect(self, timeout: float = 5.0) -> Dict[str, Any]:
"""
2026-01-20 00:25:52 -06:00
@brief Connect to the ESP32 and perform handshake.
2026-01-20 00:25:52 -06:00
Opens the serial port, sends connect command, and waits for
acknowledgment from the device.
@param timeout Maximum time to wait for handshake response (seconds).
@return Device info dict from handshake response.
@throws RuntimeError If no port specified and auto-detect fails.
@throws RuntimeError If unable to open the serial port.
2026-01-20 00:25:52 -06:00
@throws TimeoutError If device doesn't respond within timeout.
@throws ValueError If device sends invalid handshake response.
"""
2026-01-20 00:25:52 -06:00
if self.state != ConnectionState.DISCONNECTED:
raise RuntimeError(f"Already in state {self.state.name}, cannot connect")
# Auto-detect port if not specified
if self.port is None:
self.port = self._auto_detect_port()
if self.port is None:
raise RuntimeError(
"No serial port specified and auto-detect failed.\n"
"Use RealSerialStream.list_ports() to see available ports."
)
# Open serial connection
try:
self.serial = serial.Serial(
port=self.port,
baudrate=self.baud_rate,
timeout=self.timeout
)
2026-01-20 00:25:52 -06:00
self.state = ConnectionState.CONNECTING
# Clear any stale data in the buffer
self.serial.reset_input_buffer()
2026-01-20 00:25:52 -06:00
time.sleep(0.1) # Let device settle after port open
2026-01-20 00:25:52 -06:00
print(f"[SERIAL] Port opened: {self.port}")
except serial.SerialException as e:
2026-01-20 00:25:52 -06:00
self.state = ConnectionState.DISCONNECTED
error_msg = f"Failed to open {self.port}: {e}"
if "Permission denied" in str(e) or "Resource busy" in str(e):
2026-01-20 00:25:52 -06:00
error_msg += "\n\nThe port may still be in use. Wait a moment and try again."
raise RuntimeError(error_msg)
2026-01-20 00:25:52 -06:00
# Perform handshake
try:
# Send connect command (works from any device state - handles reconnection)
2026-01-20 00:25:52 -06:00
connect_cmd = {"cmd": "connect"}
self._send_json(connect_cmd)
print(f"[SERIAL] Sent: {connect_cmd}")
# Wait for acknowledgment
start_time = time.time()
while (time.time() - start_time) < timeout:
line = self._readline_raw()
if line:
try:
response = json.loads(line)
if response.get("status") == "ack_connect":
self.device_info = response
self.state = ConnectionState.CONNECTED
print(f"[SERIAL] Handshake complete: {response}")
return response
except json.JSONDecodeError:
# Ignore non-JSON lines (startup messages, residual CSV data from reconnection)
# This allows reconnection even if device was streaming when app crashed
if line and line[0].isdigit() and ',' in line:
# Residual CSV data - ignore silently
pass
else:
# Other non-JSON - show for debugging
print(f"[SERIAL] Ignoring: {line.strip()}")
2026-01-20 00:25:52 -06:00
continue
# Timeout reached
self.state = ConnectionState.DISCONNECTED
if self.serial:
self.serial.close()
self.serial = None
raise TimeoutError(
f"Device did not respond to connection request within {timeout}s.\n"
"Check that the correct firmware is flashed and device is powered on."
)
except Exception as e:
# Clean up on any error
self.state = ConnectionState.DISCONNECTED
if self.serial:
try:
self.serial.close()
except:
pass
self.serial = None
raise
def start(self) -> None:
"""
@brief Start streaming EMG data.
Device must be in CONNECTED state. Sends start command to ESP32,
which begins streaming CSV data.
@throws RuntimeError If not connected.
"""
if self.state != ConnectionState.CONNECTED:
raise RuntimeError(
f"Cannot start streaming from state {self.state.name}. "
"Must call connect() first."
)
# Send start command
start_cmd = {"cmd": "start"}
self._send_json(start_cmd)
self.state = ConnectionState.STREAMING
print(f"[SERIAL] Started streaming")
def stop(self) -> None:
"""
2026-01-20 00:25:52 -06:00
@brief Stop streaming EMG data.
Sends stop command to ESP32, which stops streaming and returns
to CONNECTED state. Connection remains open for restart.
Safe to call even if not streaming.
"""
if self.state == ConnectionState.STREAMING:
try:
stop_cmd = {"cmd": "stop"}
self._send_json(stop_cmd)
self.state = ConnectionState.CONNECTED
print(f"[SERIAL] Stopped streaming")
except Exception as e:
print(f"[SERIAL] Warning during stop: {e}")
def disconnect(self) -> None:
"""
@brief Disconnect from the ESP32.
2026-01-20 00:25:52 -06:00
Sends disconnect command (device returns to IDLE state),
then closes the serial port. Safe to call from any state.
"""
2026-01-20 00:25:52 -06:00
# Send disconnect command if connected
if self.state in (ConnectionState.CONNECTED, ConnectionState.STREAMING):
try:
disconnect_cmd = {"cmd": "disconnect"}
self._send_json(disconnect_cmd)
time.sleep(0.1) # Give device time to process
print(f"[SERIAL] Sent disconnect command")
except Exception as e:
print(f"[SERIAL] Warning sending disconnect: {e}")
2026-01-20 00:25:52 -06:00
# Close serial port
if self.serial:
try:
if self.serial.is_open:
self.serial.close()
2026-01-20 00:25:52 -06:00
print(f"[SERIAL] Port closed: {self.port}")
except Exception as e:
2026-01-20 00:25:52 -06:00
print(f"[SERIAL] Warning during port close: {e}")
finally:
self.serial = None
2026-01-20 00:25:52 -06:00
self.state = ConnectionState.DISCONNECTED
self.device_info = None
def readline(self) -> Optional[str]:
"""
2026-01-20 00:25:52 -06:00
@brief Read one line of CSV data from the ESP32.
2026-01-20 00:25:52 -06:00
Should only be called when in STREAMING state.
Blocks until a complete line is received or timeout occurs.
@return Line string including newline, or None if timeout/error.
@note Lines from ESP32 are in format: "timestamp_ms,ch0,ch1,ch2,ch3\\n"
"""
2026-01-20 00:25:52 -06:00
if self.state != ConnectionState.STREAMING:
return None
return self._readline_raw()
def _readline_raw(self) -> Optional[str]:
"""
@brief Read one line from serial port (internal helper).
@return Decoded line string, or None if timeout/error.
"""
if not self.serial or not self.serial.is_open:
return None
try:
line_bytes = self.serial.readline()
if line_bytes:
2026-01-20 00:25:52 -06:00
return line_bytes.decode('utf-8', errors='ignore').strip()
return None
except serial.SerialException:
return None
2026-01-20 00:25:52 -06:00
def _send_json(self, data: Dict[str, Any]) -> None:
"""
@brief Send JSON command to the device (internal helper).
@param data Dictionary to send as JSON.
@throws RuntimeError If serial port is not open.
"""
if not self.serial or not self.serial.is_open:
raise RuntimeError("Serial port not open")
json_str = json.dumps(data) + "\n"
self.serial.write(json_str.encode('utf-8'))
self.serial.flush() # Ensure data is sent immediately
def _auto_detect_port(self) -> Optional[str]:
"""
@brief Attempt to auto-detect the ESP32 serial port using concurrent handshake.
Sends connect command to all available ports simultaneously and selects
the first one that responds with valid handshake acknowledgment.
This is more reliable than USB chip detection since it verifies the
device is actually running the expected firmware.
@return Port name if found, None otherwise.
@note Stores device_info in self._auto_detect_result for reuse by connect()
"""
ports = serial.tools.list_ports.comports()
if not ports:
print("[SERIAL] No serial ports found")
return None
if len(ports) == 1:
# Only one port, no need for concurrent detection
print(f"[SERIAL] Only one port available: {ports[0].device}")
return ports[0].device
print(f"[SERIAL] Auto-detecting ESP32 across {len(ports)} ports...")
# Thread-safe result container
result_lock = threading.Lock()
result = {'port': None, 'device_info': None}
def try_handshake(port_name: str):
"""Attempt handshake on a single port (runs in thread)."""
try:
# Open serial connection with short timeout
ser = serial.Serial(
port=port_name,
baudrate=self.baud_rate,
timeout=0.5
)
# Clear buffer and settle
ser.reset_input_buffer()
time.sleep(0.05)
# Send connect command (resets device state if needed)
connect_cmd = {"cmd": "connect"}
json_str = json.dumps(connect_cmd) + "\n"
ser.write(json_str.encode('utf-8'))
ser.flush()
# Wait for acknowledgment (max 2 seconds)
start_time = time.time()
while (time.time() - start_time) < 2.0:
line_bytes = ser.readline()
if line_bytes:
try:
line = line_bytes.decode('utf-8', errors='ignore').strip()
response = json.loads(line)
if response.get("status") == "ack_connect":
# Found it! Store result if we're first
with result_lock:
if result['port'] is None:
result['port'] = port_name
result['device_info'] = response
print(f"[SERIAL] ✓ ESP32 found on {port_name}: {response.get('device', 'Unknown')}")
# Send disconnect to return device to IDLE
disconnect_cmd = {"cmd": "disconnect"}
json_str = json.dumps(disconnect_cmd) + "\n"
ser.write(json_str.encode('utf-8'))
ser.flush()
time.sleep(0.05)
ser.close()
return
except json.JSONDecodeError:
# Ignore non-JSON (residual CSV data, startup messages)
continue
# No valid response
ser.close()
except (serial.SerialException, OSError):
# Port unavailable or busy - skip silently
pass
# Launch concurrent handshake attempts
threads = []
for port in ports:
thread = threading.Thread(target=try_handshake, args=(port.device,), daemon=True)
thread.start()
threads.append(thread)
# Poll for first success or timeout (instead of sequential joins)
start_time = time.time()
max_wait = 2.5
while (time.time() - start_time) < max_wait:
# Check if we found a device
with result_lock:
if result['port'] is not None:
# Success! Return immediately
self._auto_detect_result = result
elapsed = time.time() - start_time
print(f"[SERIAL] Auto-detect complete in {elapsed:.2f}s")
return result['port']
# Check if all threads finished (no device found)
if not any(thread.is_alive() for thread in threads):
break
# Brief sleep to avoid busy-waiting CPU
time.sleep(0.05)
# Timeout or all threads finished without success
print("[SERIAL] No ESP32 responded to handshake on any port")
return None
@staticmethod
def list_ports() -> List[str]:
"""
@brief List all available serial ports on the system.
Useful for finding the correct port name for your ESP32.
@return List of port names (e.g., ['COM3', 'COM4'] on Windows).
"""
ports = serial.tools.list_ports.comports()
if not ports:
print("No serial ports found.")
return []
print("\nAvailable serial ports:")
print("-" * 50)
for port in ports:
print(f" {port.device}")
print(f" Description: {port.description}")
print()
return [p.device for p in ports]
# =============================================================================
# Standalone Test
# =============================================================================
if __name__ == "__main__":
"""
@brief Quick test to verify ESP32 serial communication.
Run this file directly to test:
python serial_stream.py [port]
If port is not specified, auto-detection is attempted.
"""
import sys
print("=" * 50)
print(" ESP32 Serial Stream Test")
print("=" * 50)
print()
# Show available ports
ports = RealSerialStream.list_ports()
if not ports:
print("No ports found. Is the ESP32 plugged in?")
sys.exit(1)
# Determine which port to use
if len(sys.argv) > 1:
port = sys.argv[1]
print(f"Using specified port: {port}")
else:
port = None # Will auto-detect
print("No port specified, will auto-detect.")
print()
print("Starting stream... (Ctrl+C to stop)")
print("-" * 50)
2026-01-20 00:25:52 -06:00
# Create stream
stream = RealSerialStream(port=port)
try:
2026-01-20 00:25:52 -06:00
# Connect with handshake
print("\nConnecting to device...")
device_info = stream.connect(timeout=5.0)
print(f"Connected! Device: {device_info.get('device', 'Unknown')}, "
f"Channels: {device_info.get('channels', '?')}")
print()
# Start streaming
print("Starting data stream...")
stream.start()
2026-01-20 00:25:52 -06:00
print()
sample_count = 0
while True:
line = stream.readline()
if line:
# Check if this is a data line (starts with digit = timestamp)
if line and line[0].isdigit():
sample_count += 1
# Print every 500th sample to avoid flooding terminal
2026-01-19 22:24:04 -06:00
#if sample_count % 500 == 0:
print(f" [{sample_count:6d} samples] Latest: {line}")
else:
2026-01-20 00:25:52 -06:00
# Print non-data messages
print(f" {line}")
except KeyboardInterrupt:
print("\n")
print("-" * 50)
print("Stopped by user (Ctrl+C)")
2026-01-20 00:25:52 -06:00
except TimeoutError as e:
print(f"\nConnection timeout: {e}")
except Exception as e:
print(f"\nError: {e}")
finally:
2026-01-20 00:25:52 -06:00
print("\nCleaning up...")
stream.stop()
2026-01-20 00:25:52 -06:00
stream.disconnect()
print(f"Total samples received: {sample_count}")