Impala cho phép lập trình viên kết nối thông qua JDBC. Nhờ đó, có thể sử dụng Impala như một trung gian để thực hiện các thao tác với Kudu — hệ thống lưu trữ cột có hỗ trợ truy vấn thời gian thực.
1. Cấu hình phụ thuộc Maven
<!-- Impala JDBC 4.1 driver -->
<dependency>
<groupId>com.cloudera</groupId>
<artifactId>ImpalaJDBC41</artifactId>
<version>2.6.4</version>
</dependency>
2. Mô hình dữ liệu và đối tượng nghiệp vụ
package demo.kudu;
/**
* Modelo dữ liệu biểu diễn bản ghi nhân viên.
*/
public class EmployeeRecord {
private int companyId;
private int staffId;
private String fullName;
private String sex;
private String avatar;
public EmployeeRecord(int companyId, int staffId, String fullName, String sex, String avatar) {
this.companyId = companyId;
this.staffId = staffId;
this.fullName = fullName;
this.sex = sex;
this.avatar = avatar;
}
// Các getter và setter
public int getCompanyId() { return companyId; }
public int getStaffId() { return staffId; }
public String getFullName() { return fullName; }
public String getSex() { return sex; }
public String getAvatar() { return avatar; }
public void setCompanyId(int companyId) { this.companyId = companyId; }
public void setStaffId(int staffId) { this.staffId = staffId; }
public void setFullName(String fullName) { this.fullName = fullName; }
public void setSex(String sex) { this.sex = sex; }
public void setAvatar(String avatar) { this.avatar = avatar; }
}
3.库 lớp kết nối và thao tác cơ sở dữ liệu
package demo.kudu;
import java.sql.*;
public class KuduViaImpala {
private static final String DRIVER_CLASS = "com.cloudera.impala.jdbc41.Driver";
private static final String JDBC_URL = "jdbc:impala://node1.example.com:21050/production_db;auth=noSasl";
private static Connection connection;
static {
try {
Class.forName(DRIVER_CLASS);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Failed to load JDBC driver", e);
}
}
public static Connection obtainConnection() {
try {
if (connection == null || connection.isClosed()) {
connection = DriverManager.getConnection(JDBC_URL);
}
} catch (SQLException e) {
throw new RuntimeException("Database connection failed", e);
}
return connection;
}
public static void closeQuietly(AutoCloseable... resources) {
for (AutoCloseable r : resources) {
if (r != null) {
try {
r.close();
} catch (Exception ignored) {
}
}
}
}
public static void insertEmployee(EmployeeRecord record) {
String sql = "INSERT INTO kudu_external_table (company_id, staff_id, full_name, sex, avatar) VALUES (?, ?, ?, ?, ?)";
try (Connection conn = obtainConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setInt(1, record.getCompanyId());
stmt.setInt(2, record.getStaffId());
stmt.setString(3, record.getFullName());
stmt.setString(4, record.getSex());
stmt.setString(5, record.getAvatar());
stmt.execute();
} catch (SQLException e) {
throw new RuntimeException("Insert operation failed", e);
}
}
public static void updateEmployeeInfo(EmployeeRecord record) {
String sql = "UPDATE kudu_external_table SET full_name = ?, sex = ?, avatar = ? WHERE staff_id = ?";
try (Connection conn = obtainConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, record.getFullName());
stmt.setString(2, record.getSex());
stmt.setString(3, record.getAvatar());
stmt.setInt(4, record.getStaffId());
stmt.execute();
} catch (SQLException e) {
throw new RuntimeException("Update operation failed", e);
}
}
public static void deleteEmployee(int staffId) {
String sql = "DELETE FROM kudu_external_table WHERE staff_id = ?";
try (Connection conn = obtainConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setInt(1, staffId);
stmt.execute();
} catch (SQLException e) {
throw new RuntimeException("Delete operation failed", e);
}
}
public static void fetchEmployeesByCompany(int companyId) {
String sql = "SELECT company_id, staff_id, full_name, sex, avatar FROM kudu_external_table WHERE company_id = ?";
try (Connection conn = obtainConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setInt(1, companyId);
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
int cid = rs.getInt("company_id");
int sid = rs.getInt("staff_id");
String name = rs.getString("full_name");
String sex = rs.getString("sex");
String avatar = rs.getString("avatar");
System.out.printf("%d | %d | %s | %s | %s%n", cid, sid, name, sex, avatar);
}
}
} catch (SQLException e) {
throw new RuntimeException("SELECT query failed", e);
}
}
}
4. Kiểm thử chức năng
package demo.kudu;
public class KuduImpalaIntegrationTest {
public static void main(String[] args) {
// Thêm một nhân viên mới
KuduViaImpala.insertEmployee(new EmployeeRecord(1001, 20240101, "Le Van A", "M", "avatar_a.jpg"));
// Cập nhật thông tin
EmployeeRecord updated = new EmployeeRecord(1001, 20240101, "Le Van A Updated", "F", "new_pic.png");
KuduViaImpala.updateEmployeeInfo(updated);
// Truy vấn theo công ty
KuduViaImpala.fetchEmployeesByCompany(1001);
// Xóa một bản ghi
KuduViaImpala.deleteEmployee(20240101);
}
}