Sử dụng synchronized để giải quyết xung đột truy cập tài nguyên chung
Khi nhiều luồng cùng thao tác trên một biến chia sẻ mà không có cơ chế đồng bộ, kết quả cuối cùng thường không chính xác do hiện tượng tranh chấp (race condition). Để khắc phục, từ khóa synchronized được sử dụng nhằm đảm bảo chỉ một luồng duy nhất thực thi đoạn mã quan trọng tại mỗi thời điểm.
public class ThreadSafetyDemo {
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
Thread inc1 = new Thread(new IncrementTask(counter), "Increment-1");
Thread inc2 = new Thread(new IncrementTask(counter), "Increment-2");
Thread dec1 = new Thread(new DecrementTask(counter), "Decrement-1");
inc1.start();
inc2.start();
dec1.start();
inc1.join();
inc2.join();
dec1.join();
System.out.println("Giá trị cuối cùng: " + counter.getValue());
}
}
class IncrementTask implements Runnable {
private Counter counter;
public IncrementTask(Counter counter) {
this.counter = counter;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
counter.increment();
}
System.out.println(Thread.currentThread().getName() + " hoàn tất.");
}
}
class DecrementTask implements Runnable {
private Counter counter;
public DecrementTask(Counter counter) {
this.counter = counter;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
counter.decrement();
}
System.out.println(Thread.currentThread().getName() + " hoàn tất.");
}
}
class Counter {
private int value = 0;
// Dùng khối synchronized để đồng bộ hóa phần tăng giá trị
public void increment() {
synchronized (this) {
value++;
}
}
// Dùng phương thức synchronized cho phương thức tĩnh hoặc thể hiện
public synchronized void decrement() {
value--;
}
public synchronized int getValue() {
return value;
}
}
Đồng bộ hóa hợp tác giữa các luồng bằng wait() và notifyAll()
Trong mô hình sản xuất – tiêu thụ, việc phối hợp giữa các luồng là cần thiết để tránh tình trạng truy cập vào trạng thái dữ liệu không hợp lệ (ví dụ: lấy hàng khi kho rỗng hoặc thêm hàng khi kho đầy). Cơ chế wait(), notify(), và notifyAll() giúp các luồng chờ đợi hoặc đánh thức nhau dựa trên điều kiện cụ thể.
public class ProducerConsumerDemo {
public static void main(String[] args) throws InterruptedException {
Warehouse warehouse = new Warehouse(5);
Thread producer = new Thread(new Producer(warehouse, 20), "Producer");
Thread consumer1 = new Thread(new Consumer(warehouse, 10), "Consumer-1");
Thread consumer2 = new Thread(new Consumer(warehouse, 10), "Consumer-2");
Thread consumer3 = new Thread(new Consumer(warehouse, 10), "Consumer-3");
producer.start();
consumer1.start();
consumer2.start();
consumer3.start();
producer.join();
consumer1.join();
consumer2.join();
consumer3.join();
System.out.println("Chương trình chính kết thúc. Kho còn lại: " + warehouse.size() + " mặt hàng.");
}
}
Lớp đại diện cho người sản xuất
class Producer implements Runnable {
private Warehouse warehouse;
private int itemCount;
public Producer(Warehouse warehouse, int itemCount) {
this.warehouse = warehouse;
this.itemCount = itemCount;
}
@Override
public void run() {
for (int i = 0; i < itemCount; i++) {
try {
warehouse.store("Sản phẩm-" + i);
Thread.sleep(100); // Giả lập thời gian sản xuất
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
System.out.println(Thread.currentThread().getName() + " đã hoàn thành việc sản xuất.");
}
}
Lớp đại diện cho người tiêu dùng
class Consumer implements Runnable {
private Warehouse warehouse;
private int consumeCount;
public Consumer(Warehouse warehouse, int consumeCount) {
this.warehouse = warehouse;
this.consumeCount = consumeCount;
}
@Override
public void run() {
for (int i = 0; i < consumeCount; i++) {
try {
warehouse.retrieve();
Thread.sleep(200); // Giả lập thời gian tiêu thụ
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
System.out.println(Thread.currentThread().getName() + " đã hoàn thành việc tiêu thụ.");
}
}
Lớp quản lý kho – nơi xảy ra đồng bộ hóa
import java.util.LinkedList;
import java.util.Queue;
class Warehouse {
private final int capacity;
private final Queue<String> items;
public Warehouse(int capacity) {
this.capacity = capacity;
this.items = new LinkedList<>();
}
public synchronized void store(String item) {
while (items.size() == capacity) {
try {
System.out.println("Kho đầy! " + Thread.currentThread().getName() + " đang chờ...");
wait(); // Chờ đến khi có chỗ trống
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
items.offer(item);
System.out.println(item + " đã được lưu vào kho.");
notifyAll(); // Đánh thức tất cả các luồng đang chờ (ví dụ: người tiêu dùng)
}
public synchronized void retrieve() {
while (items.isEmpty()) {
try {
System.out.println("Kho trống! " + Thread.currentThread().getName() + " đang chờ...");
wait(); // Chờ đến khi có hàng để lấy
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
String item = items.poll();
System.out.println("Lấy ra: " + item + " khỏi kho.");
notifyAll(); // Đánh thức các luồng đang chờ (ví dụ: người sản xuất)
}
public synchronized int size() {
return items.size();
}
}
Việc sử dụng vòng lặp while thay vì if khi kiểm tra điều kiện trước khi gọi wait() là rất quan trọng. Điều này ngăn chặn hiện tượng "spurious wakeup" và đảm bảo rằng luồng chỉ tiếp tục khi điều kiện thực sự được đáp ứng.