Restructure repo: add Python EMG scripts and move ESP32 code to EMG_Arm/

- Add emg_gui.py, learning_data_collection.py, learning_emg_filtering.py
- Add collected EMG data and trained models
- Move ESP32/PlatformIO code into EMG_Arm/ subdirectory
- Add servo control functions for robotic hand (flex/unflex fingers)
- Update .gitignore for Python and PlatformIO
- Exclude large asset files from repo (>100MB)
This commit is contained in:
Surya Balaji
2026-01-17 23:31:15 -06:00
parent bcfcd8f8c6
commit 7af06f115a
29 changed files with 4140 additions and 59 deletions

23
.gitignore vendored
View File

@@ -1,5 +1,18 @@
.pio
.vscode/.browse.c_cpp.db*
.vscode/c_cpp_properties.json
.vscode/launch.json
.vscode/ipch
# Python
__pycache__/
*.py[cod]
*.egg-info/
.venv/
venv/
# PlatformIO
EMG_Arm/.pio/
EMG_Arm/.vscode/
# Data (uncomment if you want to exclude)
# collected_data/
# models/
# OS
.DS_Store
Thumbs.db

View File

@@ -1,8 +0,0 @@
{
"recommendations": [
"pioarduino.pioarduino-ide"
],
"unwantedRecommendations": [
"ms-vscode.cpptools-extension-pack"
]
}

5
EMG_Arm/.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
.pio
.vscode/.browse.c_cpp.db*
.vscode/c_cpp_properties.json
.vscode/launch.json
.vscode/ipch

180
EMG_Arm/src/main.c Normal file
View File

@@ -0,0 +1,180 @@
#include <freertos/FreeRTOS.h>
#include "driver/gpio.h"
#include "driver/ledc.h"
// Finger servo pin mappings
#define thumbServoPin GPIO_NUM_1
#define indexServoPin GPIO_NUM_4
#define middleServoPin GPIO_NUM_5
#define ringServoPin GPIO_NUM_6
#define pinkyServoPin GPIO_NUM_7
// LEDC channels (one per servo)
#define thumbChannel LEDC_CHANNEL_0
#define indexChannel LEDC_CHANNEL_1
#define middleChannel LEDC_CHANNEL_2
#define ringChannel LEDC_CHANNEL_3
#define pinkyChannel LEDC_CHANNEL_4
#define deg180 2048
#define deg0 430
// Arrays for cleaner initialization
const int servoPins[] = {thumbServoPin, indexServoPin, middleServoPin, ringServoPin, pinkyServoPin};
const int servoChannels[] = {thumbChannel, indexChannel, middleChannel, ringChannel, pinkyChannel};
const int numServos = 5;
void servoInit() {
// LEDC timer configuration (shared by all servos)
ledc_timer_config_t ledc_timer = {};
ledc_timer.speed_mode = LEDC_LOW_SPEED_MODE;
ledc_timer.timer_num = LEDC_TIMER_0;
ledc_timer.duty_resolution = LEDC_TIMER_14_BIT;
ledc_timer.freq_hz = 50;
ledc_timer.clk_cfg = LEDC_AUTO_CLK;
ESP_ERROR_CHECK(ledc_timer_config(&ledc_timer));
// Initialize each finger servo channel
for (int i = 0; i < numServos; i++) {
ledc_channel_config_t ledc_channel = {};
ledc_channel.speed_mode = LEDC_LOW_SPEED_MODE;
ledc_channel.channel = servoChannels[i];
ledc_channel.timer_sel = LEDC_TIMER_0;
ledc_channel.intr_type = LEDC_INTR_DISABLE;
ledc_channel.gpio_num = servoPins[i];
ledc_channel.duty = deg0; // Start with fingers open
ledc_channel.hpoint = 0;
ESP_ERROR_CHECK(ledc_channel_config(&ledc_channel));
}
}
// Flex functions (move to 180 degrees - finger closed)
void flexThumb() {
ledc_set_duty(LEDC_LOW_SPEED_MODE, thumbChannel, deg180);
ledc_update_duty(LEDC_LOW_SPEED_MODE, thumbChannel);
}
void flexIndex() {
ledc_set_duty(LEDC_LOW_SPEED_MODE, indexChannel, deg180);
ledc_update_duty(LEDC_LOW_SPEED_MODE, indexChannel);
}
void flexMiddle() {
ledc_set_duty(LEDC_LOW_SPEED_MODE, middleChannel, deg180);
ledc_update_duty(LEDC_LOW_SPEED_MODE, middleChannel);
}
void flexRing() {
ledc_set_duty(LEDC_LOW_SPEED_MODE, ringChannel, deg180);
ledc_update_duty(LEDC_LOW_SPEED_MODE, ringChannel);
}
void flexPinky() {
ledc_set_duty(LEDC_LOW_SPEED_MODE, pinkyChannel, deg180);
ledc_update_duty(LEDC_LOW_SPEED_MODE, pinkyChannel);
}
// Unflex functions (move to 0 degrees - finger open)
void unflexThumb() {
ledc_set_duty(LEDC_LOW_SPEED_MODE, thumbChannel, deg0);
ledc_update_duty(LEDC_LOW_SPEED_MODE, thumbChannel);
}
void unflexIndex() {
ledc_set_duty(LEDC_LOW_SPEED_MODE, indexChannel, deg0);
ledc_update_duty(LEDC_LOW_SPEED_MODE, indexChannel);
}
void unflexMiddle() {
ledc_set_duty(LEDC_LOW_SPEED_MODE, middleChannel, deg0);
ledc_update_duty(LEDC_LOW_SPEED_MODE, middleChannel);
}
void unflexRing() {
ledc_set_duty(LEDC_LOW_SPEED_MODE, ringChannel, deg0);
ledc_update_duty(LEDC_LOW_SPEED_MODE, ringChannel);
}
void unflexPinky() {
ledc_set_duty(LEDC_LOW_SPEED_MODE, pinkyChannel, deg0);
ledc_update_duty(LEDC_LOW_SPEED_MODE, pinkyChannel);
}
// Combo functions
void makeFist() {
// Set all duties first
ledc_set_duty(LEDC_LOW_SPEED_MODE, thumbChannel, deg180);
ledc_set_duty(LEDC_LOW_SPEED_MODE, indexChannel, deg180);
ledc_set_duty(LEDC_LOW_SPEED_MODE, middleChannel, deg180);
ledc_set_duty(LEDC_LOW_SPEED_MODE, ringChannel, deg180);
ledc_set_duty(LEDC_LOW_SPEED_MODE, pinkyChannel, deg180);
// Update all at once
ledc_update_duty(LEDC_LOW_SPEED_MODE, thumbChannel);
ledc_update_duty(LEDC_LOW_SPEED_MODE, indexChannel);
ledc_update_duty(LEDC_LOW_SPEED_MODE, middleChannel);
ledc_update_duty(LEDC_LOW_SPEED_MODE, ringChannel);
ledc_update_duty(LEDC_LOW_SPEED_MODE, pinkyChannel);
}
void openHand() {
// Set all duties first
ledc_set_duty(LEDC_LOW_SPEED_MODE, thumbChannel, deg0);
ledc_set_duty(LEDC_LOW_SPEED_MODE, indexChannel, deg0);
ledc_set_duty(LEDC_LOW_SPEED_MODE, middleChannel, deg0);
ledc_set_duty(LEDC_LOW_SPEED_MODE, ringChannel, deg0);
ledc_set_duty(LEDC_LOW_SPEED_MODE, pinkyChannel, deg0);
// Update all at once
ledc_update_duty(LEDC_LOW_SPEED_MODE, thumbChannel);
ledc_update_duty(LEDC_LOW_SPEED_MODE, indexChannel);
ledc_update_duty(LEDC_LOW_SPEED_MODE, middleChannel);
ledc_update_duty(LEDC_LOW_SPEED_MODE, ringChannel);
ledc_update_duty(LEDC_LOW_SPEED_MODE, pinkyChannel);
}
void individualFingerDemo(int delay_ms){
flexThumb();
vTaskDelay(pdMS_TO_TICKS(delay_ms));
unflexThumb();
vTaskDelay(pdMS_TO_TICKS(delay_ms));
flexIndex();
vTaskDelay(pdMS_TO_TICKS(delay_ms));
unflexIndex();
vTaskDelay(pdMS_TO_TICKS(delay_ms));
flexMiddle();
vTaskDelay(pdMS_TO_TICKS(delay_ms));
unflexMiddle();
vTaskDelay(pdMS_TO_TICKS(delay_ms));
flexRing();
vTaskDelay(pdMS_TO_TICKS(delay_ms));
unflexRing();
vTaskDelay(pdMS_TO_TICKS(delay_ms));
flexPinky();
vTaskDelay(pdMS_TO_TICKS(delay_ms));
unflexPinky();
vTaskDelay(pdMS_TO_TICKS(delay_ms));
}
void closeOpenDemo(int delay_ms){
makeFist();
vTaskDelay(pdMS_TO_TICKS(delay_ms));
openHand();
vTaskDelay(pdMS_TO_TICKS(delay_ms));
}
void app_main() {
servoInit();
// Demo: flex and unflex each finger in sequence
// while(1) {
// individualFingerDemo(1000);
// }
// Demo: close and open hand
while(1) {
closeOpenDemo(1000);
}
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

1459
emg_gui.py Normal file

File diff suppressed because it is too large Load Diff

2235
learning_data_collection.py Normal file

File diff suppressed because it is too large Load Diff

243
learning_emg_filtering.py Normal file
View File

@@ -0,0 +1,243 @@
import numpy as np
import matplotlib.pyplot as plt
import h5py
import pandas as pd
from scipy.signal import butter, sosfiltfilt
# =============================================================================
# CONFIGURABLE PARAMETERS
# =============================================================================
ZC_THRESHOLD_PERCENT = 0.6 # Zero Crossing threshold as fraction of RMS
SSC_THRESHOLD_PERCENT = 0.3 # Slope Sign Change threshold as fraction of RMS
file = h5py.File("assets/discrete_gestures_user_000_dataset_000.hdf5", "r")
def print_tree(name, obj):
print(name)
file.visititems(print_tree)
print(list(file.keys()))
data = file["data"]
print(type(data))
print(data)
print(data.dtype)
raw = data[:]
print(raw.shape)
print(raw.dtype)
emg = raw['emg']
time = raw['time']
print(emg.shape)
dt = np.diff(time)
fs = 1.0 / np.median(dt)
print("fs =", fs)
print("dt min/median/max =", dt.min(), np.median(dt), dt.max())
# 1) pick one channel
emg_ch = emg[:, 0].astype(np.float32)
# 2) drop the initial transient (recommended)
drop_s = 0.5
drop = int(drop_s * fs)
emg_ch = emg_ch[drop:]
time_ch = time[drop:]
# 3) design + apply bandpass
low, high = 20.0, 450.0
sos = butter(4, [low/(0.5*fs), high/(0.5*fs)], btype="bandpass", output="sos")
emg_bp = sosfiltfilt(sos, emg_ch)
# 4) RMS envelope extraction
# Step A: Define window size (150ms)
window_ms = 150
window_samples = int(window_ms / 1000 * fs) # Convert ms to samples
print(f"Window: {window_ms}ms = {window_samples} samples")
# Step B: Square the bandpassed signal
emg_squared = emg_bp ** 2
# Step C: Moving average via convolution (rectangular kernel, normalized)
kernel = np.ones(window_samples) / window_samples
emg_mean_squared = np.convolve(emg_squared, kernel, mode='same')
# Step D: Square root to complete RMS
emg_envelope = np.sqrt(emg_mean_squared)
# =============================================================================
# TIME-DOMAIN FEATURE FUNCTIONS (RMS, WL, ZC, SSC)
# Computed on band-passed EMG, not RMS envelope.
# Designed for easy porting to embedded C.
# =============================================================================
def compute_rms(x):
"""Root Mean Square: sqrt(mean(x^2))"""
return np.sqrt(np.mean(x ** 2))
def compute_wl(x):
"""Waveform Length: sum of absolute differences between consecutive samples."""
return np.sum(np.abs(np.diff(x)))
def compute_zc(x, threshold):
"""Zero Crossings: count of sign changes where amplitude change exceeds threshold."""
sign_change = x[:-1] * x[1:] < 0
amp_diff = np.abs(np.diff(x)) > threshold
return np.sum(sign_change & amp_diff)
def compute_ssc(x, threshold):
"""Slope Sign Changes: count of slope direction reversals exceeding threshold."""
diff_left = x[1:-1] - x[:-2]
diff_right = x[1:-1] - x[2:]
return np.sum(diff_left * diff_right > threshold)
def compute_all_features_windowed(x, window_len, threshold_zc, threshold_ssc):
"""
Compute RMS, WL, ZC, SSC using non-overlapping windows.
Returns arrays of feature values, one per window (for ML).
"""
n_samples = len(x)
n_windows = n_samples // window_len
x_trim = x[:n_windows * window_len]
x_win = x_trim.reshape(n_windows, window_len)
rms = np.sqrt(np.mean(x_win ** 2, axis=1))
wl = np.sum(np.abs(np.diff(x_win, axis=1)), axis=1)
sign_change = x_win[:, :-1] * x_win[:, 1:] < 0
amp_diff = np.abs(np.diff(x_win, axis=1)) > threshold_zc
zc = np.sum(sign_change & amp_diff, axis=1)
diff_left = x_win[:, 1:-1] - x_win[:, :-2]
diff_right = x_win[:, 1:-1] - x_win[:, 2:]
ssc = np.sum(diff_left * diff_right > threshold_ssc, axis=1)
return rms, wl, zc, ssc
# =============================================================================
# COMPUTE FEATURES FOR ALL 16 CHANNELS
# =============================================================================
n_channels = 16
all_features = {} # Non-overlapping windows - for ML
for ch in range(n_channels):
emg_ch_i = emg[drop:, ch].astype(np.float32)
emg_bp_i = sosfiltfilt(sos, emg_ch_i)
signal_rms_i = np.sqrt(np.mean(emg_bp_i ** 2))
threshold_zc_i = ZC_THRESHOLD_PERCENT * signal_rms_i
threshold_ssc_i = (SSC_THRESHOLD_PERCENT * signal_rms_i) ** 2
# Non-overlapping windowed features (for ML)
rms_i, wl_i, zc_i, ssc_i = compute_all_features_windowed(
emg_bp_i, window_samples, threshold_zc_i, threshold_ssc_i
)
all_features[ch] = {'rms': rms_i, 'wl': wl_i, 'zc': zc_i, 'ssc': ssc_i}
# Time vector for windowed features
n_windows = len(all_features[0]['rms'])
window_centers = np.arange(n_windows) * window_samples + window_samples // 2
time_windows = time_ch[window_centers]
print(f"\nComputed features for {n_channels} channels")
print(f"Windows per channel (non-overlapping): {n_windows}")
print(f"Window size: {window_samples} samples ({window_ms} ms)")
# 5) Load gesture labels (prompts)
prompts = pd.read_hdf("assets/discrete_gestures_user_000_dataset_000.hdf5", key="prompts")
print("\nUnique gestures:", prompts['name'].unique())
t_abs = time_ch # absolute timestamps
# Find first occurrence of each gesture type
index_gestures = prompts[prompts['name'].str.contains('index')]
middle_gestures = prompts[prompts['name'].str.contains('middle')]
thumb_gestures = prompts[prompts['name'].str.contains('thumb')]
# Define plot configurations: 1) Index+Middle combined, 2) Thumb separate
plot_configs = [
{'name': 'Index & Middle Finger', 'start_time': index_gestures['time'].iloc[0], 'filter': 'index|middle'},
{'name': 'Thumb', 'start_time': thumb_gestures['time'].iloc[0], 'filter': 'thumb'},
]
# Color function for markers
def get_gesture_color(name):
if 'index' in name:
return 'green'
elif 'middle' in name:
return 'blue'
elif 'thumb' in name:
return 'orange'
return 'gray'
# =============================================================================
# 6) PLOT ALL FEATURES (RMS, WL, ZC, SSC) FOR ALL 16 CHANNELS
# =============================================================================
feature_names = ['rms', 'wl', 'zc', 'ssc']
feature_titles = ['RMS Envelope', 'Waveform Length (WL)', 'Zero Crossings (ZC)', 'Slope Sign Changes (SSC)']
feature_colors = ['red', 'blue', 'green', 'purple']
feature_ylabels = ['Amplitude', 'WL (a.u.)', 'Count', 'Count']
for config in plot_configs:
t_start = config['start_time'] - 0.5
t_end = t_start + 10.0
# Mask for windowed time vector
mask_win = (time_windows >= t_start) & (time_windows <= t_end)
t_win_rel = time_windows[mask_win] - t_start
# Get gestures in window
gesture_mask = (prompts['time'] >= t_start) & (prompts['time'] <= t_end) & \
(prompts['name'].str.contains(config['filter']))
gestures_in_window = prompts[gesture_mask]
# Create one figure per feature
for feat_idx, feat_name in enumerate(feature_names):
fig, axes = plt.subplots(4, 4, figsize=(10, 8), sharex=True, sharey=True)
axes = axes.flatten()
for ch in range(n_channels):
ax = axes[ch]
# Plot windowed feature
feat_data = all_features[ch][feat_name][mask_win]
ax.plot(t_win_rel, feat_data, linewidth=1, color=feature_colors[feat_idx])
ax.set_title(f"Ch {ch}", fontsize=9)
# Add gesture markers
for _, row in gestures_in_window.iterrows():
t_g = row['time'] - t_start
color = get_gesture_color(row['name'])
ax.axvline(t_g, color=color, linestyle='--', alpha=0.5, linewidth=0.5)
# Set subtitle based on gesture type
if 'Index' in config['name']:
subtitle = "(Green=index, Blue=middle)"
else:
subtitle = "(Orange=thumb)"
fig.suptitle(f"{feature_titles[feat_idx]} - {config['name']} Gestures\n{subtitle}", fontsize=12)
fig.supxlabel("Time (s)")
fig.supylabel(feature_ylabels[feat_idx])
plt.tight_layout()
plt.show()
# =============================================================================
# SUMMARY: Feature statistics across all channels
# =============================================================================
print("\n" + "=" * 60)
print("FEATURE SUMMARY (all channels, all windows)")
print("=" * 60)
for feat_name in ['rms', 'wl', 'zc', 'ssc']:
all_vals = np.concatenate([all_features[ch][feat_name] for ch in range(n_channels)])
print(f"{feat_name.upper():4s} | min: {all_vals.min():10.4f} | max: {all_vals.max():10.4f} | "
f"mean: {all_vals.mean():10.4f} | std: {all_vals.std():10.4f}")

Binary file not shown.

View File

@@ -1,46 +0,0 @@
#include <freertos/FreeRTOS.h>
#include "driver/gpio.h"
#include "driver/ledc.h"
#define servoPin GPIO_NUM_4
#define ledcChannel LEDC_CHANNEL_0
#define deg180 2048
#define deg0 430
void servoInit() {
// LEDC timer configuration (C++ aggregate initialization)
ledc_timer_config_t ledc_timer = {};
ledc_timer.speed_mode = LEDC_LOW_SPEED_MODE;
ledc_timer.timer_num = LEDC_TIMER_0;
ledc_timer.duty_resolution = LEDC_TIMER_14_BIT;
ledc_timer.freq_hz = 50;
ledc_timer.clk_cfg = LEDC_AUTO_CLK;
ESP_ERROR_CHECK(ledc_timer_config(&ledc_timer));
// LEDC channel configuration
ledc_channel_config_t ledc_channel = {};
ledc_channel.speed_mode = LEDC_LOW_SPEED_MODE;
ledc_channel.channel = ledcChannel;
ledc_channel.timer_sel = LEDC_TIMER_0;
ledc_channel.intr_type = LEDC_INTR_DISABLE;
ledc_channel.gpio_num = servoPin;
ledc_channel.duty = deg180; // Start off
ledc_channel.hpoint = 0;
ESP_ERROR_CHECK(ledc_channel_config(&ledc_channel));
}
// alternates between 0 and 180 - 2048 is 180 degrees (counterclockwise max)
void app_main() {
servoInit();
while(1) {
ledc_set_duty(LEDC_LOW_SPEED_MODE, ledcChannel, deg180);
ledc_update_duty(LEDC_LOW_SPEED_MODE, ledcChannel);
printf("ccwMax\n");
vTaskDelay(pdMS_TO_TICKS(1000));
ledc_set_duty(LEDC_LOW_SPEED_MODE, ledcChannel, deg0);
ledc_update_duty(LEDC_LOW_SPEED_MODE, ledcChannel);
vTaskDelay(pdMS_TO_TICKS(1000));
printf("cwMax\n");
}
}