1. Ghi âm và Thu thập Mẫu âm thanh
Bước đầu tiên trong quy trình là thu thập dữ liệu âm thanh thô. Chúng ta cần hai loại dữ liệu chính: mẫu chứa từ khóa kích hoạt (wake word) và mẫu âm thanh nền (background noise) để mô hình học cách phân biệt. Đoạn mã dưới đây định nghĩa một lớp utility để thực hiện việc ghi âm từ micro và lưu trữ dưới định dạng WAV.
import sounddevice as sd
import numpy as np
from pathlib import Path
import time
from datetime import datetime
from scipy.io.wavfile import write
class VoiceSampleRecorder:
def __init__(self, storage_path="audio_data"):
self.base_path = Path(storage_path)
self.sampling_freq = 16000
self.record_duration = 3 # Thời lượng mỗi file ghi âm
# Thiết lập cấu trúc thư mục
(self.base_path / "raw/positive").mkdir(parents=True, exist_ok=True)
(self.base_path / "raw/negative").mkdir(parents=True, exist_ok=True)
def _capture_audio(self, output_file):
"""Thực hiện ghi âm một lần"""
print("Đang ghi âm... (Hãy nói ngay bây giờ)")
raw_signal = sd.rec(int(self.record_duration * self.sampling_freq),
samplerate=self.sampling_freq,
channels=1,
blocking=True)
sd.wait()
# Lưu dưới dạng PCM 16-bit
write(output_file, self.sampling_freq, (raw_signal * 32767).astype(np.int16))
print(f"Đã lưu vào {output_file}")
def record_keyword_samples(self, total=100):
"""Thu thập các mẫu chứa từ khóa"""
for i in range(total):
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
file_path = self.base_path / f"raw/positive/{ts}_{i}.wav"
self._capture_audio(file_path)
time.sleep(1) # Nghỉ giữa các lần ghi
def record_noise_samples(self, total=300):
"""Thu thập các mẫu âm thanh nền"""
input("Hãy đảm bảo môi trường yên tĩnh, chuẩn bị ghi âm nền... (Nhấn Enter)")
for i in range(total):
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
file_path = self.base_path / f"raw/negative/{ts}_{i}.wav"
self._capture_audio(file_path)
time.sleep(0.3)
if __name__ == "__main__":
recorder = VoiceSampleRecorder()
print("1. Ghi âm từ khóa kích hoạt")
recorder.record_keyword_samples(total=50)
print("\n2. Ghi âm môi trường nền")
recorder.record_noise_samples(total=150)
2. Phân chia Tập dữ liệu Huấn luyện
Sau khi thu thập, dữ liệu cần được chia ngẫu nhiên thành tập huấn luyện (train) và tập kiểm tra (validation). Việc này giúp đánh giá khả năng tổng quát hóa của mô hình. Tỷ lệ chia thường gặp là 80/20.
import os
import shutil
from sklearn.model_selection import train_test_split
source_root = 'audio_data/raw'
dest_root = 'audio_data/split'
train_split = 0.8
# Tạo cấu trúc thư mục đích
os.makedirs(os.path.join(dest_root, 'train', 'positive'), exist_ok=True)
os.makedirs(os.path.join(dest_root, 'train', 'negative'), exist_ok=True)
os.makedirs(os.path.join(dest_root, 'val', 'positive'), exist_ok=True)
os.makedirs(os.path.join(dest_root, 'val', 'negative'), exist_ok=True)
def partition_data(category_folder):
# Lấy danh sách file wav
file_paths = [os.path.join(source_root, category_folder, f)
for f in os.listdir(os.path.join(source_root, category_folder))
if f.endswith('.wav')]
# Chia tập dữ liệu
train_set, val_set = train_test_split(file_paths, train_size=train_split, random_state=42)
# Sao chép file vào thư mục tương ứng
for src_file in train_set:
shutil.copy(src_file, os.path.join(dest_root, 'train', category_folder))
for src_file in val_set:
shutil.copy(src_file, os.path.join(dest_root, 'val', category_folder))
# Thực hiện chia cho cả 2 lớp
partition_data('positive')
partition_data('negative')
print("Hoàn tất việc phân chia dữ liệu!")
3. Trích xuất Đặc trưng và Tăng cường dữ liệu
Âm thanh thô không thể đưa trực tiếp vào mạng neural. Chúng ta cần chuyển đổi sang dạng đặc trưng tần số như MFCC. Đồng thời, kỹ thuật tăng cường dữ liệu (data augmentation) được áp dụng để mở rộng tập huấn luyện và giảm overfitting.
import librosa
import numpy as np
import soundfile as sf
class SoundFeatureExtractor:
@staticmethod
def load_waveform(file_path, target_sr=16000):
"""Đọc file âm thanh và chuẩn hóa采样率"""
try:
waveform, original_sr = sf.read(file_path)
if waveform.ndim > 1: # Chuyển về mono
waveform = waveform.mean(axis=1)
waveform = librosa.resample(waveform, orig_sr=original_sr, target_sr=target_sr)
return waveform, target_sr
except Exception as e:
print(f"Lỗi khi đọc {file_path}: {str(e)}")
return np.zeros(target_sr * 1), target_sr
@staticmethod
def compute_mfcc(waveform, sr=16000, n_mfcc=13, time_steps=100):
"""Quy trình trích xuất đặc trưng hoàn chỉnh"""
# Pre-emphasis
waveform = librosa.effects.preemphasis(waveform)
# Cắt bỏ khoảng lặng (VAD)
trimmed, _ = librosa.effects.trim(waveform, top_db=20)
if len(trimmed) < int(0.3 * sr):
trimmed = waveform # Giữ nguyên nếu quá ngắn
# Trích xuất MFCC
mfcc_vals = librosa.feature.mfcc(
y=trimmed,
sr=sr,
n_mfcc=n_mfcc,
n_fft=int(0.025 * sr),
hop_length=int(0.01 * sr)
)
# Tính đạo hàm bậc 1 và 2
delta_1 = librosa.feature.delta(mfcc_vals)
delta_2 = librosa.feature.delta(mfcc_vals, order=2)
combined_features = np.vstack([mfcc_vals, delta_1, delta_2]) # (39, T)
# Chuẩn hóa
combined_features = (combined_features - np.mean(combined_features)) / (np.std(combined_features) + 1e-8)
# Chuẩn hóa độ dài thời gian
if combined_features.shape[1] < time_steps:
pad_len = time_steps - combined_features.shape[1]
combined_features = np.pad(combined_features, ((0, 0), (0, pad_len)), mode='edge')
elif combined_features.shape[1] > time_steps:
start_idx = np.random.randint(0, combined_features.shape[1] - time_steps)
combined_features = combined_features[:, start_idx:start_idx + time_steps]
return combined_features
@staticmethod
def apply_augmentation(waveform, sr):
"""Biến đổi âm thanh để tăng cường dữ liệu"""
# Thay đổi âm lượng nhẹ
waveform = waveform * np.random.uniform(0.9, 1.1)
# Thêm nhiễu trắng
if np.random.rand() < 0.5:
noise = np.random.normal(0, 0.002, len(waveform))
waveform += noise
# Dịch chuyển cao độ
if np.random.rand() < 0.3:
waveform = librosa.effects.pitch_shift(waveform, sr=sr, n_steps=np.random.randint(-1, 2))
return waveform
4. Xây dựng Lớp Dataset Tùy chỉnh
Để tương thích với PyTorch DataLoader, chúng ta cần đóng gói dữ liệu vào một class kế thừa `torch.utils.data.Dataset`. Class này sẽ chịu trách nhiệm load file, xử lý đặc trưng và trả về tensor.
import os
import torch
from torch.utils.data import Dataset
from sound_feature_extractor import SoundFeatureExtractor
class KeywordDataset(Dataset):
def __init__(self, root_dir, sr=16000, n_mfcc=13, max_len=100, use_augment=True):
self.sr = sr
self.n_mfcc = n_mfcc
self.max_len = max_len
self.use_augment = use_augment
self.file_list = []
# Quét thư mục dữ liệu
for label_idx, class_name in enumerate(['negative', 'positive']):
class_path = os.path.join(root_dir, class_name)
if not os.path.exists(class_path):
continue
for filename in os.listdir(class_path):
full_path = os.path.join(class_path, filename)
self.file_list.append((full_path, label_idx))
# Tính toán trọng số để cân bằng lớp
pos_count = sum(1 for _, lbl in self.file_list if lbl == 1)
neg_count = len(self.file_list) - pos_count
self.sample_weights = [
1 / pos_count if lbl == 1 else 1 / neg_count
for _, lbl in self.file_list
]
def __len__(self):
return len(self.file_list)
def __getitem__(self, index):
"""Lấy mẫu dữ liệu kèm xử lý lỗi"""
max_attempts = 3
for attempt in range(max_attempts):
try:
path, label = self.file_list[index]
# Đọc âm thanh
audio_data, sr = SoundFeatureExtractor.load_waveform(path, self.sr)
# Kiểm tra độ dài tối thiểu
if len(audio_data) < 0.3 * sr:
raise ValueError(f"Audio quá ngắn: {len(audio_data) / sr:.2f}s")
# Tăng cường dữ liệu cho mẫu dương
if self.use_augment and label == 1:
audio_data = SoundFeatureExtractor.apply_augmentation(audio_data, sr)
# Trích xuất đặc trưng
features = SoundFeatureExtractor.compute_mfcc(
audio_data, sr, self.n_mfcc, self.max_len
)
# Kiểm tra kích thước cuối cùng
if features.shape[1] != self.max_len:
raise ValueError(f"Lỗi kích thước đặc trưng: {features.shape[1]}")
return (
torch.FloatTensor(features),
torch.tensor(label, dtype=torch.long),
path
)
except Exception as e:
print(f"Lỗi xử lý {path} (lần {attempt + 1}): {str(e)}")
index = (index + 1) % len(self)
# Trả về dữ liệu rỗng nếu thất bại
return (
torch.zeros((self.n_mfcc * 3, self.max_len)),
torch.tensor(-1, dtype=torch.long),
"error_sample"
)
def get_dataloader(self, batch_size=32, shuffle=True, num_workers=4):
"""Tạo DataLoader"""
sampler = torch.utils.data.WeightedRandomSampler(self.sample_weights, len(self.sample_weights)) if shuffle else None
return torch.utils.data.DataLoader(
self,
batch_size=batch_size,
sampler=sampler,
pin_memory=True,
num_workers=num_workers
)
5. Thiết kế Mạng Neural CRNN
Mô hình sử dụng kiến trúc CRNN (Convolutional Recurrent Neural Network). Phần CNN trích xuất đặc trưng cục bộ từ spectrogram, trong phần RNN (GRU) học các phụ thuộc thời gian để nhận diện từ khóa.
import torch.nn as nn
class SpotterNetwork(nn.Module):
def __init__(self, input_features=39):
super().__init__()
# Khối Convolutional
self.cnn_block = nn.Sequential(
nn.Conv2d(1, 32, kernel_size=(3, 5), padding=(1, 2)),
nn.BatchNorm2d(32),
nn.GELU(),
nn.MaxPool2d((2, 2)),
nn.Conv2d(32, 64, kernel_size=(3, 5), padding=(1, 2)),
nn.BatchNorm2d(64),
nn.GELU(),
nn.MaxPool2d((2, 2)),
nn.AdaptiveAvgPool2d((None, 25))
)
# Khối Recurrent
self.gru_layer = nn.GRU(
input_size=64 * (input_features // 4),
hidden_size=128,
bidirectional=True,
num_layers=2,
dropout=0.3
)
# Lớp phân loại
self.head = nn.Sequential(
nn.Linear(256, 128),
nn.GELU(),
nn.Dropout(0.4),
nn.Linear(128, 2)
)
def forward(self, x):
x = x.unsqueeze(1)
x = self.cnn_block(x) # (B, C, F, T)
# Biến đổi kích thước cho GRU
B, C, F, T = x.size()
x = x.permute(0, 3, 1, 2) # (B, T, C, F)
x = x.reshape(B, T, -1) # (B, T, C*F)
# Đầu ra GRU
x, _ = self.gru_layer(x) # (B, T, 256)
x = x.mean(dim=1) # Global Average Pooling theo thời gian
return self.head(x)
6. Huấn luyện Mô hình
Quy trình huấn luyện bao gồm việc lặp qua các epoch, tính toán loss, backpropagation và cập nhật trọng số. Chúng ta sử dụng AdamW optimizer và OneCycleLR scheduler để tối ưu hóa quá trình hội tụ.
import argparse
import torch
from torch import optim
from pathlib import Path
from tqdm import tqdm
from spotter_network import SpotterNetwork
from keyword_dataset import KeywordDataset
from torch import nn
def run_training(args):
device = torch.device(args.device)
print(f"Thiết bị huấn luyện: {device}")
# Chuẩn bị dữ liệu
train_data = KeywordDataset(args.train_path, use_augment=True)
val_data = KeywordDataset(args.val_path, use_augment=False)
train_loader = train_data.get_dataloader(args.batch_size, num_workers=4)
val_loader = val_data.get_dataloader(args.batch_size, shuffle=False)
# Khởi tạo mô hình
network = SpotterNetwork(input_features=39).to(device)
optimiser = optim.AdamW(network.parameters(), lr=args.lr, weight_decay=1e-5)
scheduler = optim.lr_scheduler.OneCycleLR(
optimiser,
max_lr=1e-3,
steps_per_epoch=len(train_loader),
epochs=args.epochs,
pct_start=0.3
)
# Loss function với trọng số cho lớp thiểu số
criterion = nn.CrossEntropyLoss(weight=torch.tensor([1.0, args.pos_weight]).to(device))
# Kiểm tra dữ liệu đầu vào
print("Đang kiểm tra tính nhất quán của dữ liệu...")
for feat, lbl, p in tqdm(train_loader):
if feat.shape[-1] != 100:
print(f"Lỗi đặc trưng tại: {p}, hình dạng: {feat.shape}")
raise ValueError("Kích thước đặc trưng không đồng nhất")
# Vòng lặp huấn luyện
best_accuracy = 0.0
for epoch in range(args.epochs):
network.train()
running_loss = 0.0
correct_count = 0
total_count = 0
# Giai đoạn Train
for features, labels, _ in tqdm(train_loader, desc=f"Epoch {epoch + 1}"):
features = features.to(device)
labels = labels.to(device)
optimiser.zero_grad()
outputs = network(features)
error_value = criterion(outputs, labels)
error_value.backward()
torch.nn.utils.clip_grad_norm_(network.parameters(), 5.0)
optimiser.step()
running_loss += error_value.item()
_, predicted = outputs.max(1)
correct_count += predicted.eq(labels).sum().item()
total_count += labels.size(0)
# Giai đoạn Validation
network.eval()
val_loss = 0.0
val_correct = 0
val_total = 0
with torch.no_grad():
for features, labels, _ in val_loader:
features = features.to(device)
labels = labels.to(device)
outputs = network(features)
error_value = criterion(outputs, labels)
val_loss += error_value.item()
_, predicted = outputs.max(1)
val_correct += predicted.eq(labels).sum().item()
val_total += labels.size(0)
# Tính toán chỉ số
train_acc = 100. * correct_count / total_count
val_acc = 100. * val_correct / val_total
scheduler.step(val_acc)
# Lưu mô hình tốt nhất
if val_acc > best_accuracy:
best_accuracy = val_acc
torch.save(network.state_dict(), Path(args.save_path) / "best_model.pth")
# Log thông tin
print(f"Epoch {epoch + 1}/{args.epochs}")
print(f"Train Loss: {running_loss / len(train_loader):.4f} | Acc: {train_acc:.2f}%")
print(f"Val Loss: {val_loss / len(val_loader):.4f} | Acc: {val_acc:.2f}%")
print(f"LR: {optimiser.param_groups[0]['lr']:.2e}\n")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--train_path", default="audio_data/split/train")
parser.add_argument("--val_path", default="audio_data/split/val")
parser.add_argument("--save_path", default="./checkpoints")
parser.add_argument("--device", default="cuda" if torch.cuda.is_available() else "cpu")
parser.add_argument("--batch_size", type=int, default=40)
parser.add_argument("--epochs", type=int, default=5000)
parser.add_argument("--lr", type=float, default=1e-4)
parser.add_argument("--pos_weight", type=float, default=3.0)
args = parser.parse_args()
Path(args.save_path).mkdir(parents=True, exist_ok=True)
run_training(args)
7. Kiểm tra và Đánh giá Kết quả
Sau khi huấn luyện, mô hình cần được đánh giá trên tập dữ liệu kiểm tra độc lập. Các chỉ số như Precision, Recall và Confusion Matrix giúp hiểu rõ hiệu suất thực tế của hệ thống.
import argparse
import numpy as np
from pathlib import Path
import torch
from sklearn.metrics import classification_report, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
from spotter_network import SpotterNetwork
from keyword_dataset import KeywordDataset
def perform_evaluation(args):
device = torch.device(args.device)
# Nạp mô hình
network = SpotterNetwork().to(device)
network.load_state_dict(torch.load(args.model_path, map_location=device))
network.eval()
# Nạp dữ liệu test
test_data = KeywordDataset(args.test_path, use_augment=False)
test_loader = test_data.get_dataloader(args.batch_size, shuffle=False)
# Thực hiện suy luận
true_labels = []
pred_labels = []
file_paths = []
with torch.no_grad():
for features, labels, paths in test_loader:
features = features.to(device)
outputs = network(features)
_, preds = torch.max(outputs, 1)
true_labels.extend(labels.cpu().numpy())
pred_labels.extend(preds.cpu().numpy())
file_paths.extend(paths)
# In báo cáo phân loại
print(classification_report(true_labels, pred_labels, target_names=['negative', 'positive']))
# Vẽ ma trận nhầm lẫn
fig, ax = plt.subplots(figsize=(6, 6))
ConfusionMatrixDisplay.from_predictions(
true_labels, pred_labels,
display_labels=['Not Wake', 'Wake'],
cmap='Blues', ax=ax
)
plt.savefig(Path(args.save_path) / "confusion_matrix.png")
# Ghi lại các mẫu bị phân loại sai
with open(Path(args.save_path) / "errors.txt", 'w') as f:
for path, true_val, pred_val in zip(file_paths, true_labels, pred_labels):
if true_val != pred_val:
f.write(f"{path}\tTrue: {true_val}\tPred: {pred_val}\n")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--test_path", default="audio_data/raw", required=True)
parser.add_argument("--model_path", default="checkpoints/best_model.pth", required=True)
parser.add_argument("--batch_size", type=int, default=50)
parser.add_argument("--device", default="cuda" if torch.cuda.is_available() else "cpu")
parser.add_argument("--save_path", default="./results")
args = parser.parse_args()
Path(args.save_path).mkdir(exist_ok=True)
perform_evaluation(args)