Xây Dựng Hệ Thống Kích Hoạt Giọng Nói Sử Dụng PyTorch

1. Ghi âm và Thu thập Mẫu âm thanh

Bước đầu tiên trong quy trình là thu thập dữ liệu âm thanh thô. Chúng ta cần hai loại dữ liệu chính: mẫu chứa từ khóa kích hoạt (wake word) và mẫu âm thanh nền (background noise) để mô hình học cách phân biệt. Đoạn mã dưới đây định nghĩa một lớp utility để thực hiện việc ghi âm từ micro và lưu trữ dưới định dạng WAV.

import sounddevice as sd
import numpy as np
from pathlib import Path
import time
from datetime import datetime
from scipy.io.wavfile import write


class VoiceSampleRecorder:
    def __init__(self, storage_path="audio_data"):
        self.base_path = Path(storage_path)
        self.sampling_freq = 16000
        self.record_duration = 3  # Thời lượng mỗi file ghi âm

        # Thiết lập cấu trúc thư mục
        (self.base_path / "raw/positive").mkdir(parents=True, exist_ok=True)
        (self.base_path / "raw/negative").mkdir(parents=True, exist_ok=True)

    def _capture_audio(self, output_file):
        """Thực hiện ghi âm một lần"""
        print("Đang ghi âm... (Hãy nói ngay bây giờ)")
        raw_signal = sd.rec(int(self.record_duration * self.sampling_freq),
                            samplerate=self.sampling_freq,
                            channels=1,
                            blocking=True)
        sd.wait()
        # Lưu dưới dạng PCM 16-bit
        write(output_file, self.sampling_freq, (raw_signal * 32767).astype(np.int16))
        print(f"Đã lưu vào {output_file}")

    def record_keyword_samples(self, total=100):
        """Thu thập các mẫu chứa từ khóa"""
        for i in range(total):
            ts = datetime.now().strftime("%Y%m%d_%H%M%S")
            file_path = self.base_path / f"raw/positive/{ts}_{i}.wav"
            self._capture_audio(file_path)
            time.sleep(1)  # Nghỉ giữa các lần ghi

    def record_noise_samples(self, total=300):
        """Thu thập các mẫu âm thanh nền"""
        input("Hãy đảm bảo môi trường yên tĩnh, chuẩn bị ghi âm nền... (Nhấn Enter)")
        for i in range(total):
            ts = datetime.now().strftime("%Y%m%d_%H%M%S")
            file_path = self.base_path / f"raw/negative/{ts}_{i}.wav"
            self._capture_audio(file_path)
            time.sleep(0.3)


if __name__ == "__main__":
    recorder = VoiceSampleRecorder()

    print("1. Ghi âm từ khóa kích hoạt")
    recorder.record_keyword_samples(total=50)

    print("\n2. Ghi âm môi trường nền")
    recorder.record_noise_samples(total=150)

2. Phân chia Tập dữ liệu Huấn luyện

Sau khi thu thập, dữ liệu cần được chia ngẫu nhiên thành tập huấn luyện (train) và tập kiểm tra (validation). Việc này giúp đánh giá khả năng tổng quát hóa của mô hình. Tỷ lệ chia thường gặp là 80/20.

import os
import shutil
from sklearn.model_selection import train_test_split

source_root = 'audio_data/raw'
dest_root = 'audio_data/split'

train_split = 0.8

# Tạo cấu trúc thư mục đích
os.makedirs(os.path.join(dest_root, 'train', 'positive'), exist_ok=True)
os.makedirs(os.path.join(dest_root, 'train', 'negative'), exist_ok=True)
os.makedirs(os.path.join(dest_root, 'val', 'positive'), exist_ok=True)
os.makedirs(os.path.join(dest_root, 'val', 'negative'), exist_ok=True)


def partition_data(category_folder):
    # Lấy danh sách file wav
    file_paths = [os.path.join(source_root, category_folder, f) 
                  for f in os.listdir(os.path.join(source_root, category_folder)) 
                  if f.endswith('.wav')]

    # Chia tập dữ liệu
    train_set, val_set = train_test_split(file_paths, train_size=train_split, random_state=42)

    # Sao chép file vào thư mục tương ứng
    for src_file in train_set:
        shutil.copy(src_file, os.path.join(dest_root, 'train', category_folder))

    for src_file in val_set:
        shutil.copy(src_file, os.path.join(dest_root, 'val', category_folder))


# Thực hiện chia cho cả 2 lớp
partition_data('positive')
partition_data('negative')

print("Hoàn tất việc phân chia dữ liệu!")

3. Trích xuất Đặc trưng và Tăng cường dữ liệu

Âm thanh thô không thể đưa trực tiếp vào mạng neural. Chúng ta cần chuyển đổi sang dạng đặc trưng tần số như MFCC. Đồng thời, kỹ thuật tăng cường dữ liệu (data augmentation) được áp dụng để mở rộng tập huấn luyện và giảm overfitting.

import librosa
import numpy as np
import soundfile as sf


class SoundFeatureExtractor:
    @staticmethod
    def load_waveform(file_path, target_sr=16000):
        """Đọc file âm thanh và chuẩn hóa采样率"""
        try:
            waveform, original_sr = sf.read(file_path)
            if waveform.ndim > 1:  # Chuyển về mono
                waveform = waveform.mean(axis=1)
            waveform = librosa.resample(waveform, orig_sr=original_sr, target_sr=target_sr)
            return waveform, target_sr
        except Exception as e:
            print(f"Lỗi khi đọc {file_path}: {str(e)}")
            return np.zeros(target_sr * 1), target_sr

    @staticmethod
    def compute_mfcc(waveform, sr=16000, n_mfcc=13, time_steps=100):
        """Quy trình trích xuất đặc trưng hoàn chỉnh"""
        # Pre-emphasis
        waveform = librosa.effects.preemphasis(waveform)

        # Cắt bỏ khoảng lặng (VAD)
        trimmed, _ = librosa.effects.trim(waveform, top_db=20)
        if len(trimmed) < int(0.3 * sr):
            trimmed = waveform  # Giữ nguyên nếu quá ngắn

        # Trích xuất MFCC
        mfcc_vals = librosa.feature.mfcc(
            y=trimmed,
            sr=sr,
            n_mfcc=n_mfcc,
            n_fft=int(0.025 * sr),
            hop_length=int(0.01 * sr)
        )

        # Tính đạo hàm bậc 1 và 2
        delta_1 = librosa.feature.delta(mfcc_vals)
        delta_2 = librosa.feature.delta(mfcc_vals, order=2)
        combined_features = np.vstack([mfcc_vals, delta_1, delta_2])  # (39, T)

        # Chuẩn hóa
        combined_features = (combined_features - np.mean(combined_features)) / (np.std(combined_features) + 1e-8)

        # Chuẩn hóa độ dài thời gian
        if combined_features.shape[1] < time_steps:
            pad_len = time_steps - combined_features.shape[1]
            combined_features = np.pad(combined_features, ((0, 0), (0, pad_len)), mode='edge')
        elif combined_features.shape[1] > time_steps:
            start_idx = np.random.randint(0, combined_features.shape[1] - time_steps)
            combined_features = combined_features[:, start_idx:start_idx + time_steps]

        return combined_features

    @staticmethod
    def apply_augmentation(waveform, sr):
        """Biến đổi âm thanh để tăng cường dữ liệu"""
        # Thay đổi âm lượng nhẹ
        waveform = waveform * np.random.uniform(0.9, 1.1)

        # Thêm nhiễu trắng
        if np.random.rand() < 0.5:
            noise = np.random.normal(0, 0.002, len(waveform))
            waveform += noise

        # Dịch chuyển cao độ
        if np.random.rand() < 0.3:
            waveform = librosa.effects.pitch_shift(waveform, sr=sr, n_steps=np.random.randint(-1, 2))

        return waveform

4. Xây dựng Lớp Dataset Tùy chỉnh

Để tương thích với PyTorch DataLoader, chúng ta cần đóng gói dữ liệu vào một class kế thừa `torch.utils.data.Dataset`. Class này sẽ chịu trách nhiệm load file, xử lý đặc trưng và trả về tensor.

import os
import torch
from torch.utils.data import Dataset
from sound_feature_extractor import SoundFeatureExtractor


class KeywordDataset(Dataset):
    def __init__(self, root_dir, sr=16000, n_mfcc=13, max_len=100, use_augment=True):
        self.sr = sr
        self.n_mfcc = n_mfcc
        self.max_len = max_len
        self.use_augment = use_augment
        self.file_list = []

        # Quét thư mục dữ liệu
        for label_idx, class_name in enumerate(['negative', 'positive']):
            class_path = os.path.join(root_dir, class_name)
            if not os.path.exists(class_path):
                continue

            for filename in os.listdir(class_path):
                full_path = os.path.join(class_path, filename)
                self.file_list.append((full_path, label_idx))

        # Tính toán trọng số để cân bằng lớp
        pos_count = sum(1 for _, lbl in self.file_list if lbl == 1)
        neg_count = len(self.file_list) - pos_count
        self.sample_weights = [
            1 / pos_count if lbl == 1 else 1 / neg_count
            for _, lbl in self.file_list
        ]

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, index):
        """Lấy mẫu dữ liệu kèm xử lý lỗi"""
        max_attempts = 3
        for attempt in range(max_attempts):
            try:
                path, label = self.file_list[index]

                # Đọc âm thanh
                audio_data, sr = SoundFeatureExtractor.load_waveform(path, self.sr)

                # Kiểm tra độ dài tối thiểu
                if len(audio_data) < 0.3 * sr:
                    raise ValueError(f"Audio quá ngắn: {len(audio_data) / sr:.2f}s")

                # Tăng cường dữ liệu cho mẫu dương
                if self.use_augment and label == 1:
                    audio_data = SoundFeatureExtractor.apply_augmentation(audio_data, sr)

                # Trích xuất đặc trưng
                features = SoundFeatureExtractor.compute_mfcc(
                    audio_data, sr, self.n_mfcc, self.max_len
                )

                # Kiểm tra kích thước cuối cùng
                if features.shape[1] != self.max_len:
                    raise ValueError(f"Lỗi kích thước đặc trưng: {features.shape[1]}")

                return (
                    torch.FloatTensor(features),
                    torch.tensor(label, dtype=torch.long),
                    path
                )

            except Exception as e:
                print(f"Lỗi xử lý {path} (lần {attempt + 1}): {str(e)}")
                index = (index + 1) % len(self)

        # Trả về dữ liệu rỗng nếu thất bại
        return (
            torch.zeros((self.n_mfcc * 3, self.max_len)),
            torch.tensor(-1, dtype=torch.long),
            "error_sample"
        )

    def get_dataloader(self, batch_size=32, shuffle=True, num_workers=4):
        """Tạo DataLoader"""
        sampler = torch.utils.data.WeightedRandomSampler(self.sample_weights, len(self.sample_weights)) if shuffle else None
        return torch.utils.data.DataLoader(
            self,
            batch_size=batch_size,
            sampler=sampler,
            pin_memory=True,
            num_workers=num_workers
        )

5. Thiết kế Mạng Neural CRNN

Mô hình sử dụng kiến trúc CRNN (Convolutional Recurrent Neural Network). Phần CNN trích xuất đặc trưng cục bộ từ spectrogram, trong phần RNN (GRU) học các phụ thuộc thời gian để nhận diện từ khóa.

import torch.nn as nn

class SpotterNetwork(nn.Module):

    def __init__(self, input_features=39):
        super().__init__()
        # Khối Convolutional
        self.cnn_block = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=(3, 5), padding=(1, 2)),
            nn.BatchNorm2d(32),
            nn.GELU(),
            nn.MaxPool2d((2, 2)),

            nn.Conv2d(32, 64, kernel_size=(3, 5), padding=(1, 2)),
            nn.BatchNorm2d(64),
            nn.GELU(),
            nn.MaxPool2d((2, 2)),

            nn.AdaptiveAvgPool2d((None, 25))
        )

        # Khối Recurrent
        self.gru_layer = nn.GRU(
            input_size=64 * (input_features // 4),
            hidden_size=128,
            bidirectional=True,
            num_layers=2,
            dropout=0.3
        )

        # Lớp phân loại
        self.head = nn.Sequential(
            nn.Linear(256, 128),
            nn.GELU(),
            nn.Dropout(0.4),
            nn.Linear(128, 2)
        )

    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.cnn_block(x)  # (B, C, F, T)

        # Biến đổi kích thước cho GRU
        B, C, F, T = x.size()
        x = x.permute(0, 3, 1, 2)  # (B, T, C, F)
        x = x.reshape(B, T, -1)  # (B, T, C*F)

        # Đầu ra GRU
        x, _ = self.gru_layer(x)  # (B, T, 256)
        x = x.mean(dim=1)  # Global Average Pooling theo thời gian

        return self.head(x)

6. Huấn luyện Mô hình

Quy trình huấn luyện bao gồm việc lặp qua các epoch, tính toán loss, backpropagation và cập nhật trọng số. Chúng ta sử dụng AdamW optimizer và OneCycleLR scheduler để tối ưu hóa quá trình hội tụ.

import argparse
import torch
from torch import optim
from pathlib import Path
from tqdm import tqdm
from spotter_network import SpotterNetwork
from keyword_dataset import KeywordDataset
from torch import nn


def run_training(args):
    device = torch.device(args.device)
    print(f"Thiết bị huấn luyện: {device}")

    # Chuẩn bị dữ liệu
    train_data = KeywordDataset(args.train_path, use_augment=True)
    val_data = KeywordDataset(args.val_path, use_augment=False)

    train_loader = train_data.get_dataloader(args.batch_size, num_workers=4)
    val_loader = val_data.get_dataloader(args.batch_size, shuffle=False)

    # Khởi tạo mô hình
    network = SpotterNetwork(input_features=39).to(device)
    optimiser = optim.AdamW(network.parameters(), lr=args.lr, weight_decay=1e-5)
    
    scheduler = optim.lr_scheduler.OneCycleLR(
        optimiser,
        max_lr=1e-3,
        steps_per_epoch=len(train_loader),
        epochs=args.epochs,
        pct_start=0.3
    )
    
    # Loss function với trọng số cho lớp thiểu số
    criterion = nn.CrossEntropyLoss(weight=torch.tensor([1.0, args.pos_weight]).to(device))

    # Kiểm tra dữ liệu đầu vào
    print("Đang kiểm tra tính nhất quán của dữ liệu...")
    for feat, lbl, p in tqdm(train_loader):
        if feat.shape[-1] != 100:
            print(f"Lỗi đặc trưng tại: {p}, hình dạng: {feat.shape}")
            raise ValueError("Kích thước đặc trưng không đồng nhất")

    # Vòng lặp huấn luyện
    best_accuracy = 0.0
    for epoch in range(args.epochs):
        network.train()
        running_loss = 0.0
        correct_count = 0
        total_count = 0

        # Giai đoạn Train
        for features, labels, _ in tqdm(train_loader, desc=f"Epoch {epoch + 1}"):
            features = features.to(device)
            labels = labels.to(device)

            optimiser.zero_grad()
            outputs = network(features)
            error_value = criterion(outputs, labels)
            error_value.backward()
            torch.nn.utils.clip_grad_norm_(network.parameters(), 5.0)
            optimiser.step()

            running_loss += error_value.item()
            _, predicted = outputs.max(1)
            correct_count += predicted.eq(labels).sum().item()
            total_count += labels.size(0)

        # Giai đoạn Validation
        network.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        with torch.no_grad():
            for features, labels, _ in val_loader:
                features = features.to(device)
                labels = labels.to(device)

                outputs = network(features)
                error_value = criterion(outputs, labels)

                val_loss += error_value.item()
                _, predicted = outputs.max(1)
                val_correct += predicted.eq(labels).sum().item()
                val_total += labels.size(0)

        # Tính toán chỉ số
        train_acc = 100. * correct_count / total_count
        val_acc = 100. * val_correct / val_total
        scheduler.step(val_acc)

        # Lưu mô hình tốt nhất
        if val_acc > best_accuracy:
            best_accuracy = val_acc
            torch.save(network.state_dict(), Path(args.save_path) / "best_model.pth")

        # Log thông tin
        print(f"Epoch {epoch + 1}/{args.epochs}")
        print(f"Train Loss: {running_loss / len(train_loader):.4f} | Acc: {train_acc:.2f}%")
        print(f"Val Loss: {val_loss / len(val_loader):.4f} | Acc: {val_acc:.2f}%")
        print(f"LR: {optimiser.param_groups[0]['lr']:.2e}\n")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--train_path", default="audio_data/split/train")
    parser.add_argument("--val_path", default="audio_data/split/val")
    parser.add_argument("--save_path", default="./checkpoints")
    parser.add_argument("--device", default="cuda" if torch.cuda.is_available() else "cpu")
    parser.add_argument("--batch_size", type=int, default=40)
    parser.add_argument("--epochs", type=int, default=5000)
    parser.add_argument("--lr", type=float, default=1e-4)
    parser.add_argument("--pos_weight", type=float, default=3.0)
    args = parser.parse_args()

    Path(args.save_path).mkdir(parents=True, exist_ok=True)

    run_training(args)

7. Kiểm tra và Đánh giá Kết quả

Sau khi huấn luyện, mô hình cần được đánh giá trên tập dữ liệu kiểm tra độc lập. Các chỉ số như Precision, Recall và Confusion Matrix giúp hiểu rõ hiệu suất thực tế của hệ thống.

import argparse
import numpy as np
from pathlib import Path
import torch
from sklearn.metrics import classification_report, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
from spotter_network import SpotterNetwork
from keyword_dataset import KeywordDataset


def perform_evaluation(args):
    device = torch.device(args.device)

    # Nạp mô hình
    network = SpotterNetwork().to(device)
    network.load_state_dict(torch.load(args.model_path, map_location=device))
    network.eval()

    # Nạp dữ liệu test
    test_data = KeywordDataset(args.test_path, use_augment=False)
    test_loader = test_data.get_dataloader(args.batch_size, shuffle=False)

    # Thực hiện suy luận
    true_labels = []
    pred_labels = []
    file_paths = []

    with torch.no_grad():
        for features, labels, paths in test_loader:
            features = features.to(device)
            outputs = network(features)
            _, preds = torch.max(outputs, 1)

            true_labels.extend(labels.cpu().numpy())
            pred_labels.extend(preds.cpu().numpy())
            file_paths.extend(paths)

    # In báo cáo phân loại
    print(classification_report(true_labels, pred_labels, target_names=['negative', 'positive']))

    # Vẽ ma trận nhầm lẫn
    fig, ax = plt.subplots(figsize=(6, 6))
    ConfusionMatrixDisplay.from_predictions(
        true_labels, pred_labels,
        display_labels=['Not Wake', 'Wake'],
        cmap='Blues', ax=ax
    )
    plt.savefig(Path(args.save_path) / "confusion_matrix.png")

    # Ghi lại các mẫu bị phân loại sai
    with open(Path(args.save_path) / "errors.txt", 'w') as f:
        for path, true_val, pred_val in zip(file_paths, true_labels, pred_labels):
            if true_val != pred_val:
                f.write(f"{path}\tTrue: {true_val}\tPred: {pred_val}\n")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--test_path", default="audio_data/raw", required=True)
    parser.add_argument("--model_path", default="checkpoints/best_model.pth", required=True)
    parser.add_argument("--batch_size", type=int, default=50)
    parser.add_argument("--device", default="cuda" if torch.cuda.is_available() else "cpu")
    parser.add_argument("--save_path", default="./results")
    args = parser.parse_args()

    Path(args.save_path).mkdir(exist_ok=True)
    perform_evaluation(args)

Thẻ: PyTorch voice-activation crnn mfcc audio-processing

Đăng vào ngày 30 tháng 5 lúc 07:57