I. Quy Trình Tổng Thể
- Thiết lập môi trường thực thi pyFlink2. Nạp dữ liệu đầu vào3. Thực hiện phân tích dữ liệu4. Xuất kết quả phân tích II. Thiết Lập Môi Trường Thực Thi2.1 Cấu Hình Cơ Bản
Dưới đây là mã tham khảofrom pyflink.table import EnvironmentSettings, StreamTableEnvironmentcau_hinh = EnvironmentSettings.new_instance().in_batch_mode().build()bang_env = StreamTableEnvironment.create(environment_settings=cau_hinh)2.2 Các Phương Thức Khác
Sẽ bổ sung thêm các phương thức khác như xử lý streamIII. Nạp Dữ Liệu**
3.1 Từ Biến Dữ Liệu
Mã tham khảo như saudu_lieu = [['A1', 28, 'HN'],['A2', 28, 'HCM'],['A3', 27, 'DN'],['A4', 27, 'CT'],['A5', 27, 'BD'],['A6', 27, 'HP'],['A7', 26, 'TH'],['A8', 26, 'HCM']]bang = bang_env.from_elements(du_lieu, ['ho_ten','tuoi','thanh_pho'],['STRING','INT','STRING'])bang_env.create_temporary_view('nguon_du_lieu', bang) # Đăng ký thành bảng để truy cập bằng flinksql3.2 Từ pandas.DataFrame
dataframe = pd.DataFrame(du_lieu, columns='ho_ten tuoi thanh_pho'.split())bang = bang_env.from_pandas(dataframe)3.3 Từ File CSV
csv_duong_dan = 'du_lieu_mau.csv'csv_schema = 'ho_ten string, tuoi int, thanh_pho string'sql_tao_bang = F"create table bangCSV({csv_schema}) with ('connector' = 'filesystem', 'path' = '{csv_duong_dan}', 'format' = 'csv'))"bang_env.execute_sql(sql_tao_bang)bang = bang_env.from_path('bangCSV')Lưu ý 1: File CSV chứa header sẽ gây lỗiLưu ý 2: Trong phần with của câu lệnh SQL phải dùng nháy đơn, nháy kép sẽ báo lỗiLưu ý 3: Không tạo trùng tên bảng, sẽ gây lỗi - cần xác nhận thêm 3.4 Kết Nối PostgreSQL
from pyflink.table import EnvironmentSettings, StreamTableEnvironmentcau_hinh = EnvironmentSettings.new_instance().in_batch_mode().build()bang_env = StreamTableEnvironment.create(environment_settings=cau_hinh) pg_schema = 'ho_ten STRING, tuoi INT, thanh_pho string'dsn = F'jdbc:postgresql://{host}:{port}/{database}'sql_pg = F"create table bang_pg ({pg_schema})with ('connector'='jdbc','url'='{dsn}','table-name'='{table_name}','driver'='org.postgresql.Driver','username'='{user}','password'='{pwd}')"print(sql_pg)bang_env.execute_sql(sql_pg)bang = bang_env.from_path('bang_pg')bang.limit(5).execute().print()Lưu ý 4: Cần tải file flink-connector-jdbc-.jar và postgresql-.jar, đặt trong thư mục lib của thư mục cài đặt pyflinkLưu ý 5: Các file kết nối phụ thuộc vào phiên bản databaseIV. Xử Lý Dữ Liệu
4.1 Xử Lý Cơ Bản
1) Select
from pyflink.table.expressions import col, callket_qua = bang.select(col("thanh_pho"))ket_qua.limit(3).execute().print()### 2) Group By
bang.group_by(col('thanh_pho')).select(col('thanh_pho'),call("count", col('thanh_pho')).alias('so_luong')).execute().print()bang_env.register_table('nguon_du_lieu', bang)bang_env.sql_query('select thanh_pho, count(*) so_luong from nguon_du_lieu group by thanh_pho').execute().print()### 3) Order By
bang.order_by(col('tuoi').desc).execute().print()### 4) Hàm Tích Hợp
bang.select(call('avg',col('tuoi')).alias('tuoi_trung_binh')).execute().print()bang.select(call('sum',col('tuoi')).alias('tong_tuoi')).execute().print()### 5) Chuẩn Hóa Dữ Liệu
- Map & UDF
4.2 Các Xử Lý Khác
Đang cập nhật thêm
V. Xuất Kết Quả
5.1 In Ra Console
bang.map(lay_tinh).execute().print()
5.2 File CSV
_tuoi_chuan = bang.map(chuan_hoa) _tinh_tp = bang.map(lay_tinh) bang1 = bang.join(_tuoi_chuan).where(col('ho_ten')==col('_ho_ten')).select(col('ho_ten'), col('tuoi'), col('_tuoi'), col('thanh_pho')) bang2 = bang1.join(_tinh_tp).where(col('ho_ten')==col('_ho_ten')).select(col('ho_ten'), col('tuoi'), col('_tuoi'), col('thanh_pho'), col('tinh_thanh')) bang_env.create_temporary_view('bang_gop', bang2)
Thực hiện truy vấn và ghi kết quả vào bảng đầu ra
cau_lenh = 'INSERT INTO bang_xuat(ho_ten, tuoi, _tuoi, thanh_pho, tinh_thanh) SELECT ho_ten, tuoi, _tuoi, thanh_pho, tinh_thanh FROM bang_gop' bang_env.execute_sql(cau_lenh).wait()
Lưu ý 6: Đường dẫn xuất CSV chỉ được chỉ định thư mục, không được đặt tên file
Lưu ý 7: Xuất ra database cần tạo bảng để ghi trước
5.3 POSTGRESQL
<div>```
pg_schema = "ho_ten STRING, tuoi int, _tuoi float, thanh_pho string, tinh_thanh string"
dsn = F'jdbc:postgresql://{host}:{port}/{database}'
sql_pg = F"create table bang_pg ({pg_schema})with ('connector'='jdbc','url'='{dsn}','table-name'='{table_name}','driver'='org.postgresql.Driver','username'='{user}','password'='{pwd}')"
bang_env.execute_sql(sql_pg)
cau_lenh = 'INSERT INTO bang_pg(ho_ten, tuoi, _tuoi, thanh_pho, tinh_thanh) SELECT ho_ten, tuoi, _tuoi, thanh_pho, tinh_thanh FROM bang_gop'
bang_env.execute_sql(cau_lenh).wait()
<br></br><br></br><br></br><br></br>