Hướng Dẫn Bắt Đầu Với PyFlink

I. Quy Trình Tổng Thể

  1. Thiết lập môi trường thực thi pyFlink2. Nạp dữ liệu đầu vào3. Thực hiện phân tích dữ liệu4. Xuất kết quả phân tích II. Thiết Lập Môi Trường Thực Thi2.1 Cấu Hình Cơ Bản

Dưới đây là mã tham khảofrom pyflink.table import EnvironmentSettings, StreamTableEnvironmentcau_hinh = EnvironmentSettings.new_instance().in_batch_mode().build()bang_env = StreamTableEnvironment.create(environment_settings=cau_hinh)2.2 Các Phương Thức Khác

Sẽ bổ sung thêm các phương thức khác như xử lý streamIII. Nạp Dữ Liệu**

3.1 Từ Biến Dữ Liệu

Mã tham khảo như saudu_lieu = [['A1', 28, 'HN'],['A2', 28, 'HCM'],['A3', 27, 'DN'],['A4', 27, 'CT'],['A5', 27, 'BD'],['A6', 27, 'HP'],['A7', 26, 'TH'],['A8', 26, 'HCM']]bang = bang_env.from_elements(du_lieu, ['ho_ten','tuoi','thanh_pho'],['STRING','INT','STRING'])bang_env.create_temporary_view('nguon_du_lieu', bang) # Đăng ký thành bảng để truy cập bằng flinksql3.2 Từ pandas.DataFrame

dataframe = pd.DataFrame(du_lieu, columns='ho_ten tuoi thanh_pho'.split())bang = bang_env.from_pandas(dataframe)3.3 Từ File CSV

csv_duong_dan = 'du_lieu_mau.csv'csv_schema = 'ho_ten string, tuoi int, thanh_pho string'sql_tao_bang = F"create table bangCSV({csv_schema}) with ('connector' = 'filesystem', 'path' = '{csv_duong_dan}', 'format' = 'csv'))"bang_env.execute_sql(sql_tao_bang)bang = bang_env.from_path('bangCSV')Lưu ý 1: File CSV chứa header sẽ gây lỗiLưu ý 2: Trong phần with của câu lệnh SQL phải dùng nháy đơn, nháy kép sẽ báo lỗiLưu ý 3: Không tạo trùng tên bảng, sẽ gây lỗi - cần xác nhận thêm 3.4 Kết Nối PostgreSQL

from pyflink.table import EnvironmentSettings, StreamTableEnvironmentcau_hinh = EnvironmentSettings.new_instance().in_batch_mode().build()bang_env = StreamTableEnvironment.create(environment_settings=cau_hinh) pg_schema = 'ho_ten STRING, tuoi INT, thanh_pho string'dsn = F'jdbc:postgresql://{host}:{port}/{database}'sql_pg = F"create table bang_pg ({pg_schema})with ('connector'='jdbc','url'='{dsn}','table-name'='{table_name}','driver'='org.postgresql.Driver','username'='{user}','password'='{pwd}')"print(sql_pg)bang_env.execute_sql(sql_pg)bang = bang_env.from_path('bang_pg')bang.limit(5).execute().print()Lưu ý 4: Cần tải file flink-connector-jdbc-.jar và postgresql-.jar, đặt trong thư mục lib của thư mục cài đặt pyflinkLưu ý 5: Các file kết nối phụ thuộc vào phiên bản databaseIV. Xử Lý Dữ Liệu

4.1 Xử Lý Cơ Bản

1) Select

from pyflink.table.expressions import col, callket_qua = bang.select(col("thanh_pho"))ket_qua.limit(3).execute().print()### 2) Group By

bang.group_by(col('thanh_pho')).select(col('thanh_pho'),call("count", col('thanh_pho')).alias('so_luong')).execute().print()bang_env.register_table('nguon_du_lieu', bang)bang_env.sql_query('select thanh_pho, count(*) so_luong from nguon_du_lieu group by thanh_pho').execute().print()### 3) Order By

bang.order_by(col('tuoi').desc).execute().print()### 4) Hàm Tích Hợp

bang.select(call('avg',col('tuoi')).alias('tuoi_trung_binh')).execute().print()bang.select(call('sum',col('tuoi')).alias('tong_tuoi')).execute().print()### 5) Chuẩn Hóa Dữ Liệu

  1. Map & UDF

4.2 Các Xử Lý Khác

Đang cập nhật thêm

V. Xuất Kết Quả

5.1 In Ra Console

bang.map(lay_tinh).execute().print()

5.2 File CSV

_tuoi_chuan = bang.map(chuan_hoa) _tinh_tp = bang.map(lay_tinh) bang1 = bang.join(_tuoi_chuan).where(col('ho_ten')==col('_ho_ten')).select(col('ho_ten'), col('tuoi'), col('_tuoi'), col('thanh_pho')) bang2 = bang1.join(_tinh_tp).where(col('ho_ten')==col('_ho_ten')).select(col('ho_ten'), col('tuoi'), col('_tuoi'), col('thanh_pho'), col('tinh_thanh')) bang_env.create_temporary_view('bang_gop', bang2)

Thực hiện truy vấn và ghi kết quả vào bảng đầu ra

cau_lenh = 'INSERT INTO bang_xuat(ho_ten, tuoi, _tuoi, thanh_pho, tinh_thanh) SELECT ho_ten, tuoi, _tuoi, thanh_pho, tinh_thanh FROM bang_gop' bang_env.execute_sql(cau_lenh).wait()


Lưu ý 6: Đường dẫn xuất CSV chỉ được chỉ định thư mục, không được đặt tên file

Lưu ý 7: Xuất ra database cần tạo bảng để ghi trước

5.3 POSTGRESQL


<div>```
pg_schema = "ho_ten STRING, tuoi int, _tuoi float, thanh_pho string, tinh_thanh string"
dsn = F'jdbc:postgresql://{host}:{port}/{database}'
sql_pg = F"create table bang_pg ({pg_schema})with ('connector'='jdbc','url'='{dsn}','table-name'='{table_name}','driver'='org.postgresql.Driver','username'='{user}','password'='{pwd}')"
bang_env.execute_sql(sql_pg)  

cau_lenh = 'INSERT INTO bang_pg(ho_ten, tuoi, _tuoi, thanh_pho, tinh_thanh) SELECT ho_ten, tuoi, _tuoi, thanh_pho, tinh_thanh FROM bang_gop'
bang_env.execute_sql(cau_lenh).wait()
<br></br><br></br><br></br><br></br>

Đăng vào ngày 10 tháng 6 lúc 16:50