Sử dụng SSE trong lập trình phản ứng: Hướng dẫn gọi API từ trình duyệt và Java

Để tương tác với API Server-Sent Events (SSE), bạn có thể triển khai client trên cả môi trường trình duyệt lẫn backend Java. Dưới đây là hướng dẫn chi tiết cách kết nối, xử lý sự kiện, xác thực và quản lý lỗi.

Giả định về endpoint SSE

Giả sử server cung cấp endpoint SSE tại:

  • URL: http://localhost:8080/events
  • Phương thức: GET
  • Content-Type: text/event-stream
  • Định dạng dữ liệu mẫu:
    event: message
    data: {"id":1,"content":"Xin chào"}
    id: 1001
    retry: 3000
    
    event: update
    data: {"status":"hoàn thành"}
        

Gọi SSE từ trình duyệt (JavaScript)

Trình duyệt hiện đại hỗ trợ sẵn EventSource — không cần thư viện bên ngoài.

1. Kết nối cơ bản và lắng nghe sự kiện

<script>
const stream = new EventSource('http://localhost:8080/events');

// Xử lý tin nhắn mặc định
stream.onmessage = (evt) => {
  console.log('Dữ liệu nhận:', evt.data);
  document.getElementById('output').innerHTML += `<p>${evt.data}</p>`;
};

// Lắng nghe sự kiện tùy chỉnh
stream.addEventListener('update', (evt) => {
  const payload = JSON.parse(evt.data);
  console.log('Cập nhật trạng thái:', payload.status);
});

// Khi kết nối mở
stream.onopen = () => console.log('Kết nối SSE đã sẵn sàng');

// Xử lý lỗi (trình duyệt tự động reconnect)
stream.onerror = (err) => {
  console.error('Lỗi kết nối:', err);
};

// Đóng kết nối khi rời trang
window.addEventListener('beforeunload', () => stream.close());
</script>

2. Xác thực người dùng

EventSource không cho phép thiết lập header tùy ý, bạn có thể dùng:

  • Cookie: kèm { withCredentials: true }
  • Token qua query string: new EventSource('/events?auth=token_xyz') (không an toàn cho token nhạy cảm)

3. Kiểm soát số lần thử lại

let attempts = 0;
const maxAttempts = 5;

stream.onerror = () => {
  attempts++;
  if (attempts > maxAttempts) {
    stream.close();
    console.warn('Ngừng kết nối sau nhiều lần thất bại');
  }
};

Gọi SSE từ ứng dụng Java

Java không có sẵn client SSE, nên cần tự phân tích luồng dữ liệu.

1. Sử dụng HttpClient (Java 11+)

import java.net.http.*;
import java.util.concurrent.Flow;

public class SseReader {
    public static void main(String[] args) throws Exception {
        var client = HttpClient.newHttpClient();
        var req = HttpRequest.newBuilder()
            .uri(URI.create("http://localhost:8080/events"))
            .header("Accept", "text/event-stream")
            .build();

        var handler = HttpResponse.BodyHandlers.fromLineSubscriber(
            new Flow.Subscriber<>() {
                private Flow.Subscription sub;

                public void onSubscribe(Flow.Subscription s) {
                    this.sub = s;
                    s.request(1);
                }

                public void onNext(String line) {
                    if (line.startsWith("data: ")) {
                        System.out.println("Dữ liệu: " + line.substring(6));
                    }
                    sub.request(1); // Yêu cầu dòng tiếp theo
                }

                public void onError(Throwable t) {
                    System.err.println("Lỗi luồng: " + t.getMessage());
                }

                public void onComplete() {
                    System.out.println("Kết thúc luồng");
                }
            },
            "\n"
        );

        client.sendAsync(req, handler).join();
        Thread.sleep(60_000); // Giữ tiến trình chạy
    }
}

2. Sử dụng OkHttp (phiên bản cũ hoặc muốn code gọn hơn)

import okhttp3.*;
import okio.BufferedSource;

public class SseWithOkHttp {
    public static void main(String[] args) throws Exception {
        var client = new OkHttpClient();
        var request = new Request.Builder()
            .url("http://localhost:8080/events")
            .build();

        try (var response = client.newCall(request).execute()) {
            var source = response.body().source();
            var buffer = new StringBuilder();

            while (!Thread.currentThread().isInterrupted()) {
                String line = source.readUtf8Line();
                if (line == null) break;

                if (line.isEmpty()) {
                    handleEvent(buffer.toString());
                    buffer.setLength(0);
                } else {
                    buffer.append(line).append("\n");
                }
            }
        }
    }

    static void handleEvent(String raw) {
        if (raw.contains("data:")) {
            String content = raw.split("data:", 2)[1].trim();
            System.out.println("Nội dung: " + content);
        }
    }
}

Lưu ý quan trọng

  • Giao tiếp một chiều: SSE chỉ cho phép server gửi dữ liệu đến client. Nếu cần hai chiều, hãy dùng WebSocket.
  • Quản lý tài nguyên: Luôn đóng kết nối khi không còn cần thiết để tránh rò rỉ bộ nhớ.
  • CORS: Server phải cấu hình CORS nếu client gọi từ domain khác:
    @CrossOrigin(origins = "http://localhost:3000")
  • Cấu hình proxy: Với Nginx, cần tắt buffering và tăng timeout:
    proxy_buffering off;
    proxy_read_timeout 86400s;

So sánh nhanh

Loại client Cách triển khai Xác thực Tự động reconnect Phù hợp
Trình duyệt EventSource Cookie / Query param Thông báo thời gian thực
Java Phân tích luồng thủ công Bất kỳ header nào Không (phải tự code) Microservice, testing

Gợi ý kiến trúc:
- Trong trình duyệt: Ưu tiên EventSource vì đơn giản và ổn định.
- Trong hệ thống backend: Cân nhắc thay thế SSE bằng Kafka, RabbitMQ hoặc gRPC streaming — phù hợp hơn cho giao tiếp service-to-service.

Thẻ: SSE eventsource Java reactive-programming JavaScript

Đăng vào ngày 28 tháng 6 lúc 05:21