Hướng dẫn lập trình JUC (Phần 6): Hàng đợi chặn BlockingQueue

  1. Hàng đợi chặn BlockingQueue

BlockingQueue là một giao kế thừa từ giao diện Queue, hỗ trợ các thao tác chờ khi lấy phần tử nếu hàng đợi rỗng và chờ khi thêm phần tử nếu hàng đợi đầy. (Có thể áp dụng trong mô hình người sản xuất - người tiêu dùng)

Các lớp thực hiện bao gồm: ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, LinkedTransferQueue, PriorityBlockingQueue, SynchronousQueue

Cung cấp bốn phương thức xử lý chính:

Phương thức Ném ngoại lệ Không ném ngoại lệ, có giá trị trả về Chặn Chặn với thời gian chờ
Thêm **boolean add(E e)** **boolean offer(E e)** **void put(E e)** **boolean offer(E e, long timeout, TimeUnit unit)**
Loại bỏ **E remove()** **E poll()** **E take()** **E poll(long timeout, TimeUnit unit)**
Kiểm tra đầu hàng đợi **E element()** **E peek()** **/** **/**

ArrayBlockingQueue:

public class BlockingQueueDemo {
    public static void main(String[] args) {
        QueueProcessor processor = new QueueProcessor();
        try {
            processor.testWithTimeout();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class QueueProcessor {

    public void testWithException(){
        ArrayBlockingQueue<String> boundedQueue = new ArrayBlockingQueue<>(3);
        System.out.println(boundedQueue.add("Mục 1"));
        // Thêm thành công trả về true
        System.out.println(boundedQueue.add("Mục 2"));
        System.out.println(boundedQueue.add("Mục 3"));
        // System.out.println(boundedQueue.add("Mục 4"));
        // Hàng đợi đầy sẽ ném ngoại lệ IllegalStateException
        System.out.println("---");
        System.out.println(boundedQueue.remove());
        System.out.println(boundedQueue.remove());
        System.out.println(boundedQueue.remove());
        // System.out.println(boundedQueue.remove());
        // Hàng đợi rỗng sẽ ném ngoại lệ NoSuchElementException
    }

    public void testWithoutException(){
        ArrayBlockingQueue<String> boundedQueue = new ArrayBlockingQueue<>(3);
        System.out.println(boundedQueue.offer("Mục 1"));
        // Thêm thành công trả về true
        System.out.println(boundedQueue.offer("Mục 2"));
        System.out.println(boundedQueue.offer("Mục 3"));
        System.out.println(boundedQueue.offer("Mục 4"));
        // Hàng đợi đầy sẽ trả về false, không ném ngoại lệ
        System.out.println("---");
        System.out.println(boundedQueue.poll());
        System.out.println(boundedQueue.poll());
        System.out.println(boundedQueue.poll());
        System.out.println(boundedQueue.poll());
        // Hàng đợi rỗng sẽ trả về null, không ném ngoại lệ
    }

    public void testBlockingOperations() throws InterruptedException {
        ArrayBlockingQueue<String> boundedQueue = new ArrayBlockingQueue<>(3);
        boundedQueue.put("Phần tử A");
        // Không có giá trị trả về
        boundedQueue.put("Phần tử B");
        boundedQueue.put("Phần tử C");
        // boundedQueue.put("Phần tử D");
        // Khi hàng đợi đầy, thao tác put sẽ chờ cho đến khi có chỗ trống
        System.out.println(boundedQueue.take());
        System.out.println(boundedQueue.take());
        System.out.println(boundedQueue.take());
        // System.out.println(boundedQueue.take());
        // Khi hàng đợi rỗng, thao tác take sẽ chờ cho đến khi có phần tử
    }

    public void testWithTimeout() throws InterruptedException {
        ArrayBlockingQueue<String> boundedQueue = new ArrayBlockingQueue<>(3);
        System.out.println(boundedQueue.offer("Mục 1"));
        // Thêm thành công trả về true
        System.out.println(boundedQueue.offer("Mục 2"));
        System.out.println(boundedQueue.offer("Mục 3"));
        // System.out.println(boundedQueue.offer("Mục 4"));
        // Thêm thất bại trả về false
        System.out.println(boundedQueue.offer("Mục 5", 2, TimeUnit.SECONDS));
        // Có thể đặt thời gian chờ
        System.out.println("---");
        System.out.println(boundedQueue.poll());
        System.out.println(boundedQueue.poll());
        System.out.println(boundedQueue.poll());
        System.out.println(boundedQueue.poll(2, TimeUnit.SECONDS));
        // Đặt thời gian chờ
    }
}

SynchronousQueue:

Hàng đợi đồng bộ không có dung lượng nội bộ. Khi thực hiện put() một phần tử, phải đợi đến khi có take() thì mới có thể put() phần tử tiếp theo.

public class SynchronousQueueExample {
    public static void main(String[] args) {
        SynchronousQueue<String> syncQueue = new SynchronousQueue<>();
        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName() + " gửi: Phần tử 1");
                syncQueue.put("Phần tử 1");
                System.out.println(Thread.currentThread().getName() + " gửi: Phần tử 2");
                syncQueue.put("Phần tử 2");
                System.out.println(Thread.currentThread().getName() + " gửi: Phần tử 3");
                syncQueue.put("Phần tử 3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        
        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName() + " nhận: " + syncQueue.take());
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName() + " nhận: " + syncQueue.take());
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName() + " nhận: " + syncQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

Kết quả chạy``` Thread-0 gửi: Phần tử 1 Thread-1 nhận: Phần tử 1 Thread-0 gửi: Phần tử 2 Thread-1 nhận: Phần tử 2 Thread-0 gửi: Phần tử 3 Thread-1 nhận: Phần tử 3

Thẻ: Java JUC BlockingQueue

Đăng vào ngày 21 tháng 6 lúc 02:12