Kết nối Impala với Kudu qua JDBC trong Java

Impala cho phép lập trình viên kết nối thông qua JDBC. Nhờ đó, có thể sử dụng Impala như một trung gian để thực hiện các thao tác với Kudu — hệ thống lưu trữ cột có hỗ trợ truy vấn thời gian thực.

1. Cấu hình phụ thuộc Maven

<!-- Impala JDBC 4.1 driver -->
<dependency>
    <groupId>com.cloudera</groupId>
    <artifactId>ImpalaJDBC41</artifactId>
    <version>2.6.4</version>
</dependency>

2. Mô hình dữ liệu và đối tượng nghiệp vụ

package demo.kudu;

/**
 * Modelo dữ liệu biểu diễn bản ghi nhân viên.
 */
public class EmployeeRecord {
    private int companyId;
    private int staffId;
    private String fullName;
    private String sex;
    private String avatar;

    public EmployeeRecord(int companyId, int staffId, String fullName, String sex, String avatar) {
        this.companyId = companyId;
        this.staffId = staffId;
        this.fullName = fullName;
        this.sex = sex;
        this.avatar = avatar;
    }

    // Các getter và setter
    public int getCompanyId() { return companyId; }
    public int getStaffId() { return staffId; }
    public String getFullName() { return fullName; }
    public String getSex() { return sex; }
    public String getAvatar() { return avatar; }

    public void setCompanyId(int companyId) { this.companyId = companyId; }
    public void setStaffId(int staffId) { this.staffId = staffId; }
    public void setFullName(String fullName) { this.fullName = fullName; }
    public void setSex(String sex) { this.sex = sex; }
    public void setAvatar(String avatar) { this.avatar = avatar; }
}

3.库 lớp kết nối và thao tác cơ sở dữ liệu

package demo.kudu;

import java.sql.*;

public class KuduViaImpala {

    private static final String DRIVER_CLASS = "com.cloudera.impala.jdbc41.Driver";
    private static final String JDBC_URL = "jdbc:impala://node1.example.com:21050/production_db;auth=noSasl";
    private static Connection connection;

    static {
        try {
            Class.forName(DRIVER_CLASS);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException("Failed to load JDBC driver", e);
        }
    }

    public static Connection obtainConnection() {
        try {
            if (connection == null || connection.isClosed()) {
                connection = DriverManager.getConnection(JDBC_URL);
            }
        } catch (SQLException e) {
            throw new RuntimeException("Database connection failed", e);
        }
        return connection;
    }

    public static void closeQuietly(AutoCloseable... resources) {
        for (AutoCloseable r : resources) {
            if (r != null) {
                try {
                    r.close();
                } catch (Exception ignored) {
                }
            }
        }
    }

    public static void insertEmployee(EmployeeRecord record) {
        String sql = "INSERT INTO kudu_external_table (company_id, staff_id, full_name, sex, avatar) VALUES (?, ?, ?, ?, ?)";
        try (Connection conn = obtainConnection();
             PreparedStatement stmt = conn.prepareStatement(sql)) {
            stmt.setInt(1, record.getCompanyId());
            stmt.setInt(2, record.getStaffId());
            stmt.setString(3, record.getFullName());
            stmt.setString(4, record.getSex());
            stmt.setString(5, record.getAvatar());
            stmt.execute();
        } catch (SQLException e) {
            throw new RuntimeException("Insert operation failed", e);
        }
    }

    public static void updateEmployeeInfo(EmployeeRecord record) {
        String sql = "UPDATE kudu_external_table SET full_name = ?, sex = ?, avatar = ? WHERE staff_id = ?";
        try (Connection conn = obtainConnection();
             PreparedStatement stmt = conn.prepareStatement(sql)) {
            stmt.setString(1, record.getFullName());
            stmt.setString(2, record.getSex());
            stmt.setString(3, record.getAvatar());
            stmt.setInt(4, record.getStaffId());
            stmt.execute();
        } catch (SQLException e) {
            throw new RuntimeException("Update operation failed", e);
        }
    }

    public static void deleteEmployee(int staffId) {
        String sql = "DELETE FROM kudu_external_table WHERE staff_id = ?";
        try (Connection conn = obtainConnection();
             PreparedStatement stmt = conn.prepareStatement(sql)) {
            stmt.setInt(1, staffId);
            stmt.execute();
        } catch (SQLException e) {
            throw new RuntimeException("Delete operation failed", e);
        }
    }

    public static void fetchEmployeesByCompany(int companyId) {
        String sql = "SELECT company_id, staff_id, full_name, sex, avatar FROM kudu_external_table WHERE company_id = ?";
        try (Connection conn = obtainConnection();
             PreparedStatement stmt = conn.prepareStatement(sql)) {
            stmt.setInt(1, companyId);
            try (ResultSet rs = stmt.executeQuery()) {
                while (rs.next()) {
                    int cid = rs.getInt("company_id");
                    int sid = rs.getInt("staff_id");
                    String name = rs.getString("full_name");
                    String sex = rs.getString("sex");
                    String avatar = rs.getString("avatar");
                    System.out.printf("%d | %d | %s | %s | %s%n", cid, sid, name, sex, avatar);
                }
            }
        } catch (SQLException e) {
            throw new RuntimeException("SELECT query failed", e);
        }
    }
}

4. Kiểm thử chức năng

package demo.kudu;

public class KuduImpalaIntegrationTest {
    public static void main(String[] args) {
        // Thêm một nhân viên mới
        KuduViaImpala.insertEmployee(new EmployeeRecord(1001, 20240101, "Le Van A", "M", "avatar_a.jpg"));

        // Cập nhật thông tin
        EmployeeRecord updated = new EmployeeRecord(1001, 20240101, "Le Van A Updated", "F", "new_pic.png");
        KuduViaImpala.updateEmployeeInfo(updated);

        // Truy vấn theo công ty
        KuduViaImpala.fetchEmployeesByCompany(1001);

        // Xóa một bản ghi
        KuduViaImpala.deleteEmployee(20240101);
    }
}

Thẻ: Impala jdbc Kudu apache Kerberos-Authentication

Đăng vào ngày 7 tháng 6 lúc 04:18