Files
EMG_Arm/BUCKY_ARM_MASTER_PLAN.md

2737 lines
112 KiB
Markdown
Raw Permalink Normal View History

# Bucky Arm — EMG Gesture Control: Master Implementation Reference
> Version: 2026-03-01 | Target: ESP32-S3 N32R16V (Xtensa LX7 @ 240 MHz, 512 KB SRAM, 16 MB OPI PSRAM)
> Supersedes: META_EMG_RESEARCH_NOTES.md + BUCKY_ARM_IMPROVEMENT_PLAN.md
> Source paper: doi:10.1038/s41586-025-09255-w (PDF: C:/VSCode/Marvel_Projects/s41586-025-09255-w.pdf)
---
## TABLE OF CONTENTS
- [PART 0 — SYSTEM ARCHITECTURE & RESPONSIBILITY ASSIGNMENT](#part-0--system-architecture--responsibility-assignment)
- [0.1 Who Does What](#01-who-does-what)
- [0.2 Operating Modes](#02-operating-modes)
- [0.3 FSM Reference (EMG_MAIN mode)](#03-fsm-reference-emg_main-mode)
- [0.4 EMG_STANDALONE Boot Sequence](#04-emg_standalone-boot-sequence)
- [0.5 New Firmware Changes for Architecture](#05-new-firmware-changes-for-architecture)
- [0.6 New Python Script: live_predict.py](#06-new-python-script-live_predictpy)
- [0.7 Firmware Cleanup: system_mode_t Removal](#07-firmware-cleanup-system_mode_t-removal)
- [PART I — SYSTEM FOUNDATIONS](#part-i--system-foundations)
- [1. Hardware Specification](#1-hardware-specification)
- [2. Current System Snapshot](#2-current-system-snapshot)
- [2.1 Confirmed Firmware Architecture](#21--confirmed-firmware-architecture-from-codebase-exploration)
- [2.2 Bicep Channel Subsystem](#22--bicep-channel-subsystem-ch3--adc_channel_9--gpio-10)
- [3. What Meta Built — Filtered for ESP32](#3-what-meta-built--filtered-for-esp32)
- [4. Current Code State + Known Bugs](#4-current-code-state--known-bugs)
- [PART II — TARGET ARCHITECTURE](#part-ii--target-architecture)
- [5. Full Recommended Multi-Model Stack](#5-full-recommended-multi-model-stack)
- [6. Compute Budget for Full Stack](#6-compute-budget-for-full-stack)
- [7. Why This Architecture Works for 3-Channel EMG](#7-why-this-architecture-works-for-3-channel-emg)
- [PART III — GESTURE EXTENSIBILITY](#part-iii--gesture-extensibility)
- [8. What Changes When Adding or Removing a Gesture](#8-what-changes-when-adding-or-removing-a-gesture)
- [9. Practical Limits of 3-Channel EMG](#9-practical-limits-of-3-channel-emg)
- [10. Specific Gesture Considerations](#10-specific-gesture-considerations)
- [PART IV — CHANGE REFERENCE](#part-iv--change-reference)
- [11. Change Classification Matrix](#11-change-classification-matrix)
- [PART V — FIRMWARE CHANGES](#part-v--firmware-changes)
- [Change A — DMA-Driven ADC Sampling](#change-a--dma-driven-adc-sampling)
- [Change B — IIR Biquad Bandpass Filter](#change-b--iir-biquad-bandpass-filter)
- [Change C — Confidence Rejection](#change-c--confidence-rejection)
- [Change D — On-Device NVS Calibration](#change-d--on-device-nvs-calibration)
- [Change E — int8 MLP via TFLM](#change-e--int8-mlp-via-tflm)
- [Change F — Ensemble Inference Pipeline](#change-f--ensemble-inference-pipeline)
- [PART VI — PYTHON/TRAINING CHANGES](#part-vi--pythontraining-changes)
- [Change 0 — Forward Label Shift](#change-0--forward-label-shift)
- [Change 1 — Expanded Feature Set](#change-1--expanded-feature-set)
- [Change 2 — Electrode Repositioning](#change-2--electrode-repositioning)
- [Change 3 — Data Augmentation](#change-3--data-augmentation)
- [Change 4 — Reinhard Compression](#change-4--reinhard-compression)
- [Change 5 — Classifier Benchmark](#change-5--classifier-benchmark)
- [Change 6 — Simplified MPF Features](#change-6--simplified-mpf-features)
- [Change 7 — Ensemble Training](#change-7--ensemble-training)
- [PART VII — FEATURE SELECTION FOR ESP32 PORTING](#part-vii--feature-selection-for-esp32-porting)
- [PART VIII — MEASUREMENT AND VALIDATION](#part-viii--measurement-and-validation)
- [PART IX — EXPORT WORKFLOW](#part-ix--export-workflow)
- [PART X — REFERENCES](#part-x--references)
---
# PART 0 — SYSTEM ARCHITECTURE & RESPONSIBILITY ASSIGNMENT
> This section is the authoritative reference for what runs where. All implementation
> decisions in later parts should be consistent with this partition.
## 0.1 Who Does What
| Responsibility | Laptop (Python) | ESP32 |
|----------------|-----------------|-------|
| EMG sensor reading | — | ✓ `emg_sensor_read()` always |
| Raw data streaming (for collection) | Receives CSV, saves to HDF5 | Streams CSV over UART |
| Model training | ✓ `learning_data_collection.py` | — |
| Model export | ✓ `export_to_header()``model_weights.h` | Compiled into firmware |
| On-device inference | — | ✓ `inference_predict()` |
| Laptop-side live inference | ✓ `live_predict.py` (new script) | Streams ADC + executes received cmd |
| Arm actuation | — (sends gesture string back to ESP32) | ✓ `gestures_execute()` |
| Autonomous operation (no laptop) | Not needed | ✓ `EMG_STANDALONE` mode |
| Bicep flex detection | — | ✓ `bicep_detect()` (new, Section 2.2) |
| NVS calibration | — | ✓ `calibration.c` (Change D) |
**Key rule**: The laptop is never required for real-time arm control in production.
The laptop's role is: collect data → train model → export → flash firmware → done.
After that, the ESP32 operates completely independently.
---
## 0.2 Operating Modes
Controlled by `#define MAIN_MODE` in `config/config.h`.
The enum currently reads `enum {EMG_MAIN, SERVO_CALIBRATOR, GESTURE_TESTER}`.
A new value `EMG_STANDALONE` must be added.
| `MAIN_MODE` | When to use | Laptop required? | Entry point |
|-------------|-------------|-----------------|-------------|
| `EMG_MAIN` | Development sessions, data collection, monitored operation | Yes — UART handshake to start any mode | `appConnector()` in `main.c` |
| `EMG_STANDALONE` | **Fully autonomous deployment** — no laptop | **No** — boots directly into predict+control | `run_standalone_loop()` (new function in `main.c`) |
| `SERVO_CALIBRATOR` | Hardware setup, testing servo range of motion | Yes (serial input) | Inline in `app_main()` |
| `GESTURE_TESTER` | Testing gesture→servo mapping via keyboard | Yes (serial input) | Inline in `app_main()` |
**How to switch mode**: change `#define MAIN_MODE` in `config.h` and reflash.
**To add `EMG_STANDALONE` to `config.h`** (1-line change):
```c
// config.h line 19 — current:
enum {EMG_MAIN, SERVO_CALIBRATOR, GESTURE_TESTER};
// Update to:
enum {EMG_MAIN, SERVO_CALIBRATOR, GESTURE_TESTER, EMG_STANDALONE};
```
---
## 0.3 FSM Reference (EMG_MAIN mode)
The `device_state_t` enum in `main.c` and the `command_t` enum control all transitions.
Currently: `{STATE_IDLE, STATE_CONNECTED, STATE_STREAMING, STATE_PREDICTING}`.
A new state `STATE_LAPTOP_PREDICT` must be added (see Section 0.5).
```
STATE_IDLE
└─ {"cmd":"connect"} ──────────────────────────► STATE_CONNECTED
{"cmd":"start"} ──────────┤
│ STATE_STREAMING
│ ESP32 sends raw ADC CSV at 1kHz
│ Laptop: saves to HDF5 (data collection)
│ Laptop: trains model → exports model_weights.h
│ ◄──── {"cmd":"stop"} ────────────────────┘
{"cmd":"start_predict"} ─────────┤
│ STATE_PREDICTING
│ ESP32: inference_predict() on-device
│ ESP32: gestures_execute()
│ Laptop: optional UART monitor only
│ ◄──── {"cmd":"stop"} ────────────────────┘
{"cmd":"start_laptop_predict"} ───────┘
STATE_LAPTOP_PREDICT [NEW]
ESP32: streams raw ADC CSV (same as STREAMING)
Laptop: runs live_predict.py inference
Laptop: sends {"gesture":"fist"} back
ESP32: executes received gesture command
◄──── {"cmd":"stop"} ────────────────────┘
All active states:
{"cmd":"stop"} → STATE_CONNECTED
{"cmd":"disconnect"} → STATE_IDLE
{"cmd":"connect"} → STATE_CONNECTED (from any state — reconnect)
```
**Convenience table of commands and their effects:**
| JSON command | Valid from state | Result |
|---|---|---|
| `{"cmd":"connect"}` | Any | → `STATE_CONNECTED` |
| `{"cmd":"start"}` | `STATE_CONNECTED` | → `STATE_STREAMING` |
| `{"cmd":"start_predict"}` | `STATE_CONNECTED` | → `STATE_PREDICTING` |
| `{"cmd":"start_laptop_predict"}` | `STATE_CONNECTED` | → `STATE_LAPTOP_PREDICT` (new) |
| `{"cmd":"stop"}` | `STREAMING/PREDICTING/LAPTOP_PREDICT` | → `STATE_CONNECTED` |
| `{"cmd":"disconnect"}` | Any active state | → `STATE_IDLE` |
---
## 0.4 EMG_STANDALONE Boot Sequence
No UART handshake. No laptop required. Powers on → predicts → controls arm.
```
app_main() switch MAIN_MODE == EMG_STANDALONE:
├── hand_init() // servos
├── emg_sensor_init() // ADC setup
├── inference_init() // clear window buffer, reset smoothing state
├── calibration_init() // load NVS z-score params (Change D)
│ └── if not found in NVS:
│ collect 120 REST windows (~3s at 25ms hop)
│ call calibration_update() to compute and store stats
├── bicep_load_threshold() // load NVS bicep threshold (Section 2.2)
│ └── if not found:
│ collect 3s of still bicep data
│ call bicep_calibrate() and bicep_save_threshold()
└── run_standalone_loop() ← NEW function (added to main.c)
while (1):
emg_sensor_read(&sample)
inference_add_sample(sample.channels)
if stride_counter++ >= INFERENCE_HOP_SIZE:
stride_counter = 0
gesture_t g = inference_get_gesture_enum(inference_predict(&conf))
gestures_execute(g)
bicep_state_t b = bicep_detect()
// (future: bicep_actuate(b))
vTaskDelay(1)
```
`run_standalone_loop()` is structurally identical to `run_inference_loop()` in `EMG_MAIN`,
minus all UART state-change checking and telemetry prints. It runs forever until power-off.
**Where to add**: New function `run_standalone_loop()` in `app/main.c`, plus a new case
in the `app_main()` switch block:
```c
case EMG_STANDALONE:
run_standalone_loop();
break;
```
---
## 0.5 New Firmware Changes for Architecture
These changes are needed to implement the architecture above. They are **structural**
(not accuracy improvements) and should be done before any other changes.
### S1 — Add `EMG_STANDALONE` to `config.h`
**File**: `EMG_Arm/src/config/config.h`, line 19
```c
// Change:
enum {EMG_MAIN, SERVO_CALIBRATOR, GESTURE_TESTER};
// To:
enum {EMG_MAIN, SERVO_CALIBRATOR, GESTURE_TESTER, EMG_STANDALONE};
```
### S2 — Add `STATE_LAPTOP_PREDICT` to FSM (`main.c`)
**File**: `EMG_Arm/src/app/main.c`
```c
// In device_state_t enum — add new state:
typedef enum {
STATE_IDLE = 0,
STATE_CONNECTED,
STATE_STREAMING,
STATE_PREDICTING,
STATE_LAPTOP_PREDICT, // ← ADD: streams ADC to laptop, executes laptop's gesture commands
} device_state_t;
// In command_t enum — add new command:
typedef enum {
CMD_NONE = 0,
CMD_CONNECT,
CMD_START,
CMD_START_PREDICT,
CMD_START_LAPTOP_PREDICT, // ← ADD
CMD_STOP,
CMD_DISCONNECT,
} command_t;
```
**In `parse_command()`** — add detection (place BEFORE the `"start"` check to avoid prefix collision):
```c
} else if (strncmp(value_start, "start_laptop_predict", 20) == 0) {
return CMD_START_LAPTOP_PREDICT;
} else if (strncmp(value_start, "start_predict", 13) == 0) {
return CMD_START_PREDICT;
} else if (strncmp(value_start, "start", 5) == 0) {
return CMD_START;
```
**In `serial_input_task()` FSM switch** — add to `STATE_CONNECTED` block:
```c
} else if (cmd == CMD_START_LAPTOP_PREDICT) {
g_device_state = STATE_LAPTOP_PREDICT;
printf("[STATE] CONNECTED -> LAPTOP_PREDICT\n");
xQueueSend(g_cmd_queue, &cmd, 0);
}
```
**Add to the active-state check** in `serial_input_task()`:
```c
case STATE_STREAMING:
case STATE_PREDICTING:
case STATE_LAPTOP_PREDICT: // ← ADD to the case list
if (cmd == CMD_STOP) { ... }
```
**New function `run_laptop_predict_loop()`** (add alongside `stream_emg_data()` and `run_inference_loop()`):
```c
/**
* @brief Laptop-mediated prediction loop (STATE_LAPTOP_PREDICT).
*
* Streams raw ADC CSV to laptop for inference.
* Simultaneously reads gesture commands sent back by laptop.
* Executes received gesture immediately.
*
* Laptop sends: {"gesture":"fist"}\n OR {"gesture":"rest"}\n etc.
* ESP32 parses the "gesture" field and calls inference_get_gesture_enum() + gestures_execute().
*/
static void run_laptop_predict_loop(void) {
emg_sample_t sample;
char cmd_buf[64];
int cmd_idx = 0;
printf("{\"status\":\"info\",\"msg\":\"Laptop-predict mode started\"}\n");
while (g_device_state == STATE_LAPTOP_PREDICT) {
// 1. Send raw ADC sample (same format as STATE_STREAMING)
emg_sensor_read(&sample);
printf("%u,%u,%u,%u\n", sample.channels[0], sample.channels[1],
sample.channels[2], sample.channels[3]);
// 2. Non-blocking read of any incoming gesture command from laptop
// (serial_input_task already handles FSM commands; this handles gesture commands)
// Note: getchar() is non-blocking when there is no data (returns EOF).
// Gesture messages from laptop look like: {"gesture":"fist"}\n
int c = getchar();
if (c != EOF && c != 0xFF) {
if (c == '\n' || c == '\r') {
if (cmd_idx > 0) {
cmd_buf[cmd_idx] = '\0';
// Parse {"gesture":"<name>"} — look for "gesture" field
const char *g = strstr(cmd_buf, "\"gesture\"");
if (g) {
const char *v = strchr(g, ':');
if (v) {
v++;
while (*v == ' ' || *v == '"') v++;
// Extract gesture name up to closing quote
char name[32] = {0};
int ni = 0;
while (*v && *v != '"' && ni < 31) name[ni++] = *v++;
name[ni] = '\0';
// Map name to enum and execute (reuse inference mapping)
gesture_t gesture = (gesture_t)inference_get_gesture_enum_by_name(name);
if (gesture != GESTURE_NONE) {
gestures_execute(gesture);
}
}
}
cmd_idx = 0;
}
} else if (cmd_idx < (int)sizeof(cmd_buf) - 1) {
cmd_buf[cmd_idx++] = (char)c;
} else {
cmd_idx = 0;
}
}
vTaskDelay(1);
}
}
```
**Note**: `inference_get_gesture_enum_by_name(const char *name)` is just the existing
`inference_get_gesture_enum(int class_idx)` refactored to accept a string directly
(bypassing the class_idx lookup). Alternatively, keep the existing function and add a
simple wrapper — the string matching logic already exists in `inference.c`:
```c
// Simpler: reuse the existing strcmp chain in inference_get_gesture_enum()
// by passing the name through a helper that returns the gesture_t directly.
// Add to inference.c / inference.h:
gesture_t inference_get_gesture_by_name(const char *name);
// (same strcmp logic as inference_get_gesture_enum, but returns gesture_t directly)
```
**In `state_machine_loop()`** — add the new state:
```c
static void state_machine_loop(void) {
command_t cmd;
const TickType_t poll_interval = pdMS_TO_TICKS(50);
while (1) {
if (g_device_state == STATE_STREAMING) stream_emg_data();
else if (g_device_state == STATE_PREDICTING) run_inference_loop();
else if (g_device_state == STATE_LAPTOP_PREDICT) run_laptop_predict_loop(); // ← ADD
xQueueReceive(g_cmd_queue, &cmd, poll_interval);
}
}
```
**In `app_main()` switch** — add the standalone case:
```c
case EMG_STANDALONE:
run_standalone_loop(); // new function — see Section 0.4
break;
```
---
## 0.6 New Python Script: `live_predict.py`
**Location**: `C:/VSCode/Marvel_Projects/Bucky_Arm/live_predict.py` (new file)
**Purpose**: Laptop-side live inference. Reads raw ADC stream from ESP32, runs the Python
classifier, sends gesture commands back to ESP32 for arm control.
**When to use**: `EMG_MAIN` + `STATE_LAPTOP_PREDICT` — useful for debugging and comparing
laptop accuracy vs on-device accuracy before flashing a new model.
```python
"""
live_predict.py — Laptop-side live EMG inference for Bucky Arm.
Connects to ESP32, requests STATE_LAPTOP_PREDICT, reads raw ADC CSV,
runs the trained Python classifier, sends gesture commands back to ESP32.
Usage:
python live_predict.py --port COM3 --model path/to/saved_model/
"""
import argparse
import time
import numpy as np
import serial
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).parent))
from learning_data_collection import (
EMGClassifier, EMGFeatureExtractor, SessionStorage, HAND_CHANNELS,
WINDOW_SIZE_SAMPLES, HOP_SIZE_SAMPLES, NUM_CHANNELS,
)
BAUD_RATE = 921600
CALIB_SEC = 3.0 # seconds of REST to collect for normalization at startup
CALIB_LABEL = "rest" # label used during calibration window
def parse_args():
p = argparse.ArgumentParser()
p.add_argument("--port", required=True, help="Serial port, e.g. COM3 or /dev/ttyUSB0")
p.add_argument("--model", required=True, help="Path to saved EMGClassifier model directory")
return p.parse_args()
def handshake(ser):
"""Send connect command, wait for ack."""
ser.write(b'{"cmd":"connect"}\n')
deadline = time.time() + 5.0
while time.time() < deadline:
line = ser.readline().decode("utf-8", errors="ignore").strip()
if "ack_connect" in line:
print(f"[Handshake] Connected: {line}")
return True
raise RuntimeError("No ack_connect received within 5s")
def collect_calibration_windows(ser, n_windows, window_size, hop_size, n_channels):
"""Collect n_windows worth of REST data for normalization calibration."""
print(f"[Calib] Collecting {n_windows} REST windows — hold arm still...")
raw_buffer = np.zeros((window_size, n_channels), dtype=np.float32)
windows = []
sample_count = 0
while len(windows) < n_windows:
line = ser.readline().decode("utf-8", errors="ignore").strip()
try:
vals = [float(v) for v in line.split(",")]
if len(vals) != n_channels:
continue
except ValueError:
continue
raw_buffer = np.roll(raw_buffer, -1, axis=0)
raw_buffer[-1] = vals
sample_count += 1
if sample_count >= window_size and sample_count % hop_size == 0:
windows.append(raw_buffer.copy())
print(f"[Calib] Collected {len(windows)} windows. Computing normalization stats...")
return np.array(windows) # (n_windows, window_size, n_channels)
def main():
args = parse_args()
# Load trained classifier
print(f"[Init] Loading classifier from {args.model}...")
classifier = EMGClassifier()
classifier.load(Path(args.model))
extractor = classifier.feature_extractor
ser = serial.Serial(args.port, BAUD_RATE, timeout=1.0)
time.sleep(0.5)
ser.reset_input_buffer()
handshake(ser)
# Request laptop-predict mode
ser.write(b'{"cmd":"start_laptop_predict"}\n')
print("[Control] Entered STATE_LAPTOP_PREDICT")
# Calibration: collect 3s of REST for session normalization
n_calib_windows = max(10, int(CALIB_SEC * 1000 / (HOP_SIZE_SAMPLES)))
calib_raw = collect_calibration_windows(
ser, n_calib_windows, WINDOW_SIZE_SAMPLES, HOP_SIZE_SAMPLES, NUM_CHANNELS
)
calib_features = extractor.extract_features_batch(calib_raw)
calib_mean = calib_features.mean(axis=0)
calib_std = np.where(calib_features.std(axis=0) > 1e-6,
calib_features.std(axis=0), 1e-6)
print("[Calib] Done. Starting live prediction...")
# Live prediction loop
raw_buffer = np.zeros((WINDOW_SIZE_SAMPLES, NUM_CHANNELS), dtype=np.float32)
sample_count = 0
last_gesture = None
try:
while True:
line = ser.readline().decode("utf-8", errors="ignore").strip()
# Skip JSON telemetry lines from ESP32
if line.startswith("{"):
continue
try:
vals = [float(v) for v in line.split(",")]
if len(vals) != NUM_CHANNELS:
continue
except ValueError:
continue
# Slide window
raw_buffer = np.roll(raw_buffer, -1, axis=0)
raw_buffer[-1] = vals
sample_count += 1
if sample_count >= WINDOW_SIZE_SAMPLES and sample_count % HOP_SIZE_SAMPLES == 0:
# Extract features and normalize with session stats
feat = extractor.extract_features_window(raw_buffer)
feat = (feat - calib_mean) / calib_std
proba = classifier.model.predict_proba([feat])[0]
class_idx = int(np.argmax(proba))
gesture_name = classifier.label_names[class_idx]
confidence = float(proba[class_idx])
# Send gesture command to ESP32
cmd = f'{{"gesture":"{gesture_name}"}}\n'
ser.write(cmd.encode("utf-8"))
if gesture_name != last_gesture:
print(f"[Predict] {gesture_name:12s} conf={confidence:.2f}")
last_gesture = gesture_name
except KeyboardInterrupt:
print("\n[Stop] Sending stop command...")
ser.write(b'{"cmd":"stop"}\n')
ser.close()
if __name__ == "__main__":
main()
```
**Dependencies** (add to a `requirements.txt` in `Bucky_Arm/` if not already there):
```
pyserial
numpy
scikit-learn
```
---
## 0.7 Firmware Cleanup: `system_mode_t` Removal
`config.h` lines 94100 define a `system_mode_t` typedef that is **not referenced anywhere**
in the firmware. It predates the current `device_state_t` FSM in `main.c` and conflicts
conceptually with it. Remove before starting implementation work.
**File**: `EMG_Arm/src/config/config.h`
**Remove** (lines 93100):
```c
/**
* @brief System operating modes.
*/
typedef enum {
MODE_IDLE = 0, /**< Waiting for commands */
MODE_DATA_STREAM, /**< Streaming EMG data to laptop */
MODE_COMMAND, /**< Executing gesture commands from laptop */
MODE_DEMO, /**< Running demo sequence */
MODE_COUNT
} system_mode_t;
```
No other file references `system_mode_t` — the deletion is safe and requires no other changes.
---
# PART I — SYSTEM FOUNDATIONS
## 1. Hardware Specification
### ESP32-S3 N32R16V — Confirmed Hardware
| Resource | Spec | Implication |
|----------|------|-------------|
| CPU | Dual-core Xtensa LX7 @ 240 MHz | Pin inference to Core 1, sampling to Core 0 |
| SIMD | PIE 128-bit vector extension | esp-dsp exploits this for FFT, biquad, dot-product |
| Internal SRAM | ~512 KB | All hot-path buffers, model weights, inference state |
| OPI PSRAM | 16 MB (~80 MB/s) | ADC ring buffer, raw window storage — not hot path |
| Flash | 32 MB | Code + read-only model flatbuffers (TFLM path) |
| ADC | 2× SAR ADC, 12-bit, continuous DMA mode | Change A: use `adc_continuous` driver |
**Memory rules**:
- Tag inference code: `IRAM_ATTR` — prevents cache miss stalls
- Tag large ring buffers: `EXT_RAM_BSS_ATTR` — pushes to PSRAM automatically
- Never run hot-path loops from PSRAM (latency varies; ~10× slower than SRAM)
### Espressif Acceleration Libraries
| Library | Accelerates | Key Functions |
|---------|-------------|---------------|
| **esp-dsp** | IIR biquad, FFT (up to 4096-pt), vector dot-product, matrix ops — PIE SIMD | `dsps_biquad_f32`, `dsps_fft2r_fc32`, `dsps_dotprod_f32` |
| **esp-nn** | int8 FC, depthwise/pointwise Conv, activations — SIMD optimized | Used internally by esp-dl |
| **esp-dl** | High-level int8 inference: MLP, Conv1D, LSTM; activation buffer management | Small MLP / tiny CNN deployment |
| **TFLite Micro** | Standard int8 flatbuffer inference, tensor arena (static alloc) | Keras → TFLite → int8 workflow |
### Real-Time Budget (1000 Hz, 25ms hop)
| Stage | Cost | Notes |
|-------|------|-------|
| ADC DMA sampling | ~0 µs | Hardware; CPU-free |
| IIR biquad (3 ch, 2 stages) | <100 µs | `dsps_biquad_f32` |
| Feature extraction (69 feat) | ~1,200 µs | FFT-based features dominate |
| 3 specialist LDAs | ~150 µs | `dsps_dotprod_f32` per class |
| Meta-LDA (15 inputs) | ~10 µs | 75 MACs total |
| int8 MLP fallback [69→32→16→5] | ~250 µs | esp-nn FC kernels |
| Post-processing | <50 µs | EMA, vote, debounce |
| **Total (full ensemble)** | **~1,760 µs** | **14× margin within 25ms** |
### Hard No-Gos
| Technique | Why |
|-----------|-----|
| Full MPF with matrix logarithm | Eigendecomposition per window; fragile float32; no SIMD path |
| Conv1D(16→512) + 3×LSTM(512) | ~4 MB weights; LSTM sequential dependency — impossible |
| Any transformer / attention | O(n²); no int8 transformer kernels for MCU |
| On-device gradient updates | Inference only — no training infrastructure |
| Heap allocations on hot path | FreeRTOS heap fragmentation kills determinism |
---
## 2. Current System Snapshot
| Aspect | Current State |
|--------|--------------|
| Channels | 4 total; ch0ch2 forearm (FCR, FCU, extensor), ch3 bicep (excluded from hand classifier) |
| Sampling | 1000 Hz, timer/polling (jitter — fix with Change A) |
| Window | 150 samples (150ms), 25-sample hop (25ms) |
| Features | 12: RMS, WL, ZC, SSC × 3 channels |
| Classifier | Single LDA, float32 weights in C header |
| Label alignment | RMS onset detection — missing +100ms forward shift (Change 0) |
| Normalization | Per-session z-score in Python; no on-device equivalent (Change D) |
| Smoothing | EMA (α=0.7) + majority vote (5) + debounce (3 counts) |
| Confidence rejection | None — always outputs a class (Change C) |
| Signal filtering | Analogue only via MyoWare (Change B adds software IIR) |
| Gestures | 5: fist, hook\_em, open, rest, thumbs\_up |
| Training data | 15 HDF5 sessions, 1 user |
---
## 2.1 — Confirmed Firmware Architecture (From Codebase Exploration)
> Confirmed by direct codebase inspection 2026-02-24. All file paths relative to
> `C:/VSCode/Marvel_Projects/Bucky_Arm/EMG_Arm/src/`
### ADC Pin Mapping (`drivers/emg_sensor.c`)
| Channel | ADC Channel | GPIO | Muscle Location | Role in Classifier |
|---------|-------------|------|-----------------|-------------------|
| ch0 | `ADC_CHANNEL_1` | GPIO 2 | Forearm Belly (FCR) | Primary flexion signal |
| ch1 | `ADC_CHANNEL_2` | GPIO 3 | Forearm Extensors | Extension signal |
| ch2 | `ADC_CHANNEL_8` | GPIO 9 | Forearm Contractors (FCU) | Ulnar flexion signal |
| ch3 | `ADC_CHANNEL_9` | GPIO 10 | Bicep | Independent — see Section 2.2 |
**Current ADC driver**: `adc_oneshot` (polling — **NOT DMA continuous yet**; Change A migrates this)
- Attenuation: `ADC_ATTEN_DB_12` (03.9V full-scale range)
- Calibration: `adc_cali_curve_fitting` scheme
- Output: calibrated millivolts as `uint16_t` packed into `emg_sample_t.channels[4]`
- Timing: `vTaskDelay(1)` in `run_inference_loop()` provides the ~1ms sample interval
### Current Task Structure (`app/main.c`)
| Task | Priority | Stack | Core Pinning | Role |
|------|----------|-------|--------------|------|
| `app_main` (implicit) | Default | Default | None | Runs inference loop + state machine |
| `serial_input_task` | 5 | 4096 B | **None** | Parses UART JSON commands |
**No other tasks exist.** Change A will add `adc_sampling_task` pinned to Core 0.
The inference loop runs on `app_main`'s default task — no explicit core affinity.
### State Machine (`app/main.c`)
```
STATE_IDLE ─(BLE/UART connect)─► STATE_CONNECTED
{"cmd": "start_stream"}▼
STATE_STREAMING (sends raw ADC over UART for Python)
{"cmd": "start_predict"}▼
STATE_PREDICTING (runs run_inference_loop())
```
Communication: UART at 921600 baud, JSON framing.
### Complete Data Flow (Exact Function Names)
```
emg_sensor_read(&sample)
│ drivers/emg_sensor.c
│ adc_oneshot_read() × 4 channels → adc_cali_raw_to_voltage() → uint16_t mV
│ Result: sample.channels[4] = {ch0_mV, ch1_mV, ch2_mV, ch3_mV}
▼ Called every ~1ms (vTaskDelay(1) in run_inference_loop)
inference_add_sample(sample.channels)
│ core/inference.c
│ Writes to circular window_buffer[150][4]
│ Returns true when buffer is full (after first 150 samples)
▼ Called every 25 samples (stride_counter % INFERENCE_HOP_SIZE == 0)
inference_predict(&confidence)
│ core/inference.c
│ compute_features() → LDA scores → softmax → EMA → majority vote → debounce
│ Returns: gesture class index (int), fills confidence (float)
inference_get_gesture_enum(class_idx)
│ core/inference.c
│ String match on MODEL_CLASS_NAMES[] → gesture_t enum value
gestures_execute(gesture)
core/gestures.c
switch(gesture) → servo PWM via LEDC driver
Servo pins: GPIO 1,4,5,6,7 (Thumb, Index, Middle, Ring, Pinky)
```
### Current Buffer State
```c
// core/inference.c line 19:
static uint16_t window_buffer[INFERENCE_WINDOW_SIZE][NUM_CHANNELS];
// ^^^^^^^^ MUST change to float when adding IIR filter (Change B)
//
// uint16_t: 150 × 4 × 2 = 1,200 bytes in internal SRAM
// float: 150 × 4 × 4 = 2,400 bytes in internal SRAM (still trivially small)
//
// Reason for change: IIR filter outputs float; casting back to uint16_t loses
// sub-mV precision and re-introduces the quantization noise we just filtered out.
```
### `platformio.ini` Current State (`EMG_Arm/platformio.ini`)
**Current `lib_deps`**: **None** — completely empty, no external library dependencies.
Required additions per change tier:
| Change | Library | `platformio.ini` `lib_deps` entry |
|--------|---------|----------------------------------|
| B (IIR biquad) | esp-dsp | `espressif/esp-dsp @ ^2.0.0` |
| 1 (FFT features) | esp-dsp | (same — add once for both B and 1) |
| E (int8 MLP) | TFLite Micro | `tensorflow/tflite-micro` |
| F (ensemble) | esp-dsp | (same as B) |
Add to `platformio.ini` under `[env:esp32-s3-devkitc1-n16r16]`:
```ini
lib_deps =
espressif/esp-dsp @ ^2.0.0
; tensorflow/tflite-micro ← add this only when implementing Change E
```
---
## 2.2 — Bicep Channel Subsystem (ch3 / ADC_CHANNEL_9 / GPIO 10)
### Current Status
The bicep channel is:
- **Sampled**: `emg_sensor_read()` reads all 4 channels; `sample.channels[3]` holds bicep data
- **Excluded from hand classifier**: `HAND_NUM_CHANNELS = 3`; `compute_features()` explicitly
loops `ch = 0` to `ch < HAND_NUM_CHANNELS` (i.e., ch0, ch1, ch2 only)
- **Not yet independently processed**: the comment in `inference.c` line 68
(`"ch3 (bicep) is excluded — it will be processed independently"`) is aspirational —
the independent processing is not yet implemented
### Phase 1 — Binary Flex/Unflex (Current Target)
Implement a simple RMS threshold detector as a new subsystem:
**New files:**
```
EMG_Arm/src/core/bicep.h
EMG_Arm/src/core/bicep.c
```
**bicep.h:**
```c
#pragma once
#include <stdint.h>
#include <stdbool.h>
typedef enum {
BICEP_STATE_REST = 0,
BICEP_STATE_FLEX = 1,
} bicep_state_t;
// Call once at session start with ~3s of relaxed bicep data.
// Returns the computed threshold (also stored internally).
float bicep_calibrate(const uint16_t *ch3_samples, int n_samples);
// Call every 25ms (same hop as hand gesture inference).
// Computes RMS on the last BICEP_WINDOW_SAMPLES from the ch3 circular buffer.
bicep_state_t bicep_detect(void);
// Load/save threshold to NVS (reuse calibration.c infrastructure from Change D)
bool bicep_save_threshold(float threshold_mv);
bool bicep_load_threshold(float *threshold_mv_out);
```
**Core logic (`bicep.c`):**
```c
#define BICEP_WINDOW_SAMPLES 50 // 50ms window at 1000Hz
#define BICEP_FLEX_MULTIPLIER 2.5f // threshold = rest_rms × 2.5
#define BICEP_HYSTERESIS 1.3f // prevents rapid toggling at threshold boundary
static float s_threshold_mv = 0.0f;
static bicep_state_t s_state = BICEP_STATE_REST;
float bicep_calibrate(const uint16_t *ch3_samples, int n_samples) {
float rms_sq = 0.0f;
for (int i = 0; i < n_samples; i++)
rms_sq += (float)ch3_samples[i] * ch3_samples[i];
float rest_rms = sqrtf(rms_sq / n_samples);
s_threshold_mv = rest_rms * BICEP_FLEX_MULTIPLIER;
printf("[Bicep] Calibrated: rest_rms=%.1f mV, threshold=%.1f mV\n",
rest_rms, s_threshold_mv);
return s_threshold_mv;
}
bicep_state_t bicep_detect(void) {
// Compute RMS on last BICEP_WINDOW_SAMPLES from ch3 circular buffer
// (ch3 values are stored in window_buffer[][3] alongside hand channels)
float rms_sq = 0.0f;
int idx = buffer_head;
for (int i = 0; i < BICEP_WINDOW_SAMPLES; i++) {
float v = (float)window_buffer[idx][3]; // ch3 = bicep
rms_sq += v * v;
idx = (idx + 1) % INFERENCE_WINDOW_SIZE;
}
float rms = sqrtf(rms_sq / BICEP_WINDOW_SAMPLES);
// Hysteresis: require FLEX_MULTIPLIER to enter flex, 1.0× to exit
if (s_state == BICEP_STATE_REST && rms > s_threshold_mv * BICEP_HYSTERESIS)
s_state = BICEP_STATE_FLEX;
else if (s_state == BICEP_STATE_FLEX && rms < s_threshold_mv)
s_state = BICEP_STATE_REST;
return s_state;
}
```
**Integration in `main.c` `run_inference_loop()`:**
```c
// Call alongside inference_predict() every 25ms:
if (stride_counter % INFERENCE_HOP_SIZE == 0) {
float confidence;
int class_idx = inference_predict(&confidence);
gesture_t gesture = inference_get_gesture_enum(class_idx);
bicep_state_t bicep = bicep_detect();
// Combined actuation: hand gesture + bicep state
// Example: bicep flex can enable/disable certain gestures,
// or control a separate elbow/wrist joint.
gestures_execute(gesture);
// bicep_actuate(bicep); ← add when elbow motor is wired
}
```
**Calibration trigger (add to serial_input_task command parsing):**
```c
// {"cmd": "calibrate_bicep"} → collect 3s of rest data, call bicep_calibrate()
```
### Phase 2 — Continuous Angle/Velocity Prediction (Future)
When ready to move beyond binary flex/unflex:
1. **Collect angle-labeled data**: hold arm at 0°, 15°, 30°, 45°, 60°, 75°, 90°;
log RMS at each; collect 5+ reps per angle.
2. **Fit polynomial**: `angle = a0 + a1*rms + a2*rms²` (degree-2 usually sufficient);
use `numpy.polyfit(rms_values, angles, deg=2)`.
3. **Store coefficients in NVS**: 3 floats via `nvs_set_blob()`.
4. **On-device evaluation**: `angle = a0 + rms*(a1 + rms*a2)` — 2 MACs per inference.
5. **Velocity**: `velocity = (angle_now - angle_prev) / HOP_MS` with low-pass smoothing.
### Including ch3 in Hand Gesture Classifier (for Wrist Rotation)
If/when wrist rotation or supination gestures are added:
```python
# learning_data_collection.py — change this constant:
HAND_CHANNELS = [0, 1, 2, 3] # was [0, 1, 2]; include bicep for rotation gestures
```
Feature count becomes: 4 channels × 20 per-ch + 10 cross-ch covariances + 6 correlations = **96 total**.
The bicep subsystem is then retired and ch3 becomes part of the main gesture classifier.
---
## 3. What Meta Built — Filtered for ESP32
Meta's Nature 2025 paper (doi:10.1038/s41586-025-09255-w) describes a 16-channel wristband
running Conv1D(16→512)+3×LSTM(512). **That exact model is not portable to ESP32-S3** (~4 MB
weights). What IS transferable:
| Meta Technique | Transferability | Where Used |
|----------------|-----------------|-----------|
| +100ms forward label shift after onset detection | ✓ Direct copy | Change 0 |
| Frequency features > amplitude features (Extended Data Fig. 6) | ✓ Core insight | Change 1, Change 6 |
| Deliberate electrode repositioning between sessions | ✓ Protocol | Change 2 |
| Window jitter + amplitude augmentation | ✓ Training | Change 3 |
| Reinhard compression `64x/(32+|x|)` | ✓ Optional flag | Change 4 |
| EMA α=0.7, threshold=0.35, debounce=50ms | ✓ Already implemented | Change C |
| Specialist features → meta-learner stacking | ✓ Adapted | Change 7 + F |
| Conv1D+LSTM architecture | ✗ Too large | Not implementable |
| Full MPF with matrix logarithm | ✗ Eigendecomp too costly | Not implementable |
---
## 4. Current Code State + Known Bugs
**All Python changes**: `C:/VSCode/Marvel_Projects/Bucky_Arm/learning_data_collection.py`
**Firmware**: `C:/VSCode/Marvel_Projects/Bucky_Arm/EMG_Arm/src/core/inference.c`
**Config**: `C:/VSCode/Marvel_Projects/Bucky_Arm/EMG_Arm/src/config/config.h`
**Weights**: `C:/VSCode/Marvel_Projects/Bucky_Arm/EMG_Arm/src/core/model_weights.h`
### Key Symbol Locations
| Symbol | Line | Notes |
|--------|------|-------|
| Constants block | 4994 | `NUM_CHANNELS`, `SAMPLING_RATE_HZ`, `WINDOW_SIZE_MS`, etc. |
| `align_labels_with_onset()` | 442 | RMS onset detection |
| `filter_transition_windows()` | 529 | Removes onset/offset ambiguity windows |
| `SessionStorage.save_session()` | 643 | Calls onset alignment, saves HDF5 |
| `SessionStorage.load_all_for_training()` | 871 | Returns 6 values (see bug below) |
| `EMGFeatureExtractor` class | 1404 | Current: RMS, WL, ZC, SSC only |
| `extract_features_single_channel()` | 1448 | Per-channel feature dict |
| `extract_features_window()` | 1482 | Flat array + cross-channel |
| `extract_features_batch()` | 1520 | Batch wrapper |
| `get_feature_names()` | 1545 | String names for features |
| `CalibrationTransform` class | 1562 | z-score at Python-side inference |
| `EMGClassifier` class | 1713 | LDA/QDA wrapper |
| `EMGClassifier.__init__()` | 1722 | Creates `EMGFeatureExtractor` |
| `EMGClassifier.train()` | 1735 | Feature extraction + model fit |
| `EMGClassifier._apply_session_normalization()` | 1774 | Per-session z-score |
| `EMGClassifier.cross_validate()` | 1822 | GroupKFold, trial-level |
| `EMGClassifier.export_to_header()` | 1956 | Writes `model_weights.h` |
| `EMGClassifier.save()` | 1910 | Persists model params |
| `EMGClassifier.load()` | 2089 | Reconstructs from saved params |
| `run_training_demo()` | 2333 | Main training entry point |
| `inference.c` `compute_features()` | 68 | C feature extraction |
| `inference.c` `inference_predict()` | 158 | C LDA + smoothing pipeline |
### Pending Cleanups (Do Before Any Other Code Changes)
| Item | File | Action |
|------|------|--------|
| Remove `system_mode_t` | `config/config.h` lines 93100 | Delete the unused typedef (see Part 0, Section 0.7) |
| Add `EMG_STANDALONE` to enum | `config/config.h` line 19 | Add value to the existing MAIN_MODE enum |
| Add `STATE_LAPTOP_PREDICT` + `CMD_START_LAPTOP_PREDICT` | `app/main.c` | See Part 0, Section 0.5 for exact diffs |
| Add `run_standalone_loop()` | `app/main.c` | New function — see Part 0, Section 0.4 |
| Add `run_laptop_predict_loop()` | `app/main.c` | New function — see Part 0, Section 0.5 |
| Add `inference_get_gesture_by_name()` | `core/inference.c` + `core/inference.h` | Small helper — extracts existing strcmp logic |
### Known Bug — Line 2382
```python
# BUG: load_all_for_training() returns 6 values; this call unpacks only 5.
# session_indices_combined is silently dropped — breaks per-session normalization.
X, y, trial_ids, label_names, loaded_sessions = storage.load_all_for_training()
# FIX (apply with Change 1):
X, y, trial_ids, session_indices, label_names, loaded_sessions = storage.load_all_for_training()
```
### Current `model_weights.h` State (as of 2026-02-14 training run)
| Constant | Value | Note |
|----------|-------|------|
| `MODEL_NUM_CLASSES` | 5 | fist, hook_em, open, rest, thumbs_up |
| `MODEL_NUM_FEATURES` | 12 | RMS, WL, ZC, SSC × 3 forearm channels |
| `MODEL_CLASS_NAMES` | `{"fist","hook_em","open","rest","thumbs_up"}` | Alphabetical order |
| `MODEL_NORMALIZE_FEATURES` | *not defined yet* | Add when enabling cross-ch norm (Change B) |
| `MODEL_USE_REINHARD` | *not defined yet* | Add when enabling Reinhard compression (Change 4) |
| `FEAT_ZC_THRESH` | `0.1f` | Fraction of RMS for zero-crossing threshold |
| `FEAT_SSC_THRESH` | `0.1f` | Fraction of RMS for slope sign change threshold |
The LDA_WEIGHTS and LDA_INTERCEPTS arrays are current trained values — do not modify manually.
They are regenerated by `EMGClassifier.export_to_header()` after each training run.
### Current Feature Vector (12 features — firmware contract)
```
ch0: [0]=rms [1]=wl [2]=zc [3]=ssc
ch1: [4]=rms [5]=wl [6]=zc [7]=ssc
ch2: [8]=rms [9]=wl [10]=zc [11]=ssc
```
### Target Feature Vector (69 features after Change 1)
```
Per channel (×3 channels, 20 features each):
[0] rms [1] wl [2] zc [3] ssc [4] mav [5] var
[6] iemg [7] wamp [8] ar1 [9] ar2 [10] ar3 [11] ar4
[12] mnf [13] mdf [14] pkf [15] mnp [16] bp0 [17] bp1
[18] bp2 [19] bp3
ch0: indices 019
ch1: indices 2039
ch2: indices 4059
Cross-channel (9 features):
[60] cov_ch0_ch0 [61] cov_ch0_ch1 [62] cov_ch0_ch2
[63] cov_ch1_ch1 [64] cov_ch1_ch2 [65] cov_ch2_ch2
[66] cor_ch0_ch1 [67] cor_ch0_ch2 [68] cor_ch1_ch2
```
### Specialist Feature Subset Indices (for Change F + Change 7)
```
TD (time-domain, 36 feat): indices [011, 2031, 4051]
FD (frequency-domain, 24 feat): indices [1219, 3239, 5259]
CC (cross-channel, 9 feat): indices [6068]
```
---
# PART II — TARGET ARCHITECTURE
## 5. Full Recommended Multi-Model Stack
```
ADC (DMA, Change A)
└── IIR Biquad filter per channel (Change B)
└── 150-sample circular window buffer
▼ [every 25ms]
compute_features() → 69-feature vector
calibration_apply() (Change D — NVS z-score)
├─── Stage 1: Activity Gate ──────────────────────────────────┐
│ total_rms < REST_THRESHOLD? → return GESTURE_REST │
│ (skips all inference during obvious idle) │
│ │
▼ (only reached when gesture is active) │
Stage 2: Parallel Specialist LDAs (Change F) │
├── LDA_TD [TD features, 36-dim] → prob_td[5] │
├── LDA_FD [FD features, 24-dim] → prob_fd[5] │
└── LDA_CC [CC features, 9-dim] → prob_cc[5] │
▼ │
Stage 3: Meta-LDA stacker (Change F) │
input: [prob_td | prob_fd | prob_cc] (15-dim) │
output: meta_probs[5] │
▼ │
EMA smoothing (α=0.7) on meta_probs │
│ │
├── max smoothed prob ≥ 0.50? ────── Yes ──────────────────┐ │
│ │ │
└── No: Stage 4 Confidence Cascade (Change E) │ │
run int8 MLP on full 69-feat vector │ │
use higher-confidence winner │ │
│ │ │
└────────────────────────────────────────────►│ │
│ │
◄────────────────────────────────────────────────────────── │ │
│ ◄─┘
Stage 5: Confidence rejection (Change C)
max_prob < 0.40? → return current_output (hold / GESTURE_NONE)
Majority vote (window=5) + Debounce (count=3)
final gesture → actuation
```
### Model Weight Footprint
| Model | Input Dim | Weights | Memory (float32) |
|-------|-----------|---------|-----------------|
| LDA_TD | 36 | 5×36 = 180 | 720 B |
| LDA_FD | 24 | 5×24 = 120 | 480 B |
| LDA_CC | 9 | 5×9 = 45 | 180 B |
| Meta-LDA | 15 | 5×15 = 75 | 300 B |
| int8 MLP [69→32→16→5] | 69 | ~2,900 | ~2.9 KB int8 |
| **Total** | | | **~4.6 KB** |
All model weights fit comfortably in internal SRAM.
---
## 6. Compute Budget for Full Stack
| Stage | Cost | Cumulative |
|-------|------|-----------|
| Feature extraction (69 feat, 128-pt FFT ×3) | 1,200 µs | 1,200 µs |
| NVS calibration apply | 10 µs | 1,210 µs |
| Activity gate (RMS check) | 5 µs | 1,215 µs |
| LDA_TD (36 feat × 5 classes) | 50 µs | 1,265 µs |
| LDA_FD (24 feat × 5 classes) | 35 µs | 1,300 µs |
| LDA_CC (9 feat × 5 classes) | 15 µs | 1,315 µs |
| Meta-LDA (15 feat × 5 classes) | 10 µs | 1,325 µs |
| EMA + confidence check | 10 µs | 1,335 µs |
| int8 MLP (worst case, ~30% of hops) | 250 µs | 1,585 µs |
| Vote + debounce | 20 µs | 1,605 µs |
| **Worst-case total** | **1,760 µs** | **7% of 25ms budget** |
---
## 7. Why This Architecture Works for 3-Channel EMG
Three channels means limited spatial information. The ensemble compensates by extracting
**maximum diversity from the temporal and spectral dimensions**:
- **LDA_TD** specializes in muscle activation *intensity and dynamics* (how hard and fast is each muscle firing)
- **LDA_FD** specializes in muscle activation *frequency content* (motor unit recruitment patterns — slow vs. fast twitch fibres fire at different frequencies)
- **LDA_CC** specializes in *inter-muscle coordination* (which muscles co-activate — the spatial "fingerprint" of each gesture)
These three signal aspects are partially uncorrelated. A gesture that confuses LDA_TD (similar amplitude patterns) may be distinguishable by LDA_FD (different frequency recruitment) or LDA_CC (different co-activation pattern). The meta-LDA learns which specialist to trust for each gesture boundary.
The int8 MLP fallback handles the residual nonlinear cases: gesture pairs where the decision boundary is curved in feature space, which LDA (linear boundary only) cannot resolve.
---
# PART III — GESTURE EXTENSIBILITY
## 8. What Changes When Adding or Removing a Gesture
The system is designed for extensibility. Adding a gesture requires **3 firmware lines and a retrain**.
### What Changes Automatically (No Manual Code Edits)
| Component | How it adapts |
|-----------|--------------|
| `MODEL_NUM_CLASSES` in `model_weights.h` | Auto-computed from training data label count |
| LDA weight array dimensions | `[MODEL_NUM_CLASSES][MODEL_NUM_FEATURES]` — regenerated by `export_to_header()` |
| `MODEL_CLASS_NAMES` array | Regenerated by `export_to_header()` |
| All ensemble LDA weight arrays | Regenerated by `export_ensemble_header()` (Change 7) |
| int8 MLP output layer | Retrained with new class count; re-exported to TFLite |
| Meta-LDA input/output dims | `META_NUM_INPUTS = 3 × MODEL_NUM_CLASSES` — auto from Python |
### What Requires Manual Code Changes
**Python side** (`learning_data_collection.py`):
```python
# 1. Add gesture name to the gesture list (1 line)
# Find where GESTURES or similar list is defined (near constants block ~line 49)
GESTURES = ['fist', 'hook_em', 'open', 'rest', 'thumbs_up', 'wrist_flex'] # example
```
**Firmware — `config.h`** (1 line per gesture):
```c
// Add enum value
typedef enum {
GESTURE_NONE = 0,
GESTURE_REST = 1,
GESTURE_FIST = 2,
GESTURE_OPEN = 3,
GESTURE_HOOK_EM = 4,
GESTURE_THUMBS_UP = 5,
GESTURE_WRIST_FLEX = 6, // ← add this line
} gesture_t;
```
**Firmware — `inference.c`** `inference_get_gesture_enum()` (23 lines per gesture):
```c
if (strcmp(name, "wrist_flex") == 0 || strcmp(name, "WRIST_FLEX") == 0)
return GESTURE_WRIST_FLEX;
```
**Firmware — `gestures.c`** (2 changes — these are easy to miss):
```c
// 1. Add to gesture_names[] static array — index MUST match gesture_t enum value:
static const char *gesture_names[GESTURE_COUNT] = {
"NONE", // GESTURE_NONE = 0
"REST", // GESTURE_REST = 1
"FIST", // GESTURE_FIST = 2
"OPEN", // GESTURE_OPEN = 3
"HOOK_EM", // GESTURE_HOOK_EM = 4
"THUMBS_UP", // GESTURE_THUMBS_UP = 5
"WRIST_FLEX", // GESTURE_WRIST_FLEX = 6 ← add here
};
// 2. Add case to gestures_execute() switch statement:
case GESTURE_WRIST_FLEX:
gesture_wrist_flex(); // implement the actuation function
break;
```
**Critical**: `GESTURE_COUNT` at the end of the `gesture_t` enum in `config.h` is used as the
array size for `gesture_names[]`. It updates automatically when new enum values are added before
it. Both `gesture_names[GESTURE_COUNT]` and the switch statement must be kept in sync with
`GESTURE_COUNT`. Mismatch causes a bounds-overrun or silent misclassification.
### Complete Workflow for Adding a Gesture
```
1. Python: add gesture string to GESTURES list in learning_data_collection.py (1 line)
2. Data: collect ≥10 sessions × ≥30 reps of new gesture
(follow Change 2 protocol: vary electrode placement between sessions)
3. Train: python learning_data_collection.py → option 3
OR: python train_ensemble.py (after Change 7 is implemented)
4. Export: export_to_header() OR export_ensemble_header()
→ overwrites model_weights.h / model_weights_ensemble.h with new class count
5. config.h: add enum value before GESTURE_COUNT (1 line):
GESTURE_WRIST_FLEX = 6, // ← insert before GESTURE_COUNT
GESTURE_COUNT // stays last — auto-counts
6. inference.c: add string mapping in inference_get_gesture_enum() (2 lines)
7. gestures.c: add name to gesture_names[] array at correct index (1 line)
8. gestures.c: add case to gestures_execute() switch statement (3 lines)
9. Implement actuation function for new gesture (servo angles)
10. Reflash and validate: pio run -t upload
```
**Exact files touched per new gesture (summary):**
| File | What to change |
|------|---------------|
| `learning_data_collection.py` | Add string to GESTURES list |
| `config/config.h` | Add enum value before `GESTURE_COUNT` |
| `core/inference.c` | Add `strcmp` case in `inference_get_gesture_enum()` |
| `core/gestures.c` | Add to `gesture_names[]` array + add switch case |
| `core/gestures.c` | Implement `gesture_<name>()` function with servo angles |
| `core/model_weights.h` | Auto-generated — do not edit manually |
### Removing a Gesture
Removing is the same process in reverse, with one additional step: filter the HDF5 training
data to exclude sessions that contain the removed gesture's label. The simplest approach is
to pass a label whitelist to `load_all_for_training()`:
```python
# Proposed addition to load_all_for_training() — add include_labels parameter
X, y, trial_ids, session_indices, label_names, sessions = \
storage.load_all_for_training(include_labels=['fist', 'open', 'rest', 'thumbs_up'])
# hook_em removed — existing session files are not modified
```
---
## 9. Practical Limits of 3-Channel EMG
This is the most important constraint for gesture count:
| Gesture Count | Expected Accuracy | Notes |
|--------------|-------------------|-------|
| 35 gestures | >90% achievable | Current baseline target |
| 68 gestures | 8090% achievable | Requires richer features + ensemble |
| 912 gestures | 6580% achievable | Diminishing returns; some pairs will be confused |
| 13+ gestures | <65% | Surface EMG with 3 channels cannot reliably separate this many |
**Why 3 channels limits gesture count**: Surface EMG captures the summed electrical activity of
many motor units under each electrode. With only 3 spatial locations, gestures that recruit
overlapping muscle groups (e.g., all finger-flexion gestures recruit FCR) produce similar
signals. The frequency and coordination features from Change 1 help, but there's a hard
information-theoretic limit imposed by channel count.
**Rule of thumb**: aim for ≤8 gestures with the current 3-channel setup. For more, add the
bicep channel (ch3, currently excluded) to get 4 channels — see Section 10.
---
## 10. Specific Gesture Considerations
### Wrist Flexion / Extension
- **Feasibility**: High — FCR (ch0) activates strongly for flexion; extensor group (ch2) for extension
- **Differentiation from finger gestures**: frequency content differs (wrist involves slower motor units)
- **Recommendation**: Add these before wrist rotation — more reliable with surface EMG
### Wrist Rotation (Supination / Pronation)
- **Feasibility**: Medium — the primary supinator is a deep muscle; surface electrodes capture it weakly
- **Key helper**: the bicep activates strongly during supination → **include ch3** (`HAND_CHANNELS = [0, 1, 2, 3]`)
- **Code change for 4 channels**: Python: `HAND_CHANNELS = [0, 1, 2, 3]`; firmware: `HAND_NUM_CHANNELS` auto-updates from the exported header since `MODEL_NUM_FEATURES` is recalculated
- **Caveat**: pronation vs. rest may be harder to distinguish than supination vs. rest
### Pinch / Precision Grasp
- **Feasibility**: Medium — involves intrinsic hand muscles poorly captured by forearm electrodes
- Likely confused with open hand depending on electrode placement
- Collect with careful placement; validate cross-session accuracy before relying on it
### Including ch3 (Bicep) for Wrist Gestures
To include the bicep channel in the hand gesture classifier:
```python
# learning_data_collection.py — change this constant
HAND_CHANNELS = [0, 1, 2, 3] # was [0, 1, 2] — add bicep channel
```
Feature count: 4 channels × 20 per-channel features + 10 cross-channel covariances + 6 correlations = **96 total features**.
The ensemble architecture handles this automatically — specialist LDA weight dimensions
recalculate at training time.
---
# PART IV — CHANGE REFERENCE
## 11. Change Classification Matrix
| Change | Category | Priority | Files | ESP32 Reflash? | Retrain? | Risk |
|--------|----------|----------|-------|----------------|----------|------|
| **C** | Firmware | **Tier 1** | inference.c | ✓ | No | **Very Low** |
| **B** | Firmware | **Tier 1** | inference.c / filter.c | ✓ | No | Low |
| **A** | Firmware | **Tier 1** | adc_sampling.c | ✓ | No | Medium |
| **0** | Python | **Tier 1** | learning_data_collection.py | No | ✓ | Low |
| **1** | Python+C | **Tier 2** | learning_data_collection.py + inference.c | ✓ after | ✓ | Medium |
| **D** | Firmware | **Tier 2** | calibration.c/.h | ✓ | No | Medium |
| **2** | Protocol | **Tier 2** | None | No | ✓ new data | None |
| **3** | Python | **Tier 2** | learning_data_collection.py | No | ✓ | Low |
| **E** | Python+FW | **Tier 3** | train_mlp_tflite.py + firmware | ✓ | ✓ | High |
| **4** | Python+C | **Tier 3** | learning_data_collection.py + inference.c | ✓ if enabled | ✓ | Low |
| **5** | Python | **Tier 3** | learning_data_collection.py | No | No | None |
| **6** | Python | **Tier 3** | learning_data_collection.py | No | ✓ | Low |
| **7** | Python | **Tier 3** | new: train_ensemble.py | No | ✓ | Medium |
| **F** | Firmware | **Tier 3** | new: inference_ensemble.c | ✓ | No (needs 7 first) | Medium |
**Recommended implementation order**: C → B → A → 0 → 1 → D → 2 → 3 → 5 (benchmark) → 7+F → E
---
# PART V — FIRMWARE CHANGES
## Change A — DMA-Driven ADC Sampling (Migration from `adc_oneshot` to `adc_continuous`)
**Priority**: Tier 1
**Current driver**: `adc_oneshot_read()` polling in `drivers/emg_sensor.c`. Timing is
controlled by `vTaskDelay(1)` in `run_inference_loop()` — subject to FreeRTOS scheduler
jitter of ±0.51ms, which corrupts frequency-domain features and ADC burst grouping.
**Why**: `adc_continuous` runs entirely in hardware DMA. Sample-to-sample jitter drops from
±1ms to <10µs. CPU overhead between samples is zero. Required for frequency features (Change 1).
**Effort**: 24 hours (replace `emg_sensor_read()` internals; keep public API the same)
### ESP-IDF ADC Continuous API
```c
// --- Initialize (call once at startup) ---
adc_continuous_handle_t adc_handle = NULL;
adc_continuous_handle_cfg_t adc_cfg = {
.max_store_buf_size = 4096, // PSRAM ring buffer size (bytes)
.conv_frame_size = 256, // bytes per conversion frame
};
adc_continuous_new_handle(&adc_cfg, &adc_handle);
// Actual hardware channel mapping (from emg_sensor.c):
// ch0 = ADC_CHANNEL_1 / GPIO 2 (Forearm Belly / FCR)
// ch1 = ADC_CHANNEL_2 / GPIO 3 (Forearm Extensors)
// ch2 = ADC_CHANNEL_8 / GPIO 9 (Forearm Contractors / FCU)
// ch3 = ADC_CHANNEL_9 / GPIO 10 (Bicep — independent subsystem)
adc_digi_pattern_config_t chan_cfg[4] = {
{.atten = ADC_ATTEN_DB_12, .channel = ADC_CHANNEL_1, .unit = ADC_UNIT_1, .bit_width = ADC_BITWIDTH_12},
{.atten = ADC_ATTEN_DB_12, .channel = ADC_CHANNEL_2, .unit = ADC_UNIT_1, .bit_width = ADC_BITWIDTH_12},
{.atten = ADC_ATTEN_DB_12, .channel = ADC_CHANNEL_8, .unit = ADC_UNIT_1, .bit_width = ADC_BITWIDTH_12},
{.atten = ADC_ATTEN_DB_12, .channel = ADC_CHANNEL_9, .unit = ADC_UNIT_1, .bit_width = ADC_BITWIDTH_12},
};
adc_continuous_config_t cont_cfg = {
.sample_freq_hz = 4000, // 4 channels × 1000 Hz = 4000 total samples/sec
.conv_mode = ADC_CONV_SINGLE_UNIT_1,
.format = ADC_DIGI_OUTPUT_FORMAT_TYPE2,
.pattern_num = 4,
.adc_pattern = chan_cfg,
};
adc_continuous_config(adc_handle, &cont_cfg);
// --- ISR callback (fires each frame) ---
static SemaphoreHandle_t s_adc_sem;
static bool IRAM_ATTR adc_conv_done_cb(
adc_continuous_handle_t handle,
const adc_continuous_evt_data_t *edata, void *user_data) {
BaseType_t hp_woken = pdFALSE;
xSemaphoreGiveFromISR(s_adc_sem, &hp_woken);
return hp_woken == pdTRUE;
}
adc_continuous_evt_cbs_t cbs = { .on_conv_done = adc_conv_done_cb };
adc_continuous_register_event_callbacks(adc_handle, &cbs, NULL);
adc_continuous_start(adc_handle);
// --- ADC calibration (apply per sample) ---
adc_cali_handle_t cali_handle;
adc_cali_curve_fitting_config_t cali_cfg = {
.unit_id = ADC_UNIT_1,
.atten = ADC_ATTEN_DB_12, // matches ADC_ATTEN_DB_12 used in current emg_sensor.c
.bitwidth = ADC_BITWIDTH_12,
};
adc_cali_create_scheme_curve_fitting(&cali_cfg, &cali_handle);
// --- Sampling task (pin to Core 0) ---
void adc_sampling_task(void *arg) {
uint8_t result_buf[256];
uint32_t out_len = 0;
while (1) {
xSemaphoreTake(s_adc_sem, portMAX_DELAY);
adc_continuous_read(adc_handle, result_buf, sizeof(result_buf), &out_len, 0);
// Parse: each entry is adc_digi_output_data_t
// Apply adc_cali_raw_to_voltage() for each sample
// Apply IIR filter (Change B) → post to inference ring buffer
}
}
```
**Verify**: log consecutive sample timestamps via `esp_timer_get_time()`; spacing should be 1.0ms ± 0.05ms.
---
## Change B — IIR Biquad Bandpass Filter
**Priority**: Tier 1
**Why**: MyoWare analogue filters are not tunable. Software IIR removes powerline interference
(50/60 Hz), sub-20 Hz motion artifact, and >500 Hz noise — all of which inflate ZC, WL, and
other features computed at rest.
**Effort**: 2 hours
### Step 1 — Compute Coefficients in Python (one-time, offline)
```python
from scipy.signal import butter
import numpy as np
fs = 1000.0
sos = butter(N=2, Wn=[20.0, 500.0], btype='bandpass', fs=fs, output='sos')
# sos[i] = [b0, b1, b2, a0, a1, a2]
# esp-dsp Direct Form II convention: coeffs = [b0, b1, b2, -a1, -a2]
for i, s in enumerate(sos):
b0, b1, b2, a0, a1, a2 = s
print(f"Section {i}: {b0:.8f}f, {b1:.8f}f, {b2:.8f}f, {-a1:.8f}f, {-a2:.8f}f")
# Run this and paste the printed values into the C constants below
```
### Step 2 — Add to inference.c (after includes, before `// --- State ---`)
```c
#include "dsps_biquad.h"
// 2nd-order Butterworth bandpass 20500 Hz @ 1000 Hz
// Coefficients: [b0, b1, b2, -a1, -a2] — Direct Form II, esp-dsp sign convention
// Regenerate with: scipy.signal.butter(N=2, Wn=[20,500], btype='bandpass', fs=1000, output='sos')
static const float BIQUAD_HP_COEFFS[5] = { /* paste section 0 output here */ };
static const float BIQUAD_LP_COEFFS[5] = { /* paste section 1 output here */ };
// Filter delay state: 3 channels × 2 stages × 2 delay elements = 12 floats (48 bytes)
static float biquad_hp_w[HAND_NUM_CHANNELS][2];
static float biquad_lp_w[HAND_NUM_CHANNELS][2];
```
Add to `inference_init()`:
```c
memset(biquad_hp_w, 0, sizeof(biquad_hp_w));
memset(biquad_lp_w, 0, sizeof(biquad_lp_w));
```
### Step 3 — Apply Per Sample (called before writing to window_buffer)
```c
// Apply to each channel before posting to the window buffer.
// Must be called IN ORDER for each sample (IIR has memory across calls).
static float IRAM_ATTR apply_bandpass(int ch, float raw) {
float hp_out, lp_out;
dsps_biquad_f32(&raw, &hp_out, 1, (float *)BIQUAD_HP_COEFFS, biquad_hp_w[ch]);
dsps_biquad_f32(&hp_out, &lp_out, 1, (float *)BIQUAD_LP_COEFFS, biquad_lp_w[ch]);
return lp_out;
}
```
**Note**: `window_buffer` stores `uint16_t` — change to `float` when adding this filter, so
filtered values are stored directly without lossy integer round-trip.
**Verify**: log ZC count at rest before and after — filtered ZC should be substantially lower
(less spurious noise crossings).
---
## Change C — Confidence Rejection
**Priority**: Tier 1 — **implement this first, lowest risk of all changes**
**Why**: Without a rejection threshold, ambiguous EMG (rest-to-gesture transition,
mid-gesture fatigue, electrode lift) always produces a false actuation.
**Effort**: 15 minutes
### Step 1 — Add Constant (top of inference.c with other constants)
```c
#define CONFIDENCE_THRESHOLD 0.40f // Reject when max smoothed prob < this.
// Meta paper uses 0.35; 0.40 adds prosthetic safety margin.
// Tune: lower to 0.35 if real gestures are being rejected.
```
### Step 2 — Insert After EMA Block in `inference_predict()` (after line 214)
```c
// Confidence rejection: if the peak smoothed probability is below threshold,
// hold the last confirmed output rather than outputting an uncertain prediction.
// Prevents false actuations during gesture transitions and electrode artifacts.
if (max_smoothed_prob < CONFIDENCE_THRESHOLD) {
*confidence = max_smoothed_prob;
return current_output; // -1 (GESTURE_NONE) until first confident prediction
}
```
**Verify**: arm at complete rest → confirm output stays at GESTURE_NONE and confidence logs
below 0.40. Deliberate fist → confidence rises above 0.40 within 13 inference cycles.
---
## Change D — On-Device NVS Calibration
**Priority**: Tier 2
**Why**: Python `CalibrationTransform` only runs during training. On-device NVS calibration
lets the ESP32 recalibrate z-score normalization at startup (3 seconds of REST) without
retraining — solving placement drift and day-to-day impedance variation.
**Effort**: 34 hours
### New Files
```
EMG_Arm/src/core/calibration.h
EMG_Arm/src/core/calibration.c
```
### calibration.h
```c
#pragma once
#include <stdbool.h>
#include "config/config.h"
#define CALIB_MAX_FEATURES 96 // supports up to 4-channel expansion
bool calibration_init(void); // load from NVS at startup
void calibration_apply(float *feat); // z-score in-place; no-op if not calibrated
bool calibration_update(const float X[][CALIB_MAX_FEATURES], int n_windows, int n_feat);
void calibration_reset(void);
bool calibration_is_valid(void);
```
### calibration.c
```c
#include "calibration.h"
#include "nvs_flash.h"
#include "nvs.h"
#include <math.h>
#include <string.h>
#include <stdio.h>
#define NVS_NAMESPACE "emg_calib"
#define NVS_KEY_MEAN "feat_mean"
#define NVS_KEY_STD "feat_std"
#define NVS_KEY_NFEAT "n_feat"
#define NVS_KEY_VALID "calib_ok"
static float s_mean[CALIB_MAX_FEATURES];
static float s_std[CALIB_MAX_FEATURES];
static int s_n_feat = 0;
static bool s_valid = false;
bool calibration_init(void) {
esp_err_t err = nvs_flash_init();
if (err == ESP_ERR_NVS_NO_FREE_PAGES || err == ESP_ERR_NVS_NEW_VERSION_FOUND) {
nvs_flash_erase();
nvs_flash_init();
}
nvs_handle_t h;
if (nvs_open(NVS_NAMESPACE, NVS_READONLY, &h) != ESP_OK) return false;
uint8_t valid = 0;
size_t mean_sz = sizeof(s_mean), std_sz = sizeof(s_std);
bool ok = (nvs_get_u8(h, NVS_KEY_VALID, &valid) == ESP_OK) && (valid == 1) &&
(nvs_get_i32(h, NVS_KEY_NFEAT, (int32_t*)&s_n_feat) == ESP_OK) &&
(nvs_get_blob(h, NVS_KEY_MEAN, s_mean, &mean_sz) == ESP_OK) &&
(nvs_get_blob(h, NVS_KEY_STD, s_std, &std_sz) == ESP_OK);
nvs_close(h);
s_valid = ok;
printf("[Calib] %s (%d features)\n", ok ? "Loaded from NVS" : "Not found — identity", s_n_feat);
return ok;
}
void calibration_apply(float *feat) {
if (!s_valid) return;
for (int i = 0; i < s_n_feat; i++)
feat[i] = (feat[i] - s_mean[i]) / s_std[i];
}
bool calibration_update(const float X[][CALIB_MAX_FEATURES], int n_windows, int n_feat) {
if (n_windows < 10 || n_feat > CALIB_MAX_FEATURES) return false;
s_n_feat = n_feat;
memset(s_mean, 0, sizeof(s_mean));
for (int w = 0; w < n_windows; w++)
for (int f = 0; f < n_feat; f++)
s_mean[f] += X[w][f];
for (int f = 0; f < n_feat; f++) s_mean[f] /= n_windows;
memset(s_std, 0, sizeof(s_std));
for (int w = 0; w < n_windows; w++)
for (int f = 0; f < n_feat; f++) {
float d = X[w][f] - s_mean[f];
s_std[f] += d * d;
}
for (int f = 0; f < n_feat; f++) {
s_std[f] = sqrtf(s_std[f] / n_windows);
if (s_std[f] < 1e-6f) s_std[f] = 1e-6f;
}
nvs_handle_t h;
if (nvs_open(NVS_NAMESPACE, NVS_READWRITE, &h) != ESP_OK) return false;
nvs_set_blob(h, NVS_KEY_MEAN, s_mean, sizeof(s_mean));
nvs_set_blob(h, NVS_KEY_STD, s_std, sizeof(s_std));
nvs_set_i32(h, NVS_KEY_NFEAT, n_feat);
nvs_set_u8(h, NVS_KEY_VALID, 1);
nvs_commit(h);
nvs_close(h);
s_valid = true;
printf("[Calib] Updated from %d REST windows, %d features\n", n_windows, n_feat);
return true;
}
```
### Integration in inference.c
In `inference_predict()`, after `compute_features(features)`, before LDA:
```c
calibration_apply(features); // z-score using NVS-stored mean/std
```
### Startup Flow
```c
// In main application startup sequence:
calibration_init(); // load from NVS; no-op if not present yet
// When user triggers recalibration (button press or serial command):
// Collect ~120 REST windows (~3 seconds at 25ms hop)
// Call calibration_update(rest_feature_buffer, 120, MODEL_NUM_FEATURES)
```
---
## Change E — int8 MLP via TFLite Micro
**Priority**: Tier 3 — implement after Tier 1+2 changes and benchmark (Change 5) shows LDA plateauing
**Why**: LDA finds only linear decision boundaries. A 2-layer int8 MLP adds nonlinear
boundaries for gesture pairs that overlap in feature space.
**Effort**: 46 hours
### Python Training (new file: `train_mlp_tflite.py`)
```python
"""
Train int8 MLP for ESP32-S3 deployment via TFLite Micro.
Run AFTER Change 0 (label shift) + Change 1 (expanded features).
"""
import numpy as np
import tensorflow as tf
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).parent))
from learning_data_collection import SessionStorage, EMGFeatureExtractor, HAND_CHANNELS
storage = SessionStorage()
X_raw, y, trial_ids, session_indices, label_names, _ = storage.load_all_for_training()
extractor = EMGFeatureExtractor(channels=HAND_CHANNELS, cross_channel=True)
X = extractor.extract_features_batch(X_raw).astype(np.float32)
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X = scaler.fit_transform(X)
n_feat, n_cls = X.shape[1], len(np.unique(y))
model = tf.keras.Sequential([
tf.keras.layers.Input(shape=(n_feat,)),
tf.keras.layers.Dense(32, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(16, activation='relu'),
tf.keras.layers.Dense(n_cls, activation='softmax'),
])
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
model.fit(X, y, epochs=150, batch_size=64, validation_split=0.1, verbose=1)
def representative_dataset():
for i in range(0, len(X), 10):
yield [X[i:i+1]]
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
tflite_model = converter.convert()
out = Path('EMG_Arm/src/core/emg_model_data.cc')
with open(out, 'w') as f:
f.write('#include "emg_model_data.h"\n')
f.write(f'const int g_model_len = {len(tflite_model)};\n')
f.write('const unsigned char g_model[] = {\n ')
f.write(', '.join(f'0x{b:02x}' for b in tflite_model))
f.write('\n};\n')
print(f"Wrote {out} ({len(tflite_model)} bytes)")
```
### Firmware (inference_mlp.cc)
```cpp
#include "inference_mlp.h"
#include "emg_model_data.h"
#include "tensorflow/lite/micro/micro_interpreter.h"
#include "tensorflow/lite/micro/micro_mutable_op_resolver.h"
#include "tensorflow/lite/schema/schema_generated.h"
static uint8_t tensor_arena[48 * 1024]; // 48 KB — tune down if memory is tight
static tflite::MicroInterpreter *interpreter = nullptr;
static TfLiteTensor *input = nullptr, *output = nullptr;
void inference_mlp_init(void) {
const tflite::Model *model = tflite::GetModel(g_model);
static tflite::MicroMutableOpResolver<4> resolver;
resolver.AddFullyConnected();
resolver.AddRelu();
resolver.AddSoftmax();
resolver.AddDequantize();
static tflite::MicroInterpreter interp(model, resolver, tensor_arena, sizeof(tensor_arena));
interpreter = &interp;
interpreter->AllocateTensors();
input = interpreter->input(0);
output = interpreter->output(0);
}
int inference_mlp_predict(const float *features, int n_feat, float *conf_out) {
float iscale = input->params.scale;
int izp = input->params.zero_point;
for (int i = 0; i < n_feat; i++) {
int q = (int)roundf(features[i] / iscale) + izp;
input->data.int8[i] = (int8_t)(q < -128 ? -128 : q > 127 ? 127 : q);
}
interpreter->Invoke();
float oscale = output->params.scale;
int ozp = output->params.zero_point;
float max_p = -1e9f;
int max_c = 0;
for (int c = 0; c < MODEL_NUM_CLASSES; c++) {
float p = (output->data.int8[c] - ozp) * oscale;
if (p > max_p) { max_p = p; max_c = c; }
}
*conf_out = max_p;
return max_c;
}
```
**platformio.ini addition**:
```ini
lib_deps =
tensorflow/tflite-micro
```
---
## Change F — Ensemble Inference Pipeline
**Priority**: Tier 3 (requires Change 1 features + Change 7 training + Change E MLP)
**Why**: This is the full recommended architecture from Part II.
**Effort**: 34 hours firmware (after Python ensemble is trained and exported)
### New Files
```
EMG_Arm/src/core/inference_ensemble.c
EMG_Arm/src/core/inference_ensemble.h
EMG_Arm/src/core/model_weights_ensemble.h (generated by Change 7 Python script)
```
### inference_ensemble.h
```c
#pragma once
#include <stdbool.h>
void inference_ensemble_init(void);
int inference_ensemble_predict(float *confidence);
```
### inference_ensemble.c
```c
#include "inference_ensemble.h"
#include "inference.h" // for compute_features(), calibration_apply()
#include "inference_mlp.h" // for inference_mlp_predict()
#include "model_weights_ensemble.h"
#include "config/config.h"
#include "dsps_dotprod.h"
#include <math.h>
#include <string.h>
#include <stdio.h>
#define ENSEMBLE_EMA_ALPHA 0.70f
#define ENSEMBLE_CONF_THRESHOLD 0.50f // below this: escalate to MLP fallback
#define REJECT_THRESHOLD 0.40f // below this even after MLP: hold output
#define REST_ACTIVITY_THRESHOLD 0.05f // total_rms below this → skip inference, return REST
// EMA state
static float s_smoothed[MODEL_NUM_CLASSES];
// Vote + debounce (reuse existing pattern from inference.c)
static int s_vote_history[5];
static int s_vote_head = 0;
static int s_current_output = -1;
static int s_pending_output = -1;
static int s_pending_count = 0;
// --- Generic LDA softmax predict ---
// weights: [n_classes][n_feat], intercepts: [n_classes]
// proba_out: [n_classes] — caller-provided output
static void lda_softmax(const float *feat, int n_feat,
const float *weights_flat, const float *intercepts,
int n_classes, float *proba_out) {
float raw[MODEL_NUM_CLASSES];
float max_raw = -1e9f, sum_exp = 0.0f;
for (int c = 0; c < n_classes; c++) {
raw[c] = intercepts[c];
// dsps_dotprod_f32 requires 4-byte aligned arrays and length multiple of 4;
// for safety use plain loop — compiler will auto-vectorize with -O2
const float *w = weights_flat + c * n_feat;
for (int f = 0; f < n_feat; f++) raw[c] += feat[f] * w[f];
if (raw[c] > max_raw) max_raw = raw[c];
}
for (int c = 0; c < n_classes; c++) {
proba_out[c] = expf(raw[c] - max_raw);
sum_exp += proba_out[c];
}
for (int c = 0; c < n_classes; c++) proba_out[c] /= sum_exp;
}
void inference_ensemble_init(void) {
for (int c = 0; c < MODEL_NUM_CLASSES; c++)
s_smoothed[c] = 1.0f / MODEL_NUM_CLASSES;
for (int i = 0; i < 5; i++) s_vote_history[i] = -1;
s_vote_head = 0;
s_current_output = -1;
s_pending_output = -1;
s_pending_count = 0;
}
int inference_ensemble_predict(float *confidence) {
// 1. Extract features (shared with single-model path)
float features[MODEL_NUM_FEATURES];
compute_features(features);
calibration_apply(features);
// 2. Activity gate — skip inference during obvious REST
float total_rms_sq = 0.0f;
for (int ch = 0; ch < HAND_NUM_CHANNELS; ch++) {
float r = features[ch * ENSEMBLE_PER_CH_FEATURES]; // RMS is index 0 per channel
total_rms_sq += r * r;
}
if (sqrtf(total_rms_sq) < REST_ACTIVITY_THRESHOLD) {
*confidence = 1.0f;
return GESTURE_REST;
}
// 3. Specialist LDAs
float prob_td[MODEL_NUM_CLASSES];
float prob_fd[MODEL_NUM_CLASSES];
float prob_cc[MODEL_NUM_CLASSES];
lda_softmax(features + TD_FEAT_OFFSET, TD_NUM_FEATURES,
(const float *)LDA_TD_WEIGHTS, LDA_TD_INTERCEPTS,
MODEL_NUM_CLASSES, prob_td);
lda_softmax(features + FD_FEAT_OFFSET, FD_NUM_FEATURES,
(const float *)LDA_FD_WEIGHTS, LDA_FD_INTERCEPTS,
MODEL_NUM_CLASSES, prob_fd);
lda_softmax(features + CC_FEAT_OFFSET, CC_NUM_FEATURES,
(const float *)LDA_CC_WEIGHTS, LDA_CC_INTERCEPTS,
MODEL_NUM_CLASSES, prob_cc);
// 4. Meta-LDA stacker
float meta_in[META_NUM_INPUTS]; // = 3 * MODEL_NUM_CLASSES
memcpy(meta_in, prob_td, MODEL_NUM_CLASSES * sizeof(float));
memcpy(meta_in + MODEL_NUM_CLASSES, prob_fd, MODEL_NUM_CLASSES * sizeof(float));
memcpy(meta_in + 2*MODEL_NUM_CLASSES, prob_cc, MODEL_NUM_CLASSES * sizeof(float));
float meta_probs[MODEL_NUM_CLASSES];
lda_softmax(meta_in, META_NUM_INPUTS,
(const float *)META_LDA_WEIGHTS, META_LDA_INTERCEPTS,
MODEL_NUM_CLASSES, meta_probs);
// 5. EMA smoothing on meta output
float max_smooth = 0.0f;
int winner = 0;
for (int c = 0; c < MODEL_NUM_CLASSES; c++) {
s_smoothed[c] = ENSEMBLE_EMA_ALPHA * s_smoothed[c] +
(1.0f - ENSEMBLE_EMA_ALPHA) * meta_probs[c];
if (s_smoothed[c] > max_smooth) { max_smooth = s_smoothed[c]; winner = c; }
}
// 6. Confidence cascade: escalate to MLP if meta-LDA is uncertain
if (max_smooth < ENSEMBLE_CONF_THRESHOLD) {
float mlp_conf = 0.0f;
int mlp_winner = inference_mlp_predict(features, MODEL_NUM_FEATURES, &mlp_conf);
if (mlp_conf > max_smooth) { winner = mlp_winner; max_smooth = mlp_conf; }
}
// 7. Reject if still uncertain
if (max_smooth < REJECT_THRESHOLD) {
*confidence = max_smooth;
return s_current_output;
}
*confidence = max_smooth;
// 8. Majority vote (window = 5)
s_vote_history[s_vote_head] = winner;
s_vote_head = (s_vote_head + 1) % 5;
int counts[MODEL_NUM_CLASSES] = {0};
for (int i = 0; i < 5; i++)
if (s_vote_history[i] >= 0) counts[s_vote_history[i]]++;
int majority = 0, majority_cnt = 0;
for (int c = 0; c < MODEL_NUM_CLASSES; c++)
if (counts[c] > majority_cnt) { majority_cnt = counts[c]; majority = c; }
// 9. Debounce (3 consecutive predictions to change output)
int final = s_current_output;
if (s_current_output == -1) {
s_current_output = majority; final = majority;
} else if (majority == s_current_output) {
s_pending_output = majority; s_pending_count = 1;
} else if (majority == s_pending_output) {
if (++s_pending_count >= 3) { s_current_output = majority; final = majority; }
} else {
s_pending_output = majority; s_pending_count = 1;
}
return final;
}
```
### model_weights_ensemble.h Layout (generated by Change 7)
```c
// Auto-generated by train_ensemble.py — do not edit manually
#pragma once
#define MODEL_NUM_CLASSES 5 // auto-computed from training data
#define MODEL_NUM_FEATURES 69 // total feature count (after Change 1)
#define ENSEMBLE_PER_CH_FEATURES 20 // features per channel
// Specialist feature subset offsets and sizes
#define TD_FEAT_OFFSET 0
#define TD_NUM_FEATURES 36 // time-domain: indices 011, 2031, 4051
#define FD_FEAT_OFFSET 12 // NOTE: FD features are interleaved per-channel
#define FD_NUM_FEATURES 24 // freq-domain: indices 1219, 3239, 5259
#define CC_FEAT_OFFSET 60
#define CC_NUM_FEATURES 9 // cross-channel: indices 6068
#define META_NUM_INPUTS (3 * MODEL_NUM_CLASSES) // = 15
// Specialist LDA weights (flat row-major: [n_classes][n_feat])
extern const float LDA_TD_WEIGHTS[MODEL_NUM_CLASSES][TD_NUM_FEATURES];
extern const float LDA_TD_INTERCEPTS[MODEL_NUM_CLASSES];
extern const float LDA_FD_WEIGHTS[MODEL_NUM_CLASSES][FD_NUM_FEATURES];
extern const float LDA_FD_INTERCEPTS[MODEL_NUM_CLASSES];
extern const float LDA_CC_WEIGHTS[MODEL_NUM_CLASSES][CC_NUM_FEATURES];
extern const float LDA_CC_INTERCEPTS[MODEL_NUM_CLASSES];
// Meta-LDA weights
extern const float META_LDA_WEIGHTS[MODEL_NUM_CLASSES][META_NUM_INPUTS];
extern const float META_LDA_INTERCEPTS[MODEL_NUM_CLASSES];
// Class names (for inference_get_gesture_enum)
extern const char *MODEL_CLASS_NAMES[MODEL_NUM_CLASSES];
```
**Important note on FD features**: the frequency-domain features are interleaved at indices
[1219] for ch0, [3239] for ch1, [5259] for ch2. The `lda_softmax` call for LDA_FD must
pass a **gathered** (non-contiguous) sub-vector. The cleanest approach is to gather them into
a contiguous buffer before calling lda_softmax:
```c
// Gather FD features into contiguous buffer before LDA_FD
float fd_buf[FD_NUM_FEATURES];
for (int ch = 0; ch < HAND_NUM_CHANNELS; ch++)
memcpy(fd_buf + ch*8, features + ch*20 + 12, 8 * sizeof(float));
lda_softmax(fd_buf, FD_NUM_FEATURES, ...);
```
Similarly for TD features. This gather costs <5 µs — negligible.
---
# PART VI — PYTHON/TRAINING CHANGES
## Change 0 — Forward Label Shift
**Priority**: Tier 1
**Source**: Meta Nature 2025, Methods: "Discrete-gesture time alignment"
**Why**: +100ms shift after onset detection gives the classifier 100ms of pre-event "building"
signal, dramatically cleaning the decision boundary near gesture onset.
**ESP32 impact**: None.
### Step 1 — Add Constant After Line 94
```python
# After: TRANSITION_END_MS = 150
LABEL_FORWARD_SHIFT_MS = 100 # shift label boundaries +100ms after onset alignment
# Source: Kaifosh et al. Nature 2025. doi:10.1038/s41586-025-09255-w
```
### Step 2 — Apply Shift in `SessionStorage.save_session()` (after line ~704)
Find and insert after:
```python
print(f"[Storage] Labels aligned: {changed}/{len(labels)} windows shifted")
```
Insert:
```python
if LABEL_FORWARD_SHIFT_MS > 0:
shift_windows = max(1, round(LABEL_FORWARD_SHIFT_MS / HOP_SIZE_MS))
shifted = list(aligned_labels)
for i in range(1, len(aligned_labels)):
if aligned_labels[i] != aligned_labels[i - 1]:
for j in range(i, min(i + shift_windows, len(aligned_labels))):
if shifted[j] == aligned_labels[i]:
shifted[j] = aligned_labels[i - 1]
n_shifted = sum(1 for a, b in zip(aligned_labels, shifted) if a != b)
aligned_labels = shifted
print(f"[Storage] Forward label shift (+{LABEL_FORWARD_SHIFT_MS}ms): {n_shifted} windows adjusted")
```
### Step 3 — Reduce TRANSITION_START_MS
```python
TRANSITION_START_MS = 200 # was 300 — reduce because 100ms shift already adds pre-event context
```
**Verify**: printout shows `N windows adjusted` where N is 520% of total windows per session.
---
## Change 1 — Expanded Feature Set
**Priority**: Tier 2
**Why**: 12 → 69 features; adds frequency-domain and cross-channel information that is
structurally more informative than amplitude alone (Meta Extended Data Fig. 6).
**ESP32 impact**: retrain → export new `model_weights.h`; port selected features to C.
### Sub-change 1A — Expand `extract_features_single_channel()` (line 1448)
Replace the entire function body:
```python
def extract_features_single_channel(self, signal: np.ndarray) -> dict:
if getattr(self, 'reinhard', False):
signal = 64.0 * signal / (32.0 + np.abs(signal))
signal = signal - np.mean(signal)
N = len(signal)
# --- Time domain ---
rms = np.sqrt(np.mean(signal ** 2))
diff = np.diff(signal)
wl = np.sum(np.abs(diff))
zc_thresh = self.zc_threshold_percent * rms
ssc_thresh = (self.ssc_threshold_percent * rms) ** 2
sign_ch = signal[:-1] * signal[1:] < 0
zc = int(np.sum(sign_ch & (np.abs(diff) > zc_thresh)))
d_l = signal[1:-1] - signal[:-2]
d_r = signal[1:-1] - signal[2:]
ssc = int(np.sum((d_l * d_r) > ssc_thresh))
mav = np.mean(np.abs(signal))
var = np.mean(signal ** 2)
iemg = np.sum(np.abs(signal))
wamp = int(np.sum(np.abs(diff) > 0.15 * rms))
# AR(4) via Yule-Walker
ar = np.zeros(4)
if rms > 1e-6:
try:
from scipy.linalg import solve_toeplitz
r = np.array([np.dot(signal[i:], signal[:N-i]) / N for i in range(5)])
if r[0] > 1e-10:
ar = solve_toeplitz(r[:4], -r[1:5])
except Exception:
pass
# --- Frequency domain (20500 Hz) ---
freqs = np.fft.rfftfreq(N, d=1.0 / SAMPLING_RATE_HZ)
psd = np.abs(np.fft.rfft(signal)) ** 2 / N
m = (freqs >= 20) & (freqs <= 500)
f_m, p_m = freqs[m], psd[m]
tp = np.sum(p_m) + 1e-10
mnf = float(np.sum(f_m * p_m) / tp)
cum = np.cumsum(p_m)
mdf = float(f_m[min(np.searchsorted(cum, tp / 2), len(f_m) - 1)])
pkf = float(f_m[np.argmax(p_m)]) if len(p_m) > 0 else 0.0
mnp = float(tp / max(len(p_m), 1))
# Bandpower in 4 physiological bands (mirrors firmware esp-dsp FFT bands)
bands = [(20, 80), (80, 150), (150, 300), (300, 500)]
bp = [float(np.sum(psd[(freqs >= lo) & (freqs < hi)])) for lo, hi in bands]
return {
'rms': rms, 'wl': wl, 'zc': zc, 'ssc': ssc,
'mav': mav, 'var': var, 'iemg': iemg, 'wamp': wamp,
'ar1': float(ar[0]), 'ar2': float(ar[1]),
'ar3': float(ar[2]), 'ar4': float(ar[3]),
'mnf': mnf, 'mdf': mdf, 'pkf': pkf, 'mnp': mnp,
'bp0': bp[0], 'bp1': bp[1], 'bp2': bp[2], 'bp3': bp[3],
}
```
### Sub-change 1B — Update `extract_features_window()` Return Block (line 1482)
Replace the return section:
```python
FEATURE_ORDER = ['rms', 'wl', 'zc', 'ssc', 'mav', 'var', 'iemg', 'wamp',
'ar1', 'ar2', 'ar3', 'ar4', 'mnf', 'mdf', 'pkf', 'mnp',
'bp0', 'bp1', 'bp2', 'bp3']
NORMALIZE_KEYS = {'rms', 'wl', 'mav', 'iemg'}
features = []
for ch_features in all_ch_features:
for key in FEATURE_ORDER:
val = ch_features.get(key, 0.0)
if self.normalize and key in NORMALIZE_KEYS:
val = val / norm_factor
features.append(float(val))
if self.cross_channel and window.shape[1] >= 2:
sel = window[:, channel_indices].astype(np.float32)
wc = sel - sel.mean(axis=0)
cov = (wc.T @ wc) / len(wc)
ri, ci = np.triu_indices(len(channel_indices))
features.extend(cov[ri, ci].tolist())
stds = np.sqrt(np.diag(cov)) + 1e-10
cor = cov / np.outer(stds, stds)
ro, co = np.triu_indices(len(channel_indices), k=1)
features.extend(cor[ro, co].tolist())
return np.array(features, dtype=np.float32)
```
### Sub-change 1C — Update `EMGFeatureExtractor.__init__()` (line 1430)
```python
def __init__(self, zc_threshold_percent=0.1, ssc_threshold_percent=0.1,
channels=None, normalize=True, cross_channel=True, reinhard=False):
self.zc_threshold_percent = zc_threshold_percent
self.ssc_threshold_percent = ssc_threshold_percent
self.channels = channels
self.normalize = normalize
self.cross_channel = cross_channel
self.reinhard = reinhard
```
### Sub-change 1D — Update Feature Count in `extract_features_batch()` (line 1520)
Replace `n_features = n_channels * 4`:
```python
per_ch = 20
if self.cross_channel and n_channels >= 2:
n_features = n_channels * per_ch + \
n_channels*(n_channels+1)//2 + n_channels*(n_channels-1)//2
else:
n_features = n_channels * per_ch
```
### Sub-change 1E — Update `get_feature_names()` (line 1545)
```python
def get_feature_names(self, n_channels=0):
ch_idx = self.channels if self.channels is not None else list(range(n_channels))
ORDER = ['rms','wl','zc','ssc','mav','var','iemg','wamp',
'ar1','ar2','ar3','ar4','mnf','mdf','pkf','mnp','bp0','bp1','bp2','bp3']
names = [f'ch{ch}_{f}' for ch in ch_idx for f in ORDER]
if self.cross_channel and len(ch_idx) >= 2:
n = len(ch_idx)
names += [f'cov_ch{ch_idx[i]}_ch{ch_idx[j]}' for i in range(n) for j in range(i, n)]
names += [f'cor_ch{ch_idx[i]}_ch{ch_idx[j]}' for i in range(n) for j in range(i+1, n)]
return names
```
### Sub-change 1F — Update `EMGClassifier.__init__()` (line 1722)
```python
self.feature_extractor = EMGFeatureExtractor(
channels=HAND_CHANNELS, cross_channel=True, reinhard=False)
```
### Sub-change 1G — Update `save()` (line 1910) and `load()` (line 2089)
In `save()`, add to `feature_extractor_params` dict:
```python
'cross_channel': getattr(self.feature_extractor, 'cross_channel', True),
'reinhard': getattr(self.feature_extractor, 'reinhard', False),
```
In `load()`, update `EMGFeatureExtractor(...)` constructor:
```python
classifier.feature_extractor = EMGFeatureExtractor(
zc_threshold_percent = params.get('zc_threshold_percent', 0.1),
ssc_threshold_percent = params.get('ssc_threshold_percent', 0.1),
channels = params.get('channels', HAND_CHANNELS),
normalize = params.get('normalize', False),
cross_channel = params.get('cross_channel', True),
reinhard = params.get('reinhard', False),
)
```
### Also Fix Bug at Line 2382
```python
X, y, trial_ids, session_indices, label_names, loaded_sessions = storage.load_all_for_training()
```
---
## Change 2 — Electrode Repositioning Protocol
**Protocol**: no code changes.
> *"Between sessions within a single day, the participants remove and slightly reposition the
> sEMG wristband to enable generalization across different recording positions."*
> — Meta Nature 2025 Methods
- Session 1: standard placement
- Session 2: band 12 cm up the forearm
- Session 3: band 12 cm down the forearm
- Session 4+: slight axial rotation or return to any above position
The per-session z-score normalization in `_apply_session_normalization()` handles the
resulting amplitude shifts. Perform **fast, natural** gestures — not slow/deliberate.
---
## Change 3 — Data Augmentation
**Priority**: Tier 2. Apply to **raw windows BEFORE feature extraction**.
Insert before the `# === LDA CLASSIFIER ===` comment (~line 1709):
```python
def augment_emg_batch(X, y, multiplier=3, seed=42):
"""
Augment raw EMG windows for training robustness.
Must be called on raw windows (n_windows, n_samples, n_channels),
not on pre-computed features.
Source (window jitter): Kaifosh et al. Nature 2025. doi:10.1038/s41586-025-09255-w
"""
rng = np.random.default_rng(seed)
aug_X, aug_y = [X], [y]
for _ in range(multiplier - 1):
Xc = X.copy().astype(np.float32)
Xc *= rng.uniform(0.80, 1.20, (len(X), 1, 1)).astype(np.float32) # amplitude
rms = np.sqrt(np.mean(Xc**2, axis=(1,2), keepdims=True)) + 1e-8
Xc += rng.standard_normal(Xc.shape).astype(np.float32) * (0.05 * rms) # noise
Xc += rng.uniform(-20., 20., (len(X), 1, X.shape[2])).astype(np.float32) # DC jitter
shifts = rng.integers(-5, 6, size=len(X))
for i in range(len(Xc)):
if shifts[i]: Xc[i] = np.roll(Xc[i], shifts[i], axis=0) # jitter
aug_X.append(Xc); aug_y.append(y)
return np.concatenate(aug_X), np.concatenate(aug_y)
```
In `EMGClassifier.train()`, replace the start of the function's feature extraction block:
```python
if getattr(self, 'use_augmentation', True):
X_aug, y_aug = augment_emg_batch(X, y, multiplier=3)
print(f"[Classifier] Augmented: {len(X)} → {len(X_aug)} windows")
else:
X_aug, y_aug = X, y
X_features = self.feature_extractor.extract_features_batch(X_aug)
# ... then use y_aug instead of y for model.fit()
```
---
## Change 4 — Reinhard Compression (Optional)
**Formula**: `output = 64 × x / (32 + |x|)`
**Enable in Python**: set `reinhard=True` in `EMGFeatureExtractor` constructor (Change 1F).
**Enable in firmware** (`inference.c` `compute_features()`, after signal copy loop, before mean calc):
```c
#if MODEL_USE_REINHARD
for (int i = 0; i < INFERENCE_WINDOW_SIZE; i++) {
float x = signal[i];
signal[i] = 64.0f * x / (32.0f + fabsf(x));
}
#endif
```
Add `#define MODEL_USE_REINHARD 0` to `model_weights.h` (set to `1` when Python uses `reinhard=True`).
**Python and firmware MUST match.** Mismatch silently corrupts all predictions.
---
## Change 5 — Classifier Benchmark
**Purpose**: tells you whether LDA accuracy plateau is a features problem (all classifiers similar → add features) or a model complexity problem (SVM/MLP >> LDA → implement Change E/F).
Add after `run_training_demo()`:
```python
def run_classifier_benchmark():
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import cross_val_score, GroupKFold
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis, QuadraticDiscriminantAnalysis
storage = SessionStorage()
X_raw, y, trial_ids, session_indices, label_names, _ = storage.load_all_for_training()
extractor = EMGFeatureExtractor(channels=HAND_CHANNELS, cross_channel=True)
X = extractor.extract_features_batch(X_raw)
X = EMGClassifier()._apply_session_normalization(X, session_indices, y=y)
clfs = {
'LDA (ESP32 model)': LinearDiscriminantAnalysis(),
'QDA': QuadraticDiscriminantAnalysis(reg_param=0.1),
'SVM-RBF': Pipeline([('s', StandardScaler()), ('m', SVC(kernel='rbf', C=10))]),
'MLP-128-64': Pipeline([('s', StandardScaler()),
('m', MLPClassifier(hidden_layer_sizes=(128,64),
max_iter=1000, early_stopping=True))]),
}
gkf = GroupKFold(n_splits=5)
print(f"\n{'Classifier':<22} {'Mean CV':>8} {'Std':>6}")
print("-" * 40)
for name, clf in clfs.items():
sc = cross_val_score(clf, X, y, cv=gkf, groups=trial_ids, scoring='accuracy')
print(f" {name:<20} {sc.mean()*100:>7.1f}% ±{sc.std()*100:.1f}%")
print("\n → If LDA ≈ SVM: features are the bottleneck (add Change 1 features)")
print(" → If SVM >> LDA: model complexity bottleneck (implement Change F ensemble)")
```
---
## Change 6 — Simplified MPF Features
**Python training only** — not worth porting to ESP32 directly (use bandpower bp0bp3 from Change 1 as the firmware-side approximation).
Add after `EMGFeatureExtractor` class:
```python
class MPFFeatureExtractor:
"""
Simplified 3-channel MPF: CSD upper triangle per 6 frequency bands = 36 features.
Python training only. Omits matrix logarithm (not needed for 3 channels).
Source: Kaifosh et al. Nature 2025. doi:10.1038/s41586-025-09255-w
ESP32 approximation: use bp0bp3 from EMGFeatureExtractor (Change 1).
"""
BANDS = [(0,62),(62,125),(125,187),(187,250),(250,375),(375,500)]
def __init__(self, channels=None, log_diagonal=True):
self.channels = channels or HAND_CHANNELS
self.log_diag = log_diagonal
self.n_ch = len(self.channels)
self._r, self._c = np.triu_indices(self.n_ch)
self.n_features = len(self.BANDS) * len(self._r)
def extract_window(self, window):
sig = window[:, self.channels].astype(np.float64)
N = len(sig)
freqs = np.fft.rfftfreq(N, d=1.0/SAMPLING_RATE_HZ)
Xf = np.fft.rfft(sig, axis=0)
feats = []
for lo, hi in self.BANDS:
mask = (freqs >= lo) & (freqs < hi)
if not mask.any():
feats.extend([0.0] * len(self._r)); continue
CSD = (Xf[mask].conj().T @ Xf[mask]).real / N
if self.log_diag:
for k in range(self.n_ch): CSD[k,k] = np.log(max(CSD[k,k], 1e-10))
feats.extend(CSD[self._r, self._c].tolist())
return np.array(feats, dtype=np.float32)
def extract_batch(self, X):
out = np.zeros((len(X), self.n_features), dtype=np.float32)
for i in range(len(X)): out[i] = self.extract_window(X[i])
return out
```
In `EMGClassifier.train()`, after standard feature extraction:
```python
if getattr(self, 'use_mpf', False):
mpf = MPFFeatureExtractor(channels=HAND_CHANNELS)
X_features = np.hstack([X_features, mpf.extract_batch(X_aug)])
```
---
## Change 7 — Ensemble Training
**Priority**: Tier 3 (implements Change F's training side)
**New file**: `C:/VSCode/Marvel_Projects/Bucky_Arm/train_ensemble.py`
```python
"""
Train the full 3-specialist-LDA + meta-LDA ensemble.
Requires Change 1 (expanded features) to be implemented first.
Exports model_weights_ensemble.h for firmware Change F.
Architecture:
LDA_TD (36 time-domain feat) ─┐
LDA_FD (24 freq-domain feat) ├─ 15 probs ─► Meta-LDA ─► final class
LDA_CC (9 cross-ch feat) ─┘
"""
import numpy as np
from pathlib import Path
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.model_selection import cross_val_predict, GroupKFold, cross_val_score
import sys
sys.path.insert(0, str(Path(__file__).parent))
from learning_data_collection import (
SessionStorage, EMGFeatureExtractor, HAND_CHANNELS
)
# ─── Load and extract features ───────────────────────────────────────────────
storage = SessionStorage()
X_raw, y, trial_ids, session_indices, label_names, _ = storage.load_all_for_training()
extractor = EMGFeatureExtractor(channels=HAND_CHANNELS, cross_channel=True)
X = extractor.extract_features_batch(X_raw).astype(np.float64)
# Per-session normalization (same as EMGClassifier._apply_session_normalization)
from sklearn.preprocessing import StandardScaler
for sid in np.unique(session_indices):
mask = session_indices == sid
sc = StandardScaler()
X[mask] = sc.fit_transform(X[mask])
feat_names = extractor.get_feature_names(n_channels=len(HAND_CHANNELS))
n_cls = len(np.unique(y))
# ─── Feature subset indices ───────────────────────────────────────────────────
TD_FEAT = ['rms','wl','zc','ssc','mav','var','iemg','wamp','ar1','ar2','ar3','ar4']
FD_FEAT = ['mnf','mdf','pkf','mnp','bp0','bp1','bp2','bp3']
td_idx = [i for i,n in enumerate(feat_names) if any(n.endswith(f'_{f}') for f in TD_FEAT)]
fd_idx = [i for i,n in enumerate(feat_names) if any(n.endswith(f'_{f}') for f in FD_FEAT)]
cc_idx = [i for i,n in enumerate(feat_names) if n.startswith('cov_') or n.startswith('cor_')]
print(f"Feature subsets — TD: {len(td_idx)}, FD: {len(fd_idx)}, CC: {len(cc_idx)}")
X_td = X[:, td_idx]
X_fd = X[:, fd_idx]
X_cc = X[:, cc_idx]
# ─── Train specialist LDAs with out-of-fold stacking ─────────────────────────
gkf = GroupKFold(n_splits=5)
print("Training specialist LDAs (out-of-fold for stacking)...")
lda_td = LinearDiscriminantAnalysis()
lda_fd = LinearDiscriminantAnalysis()
lda_cc = LinearDiscriminantAnalysis()
oof_td = cross_val_predict(lda_td, X_td, y, cv=gkf, groups=trial_ids, method='predict_proba')
oof_fd = cross_val_predict(lda_fd, X_fd, y, cv=gkf, groups=trial_ids, method='predict_proba')
oof_cc = cross_val_predict(lda_cc, X_cc, y, cv=gkf, groups=trial_ids, method='predict_proba')
# Specialist CV accuracy (for diagnostics)
for name, mdl, Xs in [('LDA_TD', lda_td, X_td), ('LDA_FD', lda_fd, X_fd), ('LDA_CC', lda_cc, X_cc)]:
sc = cross_val_score(mdl, Xs, y, cv=gkf, groups=trial_ids)
print(f" {name}: {sc.mean()*100:.1f}% ± {sc.std()*100:.1f}%")
# ─── Train meta-LDA on out-of-fold outputs ───────────────────────────────────
X_meta = np.hstack([oof_td, oof_fd, oof_cc]) # (n_samples, 3*n_cls = 15)
meta_lda = LinearDiscriminantAnalysis()
meta_sc = cross_val_score(meta_lda, X_meta, y, cv=gkf, groups=trial_ids)
print(f" Meta-LDA: {meta_sc.mean()*100:.1f}% ± {meta_sc.std()*100:.1f}%")
# Fit all models on full dataset for deployment
lda_td.fit(X_td, y); lda_fd.fit(X_fd, y); lda_cc.fit(X_cc, y)
meta_lda.fit(X_meta, y)
# ─── Export all weights to C header ──────────────────────────────────────────
def lda_to_c_arrays(lda, name, feat_dim, n_cls, label_names, class_order):
"""Generate C array strings for LDA weights and intercepts."""
# Reorder classes to match label_names order
coef = lda.coef_ # shape (n_cls, feat_dim) for LinearDiscriminantAnalysis
intercept = lda.intercept_
lines = []
lines.append(f"const float {name}_WEIGHTS[{n_cls}][{feat_dim}] = {{")
for c in class_order:
row = ', '.join(f'{v:.8f}f' for v in coef[c])
lines.append(f" {{{row}}}, // {label_names[c]}")
lines.append("};")
lines.append(f"const float {name}_INTERCEPTS[{n_cls}] = {{")
intercept_str = ', '.join(f'{intercept[c]:.8f}f' for c in class_order)
lines.append(f" {intercept_str}")
lines.append("};")
return '\n'.join(lines)
class_order = list(range(n_cls))
out_path = Path('EMG_Arm/src/core/model_weights_ensemble.h')
with open(out_path, 'w') as f:
f.write("// Auto-generated by train_ensemble.py — do not edit\n")
f.write("#pragma once\n\n")
f.write(f"#define MODEL_NUM_CLASSES {n_cls}\n")
f.write(f"#define MODEL_NUM_FEATURES {X.shape[1]}\n")
f.write(f"#define ENSEMBLE_PER_CH_FEATURES 20\n\n")
f.write(f"#define TD_FEAT_OFFSET {min(td_idx)}\n")
f.write(f"#define TD_NUM_FEATURES {len(td_idx)}\n")
f.write(f"#define FD_FEAT_OFFSET {min(fd_idx)}\n")
f.write(f"#define FD_NUM_FEATURES {len(fd_idx)}\n")
f.write(f"#define CC_FEAT_OFFSET {min(cc_idx)}\n")
f.write(f"#define CC_NUM_FEATURES {len(cc_idx)}\n")
f.write(f"#define META_NUM_INPUTS ({3} * MODEL_NUM_CLASSES)\n\n")
f.write(lda_to_c_arrays(lda_td, 'LDA_TD', len(td_idx), n_cls, label_names, class_order))
f.write('\n\n')
f.write(lda_to_c_arrays(lda_fd, 'LDA_FD', len(fd_idx), n_cls, label_names, class_order))
f.write('\n\n')
f.write(lda_to_c_arrays(lda_cc, 'LDA_CC', len(cc_idx), n_cls, label_names, class_order))
f.write('\n\n')
f.write(lda_to_c_arrays(meta_lda, 'META_LDA', 3*n_cls, n_cls, label_names, class_order))
f.write('\n\n')
names_str = ', '.join(f'"{label_names[c]}"' for c in class_order)
f.write(f"const char *MODEL_CLASS_NAMES[MODEL_NUM_CLASSES] = {{{names_str}}};\n")
print(f"Exported ensemble weights to {out_path}")
print(f"Total weight storage: {(len(td_idx)+len(fd_idx)+len(cc_idx)+3*n_cls)*n_cls*4} bytes float32")
```
**Note on LinearDiscriminantAnalysis with multi-class**: scikit-learn's LDA uses a
`(n_classes-1, n_features)` coef matrix for multi-class. Verify `lda.coef_.shape` after
fitting — if it is `(n_cls-1, n_feat)` rather than `(n_cls, n_feat)`, use the
`decision_function()` output structure and adjust the export accordingly.
---
# PART VII — FEATURE SELECTION FOR ESP32 PORTING
After Change 1 is trained, use this to decide what to port to C firmware.
### Step 1 — Get Feature Importance
```python
importance = np.abs(classifier.model.coef_).mean(axis=0)
feat_names = classifier.feature_extractor.get_feature_names(n_channels=len(HAND_CHANNELS))
ranked = sorted(zip(feat_names, importance), key=lambda x: -x[1])
print("Top 20 features by LDA discriminative weight:")
for name, score in ranked[:20]:
print(f" {name:<35} {score:.4f}")
```
### Step 2 — Port Decision Matrix
| Feature | C Complexity | Prereq | Port? |
|---------|-------------|--------|-------|
| RMS, WL, ZC, SSC | ✓ Already in C | — | Keep |
| MAV, VAR, IEMG | Very easy (1 loop) | None | ✓ Yes |
| WAMP | Very easy (threshold on diff) | None | ✓ Yes |
| Cross-ch covariance | Easy (3×3 outer product) | None | ✓ Yes |
| Cross-ch correlation | Easy (normalize covariance) | Covariance | ✓ Yes |
| Bandpower bp0bp3 | Medium (128-pt FFT via esp-dsp) | Add FFT call | ✓ Yes — highest ROI |
| MNF, MDF, PKF, MNP | Easy after FFT | Bandpower FFT | ✓ Free once FFT added |
| AR(4) | Medium (Levinson-Durbin in C) | None | Only if top-8 importance |
Once `dsps_fft2r_fc32()` is added for bandpower, MNF/MDF/PKF/MNP come free.
### Step 3 — Adding FFT-Based Features to inference.c
Add inside `compute_features()` loop, after time-domain features per channel:
```c
// 128-pt FFT for frequency-domain features per channel
// Zero-pad signal from INFERENCE_WINDOW_SIZE (150) to 128 by truncating
float fft_buf[256] = {0}; // 128 complex floats
for (int i = 0; i < 128 && i < INFERENCE_WINDOW_SIZE; i++) {
fft_buf[2*i] = signal[i]; // real
fft_buf[2*i+1] = 0.0f; // imag
}
dsps_fft2r_fc32(fft_buf, 128);
dsps_bit_rev_fc32(fft_buf, 128);
// Bandpower: bin k → freq = k * 1000/128 ≈ k * 7.8125 Hz
// Band 0: 2080 Hz → bins 310
// Band 1: 80150 Hz → bins 1019
// Band 2: 150300 Hz→ bins 1938
// Band 3: 300500 Hz→ bins 3864
int band_bins[5] = {3, 10, 19, 38, 64};
float bp[4] = {0,0,0,0};
for (int b = 0; b < 4; b++)
for (int k = band_bins[b]; k < band_bins[b+1]; k++) {
float re = fft_buf[2*k], im = fft_buf[2*k+1];
bp[b] += re*re + im*im;
}
// Store at correct indices (base = ch * 20)
int base = ch * 20;
features_out[base+16] = bp[0]; features_out[base+17] = bp[1];
features_out[base+18] = bp[2]; features_out[base+19] = bp[3];
```
---
# PART VIII — MEASUREMENT AND VALIDATION
## Baseline Protocol
**Run this BEFORE any change and after EACH change.**
```
1. python learning_data_collection.py → option 3 (Train Classifier)
2. Record:
- "Mean CV accuracy: XX.X% ± Y.Y%" (cross-validation)
- Confusion matrix (which gesture pairs are most confused)
- Per-gesture accuracy breakdown
3. On-device test:
- Put on sensors, perform 10 reps of each gesture
- Log classification output (UART or Python serial monitor)
- Compute per-gesture accuracy manually
4. Record REST false-trigger rate: hold arm at rest for 30 seconds,
count number of non-REST outputs
```
## Results Log
| Change | CV Acc Before | CV Acc After | Delta | On-Device Acc | False Triggers/30s | Keep? |
|--------|--------------|-------------|-------|---------------|-------------------|-------|
| Baseline | — | — | — | — | — | — |
| Change C (reject) | — | — | — | — | — | — |
| Change B (filter) | — | — | — | — | — | — |
| Change 0 (label shift) | — | — | — | — | — | — |
| Change 1 (features) | — | — | — | — | — | — |
| Change D (NVS calib) | — | — | — | — | — | — |
| Change 3 (augment) | — | — | — | — | — | — |
| Change 5 (benchmark) | — | — | — | — | — | — |
| Change 7+F (ensemble) | — | — | — | — | — | — |
| Change E (MLP) | — | — | — | — | — | — |
## When to Add More Gestures
| CV Accuracy | Recommendation |
|-------------|----------------|
| <80% | Do NOT add gestures — fix the existing 5 first |
| 8090% | Adding 12 gestures is reasonable; expect 58% drop per new gesture |
| >90% | Good baseline; can add gestures; target staying above 85% |
| >95% | Excellent; can be ambitious with gesture count |
---
# PART IX — EXPORT WORKFLOW
## Path 1 — LDA / Ensemble (Changes 04, 7+F)
```
1. Train: python learning_data_collection.py → option 3 (single LDA)
OR: python train_ensemble.py (full ensemble)
2. Export:
Single LDA: classifier.export_to_header(Path('EMG_Arm/src/core/model_weights.h'))
Ensemble: export_ensemble_header() in train_ensemble.py
→ writes model_weights_ensemble.h
3. Port new features to inference.c (if Change 1 features added):
- Follow feature selection decision matrix (Part VII)
- CRITICAL: C feature index order MUST match Python FEATURE_ORDER exactly
4. Build + flash: pio run -t upload
```
## Path 2 — int8 MLP via TFLM (Change E)
```
1. python train_mlp_tflite.py → emg_model_data.cc
2. Add TFLM to platformio.ini lib_deps
3. Replace LDA inference call with inference_mlp_predict() in inference.c
OR use inference_ensemble_predict() which calls MLP as fallback (Change F)
4. pio run -t upload
```
## Feature Index Contract (Critical)
The order of values written to `features_out[]` in `compute_features()` in C **must exactly
match** `FEATURE_ORDER` in `extract_features_window()` in Python, index for index.
To verify before flashing: print both the C feature names (from `MODEL_FEATURE_NAMES` if
added to header) and Python `extractor.get_feature_names()` and diff them.
---
# PART X — REFERENCES
**Primary paper**: Kaifosh, P., Reardon, T., et al. "A high-bandwidth neuromotor prosthesis
enabled by implicit information in intrinsic motor neurons." *Nature* (2025).
doi:10.1038/s41586-025-09255-w
**Meta codebase** (label alignment, CLER metric, model architectures):
`C:/VSCode/Marvel_Projects/Meta_Emg_Stuff/generic-neuromotor-interface/`
- `data.py`: onset detection, `searchsorted` alignment, window jitter
- `cler.py`: threshold=0.35, debounce=50ms, tolerance=±50/250ms
- `networks.py`: model architectures, left_context=20, stride=10
- `lightning.py`: `targets[..., left_context::stride]` label shift
**Barachant et al. 2012**: "Multiclass braincomputer interface classification by
Riemannian geometry." — matrix logarithm reference (MPF features).
**Espressif libraries**:
- esp-dsp: `github.com/espressif/esp-dsp` — biquad, FFT, dot-product
- esp-dl: `github.com/espressif/esp-dl` — quantized MLP/CNN inference
- TFLite Micro: `github.com/tensorflow/tflite-micro`
**All project files** (existing + planned):
```
── Laptop / Python ─────────────────────────────────────────────────────────────────────────
C:/VSCode/Marvel_Projects/Bucky_Arm/learning_data_collection.py ← main: data collection + training
C:/VSCode/Marvel_Projects/Bucky_Arm/live_predict.py ← NEW (Part 0.6): laptop-side live inference
C:/VSCode/Marvel_Projects/Bucky_Arm/train_ensemble.py ← NEW (Change 7): ensemble training
C:/VSCode/Marvel_Projects/Bucky_Arm/train_mlp_tflite.py ← NEW (Change E): int8 MLP export
── ESP32 Firmware — Existing ───────────────────────────────────────────────────────────────
C:/VSCode/Marvel_Projects/Bucky_Arm/EMG_Arm/platformio.ini
└─ ADD lib_deps: espressif/esp-dsp (Changes B,1,F), tensorflow/tflite-micro (Change E)
C:/VSCode/Marvel_Projects/Bucky_Arm/EMG_Arm/src/config/config.h
└─ MODIFY: remove system_mode_t; add EMG_STANDALONE to MAIN_MODE enum (Part 0.7, S1)
C:/VSCode/Marvel_Projects/Bucky_Arm/EMG_Arm/src/app/main.c
└─ MODIFY: add STATE_LAPTOP_PREDICT, CMD_START_LAPTOP_PREDICT, run_laptop_predict_loop(),
run_standalone_loop() (Part 0.5)
C:/VSCode/Marvel_Projects/Bucky_Arm/EMG_Arm/src/drivers/emg_sensor.c
└─ MODIFY (Change A): migrate from adc_oneshot to adc_continuous driver
C:/VSCode/Marvel_Projects/Bucky_Arm/EMG_Arm/src/core/inference.c
└─ MODIFY: add inference_get_gesture_by_name(), IIR filter (B), features (1), confidence rejection (C)
C:/VSCode/Marvel_Projects/Bucky_Arm/EMG_Arm/src/core/inference.h
└─ MODIFY: add inference_get_gesture_by_name() declaration
C:/VSCode/Marvel_Projects/Bucky_Arm/EMG_Arm/src/core/gestures.c
└─ MODIFY: update gesture_names[] and gestures_execute() when adding gestures
C:/VSCode/Marvel_Projects/Bucky_Arm/EMG_Arm/src/core/model_weights.h
└─ AUTO-GENERATED by export_to_header() — do not edit manually
── ESP32 Firmware — New Files ──────────────────────────────────────────────────────────────
C:/VSCode/Marvel_Projects/Bucky_Arm/EMG_Arm/src/core/bicep.h/.c ← Part 0 / Section 2.2
C:/VSCode/Marvel_Projects/Bucky_Arm/EMG_Arm/src/core/calibration.h/.c ← Change D (NVS z-score)
C:/VSCode/Marvel_Projects/Bucky_Arm/EMG_Arm/src/core/inference_ensemble.h/.c ← Change F
C:/VSCode/Marvel_Projects/Bucky_Arm/EMG_Arm/src/core/inference_mlp.h/.cc ← Change E
C:/VSCode/Marvel_Projects/Bucky_Arm/EMG_Arm/src/core/model_weights_ensemble.h ← AUTO-GENERATED (Change 7)
C:/VSCode/Marvel_Projects/Bucky_Arm/EMG_Arm/src/core/emg_model_data.h/.cc ← AUTO-GENERATED (Change E)
```