Việc duy trì khả năng quan sát (observability) là yếu tố sống còn khi vận hành các hệ thống phân tán dựa trên Temporal. Nếu không có cơ chế giám sát hiệu quả, các lỗi tiềm ẩn như nghẽn Task Queue, Activity thất bại liên tục hoặc Workflow bị treo sẽ rất khó bị phát hiện sớm. Temporal Python SDK cung cấp một framework mạnh mẽ để tích hợp với các công cụ thu thập metrics phổ biến, giúp kỹ sư chuyển đổi từ thế bị động sang chủ động trong việc quản lý vận hành.
Kiến trúc giám sát trong Temporal Python SDK
Hệ thống giám sát của SDK được xây dựng xoay quanh lớp TelemetryConfig. Lớp này chịu trách nhiệm thu thập các chỉ số (metrics) từ quá trình thực thi Workflow và Activity, sau đó xuất dữ liệu ra các kênh bên ngoài. Có ba phương pháp chính để cấu hình:
- Prometheus: Phù hợp cho môi trường Kubernetes hoặc các hệ thống sử dụng Grafana để hiển thị.
- OpenTelemetry (OTel): Chuẩn chung để gửi dữ liệu đến các nền tảng như Datadog, New Relic hoặc Jaeger.
- MetricBuffer: Cho phép can thiệp trực tiếp vào dữ liệu thô để xử lý logic cảnh báo tùy chỉnh trong Python.
Tích hợp Prometheus và Grafana
Prometheus là lựa chọn phổ biến nhất nhờ tính đơn giản và hiệu quả. Dưới đây là cách thiết lập một endpoint để Prometheus có thể thu thập (scrape) dữ liệu từ ứng dụng Python của bạn:
from temporalio.runtime import Runtime, TelemetryConfig, PrometheusConfig
# Thiết lập runtime với cấu hình Prometheus
prometheus_setup = PrometheusConfig(
bind_address="0.0.0.0:8000", # Cổng để Prometheus scrape dữ liệu
counters_total_suffix=True, # Thêm hậu tố _total cho counter
unit_suffix=True, # Thêm đơn vị vào tên chỉ số
histogram_bucket_overrides={ # Tùy chỉnh phân bổ histogram
"temporal_workflow_execution_duration": [0.1, 1.0, 5.0, 15.0, 60.0]
}
)
temporal_runtime = Runtime(
telemetry=TelemetryConfig(
metrics=prometheus_setup,
metric_prefix="worker_node_" # Tiền tố tùy chỉnh cho metrics
)
)
# Gán runtime làm mặc định cho Worker
Runtime.set_default(temporal_runtime)
Sau khi cấu hình, bạn cần thêm job vào file prometheus.yml để bắt đầu thu thập dữ liệu:
scrape_configs:
- job_name: 'temporal-python-worker'
static_configs:
- targets: ['localhost:8000']
scrape_interval: 10s
Cấu hình qua OpenTelemetry (OTel)
Nếu hệ thống của bạn sử dụng OTLP (OpenTelemetry Protocol), bạn có thể gửi dữ liệu trực tiếp đến một Collector hoặc backend hỗ trợ:
from datetime import timedelta
from temporalio.runtime import TelemetryConfig, OpenTelemetryConfig, OpenTelemetryMetricTemporality
otel_telemetry = TelemetryConfig(
metrics=OpenTelemetryConfig(
url="http://otel-collector-service:4317",
headers={"x-api-key": "your-secret-token"},
metric_periodicity=timedelta(seconds=15),
metric_temporality=OpenTelemetryMetricTemporality.DELTA,
durations_as_seconds=True,
http=False # Sử dụng gRPC mặc định
),
global_tags={"environment": "production", "region": "asia-east"},
attach_service_name=True
)
Xây dựng hệ thống cảnh báo tùy chỉnh với MetricBuffer
Trong một số trường hợp, bạn muốn gửi cảnh báo trực tiếp qua Telegram, Slack hoặc Webhook mà không muốn thông qua hệ thống trung gian. MetricBuffer cho phép bạn lấy dữ liệu chỉ số ngay trong mã nguồn Python.
import time
import requests
from temporalio.runtime import Runtime, TelemetryConfig, MetricBuffer
# 1. Khởi tạo bộ đệm chỉ số
metrics_storage = MetricBuffer(buffer_size=50000)
custom_runtime = Runtime(telemetry=TelemetryConfig(metrics=metrics_storage))
Runtime.set_default(custom_runtime)
def check_and_notify():
while True:
# Lấy dữ liệu mới nhất từ bộ đệm
snapshots = metrics_storage.retrieve_updates()
for entry in snapshots:
metric_info = entry.metric
# Ví dụ: Cảnh báo nếu có Workflow thất bại
if metric_info.name == "temporal_workflow_execution_total":
if entry.attributes.get("status") == "failed" and entry.value > 0:
send_to_webhook(f"Phát hiện {entry.value} Workflow bị lỗi!")
time.sleep(30)
def send_to_webhook(msg):
# Logic gửi thông báo qua API của bạn (Slack/Telegram)
print(f"[ALERT]: {msg}")
Các chỉ số quan trọng cần theo dõi
Để đảm bảo hệ thống vận hành ổn định, bạn cần thiết lập ngưỡng cảnh báo (Threshold) cho các chỉ số cốt lõi sau:
| Nhóm chỉ số | Tên Metrics | Mô tả | Ngưỡng khuyến nghị |
|---|---|---|---|
| Workflow | temporal_workflow_execution_total |
Tổng số Workflow theo trạng thái | Cảnh báo nếu trạng thái "failed" > 2% |
| Activity | temporal_activity_schedule_to_start_latency |
Độ trễ từ khi lập lịch đến khi Worker nhận việc | P99 > 2 giây (Dấu hiệu thiếu Worker) |
| Task Queue | temporal_task_queue_latency_seconds |
Thời gian chờ trong hàng đợi | P95 > 1 giây |
| Worker | temporal_worker_task_slots_available |
Số lượng slot xử lý còn trống | Cảnh báo nếu < 10% trong thời gian dài |
Kỹ thuật tối ưu hóa giám sát
Việc thu thập quá nhiều chỉ số có thể gây áp lực lên tài nguyên hệ thống (CPU/Memory). Hãy áp dụng các chiến lược sau để tối ưu:
- Lọc chỉ số (Filtering): Chỉ thu thập những chỉ số thực sự cần thiết. Sử dụng
TelemetryFilterđể loại bỏ các log hoặc metrics ở mức độDEBUG. - Lấy mẫu (Sampling): Đối với các Activity có tần suất cực cao (vài nghìn lần mỗi giây), hãy sử dụng cơ chế lấy mẫu để giảm khối lượng dữ liệu gửi đi.
- Điều chỉnh chu kỳ (Interval): Thay vì thu thập mỗi giây, hãy kéo dài chu kỳ lên 10-15 giây để giảm tải cho Prometheus hoặc OTel Collector.
Bằng cách kết hợp giữa việc quan sát trực quan qua Grafana và thiết lập cảnh báo tự động qua Webhook, đội ngũ vận hành có thể đảm bảo các quy trình nghiệp vụ trên Temporal luôn chạy đúng tiến độ và xử lý kịp thời các sự cố phát sinh.