- Hàng đợi chặn BlockingQueue
BlockingQueue là một giao kế thừa từ giao diện Queue, hỗ trợ các thao tác chờ khi lấy phần tử nếu hàng đợi rỗng và chờ khi thêm phần tử nếu hàng đợi đầy. (Có thể áp dụng trong mô hình người sản xuất - người tiêu dùng)
Các lớp thực hiện bao gồm: ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, LinkedTransferQueue, PriorityBlockingQueue, SynchronousQueue
Cung cấp bốn phương thức xử lý chính:
| Phương thức | Ném ngoại lệ | Không ném ngoại lệ, có giá trị trả về | Chặn | Chặn với thời gian chờ |
|---|---|---|---|---|
| Thêm | **boolean add(E e)** | **boolean offer(E e)** | **void put(E e)** | **boolean offer(E e, long timeout, TimeUnit unit)** |
| Loại bỏ | **E remove()** | **E poll()** | **E take()** | **E poll(long timeout, TimeUnit unit)** |
| Kiểm tra đầu hàng đợi | **E element()** | **E peek()** | **/** | **/** |
ArrayBlockingQueue:
public class BlockingQueueDemo {
public static void main(String[] args) {
QueueProcessor processor = new QueueProcessor();
try {
processor.testWithTimeout();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class QueueProcessor {
public void testWithException(){
ArrayBlockingQueue<String> boundedQueue = new ArrayBlockingQueue<>(3);
System.out.println(boundedQueue.add("Mục 1"));
// Thêm thành công trả về true
System.out.println(boundedQueue.add("Mục 2"));
System.out.println(boundedQueue.add("Mục 3"));
// System.out.println(boundedQueue.add("Mục 4"));
// Hàng đợi đầy sẽ ném ngoại lệ IllegalStateException
System.out.println("---");
System.out.println(boundedQueue.remove());
System.out.println(boundedQueue.remove());
System.out.println(boundedQueue.remove());
// System.out.println(boundedQueue.remove());
// Hàng đợi rỗng sẽ ném ngoại lệ NoSuchElementException
}
public void testWithoutException(){
ArrayBlockingQueue<String> boundedQueue = new ArrayBlockingQueue<>(3);
System.out.println(boundedQueue.offer("Mục 1"));
// Thêm thành công trả về true
System.out.println(boundedQueue.offer("Mục 2"));
System.out.println(boundedQueue.offer("Mục 3"));
System.out.println(boundedQueue.offer("Mục 4"));
// Hàng đợi đầy sẽ trả về false, không ném ngoại lệ
System.out.println("---");
System.out.println(boundedQueue.poll());
System.out.println(boundedQueue.poll());
System.out.println(boundedQueue.poll());
System.out.println(boundedQueue.poll());
// Hàng đợi rỗng sẽ trả về null, không ném ngoại lệ
}
public void testBlockingOperations() throws InterruptedException {
ArrayBlockingQueue<String> boundedQueue = new ArrayBlockingQueue<>(3);
boundedQueue.put("Phần tử A");
// Không có giá trị trả về
boundedQueue.put("Phần tử B");
boundedQueue.put("Phần tử C");
// boundedQueue.put("Phần tử D");
// Khi hàng đợi đầy, thao tác put sẽ chờ cho đến khi có chỗ trống
System.out.println(boundedQueue.take());
System.out.println(boundedQueue.take());
System.out.println(boundedQueue.take());
// System.out.println(boundedQueue.take());
// Khi hàng đợi rỗng, thao tác take sẽ chờ cho đến khi có phần tử
}
public void testWithTimeout() throws InterruptedException {
ArrayBlockingQueue<String> boundedQueue = new ArrayBlockingQueue<>(3);
System.out.println(boundedQueue.offer("Mục 1"));
// Thêm thành công trả về true
System.out.println(boundedQueue.offer("Mục 2"));
System.out.println(boundedQueue.offer("Mục 3"));
// System.out.println(boundedQueue.offer("Mục 4"));
// Thêm thất bại trả về false
System.out.println(boundedQueue.offer("Mục 5", 2, TimeUnit.SECONDS));
// Có thể đặt thời gian chờ
System.out.println("---");
System.out.println(boundedQueue.poll());
System.out.println(boundedQueue.poll());
System.out.println(boundedQueue.poll());
System.out.println(boundedQueue.poll(2, TimeUnit.SECONDS));
// Đặt thời gian chờ
}
}
SynchronousQueue:
Hàng đợi đồng bộ không có dung lượng nội bộ. Khi thực hiện put() một phần tử, phải đợi đến khi có take() thì mới có thể put() phần tử tiếp theo.
public class SynchronousQueueExample {
public static void main(String[] args) {
SynchronousQueue<String> syncQueue = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName() + " gửi: Phần tử 1");
syncQueue.put("Phần tử 1");
System.out.println(Thread.currentThread().getName() + " gửi: Phần tử 2");
syncQueue.put("Phần tử 2");
System.out.println(Thread.currentThread().getName() + " gửi: Phần tử 3");
syncQueue.put("Phần tử 3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " nhận: " + syncQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " nhận: " + syncQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " nhận: " + syncQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
Kết quả chạy``` Thread-0 gửi: Phần tử 1 Thread-1 nhận: Phần tử 1 Thread-0 gửi: Phần tử 2 Thread-1 nhận: Phần tử 2 Thread-0 gửi: Phần tử 3 Thread-1 nhận: Phần tử 3