Để tương tác với API Server-Sent Events (SSE), bạn có thể triển khai client trên cả môi trường trình duyệt lẫn backend Java. Dưới đây là hướng dẫn chi tiết cách kết nối, xử lý sự kiện, xác thực và quản lý lỗi.
Giả định về endpoint SSE
Giả sử server cung cấp endpoint SSE tại:
- URL:
http://localhost:8080/events - Phương thức: GET
- Content-Type:
text/event-stream - Định dạng dữ liệu mẫu:
event: message data: {"id":1,"content":"Xin chào"} id: 1001 retry: 3000 event: update data: {"status":"hoàn thành"}
Gọi SSE từ trình duyệt (JavaScript)
Trình duyệt hiện đại hỗ trợ sẵn EventSource — không cần thư viện bên ngoài.
1. Kết nối cơ bản và lắng nghe sự kiện
<script>
const stream = new EventSource('http://localhost:8080/events');
// Xử lý tin nhắn mặc định
stream.onmessage = (evt) => {
console.log('Dữ liệu nhận:', evt.data);
document.getElementById('output').innerHTML += `<p>${evt.data}</p>`;
};
// Lắng nghe sự kiện tùy chỉnh
stream.addEventListener('update', (evt) => {
const payload = JSON.parse(evt.data);
console.log('Cập nhật trạng thái:', payload.status);
});
// Khi kết nối mở
stream.onopen = () => console.log('Kết nối SSE đã sẵn sàng');
// Xử lý lỗi (trình duyệt tự động reconnect)
stream.onerror = (err) => {
console.error('Lỗi kết nối:', err);
};
// Đóng kết nối khi rời trang
window.addEventListener('beforeunload', () => stream.close());
</script>
2. Xác thực người dùng
Vì EventSource không cho phép thiết lập header tùy ý, bạn có thể dùng:
- Cookie: kèm
{ withCredentials: true } - Token qua query string:
new EventSource('/events?auth=token_xyz')(không an toàn cho token nhạy cảm)
3. Kiểm soát số lần thử lại
let attempts = 0;
const maxAttempts = 5;
stream.onerror = () => {
attempts++;
if (attempts > maxAttempts) {
stream.close();
console.warn('Ngừng kết nối sau nhiều lần thất bại');
}
};
Gọi SSE từ ứng dụng Java
Java không có sẵn client SSE, nên cần tự phân tích luồng dữ liệu.
1. Sử dụng HttpClient (Java 11+)
import java.net.http.*;
import java.util.concurrent.Flow;
public class SseReader {
public static void main(String[] args) throws Exception {
var client = HttpClient.newHttpClient();
var req = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:8080/events"))
.header("Accept", "text/event-stream")
.build();
var handler = HttpResponse.BodyHandlers.fromLineSubscriber(
new Flow.Subscriber<>() {
private Flow.Subscription sub;
public void onSubscribe(Flow.Subscription s) {
this.sub = s;
s.request(1);
}
public void onNext(String line) {
if (line.startsWith("data: ")) {
System.out.println("Dữ liệu: " + line.substring(6));
}
sub.request(1); // Yêu cầu dòng tiếp theo
}
public void onError(Throwable t) {
System.err.println("Lỗi luồng: " + t.getMessage());
}
public void onComplete() {
System.out.println("Kết thúc luồng");
}
},
"\n"
);
client.sendAsync(req, handler).join();
Thread.sleep(60_000); // Giữ tiến trình chạy
}
}
2. Sử dụng OkHttp (phiên bản cũ hoặc muốn code gọn hơn)
import okhttp3.*;
import okio.BufferedSource;
public class SseWithOkHttp {
public static void main(String[] args) throws Exception {
var client = new OkHttpClient();
var request = new Request.Builder()
.url("http://localhost:8080/events")
.build();
try (var response = client.newCall(request).execute()) {
var source = response.body().source();
var buffer = new StringBuilder();
while (!Thread.currentThread().isInterrupted()) {
String line = source.readUtf8Line();
if (line == null) break;
if (line.isEmpty()) {
handleEvent(buffer.toString());
buffer.setLength(0);
} else {
buffer.append(line).append("\n");
}
}
}
}
static void handleEvent(String raw) {
if (raw.contains("data:")) {
String content = raw.split("data:", 2)[1].trim();
System.out.println("Nội dung: " + content);
}
}
}
Lưu ý quan trọng
- Giao tiếp một chiều: SSE chỉ cho phép server gửi dữ liệu đến client. Nếu cần hai chiều, hãy dùng WebSocket.
- Quản lý tài nguyên: Luôn đóng kết nối khi không còn cần thiết để tránh rò rỉ bộ nhớ.
- CORS: Server phải cấu hình CORS nếu client gọi từ domain khác:
@CrossOrigin(origins = "http://localhost:3000")
- Cấu hình proxy: Với Nginx, cần tắt buffering và tăng timeout:
proxy_buffering off; proxy_read_timeout 86400s;
So sánh nhanh
| Loại client | Cách triển khai | Xác thực | Tự động reconnect | Phù hợp |
|---|---|---|---|---|
| Trình duyệt | EventSource |
Cookie / Query param | Có | Thông báo thời gian thực |
| Java | Phân tích luồng thủ công | Bất kỳ header nào | Không (phải tự code) | Microservice, testing |
Gợi ý kiến trúc:
- Trong trình duyệt: Ưu tiên EventSource vì đơn giản và ổn định.
- Trong hệ thống backend: Cân nhắc thay thế SSE bằng Kafka, RabbitMQ hoặc gRPC streaming — phù hợp hơn cho giao tiếp service-to-service.