diff --git a/EMG_Arm/src/app/main.c b/EMG_Arm/src/app/main.c index 646fee7..3e6bb6f 100644 --- a/EMG_Arm/src/app/main.c +++ b/EMG_Arm/src/app/main.c @@ -2,16 +2,21 @@ * @file main.c * @brief Application entry point for the EMG-controlled robotic hand. * - * This is the top-level application that initializes all subsystems - * and runs the main loop. Currently configured to stream EMG data - * over USB serial for Python to receive. + * Implements a robust handshake protocol with the host computer: + * 1. Wait for "connect" command + * 2. Acknowledge connection + * 3. Wait for "start" command + * 4. Stream EMG data + * 5. Handle "stop" and "disconnect" commands * * @note This is Layer 4 (Application). */ #include +#include #include #include +#include #include "esp_timer.h" #include "config/config.h" @@ -20,29 +25,206 @@ #include "core/gestures.h" /******************************************************************************* - * Private Functions + * Constants + ******************************************************************************/ + +#define CMD_BUFFER_SIZE 128 +#define JSON_RESPONSE_SIZE 128 + +/******************************************************************************* + * Types ******************************************************************************/ /** - * @brief Stream EMG data over USB serial. + * @brief Device state machine. + */ +typedef enum { + STATE_IDLE = 0, /**< Waiting for connect command */ + STATE_CONNECTED, /**< Connected, waiting for start command */ + STATE_STREAMING, /**< Actively streaming EMG data */ +} device_state_t; + +/** + * @brief Commands from host. + */ +typedef enum { + CMD_NONE = 0, + CMD_CONNECT, + CMD_START, + CMD_STOP, + CMD_DISCONNECT, +} command_t; + +/******************************************************************************* + * Global State + ******************************************************************************/ + +static volatile device_state_t g_device_state = STATE_IDLE; +static QueueHandle_t g_cmd_queue = NULL; + +/******************************************************************************* + * Forward Declarations + ******************************************************************************/ + +static void send_ack_connect(void); + +/******************************************************************************* + * Command Parsing + ******************************************************************************/ + +/** + * @brief Parse incoming command from JSON. * - * Outputs data in format: "timestamp_ms,ch0,ch1,ch2,ch3\n" - * This matches what Python's SimulatedEMGStream produces. + * Expected format: {"cmd": "connect"} + * + * @param line Input line from serial + * @return Parsed command + */ +static command_t parse_command(const char* line) +{ + /* Simple JSON parsing - look for "cmd" field */ + const char* cmd_start = strstr(line, "\"cmd\""); + if (!cmd_start) { + return CMD_NONE; + } + + /* Find the value after "cmd": */ + const char* value_start = strchr(cmd_start, ':'); + if (!value_start) { + return CMD_NONE; + } + + /* Skip whitespace and opening quote */ + value_start++; + while (*value_start == ' ' || *value_start == '"') { + value_start++; + } + + /* Match command strings */ + if (strncmp(value_start, "connect", 7) == 0) { + return CMD_CONNECT; + } else if (strncmp(value_start, "start", 5) == 0) { + return CMD_START; + } else if (strncmp(value_start, "stop", 4) == 0) { + return CMD_STOP; + } else if (strncmp(value_start, "disconnect", 10) == 0) { + return CMD_DISCONNECT; + } + + return CMD_NONE; +} + +/******************************************************************************* + * Serial Input Task + ******************************************************************************/ + +/** + * @brief FreeRTOS task to read serial input and parse commands. + * + * This task runs continuously, reading lines from stdin (USB serial) + * and updating device state directly. This allows commands to interrupt + * streaming immediately via the volatile state variable. + * + * @param pvParameters Unused + */ +static void serial_input_task(void* pvParameters) +{ + char line_buffer[CMD_BUFFER_SIZE]; + int line_idx = 0; + + while (1) { + /* Read one character at a time */ + int c = getchar(); + + if (c == EOF || c == 0xFF) { + /* No data available, yield to other tasks */ + vTaskDelay(pdMS_TO_TICKS(10)); + continue; + } + + if (c == '\n' || c == '\r') { + /* End of line - process command */ + if (line_idx > 0) { + line_buffer[line_idx] = '\0'; + + command_t cmd = parse_command(line_buffer); + + if (cmd != CMD_NONE) { + /* Handle state transitions directly */ + /* This allows streaming loop to see state changes immediately */ + switch (g_device_state) { + case STATE_IDLE: + if (cmd == CMD_CONNECT) { + g_device_state = STATE_CONNECTED; + send_ack_connect(); + printf("[STATE] IDLE -> CONNECTED\n"); + } + break; + + case STATE_CONNECTED: + if (cmd == CMD_START) { + g_device_state = STATE_STREAMING; + printf("[STATE] CONNECTED -> STREAMING\n"); + /* Signal state machine to start streaming */ + xQueueSend(g_cmd_queue, &cmd, 0); + } else if (cmd == CMD_DISCONNECT) { + g_device_state = STATE_IDLE; + printf("[STATE] CONNECTED -> IDLE\n"); + } + break; + + case STATE_STREAMING: + if (cmd == CMD_STOP) { + g_device_state = STATE_CONNECTED; + printf("[STATE] STREAMING -> CONNECTED\n"); + /* Streaming loop will exit when it sees state change */ + } else if (cmd == CMD_DISCONNECT) { + g_device_state = STATE_IDLE; + printf("[STATE] STREAMING -> IDLE\n"); + /* Streaming loop will exit when it sees state change */ + } + break; + } + } + + line_idx = 0; + } + } else if (line_idx < CMD_BUFFER_SIZE - 1) { + /* Add character to buffer */ + line_buffer[line_idx++] = (char)c; + } else { + /* Buffer overflow - reset */ + line_idx = 0; + } + } +} + +/******************************************************************************* + * State Machine + ******************************************************************************/ + +/** + * @brief Send JSON acknowledgment for connection. + */ +static void send_ack_connect(void) +{ + printf("{\"status\":\"ack_connect\",\"device\":\"ESP32-EMG\",\"channels\":%d}\n", + EMG_NUM_CHANNELS); + fflush(stdout); +} + +/** + * @brief Stream EMG data continuously until stopped. + * + * This function blocks and streams data at the configured sample rate. + * Returns when state changes from STREAMING. */ static void stream_emg_data(void) { emg_sample_t sample; - - printf("\n[EMG] Starting data stream at %d Hz...\n", EMG_SAMPLE_RATE_HZ); - printf("[EMG] Format: timestamp_ms,ch0,ch1,ch2,ch3\n\n"); - - /* - * FreeRTOS tick rate is set to 1000 Hz in sdkconfig.defaults (1ms per tick). - * Delay of 1 tick = 1ms, giving us the full 1000 Hz sample rate. - */ const TickType_t delay_ticks = 1; /* 1 tick = 1ms at 1000 Hz tick rate */ - while (1) { + while (g_device_state == STATE_STREAMING) { /* Read EMG (fake or real depending on FEATURE_FAKE_EMG) */ emg_sensor_read(&sample); @@ -60,14 +242,30 @@ static void stream_emg_data(void) } /** - * @brief Run demo mode - cycle through gestures. + * @brief Main state machine loop. + * + * Monitors device state and starts streaming when requested. + * Serial input task handles all state transitions directly. */ -static void run_demo(void) +static void state_machine_loop(void) { - printf("\n[DEMO] Running gesture demo...\n"); + command_t cmd; + const TickType_t poll_interval = pdMS_TO_TICKS(50); while (1) { - gestures_demo_fist(1000); + /* Check if we should start streaming */ + if (g_device_state == STATE_STREAMING) { + /* Stream until state changes (via serial input task) */ + stream_emg_data(); + /* Returns when state is no longer STREAMING */ + } + + /* Wait for start command or just poll state */ + /* Timeout allows checking state even if queue is empty */ + xQueueReceive(g_cmd_queue, &cmd, poll_interval); + + /* Note: State transitions are handled by serial_input_task */ + /* This loop only triggers streaming when state becomes STREAMING */ } } @@ -80,7 +278,7 @@ void app_main(void) printf("\n"); printf("========================================\n"); printf(" Bucky Arm - EMG Robotic Hand\n"); - printf(" Firmware v1.0.0\n"); + printf(" Firmware v2.0.0 (Handshake Protocol)\n"); printf("========================================\n\n"); /* Initialize subsystems */ @@ -98,16 +296,26 @@ void app_main(void) printf("[INIT] Done!\n\n"); - /* - * Choose what to run: - * - stream_emg_data(): Send EMG data to laptop (Phase 1) - * - run_demo(): Test servo movement - * - * For now, we stream EMG data. - * Comment out and use run_demo() to test servos. - */ - stream_emg_data(); + /* Create command queue */ + g_cmd_queue = xQueueCreate(10, sizeof(command_t)); + if (g_cmd_queue == NULL) { + printf("[ERROR] Failed to create command queue!\n"); + return; + } - /* Alternative: run servo demo */ - // run_demo(); + /* Launch serial input task */ + xTaskCreate( + serial_input_task, + "serial_input", + 4096, /* Stack size */ + NULL, /* Parameters */ + 5, /* Priority */ + NULL /* Task handle */ + ); + + printf("[PROTOCOL] Waiting for host to connect...\n"); + printf("[PROTOCOL] Send: {\"cmd\": \"connect\"}\n\n"); + + /* Run main state machine */ + state_machine_loop(); } diff --git a/emg_gui.py b/emg_gui.py index aa019fd..5cb61ed 100644 --- a/emg_gui.py +++ b/emg_gui.py @@ -310,6 +310,7 @@ class CollectionPage(BasePage): # Collection state self.is_collecting = False + self.is_connected = False self.using_real_hardware = False self.stream = None self.parser = None @@ -377,12 +378,23 @@ class CollectionPage(BasePage): ) self.refresh_ports_btn.pack(side="left") - # Connection status indicator + # Connection status and button + connect_frame = ctk.CTkFrame(self.port_frame, fg_color="transparent") + connect_frame.pack(fill="x", pady=(5, 0)) + + self.connect_button = ctk.CTkButton( + connect_frame, text="Connect", + width=100, height=28, + command=self._toggle_connection, + state="disabled" # Disabled until "Real ESP32" selected + ) + self.connect_button.pack(side="left", padx=(0, 10)) + self.connection_status = ctk.CTkLabel( - self.port_frame, text="● Not connected", + connect_frame, text="● Disconnected", font=ctk.CTkFont(size=11), text_color="gray" ) - self.connection_status.pack(anchor="w", pady=(5, 0)) + self.connection_status.pack(side="left") # Gesture selection gesture_frame = ctk.CTkFrame(self.controls_panel, fg_color="transparent") @@ -529,7 +541,6 @@ class CollectionPage(BasePage): def start_collection(self): """Start data collection.""" - self.is_collecting = True # Get selected gestures gestures = [g for g, var in self.gesture_vars.items() if var.get()] if not gestures: @@ -540,18 +551,16 @@ class CollectionPage(BasePage): self.using_real_hardware = (self.source_var.get() == "real") if self.using_real_hardware: - # Real ESP32 serial stream - port = self._get_serial_port() + # Must be connected for real hardware + if not self.is_connected or not self.stream: + messagebox.showerror("Not Connected", "Please connect to the ESP32 first.") + return + + # Send start command to begin streaming try: - self.stream = RealSerialStream(port=port) - self._update_connection_status("orange", "Connecting...") + self.stream.start() except Exception as e: - error_msg = f"Failed to create serial stream:\n{e}" - if "PermissionError" in str(type(e).__name__) or "Permission denied" in str(e): - error_msg += "\n\nThe port may still be in use. Wait a few seconds and try again." - elif "FileNotFoundError" in str(type(e).__name__): - error_msg += f"\n\nPort '{port}' not found. Try refreshing the port list." - messagebox.showerror("Connection Error", error_msg) + messagebox.showerror("Start Error", f"Failed to start streaming:\n{e}") return else: # Simulated stream (gesture-aware for realistic testing) @@ -577,14 +586,19 @@ class CollectionPage(BasePage): self.collected_labels = [] self.sample_buffer = [] + # Mark as collecting + self.is_collecting = True + # Update UI self.start_button.configure(text="Stop Collection", fg_color="red") self.save_button.configure(state="disabled") self.status_label.configure(text="Starting...") - # Disable source selection during collection + # Disable source selection and connection during collection self.sim_radio.configure(state="disabled") self.real_radio.configure(state="disabled") + if self.using_real_hardware: + self.connect_button.configure(state="disabled") # Start collection thread self.collection_thread = threading.Thread(target=self.collection_loop, daemon=True) @@ -600,16 +614,16 @@ class CollectionPage(BasePage): # Safe cleanup - stream might already be in error state try: if self.stream: - self.stream.stop() - # Give OS time to release the port (important for macOS) if self.using_real_hardware: - time.sleep(0.5) + # Send stop command (returns to CONNECTED state) + self.stream.stop() + else: + # For simulated stream, just stop it + self.stream.stop() + self.stream = None except Exception: pass # Ignore cleanup errors - # Clear stream reference - self.stream = None - # Drain any pending messages from queue to prevent stale data try: while True: @@ -622,27 +636,25 @@ class CollectionPage(BasePage): self.prompt_label.configure(text="DONE", text_color="green") self.countdown_label.configure(text="") - # Re-enable source selection + # Re-enable source selection and connection button self.sim_radio.configure(state="normal") self.real_radio.configure(state="normal") - - # Update connection status if self.using_real_hardware: - self._update_connection_status("gray", "Disconnected") + self.connect_button.configure(state="normal") + # Still connected, just not streaming + if self.is_connected: + device_name = self.stream.device_info.get('device', 'ESP32') if self.stream and self.stream.device_info else 'ESP32' + self._update_connection_status("green", f"Connected ({device_name})") if self.collected_windows: self.save_button.configure(state="normal") def collection_loop(self): """Background collection loop.""" - # Try to start the stream (may fail for real hardware) - try: - self.stream.start() - if self.using_real_hardware: - self.data_queue.put(('connection_status', ('green', 'Connected'))) - except Exception as e: - self.data_queue.put(('error', f"Failed to connect: {e}")) - return + # Stream is already started (either via handshake for real HW or created for simulated) + # Just mark as ready + if self.using_real_hardware: + self.data_queue.put(('connection_status', ('green', 'Streaming'))) self.scheduler.start_session() @@ -861,9 +873,13 @@ class CollectionPage(BasePage): if self.source_var.get() == "real": self.port_frame.pack(fill="x", pady=(5, 0)) self._refresh_ports() + self.connect_button.configure(state="normal") + self.start_button.configure(state="disabled") # Must connect first else: self.port_frame.pack_forget() self._update_connection_status("gray", "Not using hardware") + self.connect_button.configure(state="disabled") + self.start_button.configure(state="normal") # Simulated mode doesn't need connect def _refresh_ports(self): """Scan and populate available serial ports.""" @@ -888,6 +904,91 @@ class CollectionPage(BasePage): """Update the connection status indicator.""" self.connection_status.configure(text=f"● {text}", text_color=color) + def _toggle_connection(self): + """Connect or disconnect from ESP32.""" + if self.is_connected: + self._disconnect_device() + else: + self._connect_device() + + def _connect_device(self): + """Connect to ESP32 with handshake.""" + port = self._get_serial_port() + + try: + # Update UI to show connecting + self._update_connection_status("orange", "Connecting...") + self.connect_button.configure(state="disabled") + self.update() # Force UI update + + # Create stream and connect + self.stream = RealSerialStream(port=port) + device_info = self.stream.connect(timeout=5.0) + + # Success! + self.is_connected = True + self._update_connection_status("green", f"Connected ({device_info.get('device', 'ESP32')})") + self.connect_button.configure(text="Disconnect", state="normal") + self.start_button.configure(state="normal") + + except TimeoutError as e: + messagebox.showerror( + "Connection Timeout", + f"Device did not respond within 5 seconds.\n\n" + f"Check that:\n" + f"• ESP32 is powered on\n" + f"• Correct firmware is flashed\n" + f"• USB cable is properly connected" + ) + self._update_connection_status("red", "Timeout") + self.connect_button.configure(state="normal") + if self.stream: + try: + self.stream.disconnect() + except: + pass + self.stream = None + + except Exception as e: + error_msg = f"Failed to connect:\n{e}" + if "Permission denied" in str(e) or "Resource busy" in str(e): + error_msg += "\n\nThe port may still be in use. Wait a few seconds and try again." + elif "FileNotFoundError" in str(type(e).__name__): + error_msg += f"\n\nPort not found. Try refreshing the port list." + + messagebox.showerror("Connection Error", error_msg) + self._update_connection_status("red", "Failed") + self.connect_button.configure(state="normal") + if self.stream: + try: + self.stream.disconnect() + except: + pass + self.stream = None + + def _disconnect_device(self): + """Disconnect from ESP32.""" + try: + if self.stream: + self.stream.disconnect() + # Give OS time to release the port + time.sleep(0.5) + + self.is_connected = False + self.stream = None + self._update_connection_status("gray", "Disconnected") + self.connect_button.configure(text="Connect") + self.start_button.configure(state="disabled") + + except Exception as e: + messagebox.showwarning("Disconnect Warning", f"Error during disconnect: {e}") + # Still mark as disconnected even if there was an error + self.is_connected = False + self.stream = None + self._update_connection_status("gray", "Disconnected") + self.connect_button.configure(text="Connect") + self.start_button.configure(state="disabled") + def on_hide(self): """Stop collection when leaving page.""" if self.is_collecting: @@ -1280,11 +1381,23 @@ class PredictionPage(BasePage): ) self.refresh_ports_btn.pack(side="left") + # Connection status and button + connect_frame = ctk.CTkFrame(self.port_frame, fg_color="transparent") + connect_frame.pack(fill="x", pady=(5, 0)) + + self.connect_button = ctk.CTkButton( + connect_frame, text="Connect", + width=100, height=28, + command=self._toggle_connection, + state="disabled" # Disabled until "Real ESP32" selected + ) + self.connect_button.pack(side="left", padx=(0, 10)) + self.connection_status = ctk.CTkLabel( - self.port_frame, text="● Not connected", + connect_frame, text="● Disconnected", font=ctk.CTkFont(size=11), text_color="gray" ) - self.connection_status.pack(anchor="w", pady=(5, 0)) + self.connection_status.pack(side="left") # Start button self.start_button = ctk.CTkButton( @@ -1346,6 +1459,7 @@ class PredictionPage(BasePage): # State self.is_predicting = False + self.is_connected = False self.using_real_hardware = False self.classifier = None self.smoother = None @@ -1392,6 +1506,19 @@ class PredictionPage(BasePage): # Determine data source self.using_real_hardware = (self.source_var.get() == "real") + # For real hardware, must be connected + if self.using_real_hardware: + if not self.is_connected or not self.stream: + messagebox.showerror("Not Connected", "Please connect to the ESP32 first.") + return + + # Send start command to begin streaming + try: + self.stream.start() + except Exception as e: + messagebox.showerror("Start Error", f"Failed to start streaming:\n{e}") + return + # Create prediction smoother self.smoother = PredictionSmoother( label_names=self.classifier.label_names, @@ -1403,9 +1530,11 @@ class PredictionPage(BasePage): self.is_predicting = True self.start_button.configure(text="Stop", fg_color="red") - # Disable source selection during prediction + # Disable source selection and connection during prediction self.sim_radio.configure(state="disabled") self.real_radio.configure(state="disabled") + if self.using_real_hardware: + self.connect_button.configure(state="disabled") # Start prediction thread thread = threading.Thread(target=self._prediction_thread, daemon=True) @@ -1421,10 +1550,13 @@ class PredictionPage(BasePage): # Safe cleanup - stream might already be in error state try: if self.stream: - self.stream.stop() - # Give OS time to release the port (important for macOS) if self.using_real_hardware: - time.sleep(0.5) + # Send stop command (returns to CONNECTED state) + self.stream.stop() + else: + # For simulated stream, just stop it + self.stream.stop() + self.stream = None except Exception: pass # Ignore cleanup errors @@ -1435,22 +1567,27 @@ class PredictionPage(BasePage): self.sim_label.configure(text="") self.raw_label.configure(text="", text_color="gray") - # Re-enable source selection + # Re-enable source selection and connection button self.sim_radio.configure(state="normal") self.real_radio.configure(state="normal") - - # Update connection status if self.using_real_hardware: - self._update_connection_status("gray", "Disconnected") + self.connect_button.configure(state="normal") + # Still connected, just not streaming + if self.is_connected: + device_name = self.stream.device_info.get('device', 'ESP32') if self.stream and self.stream.device_info else 'ESP32' + self._update_connection_status("green", f"Connected ({device_name})") def _on_source_change(self): """Show/hide port selection based on data source.""" if self.source_var.get() == "real": self.port_frame.pack(fill="x", pady=(5, 0)) self._refresh_ports() + self.connect_button.configure(state="normal") + # Start button will be enabled after connection else: self.port_frame.pack_forget() self._update_connection_status("gray", "Not using hardware") + self.connect_button.configure(state="disabled") def _refresh_ports(self): """Scan and populate available serial ports.""" @@ -1472,22 +1609,95 @@ class PredictionPage(BasePage): """Update the connection status indicator.""" self.connection_status.configure(text=f"● {text}", text_color=color) + def _toggle_connection(self): + """Connect or disconnect from ESP32.""" + if self.is_connected: + self._disconnect_device() + else: + self._connect_device() + + def _connect_device(self): + """Connect to ESP32 with handshake.""" + port = self._get_serial_port() + + try: + # Update UI to show connecting + self._update_connection_status("orange", "Connecting...") + self.connect_button.configure(state="disabled") + self.update() # Force UI update + + # Create stream and connect + self.stream = RealSerialStream(port=port) + device_info = self.stream.connect(timeout=5.0) + + # Success! + self.is_connected = True + self._update_connection_status("green", f"Connected ({device_info.get('device', 'ESP32')})") + self.connect_button.configure(text="Disconnect", state="normal") + + except TimeoutError as e: + messagebox.showerror( + "Connection Timeout", + f"Device did not respond within 5 seconds.\n\n" + f"Check that:\n" + f"• ESP32 is powered on\n" + f"• Correct firmware is flashed\n" + f"• USB cable is properly connected" + ) + self._update_connection_status("red", "Timeout") + self.connect_button.configure(state="normal") + if self.stream: + try: + self.stream.disconnect() + except: + pass + self.stream = None + + except Exception as e: + error_msg = f"Failed to connect:\n{e}" + if "Permission denied" in str(e) or "Resource busy" in str(e): + error_msg += "\n\nThe port may still be in use. Wait a few seconds and try again." + elif "FileNotFoundError" in str(type(e).__name__): + error_msg += f"\n\nPort not found. Try refreshing the port list." + + messagebox.showerror("Connection Error", error_msg) + self._update_connection_status("red", "Failed") + self.connect_button.configure(state="normal") + if self.stream: + try: + self.stream.disconnect() + except: + pass + self.stream = None + + def _disconnect_device(self): + """Disconnect from ESP32.""" + try: + if self.stream: + self.stream.disconnect() + # Give OS time to release the port + time.sleep(0.5) + + self.is_connected = False + self.stream = None + self._update_connection_status("gray", "Disconnected") + self.connect_button.configure(text="Connect") + + except Exception as e: + messagebox.showwarning("Disconnect Warning", f"Error during disconnect: {e}") + # Still mark as disconnected even if there was an error + self.is_connected = False + self.stream = None + self._update_connection_status("gray", "Disconnected") + self.connect_button.configure(text="Connect") + def _prediction_thread(self): """Background prediction thread.""" - # Create appropriate stream based on source selection - if self.using_real_hardware: - port = self._get_serial_port() - try: - self.stream = RealSerialStream(port=port) - except Exception as e: - error_msg = f"Failed to create serial stream: {e}" - if "Permission denied" in str(e) or "Resource busy" in str(e): - error_msg += "\n\nThe port may still be in use. Wait a moment and try again." - self.data_queue.put(('error', error_msg)) - return - else: + # For simulated mode, create new stream + if not self.using_real_hardware: self.stream = GestureAwareEMGStream(num_channels=NUM_CHANNELS, sample_rate=SAMPLING_RATE_HZ) + # Stream is already started (either via handshake for real HW or will be started for simulated) parser = EMGParser(num_channels=NUM_CHANNELS) windower = Windower(window_size_ms=WINDOW_SIZE_MS, sample_rate=SAMPLING_RATE_HZ, overlap=0.0) @@ -1498,16 +1708,18 @@ class PredictionPage(BasePage): gesture_start = time.perf_counter() current_gesture = gesture_cycle[0] - # Start the stream - try: - if hasattr(self.stream, 'set_gesture'): - self.stream.set_gesture(current_gesture) - self.stream.start() - - if self.using_real_hardware: - self.data_queue.put(('connection_status', ('green', 'Connected'))) - except Exception as e: - self.data_queue.put(('error', f"Failed to connect: {e}")) + # Start simulated stream if needed + if not self.using_real_hardware: + try: + if hasattr(self.stream, 'set_gesture'): + self.stream.set_gesture(current_gesture) + self.stream.start() + except Exception as e: + self.data_queue.put(('error', f"Failed to start simulated stream: {e}")) + return + else: + # Real hardware is already streaming + self.data_queue.put(('connection_status', ('green', 'Streaming'))) return while self.is_predicting: diff --git a/serial_stream.py b/serial_stream.py index 713dd63..08f9fa8 100644 --- a/serial_stream.py +++ b/serial_stream.py @@ -1,10 +1,10 @@ """ @file serial_stream.py -@brief Real serial stream for reading EMG data from ESP32. +@brief Real serial stream for reading EMG data from ESP32 with handshake protocol. This module provides a serial communication interface for receiving -EMG data from the ESP32 microcontroller over USB. It implements the -same interface as SimulatedEMGStream, making it a drop-in replacement. +EMG data from the ESP32 microcontroller over USB. It implements a +robust handshake protocol to ensure reliable connection before streaming. @section usage Usage Example @code{.py} @@ -12,17 +12,50 @@ same interface as SimulatedEMGStream, making it a drop-in replacement. # Create stream (auto-detects port, or specify manually) stream = RealSerialStream(port='COM3') + + # Connect with handshake (raises on timeout/failure) + stream.connect(timeout=5.0) + + # Start streaming stream.start() - # Read data (same interface as SimulatedEMGStream) + # Read data while True: line = stream.readline() if line: print(line) # "1234,512,489,501,523" + # Stop streaming (device returns to CONNECTED state) stream.stop() + + # Disconnect (device returns to IDLE state) + stream.disconnect() @endcode +@section protocol Handshake Protocol & State Machine + + ESP32 States: + IDLE - Waiting for "connect" command + CONNECTED - Handshake complete, waiting for "start" command + STREAMING - Actively sending CSV data + + State Transitions: + 1. connect() : IDLE → CONNECTED + App sends: {"cmd": "connect"} + Device responds: {"status": "ack_connect", "device": "ESP32-EMG", "channels": 4} + + 2. start() : CONNECTED → STREAMING + App sends: {"cmd": "start"} + Device starts streaming CSV data + + 3. stop() : STREAMING → CONNECTED + App sends: {"cmd": "stop"} + Device stops streaming, ready for new start command + + 4. disconnect() : ANY → IDLE + App sends: {"cmd": "disconnect"} + Device returns to idle, waiting for new connection + @section format Data Format The ESP32 sends data as CSV lines: "timestamp_ms,ch0,ch1,ch2,ch3\\n" @@ -35,20 +68,32 @@ same interface as SimulatedEMGStream, making it a drop-in replacement. import serial import serial.tools.list_ports -from typing import Optional, List +import json +import time +from typing import Optional, List, Dict, Any +from enum import Enum + + +class ConnectionState(Enum): + """Connection states for the serial stream.""" + DISCONNECTED = 0 # No serial connection + CONNECTING = 1 # Serial open, waiting for handshake + CONNECTED = 2 # Handshake complete, ready to stream + STREAMING = 3 # Actively streaming data class RealSerialStream: """ - @brief Reads EMG data from ESP32 over USB serial. + @brief Reads EMG data from ESP32 over USB serial with handshake protocol. - This class provides the same interface as SimulatedEMGStream: - - start() : Open serial connection - - stop() : Close serial connection - - readline() : Read one line of data + This class implements a robust connection protocol: + - connect() : Open serial port and perform handshake + - start() : Begin streaming data + - stop() : Stop streaming (device stays connected) + - disconnect() : Close connection (device returns to idle) + - readline() : Read one line of data - This allows it to be used as a drop-in replacement for testing - with real hardware instead of simulated data. + State machine ensures reliable communication without timing dependencies. @note Requires pyserial: pip install pyserial """ @@ -60,24 +105,34 @@ class RealSerialStream: @param port Serial port name (e.g., 'COM3' on Windows, '/dev/ttyUSB0' on Linux). If None, will attempt to auto-detect the ESP32. @param baud_rate Communication speed in bits per second. Default 115200 matches ESP32. - @param timeout Read timeout in seconds. Returns None if no data within this time. + @param timeout Read timeout in seconds for readline(). """ self.port = port self.baud_rate = baud_rate self.timeout = timeout self.serial: Optional[serial.Serial] = None - self.running = False + self.state = ConnectionState.DISCONNECTED + self.device_info: Optional[Dict[str, Any]] = None - def start(self) -> None: + def connect(self, timeout: float = 5.0) -> Dict[str, Any]: """ - @brief Open the serial connection to the ESP32. + @brief Connect to the ESP32 and perform handshake. - If no port was specified in __init__, attempts to auto-detect - the ESP32 by looking for common USB-UART chip identifiers. + Opens the serial port, sends connect command, and waits for + acknowledgment from the device. + + @param timeout Maximum time to wait for handshake response (seconds). + + @return Device info dict from handshake response. @throws RuntimeError If no port specified and auto-detect fails. @throws RuntimeError If unable to open the serial port. + @throws TimeoutError If device doesn't respond within timeout. + @throws ValueError If device sends invalid handshake response. """ + if self.state != ConnectionState.DISCONNECTED: + raise RuntimeError(f"Already in state {self.state.name}, cannot connect") + # Auto-detect port if not specified if self.port is None: self.port = self._auto_detect_port() @@ -95,61 +150,185 @@ class RealSerialStream: baudrate=self.baud_rate, timeout=self.timeout ) - self.running = True + self.state = ConnectionState.CONNECTING # Clear any stale data in the buffer self.serial.reset_input_buffer() + time.sleep(0.1) # Let device settle after port open - print(f"[SERIAL] Connected to {self.port} at {self.baud_rate} baud") + print(f"[SERIAL] Port opened: {self.port}") except serial.SerialException as e: + self.state = ConnectionState.DISCONNECTED error_msg = f"Failed to open {self.port}: {e}" - # Add helpful context for common errors if "Permission denied" in str(e) or "Resource busy" in str(e): - error_msg += "\n\nThe port may still be in use from a previous connection. Wait a moment and try again." + error_msg += "\n\nThe port may still be in use. Wait a moment and try again." raise RuntimeError(error_msg) + # Perform handshake + try: + # Send connect command + connect_cmd = {"cmd": "connect"} + self._send_json(connect_cmd) + print(f"[SERIAL] Sent: {connect_cmd}") + + # Wait for acknowledgment + start_time = time.time() + while (time.time() - start_time) < timeout: + line = self._readline_raw() + if line: + try: + response = json.loads(line) + if response.get("status") == "ack_connect": + self.device_info = response + self.state = ConnectionState.CONNECTED + print(f"[SERIAL] Handshake complete: {response}") + return response + except json.JSONDecodeError: + # Ignore non-JSON lines (might be startup messages) + print(f"[SERIAL] Ignoring: {line.strip()}") + continue + + # Timeout reached + self.state = ConnectionState.DISCONNECTED + if self.serial: + self.serial.close() + self.serial = None + raise TimeoutError( + f"Device did not respond to connection request within {timeout}s.\n" + "Check that the correct firmware is flashed and device is powered on." + ) + + except Exception as e: + # Clean up on any error + self.state = ConnectionState.DISCONNECTED + if self.serial: + try: + self.serial.close() + except: + pass + self.serial = None + raise + + def start(self) -> None: + """ + @brief Start streaming EMG data. + + Device must be in CONNECTED state. Sends start command to ESP32, + which begins streaming CSV data. + + @throws RuntimeError If not connected. + """ + if self.state != ConnectionState.CONNECTED: + raise RuntimeError( + f"Cannot start streaming from state {self.state.name}. " + "Must call connect() first." + ) + + # Send start command + start_cmd = {"cmd": "start"} + self._send_json(start_cmd) + self.state = ConnectionState.STREAMING + print(f"[SERIAL] Started streaming") + def stop(self) -> None: """ - @brief Close the serial connection. + @brief Stop streaming EMG data. - Safe to call even if not connected. + Sends stop command to ESP32, which stops streaming and returns + to CONNECTED state. Connection remains open for restart. + + Safe to call even if not streaming. """ - self.running = False + if self.state == ConnectionState.STREAMING: + try: + stop_cmd = {"cmd": "stop"} + self._send_json(stop_cmd) + self.state = ConnectionState.CONNECTED + print(f"[SERIAL] Stopped streaming") + except Exception as e: + print(f"[SERIAL] Warning during stop: {e}") + def disconnect(self) -> None: + """ + @brief Disconnect from the ESP32. + + Sends disconnect command (device returns to IDLE state), + then closes the serial port. Safe to call from any state. + """ + # Send disconnect command if connected + if self.state in (ConnectionState.CONNECTED, ConnectionState.STREAMING): + try: + disconnect_cmd = {"cmd": "disconnect"} + self._send_json(disconnect_cmd) + time.sleep(0.1) # Give device time to process + print(f"[SERIAL] Sent disconnect command") + except Exception as e: + print(f"[SERIAL] Warning sending disconnect: {e}") + + # Close serial port if self.serial: try: if self.serial.is_open: self.serial.close() - print(f"[SERIAL] Disconnected from {self.port}") + print(f"[SERIAL] Port closed: {self.port}") except Exception as e: - print(f"[SERIAL] Warning during disconnect: {e}") + print(f"[SERIAL] Warning during port close: {e}") finally: self.serial = None + self.state = ConnectionState.DISCONNECTED + self.device_info = None + def readline(self) -> Optional[str]: """ - @brief Read one line of data from the ESP32. + @brief Read one line of CSV data from the ESP32. + Should only be called when in STREAMING state. Blocks until a complete line is received or timeout occurs. - This matches the interface of SimulatedEMGStream.readline(). @return Line string including newline, or None if timeout/error. @note Lines from ESP32 are in format: "timestamp_ms,ch0,ch1,ch2,ch3\\n" """ + if self.state != ConnectionState.STREAMING: + return None + + return self._readline_raw() + + def _readline_raw(self) -> Optional[str]: + """ + @brief Read one line from serial port (internal helper). + + @return Decoded line string, or None if timeout/error. + """ if not self.serial or not self.serial.is_open: return None try: line_bytes = self.serial.readline() if line_bytes: - return line_bytes.decode('utf-8', errors='ignore') + return line_bytes.decode('utf-8', errors='ignore').strip() return None except serial.SerialException: return None + def _send_json(self, data: Dict[str, Any]) -> None: + """ + @brief Send JSON command to the device (internal helper). + + @param data Dictionary to send as JSON. + + @throws RuntimeError If serial port is not open. + """ + if not self.serial or not self.serial.is_open: + raise RuntimeError("Serial port not open") + + json_str = json.dumps(data) + "\n" + self.serial.write(json_str.encode('utf-8')) + self.serial.flush() # Ensure data is sent immediately + def _auto_detect_port(self) -> Optional[str]: """ @brief Attempt to auto-detect the ESP32 serial port. @@ -244,11 +423,21 @@ if __name__ == "__main__": print("Starting stream... (Ctrl+C to stop)") print("-" * 50) - # Create and start stream + # Create stream stream = RealSerialStream(port=port) try: + # Connect with handshake + print("\nConnecting to device...") + device_info = stream.connect(timeout=5.0) + print(f"Connected! Device: {device_info.get('device', 'Unknown')}, " + f"Channels: {device_info.get('channels', '?')}") + print() + + # Start streaming + print("Starting data stream...") stream.start() + print() sample_count = 0 @@ -256,8 +445,6 @@ if __name__ == "__main__": line = stream.readline() if line: - line = line.strip() - # Check if this is a data line (starts with digit = timestamp) if line and line[0].isdigit(): sample_count += 1 @@ -267,7 +454,7 @@ if __name__ == "__main__": print(f" [{sample_count:6d} samples] Latest: {line}") else: - # Print startup/info messages from ESP32 + # Print non-data messages print(f" {line}") except KeyboardInterrupt: @@ -275,6 +462,14 @@ if __name__ == "__main__": print("-" * 50) print("Stopped by user (Ctrl+C)") + except TimeoutError as e: + print(f"\nConnection timeout: {e}") + + except Exception as e: + print(f"\nError: {e}") + finally: + print("\nCleaning up...") stream.stop() + stream.disconnect() print(f"Total samples received: {sample_count}")