271 lines
7.9 KiB
Python
271 lines
7.9 KiB
Python
"""
|
|
@file serial_stream.py
|
|
@brief Real serial stream for reading EMG data from ESP32.
|
|
|
|
This module provides a serial communication interface for receiving
|
|
EMG data from the ESP32 microcontroller over USB. It implements the
|
|
same interface as SimulatedEMGStream, making it a drop-in replacement.
|
|
|
|
@section usage Usage Example
|
|
@code{.py}
|
|
from serial_stream import RealSerialStream
|
|
|
|
# Create stream (auto-detects port, or specify manually)
|
|
stream = RealSerialStream(port='COM3')
|
|
stream.start()
|
|
|
|
# Read data (same interface as SimulatedEMGStream)
|
|
while True:
|
|
line = stream.readline()
|
|
if line:
|
|
print(line) # "1234,512,489,501,523"
|
|
|
|
stream.stop()
|
|
@endcode
|
|
|
|
@section format Data Format
|
|
The ESP32 sends data as CSV lines:
|
|
"timestamp_ms,ch0,ch1,ch2,ch3\\n"
|
|
|
|
Example:
|
|
"12345,512,489,501,523\\n"
|
|
|
|
@author Bucky Arm Project
|
|
"""
|
|
|
|
import serial
|
|
import serial.tools.list_ports
|
|
from typing import Optional, List
|
|
|
|
|
|
class RealSerialStream:
|
|
"""
|
|
@brief Reads EMG data from ESP32 over USB serial.
|
|
|
|
This class provides the same interface as SimulatedEMGStream:
|
|
- start() : Open serial connection
|
|
- stop() : Close serial connection
|
|
- readline() : Read one line of data
|
|
|
|
This allows it to be used as a drop-in replacement for testing
|
|
with real hardware instead of simulated data.
|
|
|
|
@note Requires pyserial: pip install pyserial
|
|
"""
|
|
|
|
def __init__(self, port: str = None, baud_rate: int = 115200, timeout: float = 1.0):
|
|
"""
|
|
@brief Initialize the serial stream.
|
|
|
|
@param port Serial port name (e.g., 'COM3' on Windows, '/dev/ttyUSB0' on Linux).
|
|
If None, will attempt to auto-detect the ESP32.
|
|
@param baud_rate Communication speed in bits per second. Default 115200 matches ESP32.
|
|
@param timeout Read timeout in seconds. Returns None if no data within this time.
|
|
"""
|
|
self.port = port
|
|
self.baud_rate = baud_rate
|
|
self.timeout = timeout
|
|
self.serial: Optional[serial.Serial] = None
|
|
self.running = False
|
|
|
|
def start(self) -> None:
|
|
"""
|
|
@brief Open the serial connection to the ESP32.
|
|
|
|
If no port was specified in __init__, attempts to auto-detect
|
|
the ESP32 by looking for common USB-UART chip identifiers.
|
|
|
|
@throws RuntimeError If no port specified and auto-detect fails.
|
|
@throws RuntimeError If unable to open the serial port.
|
|
"""
|
|
# Auto-detect port if not specified
|
|
if self.port is None:
|
|
self.port = self._auto_detect_port()
|
|
|
|
if self.port is None:
|
|
raise RuntimeError(
|
|
"No serial port specified and auto-detect failed.\n"
|
|
"Use RealSerialStream.list_ports() to see available ports."
|
|
)
|
|
|
|
# Open serial connection
|
|
try:
|
|
self.serial = serial.Serial(
|
|
port=self.port,
|
|
baudrate=self.baud_rate,
|
|
timeout=self.timeout
|
|
)
|
|
self.running = True
|
|
|
|
# Clear any stale data in the buffer
|
|
self.serial.reset_input_buffer()
|
|
|
|
print(f"[SERIAL] Connected to {self.port} at {self.baud_rate} baud")
|
|
|
|
except serial.SerialException as e:
|
|
raise RuntimeError(f"Failed to open {self.port}: {e}")
|
|
|
|
def stop(self) -> None:
|
|
"""
|
|
@brief Close the serial connection.
|
|
|
|
Safe to call even if not connected.
|
|
"""
|
|
self.running = False
|
|
|
|
if self.serial and self.serial.is_open:
|
|
self.serial.close()
|
|
print(f"[SERIAL] Disconnected from {self.port}")
|
|
|
|
def readline(self) -> Optional[str]:
|
|
"""
|
|
@brief Read one line of data from the ESP32.
|
|
|
|
Blocks until a complete line is received or timeout occurs.
|
|
This matches the interface of SimulatedEMGStream.readline().
|
|
|
|
@return Line string including newline, or None if timeout/error.
|
|
|
|
@note Lines from ESP32 are in format: "timestamp_ms,ch0,ch1,ch2,ch3\\n"
|
|
"""
|
|
if not self.serial or not self.serial.is_open:
|
|
return None
|
|
|
|
try:
|
|
line_bytes = self.serial.readline()
|
|
if line_bytes:
|
|
return line_bytes.decode('utf-8', errors='ignore')
|
|
return None
|
|
|
|
except serial.SerialException:
|
|
return None
|
|
|
|
def _auto_detect_port(self) -> Optional[str]:
|
|
"""
|
|
@brief Attempt to auto-detect the ESP32 serial port.
|
|
|
|
Looks for common USB-UART bridge chips used on ESP32 dev boards:
|
|
- CP210x (Silicon Labs)
|
|
- CH340 (WCH)
|
|
- FTDI
|
|
|
|
@return Port name if found, None otherwise.
|
|
"""
|
|
ports = serial.tools.list_ports.comports()
|
|
|
|
# Known USB-UART chip identifiers
|
|
known_chips = ['cp210', 'ch340', 'ftdi', 'usb-serial', 'usb serial']
|
|
|
|
for port in ports:
|
|
description_lower = port.description.lower()
|
|
if any(chip in description_lower for chip in known_chips):
|
|
print(f"[SERIAL] Auto-detected ESP32 on {port.device}")
|
|
return port.device
|
|
|
|
# Fallback: use first available port
|
|
if ports:
|
|
print(f"[SERIAL] No ESP32 detected, using first port: {ports[0].device}")
|
|
return ports[0].device
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def list_ports() -> List[str]:
|
|
"""
|
|
@brief List all available serial ports on the system.
|
|
|
|
Useful for finding the correct port name for your ESP32.
|
|
|
|
@return List of port names (e.g., ['COM3', 'COM4'] on Windows).
|
|
"""
|
|
ports = serial.tools.list_ports.comports()
|
|
|
|
if not ports:
|
|
print("No serial ports found.")
|
|
return []
|
|
|
|
print("\nAvailable serial ports:")
|
|
print("-" * 50)
|
|
|
|
for port in ports:
|
|
print(f" {port.device}")
|
|
print(f" Description: {port.description}")
|
|
print()
|
|
|
|
return [p.device for p in ports]
|
|
|
|
|
|
# =============================================================================
|
|
# Standalone Test
|
|
# =============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
"""
|
|
@brief Quick test to verify ESP32 serial communication.
|
|
|
|
Run this file directly to test:
|
|
python serial_stream.py [port]
|
|
|
|
If port is not specified, auto-detection is attempted.
|
|
"""
|
|
import sys
|
|
|
|
print("=" * 50)
|
|
print(" ESP32 Serial Stream Test")
|
|
print("=" * 50)
|
|
print()
|
|
|
|
# Show available ports
|
|
ports = RealSerialStream.list_ports()
|
|
|
|
if not ports:
|
|
print("No ports found. Is the ESP32 plugged in?")
|
|
sys.exit(1)
|
|
|
|
# Determine which port to use
|
|
if len(sys.argv) > 1:
|
|
port = sys.argv[1]
|
|
print(f"Using specified port: {port}")
|
|
else:
|
|
port = None # Will auto-detect
|
|
print("No port specified, will auto-detect.")
|
|
|
|
print()
|
|
print("Starting stream... (Ctrl+C to stop)")
|
|
print("-" * 50)
|
|
|
|
# Create and start stream
|
|
stream = RealSerialStream(port=port)
|
|
|
|
try:
|
|
stream.start()
|
|
|
|
sample_count = 0
|
|
|
|
while True:
|
|
line = stream.readline()
|
|
|
|
if line:
|
|
line = line.strip()
|
|
|
|
# Check if this is a data line (starts with digit = timestamp)
|
|
if line and line[0].isdigit():
|
|
sample_count += 1
|
|
|
|
# Print every 500th sample to avoid flooding terminal
|
|
#if sample_count % 500 == 0:
|
|
print(f" [{sample_count:6d} samples] Latest: {line}")
|
|
|
|
else:
|
|
# Print startup/info messages from ESP32
|
|
print(f" {line}")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n")
|
|
print("-" * 50)
|
|
print("Stopped by user (Ctrl+C)")
|
|
|
|
finally:
|
|
stream.stop()
|
|
print(f"Total samples received: {sample_count}")
|