Pre-requisite: Dataset¶
Dataset Format¶
Dataset must be pre-uploaded.
Dataset folder titled "WakeWordDataset" (stored in top_dir variable) has two subdirectories--
WakeWordDataset/hotword/
WakeWordDataset/background/
Each directory contains .wav files containing the dataset. "background" contains speech that is not the wake word, while "hotword" contains wake word separated by 2 second increments.
Note¶
- Quality of resulting model depends on quality of dataset. For small dataset, false positives are expected to occur more (model predicts background noise as wake word).
- Quality of dataset can be further improved by having variety of speakers recording in different environments with different levels of background noise.
In [1]:
!wget -N "https://cainvas-static.s3.amazonaws.com/media/user_data/cainvas-admin/WakeWordDataset.zip"
!unzip -o WakeWordDataset.zip
!rm WakeWordDataset.zip
In [2]:
import numpy as np
import tensorflow as tf
import os
from librosa.core import load as librosa_load
import IPython.display as ipd
from matplotlib import pyplot as plt
In [3]:
max_length = 2 #length (in seconds) of input
desired_sr = 8000 #sampling rate to use
desired_samples = max_length*desired_sr #total number of samples in input
In [4]:
#Processing the data
#Function to normalize input values
def normalize_sample(input_val):
diff = np.max(input_val) - np.min(input_val)
if (diff != 0):
input_val /= diff
return input_val
#Dataset storing audio samples for wake word and background
cainvas_dataset = np.empty((0, desired_samples))
noncainvas_dataset = np.empty((0, desired_samples))
top_dir = "WakeWordDataset"
background_dir = os.path.join(top_dir, "background")
word_dir = os.path.join(top_dir, "hotword")
for ds_dir in ([background_dir, word_dir]) :
for file in os.listdir(ds_dir):
file_path = os.path.join(ds_dir, file)
print("adding ", file, "to audio dataset")
X, sr = librosa_load(file_path, sr=desired_sr)
X = normalize_sample(X)
X = np.pad(X, (0,desired_samples - (X.shape[0]%desired_samples)))
X_sub = np.array(np.split(X, int(len(X)*1.0/desired_samples)))
if ( ds_dir == background_dir ):
noncainvas_dataset = np.append(noncainvas_dataset, X_sub, axis=0)
else:
cainvas_dataset = np.append(cainvas_dataset, X_sub, axis=0)
In [5]:
#Concatenating dataset into matrix of inputs and labels
total_len = cainvas_dataset.shape[0] + noncainvas_dataset.shape[0]
inputs = np.append(cainvas_dataset, noncainvas_dataset, axis=0)
labels = np.array([1. if i < cainvas_dataset.shape[0] else 0. for i in range(total_len)])
print(total_len)
In [6]:
#Adding background and silence
background = np.random.random((50, desired_samples))
silence = np.zeros((50,desired_samples))
inputs = np.append(inputs, background, axis=0)
inputs = np.append(inputs, silence, axis=0)
labels = np.append(labels, np.zeros(len(background) + len(silence)), axis=0)
total_len = len(labels)
print(total_len)
In [7]:
#Shuffling inputs and labels
shuffle_permutation = np.arange(total_len)
np.random.shuffle(shuffle_permutation)
inputs = inputs[shuffle_permutation]
labels = labels[shuffle_permutation]
#Splitting into train and test dataset
train_split = 0.9
cutoff = int(train_split*total_len)
inputs_train = inputs[:cutoff]
inputs_test = inputs[cutoff:]
labels_train = labels[:cutoff]
labels_test = labels[cutoff:]
In [8]:
#Selecting random index from test dataset
ind = int(np.random.uniform()*len(inputs_test))
#Displaying sample spectrogram and audio from test dataset
X = inputs_train[ind]
y = labels_train[ind]
print("Label is", "cainvas" if y==1 else "background")
spectrogram_out = tf.abs(tf.signal.stft(X, 200, 100, fft_length=128)).numpy()
spectrogram_out = np.swapaxes(spectrogram_out, 0, 1)
plt.imshow(spectrogram_out, cmap='hot', interpolation='nearest')
plt.show()
ipd.Audio(X, rate=desired_sr)
Out[8]:
Building and Training the Model¶
In [9]:
model = tf.keras.Sequential()
def spectrogramOp(X):
spectrogram_out = tf.abs(tf.signal.stft(X, 200, 25, fft_length=256))
return spectrogram_out
lambda1 = tf.keras.layers.Lambda(spectrogramOp, name="lambda_spectrogram")
lambda15 = tf.keras.layers.Lambda(lambda x: tf.transpose(x, perm=(0,2,1)), input_shape=(633, 129), name="switch_hw")
lambda2 = tf.keras.layers.Lambda(lambda x: tf.reshape(x, (-1, 129, 633, 1)), name="add_channels")
conv2d1 = tf.keras.layers.Conv2D(4, (8, 129), strides=2, activation='relu', name="conv1", input_shape=(129, 633, 1))
conv2d2 = tf.keras.layers.Conv2D(8, (4, 4), strides=2, activation='relu', name="conv2")
conv2d3 = tf.keras.layers.Conv2D(8, (8, 8), strides=2, activation='relu', name="conv3")
flatten1 = tf.keras.layers.Flatten()
dense1 = tf.keras.layers.Dense(1)
activation1 = tf.keras.layers.Activation('sigmoid')
model.add(lambda1)
model.add(lambda15)
model.add(lambda2)
model.add(conv2d1)
model.add(conv2d2)
model.add(conv2d3)
model.add(flatten1)
model.add(dense1)
model.add(activation1)
model.compile(optimizer='adam', loss=tf.keras.losses.binary_crossentropy, metrics=['accuracy'])
model.fit(inputs_train, labels_train, batch_size=32, epochs=10,
validation_data=(inputs_test, labels_test))
Out[9]:
In [10]:
#Viewing a summary of the model
model.summary()
In [11]:
#Evaluating final model's performance on test dataset
score, acc = model.evaluate(inputs_test, labels_test, batch_size=32)
print('Test score:', score)
print('Test accuracy:', acc)
Testing Trained Model¶
Testing on random sample from test dataset¶
In [12]:
ind = int(np.random.uniform()*len(inputs_test))
X = inputs_test[ind]
ipd.Audio(X, rate=desired_sr)
y = labels_test[ind]
output = model.predict(np.array([inputs_test[ind]]))[0][0]
print("True label:", y)
print("Prediction:", output)
print("Label is", "cainvas" if y==1 else "background")
spectrogram_out = tf.abs(tf.signal.stft(X, 200, 100, fft_length=128)).numpy()
spectrogram_out = np.swapaxes(spectrogram_out, 0, 1)
plt.imshow(spectrogram_out, cmap='hot', interpolation='nearest')
plt.show()
ipd.Audio(X, rate=desired_sr)
Out[12]:
Testing on background and silence¶
In [13]:
background = np.random.random((desired_samples))
silence = np.zeros((desired_samples))
background_out, silence_out = model.predict(np.array([background, silence]))
print("Background prediction", background_out)
print("Silence prediction", silence_out)
Testing on longer clip not in test dataset¶
In [14]:
path = "./WakeWordDataset/TestLongClip3.wav"
#loading the sample in
X, sr = librosa_load(path, sr=desired_sr, mono=True)
X = X.astype(np.float64)
assert sr == desired_sr #ensure sample rate is same as desired
assert len(X.shape) == 1 #ensure X is a mono signal
spectrogram_out = tf.abs(tf.signal.stft(X, 200, 100, fft_length=128)).numpy()
spectrogram_out = np.swapaxes(spectrogram_out, 0, 1)
plt.imshow(spectrogram_out, cmap='hot', interpolation='nearest')
plt.show()
win_len = 10000
stride_len = 1000
times = []
predictions = []
for n in range(0, len(X) - win_len, stride_len):
X_wind = X[n:n + win_len]
X_wind = np.pad(X_wind, ((0, desired_samples - len(X_wind))))
test_pred = model.predict(np.array([X_wind]))
times.append((n + win_len / 2) / float(desired_sr))
predictions.append(test_pred.flatten()[0])
memory_stride = int(0.2 * float(desired_sr) / stride_len) #shift predictions by .2 seconds
memory_len = int(0.5 * float(desired_sr) / stride_len) #have each memory window at .5 seconds
time_diff = np.diff(times)
activating_times = []
#slide memory window through predictions
for n in range(0, len(predictions)-memory_len, memory_stride):
prediction_window = predictions[n:n+memory_len]
window_time = times[n+memory_len-1] - times[n]
area = 0.
#for the current memory window, find the riemann sum
sum = 0.
for i in range(0, memory_len-1):
sum += times[n+i+1]-times[n+i]
area += time_diff[n+i]*(prediction_window[i] + prediction_window[i+1])/2.
if area > window_time*0.30:
activating_times.append(times[n])
#Visualizing with matplotlib
fig, ax1 = plt.subplots()
white_color = "#fff"
red_color = "#f00"
ax1.set_xlabel("time (s)", color=white_color)
ax1.set_ylabel("wake word detection", color=white_color)
start_time = 0
end_time = 0
start_index = times.index(start_time) if start_time in times else 0
end_index = times.index(end_time) if end_time in times else len(times)
ax1.plot(times[start_index:end_index],
predictions[start_index:end_index], color=red_color)
ax1.tick_params(axis='x', labelcolor=white_color)
ax1.tick_params(axis='y', labelcolor=white_color)
for t in activating_times:
ax1.axvline(t, color="blue", alpha=0.2)
ipd.Audio(path)
Out[14]: