App-Device handshake complete

This commit is contained in:
2026-01-20 00:25:52 -06:00
parent 491117fea8
commit c8c15df889
3 changed files with 746 additions and 131 deletions

View File

@@ -2,16 +2,21 @@
* @file main.c
* @brief Application entry point for the EMG-controlled robotic hand.
*
* This is the top-level application that initializes all subsystems
* and runs the main loop. Currently configured to stream EMG data
* over USB serial for Python to receive.
* Implements a robust handshake protocol with the host computer:
* 1. Wait for "connect" command
* 2. Acknowledge connection
* 3. Wait for "start" command
* 4. Stream EMG data
* 5. Handle "stop" and "disconnect" commands
*
* @note This is Layer 4 (Application).
*/
#include <stdio.h>
#include <string.h>
#include <freertos/FreeRTOS.h>
#include <freertos/task.h>
#include <freertos/queue.h>
#include "esp_timer.h"
#include "config/config.h"
@@ -20,29 +25,206 @@
#include "core/gestures.h"
/*******************************************************************************
* Private Functions
* Constants
******************************************************************************/
#define CMD_BUFFER_SIZE 128
#define JSON_RESPONSE_SIZE 128
/*******************************************************************************
* Types
******************************************************************************/
/**
* @brief Stream EMG data over USB serial.
* @brief Device state machine.
*/
typedef enum {
STATE_IDLE = 0, /**< Waiting for connect command */
STATE_CONNECTED, /**< Connected, waiting for start command */
STATE_STREAMING, /**< Actively streaming EMG data */
} device_state_t;
/**
* @brief Commands from host.
*/
typedef enum {
CMD_NONE = 0,
CMD_CONNECT,
CMD_START,
CMD_STOP,
CMD_DISCONNECT,
} command_t;
/*******************************************************************************
* Global State
******************************************************************************/
static volatile device_state_t g_device_state = STATE_IDLE;
static QueueHandle_t g_cmd_queue = NULL;
/*******************************************************************************
* Forward Declarations
******************************************************************************/
static void send_ack_connect(void);
/*******************************************************************************
* Command Parsing
******************************************************************************/
/**
* @brief Parse incoming command from JSON.
*
* Outputs data in format: "timestamp_ms,ch0,ch1,ch2,ch3\n"
* This matches what Python's SimulatedEMGStream produces.
* Expected format: {"cmd": "connect"}
*
* @param line Input line from serial
* @return Parsed command
*/
static command_t parse_command(const char* line)
{
/* Simple JSON parsing - look for "cmd" field */
const char* cmd_start = strstr(line, "\"cmd\"");
if (!cmd_start) {
return CMD_NONE;
}
/* Find the value after "cmd": */
const char* value_start = strchr(cmd_start, ':');
if (!value_start) {
return CMD_NONE;
}
/* Skip whitespace and opening quote */
value_start++;
while (*value_start == ' ' || *value_start == '"') {
value_start++;
}
/* Match command strings */
if (strncmp(value_start, "connect", 7) == 0) {
return CMD_CONNECT;
} else if (strncmp(value_start, "start", 5) == 0) {
return CMD_START;
} else if (strncmp(value_start, "stop", 4) == 0) {
return CMD_STOP;
} else if (strncmp(value_start, "disconnect", 10) == 0) {
return CMD_DISCONNECT;
}
return CMD_NONE;
}
/*******************************************************************************
* Serial Input Task
******************************************************************************/
/**
* @brief FreeRTOS task to read serial input and parse commands.
*
* This task runs continuously, reading lines from stdin (USB serial)
* and updating device state directly. This allows commands to interrupt
* streaming immediately via the volatile state variable.
*
* @param pvParameters Unused
*/
static void serial_input_task(void* pvParameters)
{
char line_buffer[CMD_BUFFER_SIZE];
int line_idx = 0;
while (1) {
/* Read one character at a time */
int c = getchar();
if (c == EOF || c == 0xFF) {
/* No data available, yield to other tasks */
vTaskDelay(pdMS_TO_TICKS(10));
continue;
}
if (c == '\n' || c == '\r') {
/* End of line - process command */
if (line_idx > 0) {
line_buffer[line_idx] = '\0';
command_t cmd = parse_command(line_buffer);
if (cmd != CMD_NONE) {
/* Handle state transitions directly */
/* This allows streaming loop to see state changes immediately */
switch (g_device_state) {
case STATE_IDLE:
if (cmd == CMD_CONNECT) {
g_device_state = STATE_CONNECTED;
send_ack_connect();
printf("[STATE] IDLE -> CONNECTED\n");
}
break;
case STATE_CONNECTED:
if (cmd == CMD_START) {
g_device_state = STATE_STREAMING;
printf("[STATE] CONNECTED -> STREAMING\n");
/* Signal state machine to start streaming */
xQueueSend(g_cmd_queue, &cmd, 0);
} else if (cmd == CMD_DISCONNECT) {
g_device_state = STATE_IDLE;
printf("[STATE] CONNECTED -> IDLE\n");
}
break;
case STATE_STREAMING:
if (cmd == CMD_STOP) {
g_device_state = STATE_CONNECTED;
printf("[STATE] STREAMING -> CONNECTED\n");
/* Streaming loop will exit when it sees state change */
} else if (cmd == CMD_DISCONNECT) {
g_device_state = STATE_IDLE;
printf("[STATE] STREAMING -> IDLE\n");
/* Streaming loop will exit when it sees state change */
}
break;
}
}
line_idx = 0;
}
} else if (line_idx < CMD_BUFFER_SIZE - 1) {
/* Add character to buffer */
line_buffer[line_idx++] = (char)c;
} else {
/* Buffer overflow - reset */
line_idx = 0;
}
}
}
/*******************************************************************************
* State Machine
******************************************************************************/
/**
* @brief Send JSON acknowledgment for connection.
*/
static void send_ack_connect(void)
{
printf("{\"status\":\"ack_connect\",\"device\":\"ESP32-EMG\",\"channels\":%d}\n",
EMG_NUM_CHANNELS);
fflush(stdout);
}
/**
* @brief Stream EMG data continuously until stopped.
*
* This function blocks and streams data at the configured sample rate.
* Returns when state changes from STREAMING.
*/
static void stream_emg_data(void)
{
emg_sample_t sample;
printf("\n[EMG] Starting data stream at %d Hz...\n", EMG_SAMPLE_RATE_HZ);
printf("[EMG] Format: timestamp_ms,ch0,ch1,ch2,ch3\n\n");
/*
* FreeRTOS tick rate is set to 1000 Hz in sdkconfig.defaults (1ms per tick).
* Delay of 1 tick = 1ms, giving us the full 1000 Hz sample rate.
*/
const TickType_t delay_ticks = 1; /* 1 tick = 1ms at 1000 Hz tick rate */
while (1) {
while (g_device_state == STATE_STREAMING) {
/* Read EMG (fake or real depending on FEATURE_FAKE_EMG) */
emg_sensor_read(&sample);
@@ -60,14 +242,30 @@ static void stream_emg_data(void)
}
/**
* @brief Run demo mode - cycle through gestures.
* @brief Main state machine loop.
*
* Monitors device state and starts streaming when requested.
* Serial input task handles all state transitions directly.
*/
static void run_demo(void)
static void state_machine_loop(void)
{
printf("\n[DEMO] Running gesture demo...\n");
command_t cmd;
const TickType_t poll_interval = pdMS_TO_TICKS(50);
while (1) {
gestures_demo_fist(1000);
/* Check if we should start streaming */
if (g_device_state == STATE_STREAMING) {
/* Stream until state changes (via serial input task) */
stream_emg_data();
/* Returns when state is no longer STREAMING */
}
/* Wait for start command or just poll state */
/* Timeout allows checking state even if queue is empty */
xQueueReceive(g_cmd_queue, &cmd, poll_interval);
/* Note: State transitions are handled by serial_input_task */
/* This loop only triggers streaming when state becomes STREAMING */
}
}
@@ -80,7 +278,7 @@ void app_main(void)
printf("\n");
printf("========================================\n");
printf(" Bucky Arm - EMG Robotic Hand\n");
printf(" Firmware v1.0.0\n");
printf(" Firmware v2.0.0 (Handshake Protocol)\n");
printf("========================================\n\n");
/* Initialize subsystems */
@@ -98,16 +296,26 @@ void app_main(void)
printf("[INIT] Done!\n\n");
/*
* Choose what to run:
* - stream_emg_data(): Send EMG data to laptop (Phase 1)
* - run_demo(): Test servo movement
*
* For now, we stream EMG data.
* Comment out and use run_demo() to test servos.
*/
stream_emg_data();
/* Alternative: run servo demo */
// run_demo();
/* Create command queue */
g_cmd_queue = xQueueCreate(10, sizeof(command_t));
if (g_cmd_queue == NULL) {
printf("[ERROR] Failed to create command queue!\n");
return;
}
/* Launch serial input task */
xTaskCreate(
serial_input_task,
"serial_input",
4096, /* Stack size */
NULL, /* Parameters */
5, /* Priority */
NULL /* Task handle */
);
printf("[PROTOCOL] Waiting for host to connect...\n");
printf("[PROTOCOL] Send: {\"cmd\": \"connect\"}\n\n");
/* Run main state machine */
state_machine_loop();
}

View File

@@ -310,6 +310,7 @@ class CollectionPage(BasePage):
# Collection state
self.is_collecting = False
self.is_connected = False
self.using_real_hardware = False
self.stream = None
self.parser = None
@@ -377,12 +378,23 @@ class CollectionPage(BasePage):
)
self.refresh_ports_btn.pack(side="left")
# Connection status indicator
# Connection status and button
connect_frame = ctk.CTkFrame(self.port_frame, fg_color="transparent")
connect_frame.pack(fill="x", pady=(5, 0))
self.connect_button = ctk.CTkButton(
connect_frame, text="Connect",
width=100, height=28,
command=self._toggle_connection,
state="disabled" # Disabled until "Real ESP32" selected
)
self.connect_button.pack(side="left", padx=(0, 10))
self.connection_status = ctk.CTkLabel(
self.port_frame, text="Not connected",
connect_frame, text="Disconnected",
font=ctk.CTkFont(size=11), text_color="gray"
)
self.connection_status.pack(anchor="w", pady=(5, 0))
self.connection_status.pack(side="left")
# Gesture selection
gesture_frame = ctk.CTkFrame(self.controls_panel, fg_color="transparent")
@@ -529,7 +541,6 @@ class CollectionPage(BasePage):
def start_collection(self):
"""Start data collection."""
self.is_collecting = True
# Get selected gestures
gestures = [g for g, var in self.gesture_vars.items() if var.get()]
if not gestures:
@@ -540,18 +551,16 @@ class CollectionPage(BasePage):
self.using_real_hardware = (self.source_var.get() == "real")
if self.using_real_hardware:
# Real ESP32 serial stream
port = self._get_serial_port()
# Must be connected for real hardware
if not self.is_connected or not self.stream:
messagebox.showerror("Not Connected", "Please connect to the ESP32 first.")
return
# Send start command to begin streaming
try:
self.stream = RealSerialStream(port=port)
self._update_connection_status("orange", "Connecting...")
self.stream.start()
except Exception as e:
error_msg = f"Failed to create serial stream:\n{e}"
if "PermissionError" in str(type(e).__name__) or "Permission denied" in str(e):
error_msg += "\n\nThe port may still be in use. Wait a few seconds and try again."
elif "FileNotFoundError" in str(type(e).__name__):
error_msg += f"\n\nPort '{port}' not found. Try refreshing the port list."
messagebox.showerror("Connection Error", error_msg)
messagebox.showerror("Start Error", f"Failed to start streaming:\n{e}")
return
else:
# Simulated stream (gesture-aware for realistic testing)
@@ -577,14 +586,19 @@ class CollectionPage(BasePage):
self.collected_labels = []
self.sample_buffer = []
# Mark as collecting
self.is_collecting = True
# Update UI
self.start_button.configure(text="Stop Collection", fg_color="red")
self.save_button.configure(state="disabled")
self.status_label.configure(text="Starting...")
# Disable source selection during collection
# Disable source selection and connection during collection
self.sim_radio.configure(state="disabled")
self.real_radio.configure(state="disabled")
if self.using_real_hardware:
self.connect_button.configure(state="disabled")
# Start collection thread
self.collection_thread = threading.Thread(target=self.collection_loop, daemon=True)
@@ -600,16 +614,16 @@ class CollectionPage(BasePage):
# Safe cleanup - stream might already be in error state
try:
if self.stream:
self.stream.stop()
# Give OS time to release the port (important for macOS)
if self.using_real_hardware:
time.sleep(0.5)
# Send stop command (returns to CONNECTED state)
self.stream.stop()
else:
# For simulated stream, just stop it
self.stream.stop()
self.stream = None
except Exception:
pass # Ignore cleanup errors
# Clear stream reference
self.stream = None
# Drain any pending messages from queue to prevent stale data
try:
while True:
@@ -622,27 +636,25 @@ class CollectionPage(BasePage):
self.prompt_label.configure(text="DONE", text_color="green")
self.countdown_label.configure(text="")
# Re-enable source selection
# Re-enable source selection and connection button
self.sim_radio.configure(state="normal")
self.real_radio.configure(state="normal")
# Update connection status
if self.using_real_hardware:
self._update_connection_status("gray", "Disconnected")
self.connect_button.configure(state="normal")
# Still connected, just not streaming
if self.is_connected:
device_name = self.stream.device_info.get('device', 'ESP32') if self.stream and self.stream.device_info else 'ESP32'
self._update_connection_status("green", f"Connected ({device_name})")
if self.collected_windows:
self.save_button.configure(state="normal")
def collection_loop(self):
"""Background collection loop."""
# Try to start the stream (may fail for real hardware)
try:
self.stream.start()
# Stream is already started (either via handshake for real HW or created for simulated)
# Just mark as ready
if self.using_real_hardware:
self.data_queue.put(('connection_status', ('green', 'Connected')))
except Exception as e:
self.data_queue.put(('error', f"Failed to connect: {e}"))
return
self.data_queue.put(('connection_status', ('green', 'Streaming')))
self.scheduler.start_session()
@@ -861,9 +873,13 @@ class CollectionPage(BasePage):
if self.source_var.get() == "real":
self.port_frame.pack(fill="x", pady=(5, 0))
self._refresh_ports()
self.connect_button.configure(state="normal")
self.start_button.configure(state="disabled") # Must connect first
else:
self.port_frame.pack_forget()
self._update_connection_status("gray", "Not using hardware")
self.connect_button.configure(state="disabled")
self.start_button.configure(state="normal") # Simulated mode doesn't need connect
def _refresh_ports(self):
"""Scan and populate available serial ports."""
@@ -888,6 +904,91 @@ class CollectionPage(BasePage):
"""Update the connection status indicator."""
self.connection_status.configure(text=f"{text}", text_color=color)
def _toggle_connection(self):
"""Connect or disconnect from ESP32."""
if self.is_connected:
self._disconnect_device()
else:
self._connect_device()
def _connect_device(self):
"""Connect to ESP32 with handshake."""
port = self._get_serial_port()
try:
# Update UI to show connecting
self._update_connection_status("orange", "Connecting...")
self.connect_button.configure(state="disabled")
self.update() # Force UI update
# Create stream and connect
self.stream = RealSerialStream(port=port)
device_info = self.stream.connect(timeout=5.0)
# Success!
self.is_connected = True
self._update_connection_status("green", f"Connected ({device_info.get('device', 'ESP32')})")
self.connect_button.configure(text="Disconnect", state="normal")
self.start_button.configure(state="normal")
except TimeoutError as e:
messagebox.showerror(
"Connection Timeout",
f"Device did not respond within 5 seconds.\n\n"
f"Check that:\n"
f"• ESP32 is powered on\n"
f"• Correct firmware is flashed\n"
f"• USB cable is properly connected"
)
self._update_connection_status("red", "Timeout")
self.connect_button.configure(state="normal")
if self.stream:
try:
self.stream.disconnect()
except:
pass
self.stream = None
except Exception as e:
error_msg = f"Failed to connect:\n{e}"
if "Permission denied" in str(e) or "Resource busy" in str(e):
error_msg += "\n\nThe port may still be in use. Wait a few seconds and try again."
elif "FileNotFoundError" in str(type(e).__name__):
error_msg += f"\n\nPort not found. Try refreshing the port list."
messagebox.showerror("Connection Error", error_msg)
self._update_connection_status("red", "Failed")
self.connect_button.configure(state="normal")
if self.stream:
try:
self.stream.disconnect()
except:
pass
self.stream = None
def _disconnect_device(self):
"""Disconnect from ESP32."""
try:
if self.stream:
self.stream.disconnect()
# Give OS time to release the port
time.sleep(0.5)
self.is_connected = False
self.stream = None
self._update_connection_status("gray", "Disconnected")
self.connect_button.configure(text="Connect")
self.start_button.configure(state="disabled")
except Exception as e:
messagebox.showwarning("Disconnect Warning", f"Error during disconnect: {e}")
# Still mark as disconnected even if there was an error
self.is_connected = False
self.stream = None
self._update_connection_status("gray", "Disconnected")
self.connect_button.configure(text="Connect")
self.start_button.configure(state="disabled")
def on_hide(self):
"""Stop collection when leaving page."""
if self.is_collecting:
@@ -1280,11 +1381,23 @@ class PredictionPage(BasePage):
)
self.refresh_ports_btn.pack(side="left")
# Connection status and button
connect_frame = ctk.CTkFrame(self.port_frame, fg_color="transparent")
connect_frame.pack(fill="x", pady=(5, 0))
self.connect_button = ctk.CTkButton(
connect_frame, text="Connect",
width=100, height=28,
command=self._toggle_connection,
state="disabled" # Disabled until "Real ESP32" selected
)
self.connect_button.pack(side="left", padx=(0, 10))
self.connection_status = ctk.CTkLabel(
self.port_frame, text="Not connected",
connect_frame, text="Disconnected",
font=ctk.CTkFont(size=11), text_color="gray"
)
self.connection_status.pack(anchor="w", pady=(5, 0))
self.connection_status.pack(side="left")
# Start button
self.start_button = ctk.CTkButton(
@@ -1346,6 +1459,7 @@ class PredictionPage(BasePage):
# State
self.is_predicting = False
self.is_connected = False
self.using_real_hardware = False
self.classifier = None
self.smoother = None
@@ -1392,6 +1506,19 @@ class PredictionPage(BasePage):
# Determine data source
self.using_real_hardware = (self.source_var.get() == "real")
# For real hardware, must be connected
if self.using_real_hardware:
if not self.is_connected or not self.stream:
messagebox.showerror("Not Connected", "Please connect to the ESP32 first.")
return
# Send start command to begin streaming
try:
self.stream.start()
except Exception as e:
messagebox.showerror("Start Error", f"Failed to start streaming:\n{e}")
return
# Create prediction smoother
self.smoother = PredictionSmoother(
label_names=self.classifier.label_names,
@@ -1403,9 +1530,11 @@ class PredictionPage(BasePage):
self.is_predicting = True
self.start_button.configure(text="Stop", fg_color="red")
# Disable source selection during prediction
# Disable source selection and connection during prediction
self.sim_radio.configure(state="disabled")
self.real_radio.configure(state="disabled")
if self.using_real_hardware:
self.connect_button.configure(state="disabled")
# Start prediction thread
thread = threading.Thread(target=self._prediction_thread, daemon=True)
@@ -1421,10 +1550,13 @@ class PredictionPage(BasePage):
# Safe cleanup - stream might already be in error state
try:
if self.stream:
self.stream.stop()
# Give OS time to release the port (important for macOS)
if self.using_real_hardware:
time.sleep(0.5)
# Send stop command (returns to CONNECTED state)
self.stream.stop()
else:
# For simulated stream, just stop it
self.stream.stop()
self.stream = None
except Exception:
pass # Ignore cleanup errors
@@ -1435,22 +1567,27 @@ class PredictionPage(BasePage):
self.sim_label.configure(text="")
self.raw_label.configure(text="", text_color="gray")
# Re-enable source selection
# Re-enable source selection and connection button
self.sim_radio.configure(state="normal")
self.real_radio.configure(state="normal")
# Update connection status
if self.using_real_hardware:
self._update_connection_status("gray", "Disconnected")
self.connect_button.configure(state="normal")
# Still connected, just not streaming
if self.is_connected:
device_name = self.stream.device_info.get('device', 'ESP32') if self.stream and self.stream.device_info else 'ESP32'
self._update_connection_status("green", f"Connected ({device_name})")
def _on_source_change(self):
"""Show/hide port selection based on data source."""
if self.source_var.get() == "real":
self.port_frame.pack(fill="x", pady=(5, 0))
self._refresh_ports()
self.connect_button.configure(state="normal")
# Start button will be enabled after connection
else:
self.port_frame.pack_forget()
self._update_connection_status("gray", "Not using hardware")
self.connect_button.configure(state="disabled")
def _refresh_ports(self):
"""Scan and populate available serial ports."""
@@ -1472,22 +1609,95 @@ class PredictionPage(BasePage):
"""Update the connection status indicator."""
self.connection_status.configure(text=f"{text}", text_color=color)
def _toggle_connection(self):
"""Connect or disconnect from ESP32."""
if self.is_connected:
self._disconnect_device()
else:
self._connect_device()
def _connect_device(self):
"""Connect to ESP32 with handshake."""
port = self._get_serial_port()
try:
# Update UI to show connecting
self._update_connection_status("orange", "Connecting...")
self.connect_button.configure(state="disabled")
self.update() # Force UI update
# Create stream and connect
self.stream = RealSerialStream(port=port)
device_info = self.stream.connect(timeout=5.0)
# Success!
self.is_connected = True
self._update_connection_status("green", f"Connected ({device_info.get('device', 'ESP32')})")
self.connect_button.configure(text="Disconnect", state="normal")
except TimeoutError as e:
messagebox.showerror(
"Connection Timeout",
f"Device did not respond within 5 seconds.\n\n"
f"Check that:\n"
f"• ESP32 is powered on\n"
f"• Correct firmware is flashed\n"
f"• USB cable is properly connected"
)
self._update_connection_status("red", "Timeout")
self.connect_button.configure(state="normal")
if self.stream:
try:
self.stream.disconnect()
except:
pass
self.stream = None
except Exception as e:
error_msg = f"Failed to connect:\n{e}"
if "Permission denied" in str(e) or "Resource busy" in str(e):
error_msg += "\n\nThe port may still be in use. Wait a few seconds and try again."
elif "FileNotFoundError" in str(type(e).__name__):
error_msg += f"\n\nPort not found. Try refreshing the port list."
messagebox.showerror("Connection Error", error_msg)
self._update_connection_status("red", "Failed")
self.connect_button.configure(state="normal")
if self.stream:
try:
self.stream.disconnect()
except:
pass
self.stream = None
def _disconnect_device(self):
"""Disconnect from ESP32."""
try:
if self.stream:
self.stream.disconnect()
# Give OS time to release the port
time.sleep(0.5)
self.is_connected = False
self.stream = None
self._update_connection_status("gray", "Disconnected")
self.connect_button.configure(text="Connect")
except Exception as e:
messagebox.showwarning("Disconnect Warning", f"Error during disconnect: {e}")
# Still mark as disconnected even if there was an error
self.is_connected = False
self.stream = None
self._update_connection_status("gray", "Disconnected")
self.connect_button.configure(text="Connect")
def _prediction_thread(self):
"""Background prediction thread."""
# Create appropriate stream based on source selection
if self.using_real_hardware:
port = self._get_serial_port()
try:
self.stream = RealSerialStream(port=port)
except Exception as e:
error_msg = f"Failed to create serial stream: {e}"
if "Permission denied" in str(e) or "Resource busy" in str(e):
error_msg += "\n\nThe port may still be in use. Wait a moment and try again."
self.data_queue.put(('error', error_msg))
return
else:
# For simulated mode, create new stream
if not self.using_real_hardware:
self.stream = GestureAwareEMGStream(num_channels=NUM_CHANNELS, sample_rate=SAMPLING_RATE_HZ)
# Stream is already started (either via handshake for real HW or will be started for simulated)
parser = EMGParser(num_channels=NUM_CHANNELS)
windower = Windower(window_size_ms=WINDOW_SIZE_MS, sample_rate=SAMPLING_RATE_HZ, overlap=0.0)
@@ -1498,16 +1708,18 @@ class PredictionPage(BasePage):
gesture_start = time.perf_counter()
current_gesture = gesture_cycle[0]
# Start the stream
# Start simulated stream if needed
if not self.using_real_hardware:
try:
if hasattr(self.stream, 'set_gesture'):
self.stream.set_gesture(current_gesture)
self.stream.start()
if self.using_real_hardware:
self.data_queue.put(('connection_status', ('green', 'Connected')))
except Exception as e:
self.data_queue.put(('error', f"Failed to connect: {e}"))
self.data_queue.put(('error', f"Failed to start simulated stream: {e}"))
return
else:
# Real hardware is already streaming
self.data_queue.put(('connection_status', ('green', 'Streaming')))
return
while self.is_predicting:

View File

@@ -1,10 +1,10 @@
"""
@file serial_stream.py
@brief Real serial stream for reading EMG data from ESP32.
@brief Real serial stream for reading EMG data from ESP32 with handshake protocol.
This module provides a serial communication interface for receiving
EMG data from the ESP32 microcontroller over USB. It implements the
same interface as SimulatedEMGStream, making it a drop-in replacement.
EMG data from the ESP32 microcontroller over USB. It implements a
robust handshake protocol to ensure reliable connection before streaming.
@section usage Usage Example
@code{.py}
@@ -12,17 +12,50 @@ same interface as SimulatedEMGStream, making it a drop-in replacement.
# Create stream (auto-detects port, or specify manually)
stream = RealSerialStream(port='COM3')
# Connect with handshake (raises on timeout/failure)
stream.connect(timeout=5.0)
# Start streaming
stream.start()
# Read data (same interface as SimulatedEMGStream)
# Read data
while True:
line = stream.readline()
if line:
print(line) # "1234,512,489,501,523"
# Stop streaming (device returns to CONNECTED state)
stream.stop()
# Disconnect (device returns to IDLE state)
stream.disconnect()
@endcode
@section protocol Handshake Protocol & State Machine
ESP32 States:
IDLE - Waiting for "connect" command
CONNECTED - Handshake complete, waiting for "start" command
STREAMING - Actively sending CSV data
State Transitions:
1. connect() : IDLE → CONNECTED
App sends: {"cmd": "connect"}
Device responds: {"status": "ack_connect", "device": "ESP32-EMG", "channels": 4}
2. start() : CONNECTED → STREAMING
App sends: {"cmd": "start"}
Device starts streaming CSV data
3. stop() : STREAMING → CONNECTED
App sends: {"cmd": "stop"}
Device stops streaming, ready for new start command
4. disconnect() : ANY → IDLE
App sends: {"cmd": "disconnect"}
Device returns to idle, waiting for new connection
@section format Data Format
The ESP32 sends data as CSV lines:
"timestamp_ms,ch0,ch1,ch2,ch3\\n"
@@ -35,20 +68,32 @@ same interface as SimulatedEMGStream, making it a drop-in replacement.
import serial
import serial.tools.list_ports
from typing import Optional, List
import json
import time
from typing import Optional, List, Dict, Any
from enum import Enum
class ConnectionState(Enum):
"""Connection states for the serial stream."""
DISCONNECTED = 0 # No serial connection
CONNECTING = 1 # Serial open, waiting for handshake
CONNECTED = 2 # Handshake complete, ready to stream
STREAMING = 3 # Actively streaming data
class RealSerialStream:
"""
@brief Reads EMG data from ESP32 over USB serial.
@brief Reads EMG data from ESP32 over USB serial with handshake protocol.
This class provides the same interface as SimulatedEMGStream:
- start() : Open serial connection
- stop() : Close serial connection
This class implements a robust connection protocol:
- connect() : Open serial port and perform handshake
- start() : Begin streaming data
- stop() : Stop streaming (device stays connected)
- disconnect() : Close connection (device returns to idle)
- readline() : Read one line of data
This allows it to be used as a drop-in replacement for testing
with real hardware instead of simulated data.
State machine ensures reliable communication without timing dependencies.
@note Requires pyserial: pip install pyserial
"""
@@ -60,24 +105,34 @@ class RealSerialStream:
@param port Serial port name (e.g., 'COM3' on Windows, '/dev/ttyUSB0' on Linux).
If None, will attempt to auto-detect the ESP32.
@param baud_rate Communication speed in bits per second. Default 115200 matches ESP32.
@param timeout Read timeout in seconds. Returns None if no data within this time.
@param timeout Read timeout in seconds for readline().
"""
self.port = port
self.baud_rate = baud_rate
self.timeout = timeout
self.serial: Optional[serial.Serial] = None
self.running = False
self.state = ConnectionState.DISCONNECTED
self.device_info: Optional[Dict[str, Any]] = None
def start(self) -> None:
def connect(self, timeout: float = 5.0) -> Dict[str, Any]:
"""
@brief Open the serial connection to the ESP32.
@brief Connect to the ESP32 and perform handshake.
If no port was specified in __init__, attempts to auto-detect
the ESP32 by looking for common USB-UART chip identifiers.
Opens the serial port, sends connect command, and waits for
acknowledgment from the device.
@param timeout Maximum time to wait for handshake response (seconds).
@return Device info dict from handshake response.
@throws RuntimeError If no port specified and auto-detect fails.
@throws RuntimeError If unable to open the serial port.
@throws TimeoutError If device doesn't respond within timeout.
@throws ValueError If device sends invalid handshake response.
"""
if self.state != ConnectionState.DISCONNECTED:
raise RuntimeError(f"Already in state {self.state.name}, cannot connect")
# Auto-detect port if not specified
if self.port is None:
self.port = self._auto_detect_port()
@@ -95,61 +150,185 @@ class RealSerialStream:
baudrate=self.baud_rate,
timeout=self.timeout
)
self.running = True
self.state = ConnectionState.CONNECTING
# Clear any stale data in the buffer
self.serial.reset_input_buffer()
time.sleep(0.1) # Let device settle after port open
print(f"[SERIAL] Connected to {self.port} at {self.baud_rate} baud")
print(f"[SERIAL] Port opened: {self.port}")
except serial.SerialException as e:
self.state = ConnectionState.DISCONNECTED
error_msg = f"Failed to open {self.port}: {e}"
# Add helpful context for common errors
if "Permission denied" in str(e) or "Resource busy" in str(e):
error_msg += "\n\nThe port may still be in use from a previous connection. Wait a moment and try again."
error_msg += "\n\nThe port may still be in use. Wait a moment and try again."
raise RuntimeError(error_msg)
# Perform handshake
try:
# Send connect command
connect_cmd = {"cmd": "connect"}
self._send_json(connect_cmd)
print(f"[SERIAL] Sent: {connect_cmd}")
# Wait for acknowledgment
start_time = time.time()
while (time.time() - start_time) < timeout:
line = self._readline_raw()
if line:
try:
response = json.loads(line)
if response.get("status") == "ack_connect":
self.device_info = response
self.state = ConnectionState.CONNECTED
print(f"[SERIAL] Handshake complete: {response}")
return response
except json.JSONDecodeError:
# Ignore non-JSON lines (might be startup messages)
print(f"[SERIAL] Ignoring: {line.strip()}")
continue
# Timeout reached
self.state = ConnectionState.DISCONNECTED
if self.serial:
self.serial.close()
self.serial = None
raise TimeoutError(
f"Device did not respond to connection request within {timeout}s.\n"
"Check that the correct firmware is flashed and device is powered on."
)
except Exception as e:
# Clean up on any error
self.state = ConnectionState.DISCONNECTED
if self.serial:
try:
self.serial.close()
except:
pass
self.serial = None
raise
def start(self) -> None:
"""
@brief Start streaming EMG data.
Device must be in CONNECTED state. Sends start command to ESP32,
which begins streaming CSV data.
@throws RuntimeError If not connected.
"""
if self.state != ConnectionState.CONNECTED:
raise RuntimeError(
f"Cannot start streaming from state {self.state.name}. "
"Must call connect() first."
)
# Send start command
start_cmd = {"cmd": "start"}
self._send_json(start_cmd)
self.state = ConnectionState.STREAMING
print(f"[SERIAL] Started streaming")
def stop(self) -> None:
"""
@brief Close the serial connection.
@brief Stop streaming EMG data.
Safe to call even if not connected.
Sends stop command to ESP32, which stops streaming and returns
to CONNECTED state. Connection remains open for restart.
Safe to call even if not streaming.
"""
self.running = False
if self.state == ConnectionState.STREAMING:
try:
stop_cmd = {"cmd": "stop"}
self._send_json(stop_cmd)
self.state = ConnectionState.CONNECTED
print(f"[SERIAL] Stopped streaming")
except Exception as e:
print(f"[SERIAL] Warning during stop: {e}")
def disconnect(self) -> None:
"""
@brief Disconnect from the ESP32.
Sends disconnect command (device returns to IDLE state),
then closes the serial port. Safe to call from any state.
"""
# Send disconnect command if connected
if self.state in (ConnectionState.CONNECTED, ConnectionState.STREAMING):
try:
disconnect_cmd = {"cmd": "disconnect"}
self._send_json(disconnect_cmd)
time.sleep(0.1) # Give device time to process
print(f"[SERIAL] Sent disconnect command")
except Exception as e:
print(f"[SERIAL] Warning sending disconnect: {e}")
# Close serial port
if self.serial:
try:
if self.serial.is_open:
self.serial.close()
print(f"[SERIAL] Disconnected from {self.port}")
print(f"[SERIAL] Port closed: {self.port}")
except Exception as e:
print(f"[SERIAL] Warning during disconnect: {e}")
print(f"[SERIAL] Warning during port close: {e}")
finally:
self.serial = None
self.state = ConnectionState.DISCONNECTED
self.device_info = None
def readline(self) -> Optional[str]:
"""
@brief Read one line of data from the ESP32.
@brief Read one line of CSV data from the ESP32.
Should only be called when in STREAMING state.
Blocks until a complete line is received or timeout occurs.
This matches the interface of SimulatedEMGStream.readline().
@return Line string including newline, or None if timeout/error.
@note Lines from ESP32 are in format: "timestamp_ms,ch0,ch1,ch2,ch3\\n"
"""
if self.state != ConnectionState.STREAMING:
return None
return self._readline_raw()
def _readline_raw(self) -> Optional[str]:
"""
@brief Read one line from serial port (internal helper).
@return Decoded line string, or None if timeout/error.
"""
if not self.serial or not self.serial.is_open:
return None
try:
line_bytes = self.serial.readline()
if line_bytes:
return line_bytes.decode('utf-8', errors='ignore')
return line_bytes.decode('utf-8', errors='ignore').strip()
return None
except serial.SerialException:
return None
def _send_json(self, data: Dict[str, Any]) -> None:
"""
@brief Send JSON command to the device (internal helper).
@param data Dictionary to send as JSON.
@throws RuntimeError If serial port is not open.
"""
if not self.serial or not self.serial.is_open:
raise RuntimeError("Serial port not open")
json_str = json.dumps(data) + "\n"
self.serial.write(json_str.encode('utf-8'))
self.serial.flush() # Ensure data is sent immediately
def _auto_detect_port(self) -> Optional[str]:
"""
@brief Attempt to auto-detect the ESP32 serial port.
@@ -244,11 +423,21 @@ if __name__ == "__main__":
print("Starting stream... (Ctrl+C to stop)")
print("-" * 50)
# Create and start stream
# Create stream
stream = RealSerialStream(port=port)
try:
# Connect with handshake
print("\nConnecting to device...")
device_info = stream.connect(timeout=5.0)
print(f"Connected! Device: {device_info.get('device', 'Unknown')}, "
f"Channels: {device_info.get('channels', '?')}")
print()
# Start streaming
print("Starting data stream...")
stream.start()
print()
sample_count = 0
@@ -256,8 +445,6 @@ if __name__ == "__main__":
line = stream.readline()
if line:
line = line.strip()
# Check if this is a data line (starts with digit = timestamp)
if line and line[0].isdigit():
sample_count += 1
@@ -267,7 +454,7 @@ if __name__ == "__main__":
print(f" [{sample_count:6d} samples] Latest: {line}")
else:
# Print startup/info messages from ESP32
# Print non-data messages
print(f" {line}")
except KeyboardInterrupt:
@@ -275,6 +462,14 @@ if __name__ == "__main__":
print("-" * 50)
print("Stopped by user (Ctrl+C)")
except TimeoutError as e:
print(f"\nConnection timeout: {e}")
except Exception as e:
print(f"\nError: {e}")
finally:
print("\nCleaning up...")
stream.stop()
stream.disconnect()
print(f"Total samples received: {sample_count}")