American Sign Language Detection App¶
with Acceleromter, Gyroscope and PyTorch¶
Credit: AITS Cainvas Community
Photo by Sammi Schouten on Dribbble
American Sign Language (ASL) is a complete, natural language that has the same linguistic properties as spoken languages, with grammar that differs from English. ASL is expressed by movements of the hands and face.¶
So to detect ASL with hand gestures, we will create a American Sign Language Detection App with deep learning.¶
Importing Libraries¶
In [1]:
import os, time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from torchsummary import summary
Set a fixed random seed value, for reproducibility, this will allow us to get the same random numbers each time the notebook is run¶
In [2]:
SEED = 1337
np.random.seed(SEED)
torch.manual_seed(SEED)
cuda_available = torch.cuda.is_available()
device = torch.device("cuda" if cuda_available else "cpu")
if cuda_available:
torch.cuda.manual_seed(SEED)
In [3]:
# sudo AWS_ACCESS_KEY_ID={} AWS_SECRET_ACCESS_KEY={} aws s3 cp --recursive s3://cainvas-static/media/user_data/aitswarrior/ .
In [4]:
!wget -N https://cainvas-static.s3.amazonaws.com/media/user_data/cainvas-admin/asl_imu_dataset.zip
!unzip -qo asl_imu_dataset.zip
!rm asl_imu_dataset.zip
Visualize Dataset¶
In [5]:
def plot_gesture(filenames, type = "accelerometer"):
for filename in filenames:
df = pd.read_csv(filename)
index = range(1, len(df['ax']) + 1)
fig = plt.figure(figsize=(12,6))
plt.title(type.title() + ' - Gesture File "' + os.path.splitext(filename)[0].upper() + '" plot')
if type=="accelerometer":
plt.plot(index, df['ax'], 'g.', label='x', linestyle='solid', marker=',')
plt.plot(index, df['ay'], 'b.', label='y', linestyle='solid', marker=',')
plt.plot(index, df['az'], 'r.', label='z', linestyle='solid', marker=',')
plt.ylabel("Acceleration (G)")
elif type=="gyroscope":
plt.plot(index, df['gx'], 'g.', label='x', linestyle='solid', marker=',')
plt.plot(index, df['gy'], 'b.', label='y', linestyle='solid', marker=',')
plt.plot(index, df['gz'], 'r.', label='z', linestyle='solid', marker=',')
plt.ylabel("Gyroscope (deg/sec)")
plt.xlabel("Sample")
plt.legend()
plt.show()
print()
In [6]:
gesture_files = ['asl_imu_dataset/thankyou/thankyou_dataset_1.csv', 'asl_imu_dataset/help/help_dataset_1.csv']
plot_gesture(gesture_files, type = "accelerometer")
plot_gesture(gesture_files, type = "gyroscope")
In [7]:
FREQUENCY = 6932/60 # Aroung 6932 samples per 60 second captured through Cainvas Pailette
GESTURE_CYCLE_TIME = 4 # Each fully captuerd gestures were 4 seconds long
SAMPLES_PER_GESTURE = int(FREQUENCY * GESTURE_CYCLE_TIME) # Number of samples in a gesture
data_dir = "asl_imu_dataset"
CLASSES = [gesture_class for gesture_class in os.listdir(data_dir) \
if 'ipynb_checkpoints' not in gesture_class]
NUM_GESTURES = len(CLASSES)
inputs = []
# read each csv file and push an input and output
for gesture_class in os.listdir(data_dir):
gesture_dataframes = []
for gesture_file in os.listdir(os.path.join(data_dir, gesture_class)):
gesture_file = os.path.join(data_dir, gesture_class, gesture_file)
if not os.path.isfile(gesture_file) or os.path.splitext(gesture_file)[1] != ".csv":
continue
df = pd.read_csv(gesture_file)
# get rid of pesky empty value lines of csv which cause NaN inputs
df = df.dropna()
df = df.reset_index(drop=True)
num_recordings = int(df.shape[0] // SAMPLES_PER_GESTURE)
print(f"\tThere are ({df.shape[0]}/{SAMPLES_PER_GESTURE}) = {num_recordings}",
f"recordings of the '{os.path.basename(gesture_file)}' gesture.")
df = df.loc[:(num_recordings*SAMPLES_PER_GESTURE)-1]
# normalize the input data, between 0 to 1:
# - acceleration is between: -4 to +4
# - gyroscope is between: -2000 to +2000
df.loc[:,['ax','ay','az']] = (df.loc[:,['ax','ay','az']]+4)/8
df.loc[:,['gx','gy','gz']] = (df.loc[:,['gx','gy','gz']]+2000)/4000
gesture_dataframes.append(df)
gesture = pd.concat(gesture_dataframes, ignore_index=True).to_numpy()
print("There are {} recordings in total for '{}' gesture\n".format((gesture.shape), gesture_class))
inputs.append(gesture)
print("Data set parsing and augmentation complete.")
In [8]:
PEAK_GESTURE_TIME = 2 # seconds
PEAK_SAMPLES = int(FREQUENCY * PEAK_GESTURE_TIME) #samples per peak gesture
peak_sample_width = int(PEAK_SAMPLES//2)
def plot_gesture(np_tensor_i, title):
fig = plt.figure(figsize=(12,6))
plt.title(title)
plt.plot(np_tensor_i)
plt.show()
print()
preprocessed_inputs = []
no_gestures = []
def get_peak_by_axis(np_tensor_i, peak_sample_width, axis):
idx_max = np.argmax(np_tensor_i[:,axis])
idx_min = np.argmin(np_tensor_i[:,axis])
index = peak_geture_index = 0
if abs(idx_max-idx_min) < (peak_sample_width*2):
peak_geture_index = int((idx_max+idx_min)//2)
else:
# don't consider the gestures where lowest and
# highest points are not in 2 seconds window
return -1
# if we consider those points, we have to check
# which 2 seconds window would be better suited
val_max = np.max(np_tensor_i[:,axis])
val_min = np.min(np_tensor_i[:,axis])
peak_geture_index = idx_max if (val_max >= val_min) else idx_min
low = 0
high = np_tensor_i.shape[0]
# when peak starts early
if (peak_geture_index-peak_sample_width < low):
index = peak_sample_width
# when peak starts late
elif(peak_geture_index+peak_sample_width > high):
index = high - peak_sample_width
# when peak is in between
else:
index = peak_geture_index
return index
for gesture_class_index, np_tensor in enumerate(inputs):
weight_axis = relative_change = 0
for axis in range(np_tensor.shape[1]):
curr_relative_change = (np.max(np_tensor[:,axis])-np.min(np_tensor[:,axis]))\
/(np.sum(np_tensor[:,axis])/np_tensor.shape[0])
if (curr_relative_change > relative_change):
relative_change = curr_relative_change
weight_axis = axis
# split the numpy tensor with each split containing 4 seconds recording
num_recordings = int(np_tensor.shape[0] // SAMPLES_PER_GESTURE)
np_tensor = np.array(np.split(np_tensor, num_recordings))
temp_tensors = []
for np_tensor_i in np_tensor:
index = get_peak_by_axis(np_tensor_i, peak_sample_width, weight_axis)
if (index < 0):
continue
start = 0
end = np_tensor_i.shape[0]
low = index-peak_sample_width
high = index+peak_sample_width
temp_tensors.append(np_tensor_i[low:high])
# adding those redundant samples to no_gesture to remove flickering
if CLASSES[gesture_class_index] != "no_gesture":
# if we can extract at least 1 second sample from left
if ((low-start) >= peak_sample_width):
np_no_gesture_i = np.concatenate((
np_tensor_i[start:low],
np.flip(np_tensor_i[start:low], 0)))
no_gestures.append(np_no_gesture_i[:2*peak_sample_width])
if ((end-high) >= peak_sample_width):
np_no_gesture_i = np.concatenate((
np_tensor_i[high:end],
np.flip(np_tensor_i[high:end], 0)))
no_gestures.append(np_no_gesture_i[:2*peak_sample_width])
# converting list of np arrays to np array
np_tensor = np.array(temp_tensors)
preprocessed_inputs.append(np_tensor)
print("Dataset preprocessing complete")
Dataset distribution¶
In [9]:
dist = [0]*len(CLASSES)
for i, gesture_class in enumerate(CLASSES):
if gesture_class == "no_gesture":
dist[i] += len(no_gestures)
dist[i] += preprocessed_inputs[i].shape[0]
fig = plt.figure()
plt.bar(CLASSES, dist)
plt.xlabel("\nASL Gestures")
plt.ylabel("Number of samples")
plt.title("ASL Gesture dataset distribution")
plt.show()
In [10]:
TOTAL_SAMPLES_REQUIRED = 60000 # for augmentation
samples_per_class = (TOTAL_SAMPLES_REQUIRED//NUM_GESTURES) # total samples count per class
inputs = []
outputs = []
for gesture_class_index, np_tensor in enumerate(preprocessed_inputs):
if CLASSES[gesture_class_index] == "no_gesture":
np_tensor = np.concatenate((np.array(no_gestures), np_tensor))
curr_total_samples = np_tensor.shape[0]
# calculation the number of times to repeat the actual gesture class
required_repeat = (samples_per_class//curr_total_samples)+1
# expanding the original gesture to augment in the next step
aug_np_tensor = np.tile(np_tensor, (required_repeat,1,1))
aug_np_tensor = aug_np_tensor[:samples_per_class-curr_total_samples]
# adding random 15% noise overall
noise_threshold = 0.15
random_noise = np.random.uniform(low=(1-noise_threshold), high=(1+noise_threshold), size=aug_np_tensor.shape)
aug_np_tensor = np.multiply(aug_np_tensor, random_noise)
# append the augmented gesture to the original one
np_tensor = np.concatenate((np_tensor, aug_np_tensor))
# flattening 6 axes to 1
np_tensor = np_tensor.reshape(np_tensor.shape[0], -1)
inputs += [np_tensor]
outputs += [gesture_class_index]*samples_per_class
inputs = np.concatenate(inputs)
outputs = np.array(outputs)
# print(inputs.shape)
# print(outputs.shape)
print("Dataset augmentation complete")
Dataset distribution after augmentation¶
In [11]:
dist = [np.count_nonzero(outputs == i) for i in range(NUM_GESTURES)]
fig = plt.figure()
plt.bar(CLASSES, dist)
plt.xlabel("\nASL Gestures")
plt.ylabel("Number of samples")
plt.title("ASL Gesture dataset distribution after augmentation")
plt.show()
Randomize and split the input and output pairs for training¶
Randomly split input and output pairs into sets of data: 80% for training and 20% for testing.¶
- the training set is used to train the model
- the validation set is used to measure how well the model is performing during training
- the testing set is used to test the model after training
In [12]:
# Randomize the order of the inputs, so they can be evenly distributed for training, testing, and validation
# https://stackoverflow.com/a/37710486/2020087
num_inputs = len(inputs)
INPUT_LEN = len(inputs[0])
inputs = inputs.reshape((num_inputs, 1, INPUT_LEN))
randomize = np.arange(num_inputs)
np.random.shuffle(randomize)
# Swap the consecutive indexes (0, 1, 2, etc) with the randomized indexes
inputs = inputs[randomize]
outputs = outputs[randomize]
# Split the recordings (group of samples) into three sets: training, testing and validation
TRAIN_SPLIT = int(0.8 * num_inputs)
TEST_SPLIT = int(0.2 * num_inputs + TRAIN_SPLIT)
inputs_train, inputs_test, inputs_validate = np.split(inputs, [TRAIN_SPLIT, TEST_SPLIT])
outputs_train, outputs_test, outputs_validate = np.split(outputs, [TRAIN_SPLIT, TEST_SPLIT])
# we are training on the entire data, if it's less than 1000
# if ( inputs_train.shape[0] < 1000 ):
# inputs_train = inputs
# outputs_train = outputs
print("inputs train shape :", inputs_train.shape)
print("inputs test shape :", inputs_test.shape)
print("outputs train shape :", outputs_train.shape)
print("outputs test shape :", outputs_test.shape)
INPUT_LEN = inputs_train.shape[1]
print("\nData set randomization and splitting complete.")
Build the PyTorch Model¶
In [13]:
class aslSeqModel(nn.Module):
def __init__(self):
super().__init__()
self.network = nn.Sequential(
# (L-K+2P)/S + 1
nn.Conv1d(1, 16, kernel_size=30, stride=10), # 1380 --> (1380-30)/10 + 1 --> 136
nn.ReLU(),
nn.MaxPool1d(2), # 136/2 --> 67
nn.Conv1d(16, 32, kernel_size=15, stride=5), # 67 --> (67-15)/5 + 1 --> 11
nn.ReLU(),
nn.MaxPool1d(2), # 11/2 --> 5
nn.Flatten(),
nn.Linear(32 * 5, NUM_GESTURES),
# nn.Softmax(dim=0),
)
def forward(self, x):
return self.network(x)
model = aslSeqModel().to(device)
model_parameters = sum(p.numel() for p in model.parameters() if p.requires_grad)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
print(model)
print("Model Parameter Count :", model_parameters)
Train the Model¶
In [14]:
EPOCHS = 100
BATCHES = 100
losses = []
accuracies = []
TOTAL_BATCHES = int(inputs_train.shape[0]//BATCHES)
torch_inputs_train = torch.from_numpy(np.array(inputs_train, dtype=np.float32)).to(device)
torch_outputs_train = torch.from_numpy(np.array(outputs_train, dtype=np.float32)).long().to(device)
for epoch in range(EPOCHS):
start = time.time()
model.train()
running_loss = 0.0
for i in range(TOTAL_BATCHES):
# Get Samples
data = torch_inputs_train[i*BATCHES:(i+1)*BATCHES,:,:]
target = torch_outputs_train[i*BATCHES:(i+1)*BATCHES]
# Init
optimizer.zero_grad()
# Predict
y_pred = model(data)
# Calculate loss
loss = criterion(y_pred, target)
running_loss += loss.to(device).data
# Backpropagation
loss.backward()
optimizer.step()
# Display
if (i == TOTAL_BATCHES-1):
model.eval()
output = model(torch_inputs_train)
pred = output.data.max(1)[1]
d = pred.eq(torch_outputs_train.data)
accuracy = d.sum().item()/d.size().numel()
losses.append(running_loss/TOTAL_BATCHES)
accuracies.append(accuracy)
end = time.time()
print('\rTrain Epoch: {}/{} [{}/{}] (took {}ms)\t\tLoss: {:.6f}\t\tAccuracy: {}/{}={:.1f}%'.format(
epoch+1,
EPOCHS,
i+1,
TOTAL_BATCHES,
int((end-start)*1000),
running_loss/TOTAL_BATCHES,
d.sum(), d.size().numel(), accuracy*100,
end=''))
# for name, param in model.named_parameters():
# if param.requires_grad and name=="fc3.weight":
# print ("\nparameter: ", name, model.fc3.weight.grad)
Plot¶
In [15]:
plt.plot(accuracies)
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.show()
plt.plot(losses)
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.show()
Evaluate¶
In [16]:
evaluate_x = torch.from_numpy(np.array(inputs_test, dtype=np.float32)).to(device)
evaluate_y = torch.from_numpy(outputs_test).to(device)
# print(evaluate_x.shape)
# print(evaluate_y.shape)
model.eval()
output = model(evaluate_x)
# print(output)
pred = output.data.max(1)[1]
# print(pred)
d = pred.eq(evaluate_y.data)
accuracy = d.sum().item()/d.size().numel()
print('Test Accuracy: {:.4f}%'.format(accuracy*100))
Save as ONNX Model¶
In [17]:
dummy_input = torch.randn(np.expand_dims(inputs_train[0], axis=0).shape).to(device)
torch.onnx.export(model, (dummy_input), "./asl_imu.onnx", verbose=False)
deepCC¶
In [18]:
!deepCC asl_imu.onnx
DeepSea vs PyTorch Model Prediction¶
In [19]:
def compare(df, true_gesture, no_of_comparison=10):
df = df.dropna()
df = df.reset_index(drop=True)
FREQUENCY = 6932/60 # Aroung 6932 samples per 60 second captured through Cainvas Pailette
GESTURE_CYCLE_TIME = 2 # Each fully captuerd gestures were 2 seconds long
SAMPLES_PER_GESTURE = int(FREQUENCY * GESTURE_CYCLE_TIME) # Number of samples in a gesture
random_indices = np.random.randint(0,
high = len(df) / SAMPLES_PER_GESTURE,
dtype = int,
size = no_of_comparison)
for i in random_indices:
tensor = []
for j in range(SAMPLES_PER_GESTURE):
index = i * SAMPLES_PER_GESTURE + j
# normalize the input data, between 0 to 1:
# - acceleration is between: -4 to +4
# - gyroscope is between: -2000 to +2000
tensor += [
(df['ax'][index] + 4) / 8,
(df['ay'][index] + 4) / 8,
(df['az'][index] + 4) / 8,
(df['gx'][index] + 2000) / 4000,
(df['gy'][index] + 2000) / 4000,
(df['gz'][index] + 2000) / 4000
]
np_tensor = np.expand_dims(np.array(tensor, dtype=np.float32), axis=(0, 1)) # (2772) --> (1, 1, 2772)
print("True: \t\t\t", true_gesture)
# torch
model.eval()
t_tensor = torch.from_numpy(np_tensor).to(device)
t_output = model(t_tensor)
sm = torch.nn.Softmax(dim=1)
probabilities = sm(t_output)
t_pred = t_output.data.max(1)[1]
print("Predict [PyTorch]: \t", CLASSES[t_pred], "({})".format(max(probabilities.tolist()[0])))
# deepC
np.savetxt('sample.data', np_tensor.flatten())
!asl_imu_deepC/asl_imu.exe sample.data &> /dev/null
dc_output = np.loadtxt('deepSea_result_1.out')
probabilities = sm(torch.from_numpy(dc_output).unsqueeze(0).to(device))
dc_pred = np.argmax(dc_output)
print("Predict [DeepSea]: \t", CLASSES[dc_pred], "({})".format(max(probabilities.tolist()[0])))
print()
In [20]:
gesture_files = []
for gesture_class in os.listdir(data_dir):
for gesture_file in os.listdir(os.path.join(data_dir, gesture_class)):
if 'ipynb_checkpoints' in gesture_file:
continue
else:
gesture_files.append(os.path.join(data_dir, gesture_class, gesture_file))
break
for gesture_file in gesture_files:
df = pd.read_csv(gesture_file)
true_gesture = os.path.basename(os.path.dirname(gesture_file))
compare(df, true_gesture, 3)
The above results were generated with random 2 second samples from 4 second dataset. So prediction might differ from True value. What we are showing here is PyTorch vs DeepSea prediction and probability.¶
deepCC for Arduino Nano 33 BLE Sense¶
In [21]:
!rm -rf asl_imu_deepC/
!deepCC asl_imu.onnx --board="Arduino Nano 33 BLE Sense" --debug --archive --bundle