Vấn đề
Khi sử dụng RedisTemplate với tính năng hỗ trợ giao dịch (transaction) được bật (enableTransactionSupport = true) và kết hợp với annotation @Transactional của Spring, bạn có thể gặp tình huống: thực hiện một thao tác đọc Redis trong luồng hiện tại (ví dụ: trong một request web) nhưng kết quả nhận được lại là null, mặc dù dữ liệu đã tồn tại.
Nguyên nhân chính nằm ở cách RedisTemplate quản lý kết nối và cơ chế giao dịch của Redis. Hãy cùng phân tích quy trình xử lý để hiểu rõ vấn đề.
Quy trình xử lý kết nối trong RedisTemplate
Khi bạn gọi một phương thức thao tác với Redis (ví dụ: redisTemplate.opsForValue().get(key)), RedisTemplate sẽ thực thi thông qua phương thức lõi execute:
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {
Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
Assert.notNull(action, "Callback object must not be null");
RedisConnectionFactory factory = getRequiredConnectionFactory();
RedisConnection conn = null;
try {
// Kiểm tra nếu enableTransactionSupport là true, liên kết kết nối với luồng hiện tại
if (enableTransactionSupport) {
conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
} else {
conn = RedisConnectionUtils.getConnection(factory);
}
boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
RedisConnection connToUse = preProcessConnection(conn, existingConnection);
// ... pipeline handling ...
T result = action.doInRedis(connToUse);
// ... pipeline cleanup ...
return postProcessResult(result, connToUse, existingConnection);
} finally {
RedisConnectionUtils.releaseConnection(conn, factory);
}
}
Trong đó, RedisConnectionUtils là lớp tiện ích dùng để lấy kết nối. Nếu enableTransactionSupport = true, phương thức bindConnection sẽ được gọi:
public static RedisConnection bindConnection(RedisConnectionFactory factory, boolean enableTransactionSupport) {
return doGetConnection(factory, true, true, enableTransactionSupport);
}
private static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,
boolean enableTransactionSupport) {
Assert.notNull(factory, "No RedisConnectionFactory specified");
RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);
if (connHolder != null) {
if (enableTransactionSupport) {
potentiallyRegisterTransactionSynchronisation(connHolder, factory);
}
return connHolder.getConnection();
}
if (!allowCreate) {
throw new IllegalArgumentException("No connection found and allowCreate = false");
}
RedisConnection conn = factory.getConnection();
if (bind) {
RedisConnection connectionToBind = conn;
// Nếu đang trong giao dịch @Transactional và không phải read-only, tạo proxy cho kết nối
if (enableTransactionSupport && isActualNonReadonlyTransactionActive()) {
connectionToBind = createConnectionProxy(conn, factory);
}
connHolder = new RedisConnectionHolder(connectionToBind);
TransactionSynchronizationManager.bindResource(factory, connHolder);
if (enableTransactionSupport) {
potentiallyRegisterTransactionSynchronisation(connHolder, factory);
}
return connHolder.getConnection();
}
return conn;
}
Phương thức potentiallyRegisterTransactionSynchronisation kiểm tra xem có giao dịch Spring đang hoạt động hay không (thông qua @Transactional). Nếu có, nó sẽ thực hiện:
- Gọi
conn.multi()để bắt đầu giao dịch Redis. - Đăng ký một callback (
RedisTransactionSynchronizer) để commit hoặc rollback giao dịch Redis khi giao dịch Spring kết thúc.
Proxy kết nối và vấn đề đọc dữ liệu
Khi ở trong giao dịch, nếu isActualNonReadonlyTransactionActive() trả về true, một proxy kết nối sẽ được tạo ra thông qua createConnectionProxy:
private static RedisConnection createConnectionProxy(RedisConnection connection, RedisConnectionFactory factory) {
ProxyFactory proxyFactory = new ProxyFactory(connection);
proxyFactory.addAdvice(new ConnectionSplittingInterceptor(factory));
return RedisConnection.class.cast(proxyFactory.getProxy());
}
Proxy này sử dụng ConnectionSplittingInterceptor để chặn tất cả các lời gọi phương thức trên RedisConnection:
public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
RedisCommand commandToExecute = RedisCommand.failsafeCommandLookup(method.getName());
// Nếu là lệnh đọc, mở kết nối mới (không dùng giao dịch)
if (isPotentiallyThreadBoundCommand(commandToExecute)) {
return invoke(method, obj, args);
}
// Ngược lại, mở kết nối mới, thực thi và đóng ngay
RedisConnection connection = factory.getConnection();
try {
return invoke(method, connection, args);
} finally {
if (!connection.isClosed()) {
connection.close();
}
}
}
Như vậy, nếu kết nối là proxy, các lệnh đọc (read command) sẽ được thực thi trên một kết nối riêng, không nằm trong giao dịch, và kết quả được trả về ngay lập tức. Các lệnh ghi (write command) vẫn được đưa vào giao dịch.
Nguyên nhân lỗi
Vấn đề xảy ra khi bạn thực hiện các bước sau trong cùng một luồng:
- Gọi một thao tác Redis không có
@Transactional(ví dụ: gọi trực tiếpredisTemplate.opsForValue().get(key)). - Sau đó gọi một thao tác Redis có
@Transactional(ví dụ: gọi phương thức service được đánh dấu@Transactional).
Kết quả: Ở bước 1, một kết nối Redis được tạo và liên kết với luồng hiện tại (do bindConnection được gọi). Kết nối này không phải proxy. Ở bước 2, khi TransactionSynchronizationManager.getResource(factory) được gọi, nó thấy đã có kết nối trong luồng (kết nối từ bước 1) và trả về kết nối đó. Vì kết nối này không phải proxy, tất cả các lệnh, bao gồm cả lệnh đọc, đều được thực thi trong giao dịch Redis. Lệnh đọc sẽ không trả về kết quả ngay mà chờ đến khi giao dịch được commit (gọi exec), dẫn đến kết quả trả về là null.
Giải pháp
Để khắc phục, bạn có thể đảm bảo rằng trước khi bắt đầu giao dịch, kết nối cũ (không proxy) được giải phóng và kết nối mới (proxy) được tạo lại.
Giải pháp 1: Giải phóng kết nối thủ công
Sau khi thực hiện thao tác Redis không giao dịch, bạn có thể gọi TransactionSynchronizationManager.unbindResource() để gỡ kết nối khỏi luồng:
public void getRedis() {
Object value = redisTemplate.opsForValue().get("mykey");
TransactionSynchronizationManager.unbindResource(redisTemplate.getConnectionFactory());
System.out.println(value);
}
Giải pháp 2: Tạo RedisTemplate tùy chỉnh (CustomRedisTemplate)
Bạn có thể kế thừa RedisTemplate và ghi đè phương thức preProcessConnection. Trong phương thức này, kiểm tra nếu kết nối hiện tại không phải proxy nhưng luồng đang ở trong giao dịch, thì giải phóng nó và tạo lại kết nối proxy:
public class CustomRedisTemplate<K, V> extends RedisTemplate<K, V> {
private boolean enableTransactionSupport = false;
private static boolean isActualNonReadonlyTransactionActive() {
return TransactionSynchronizationManager.isActualTransactionActive()
&& !TransactionSynchronizationManager.isCurrentTransactionReadOnly();
}
@Override
protected RedisConnection preProcessConnection(RedisConnection connection, boolean existingConnection) {
if (existingConnection && !Proxy.isProxyClass(connection.getClass()) && isActualNonReadonlyTransactionActive()) {
// Giải phóng kết nối cũ
RedisConnectionUtils.unbindConnection(getConnectionFactory());
// Giữ lại các synchronization hiện tại (trừ cái cuối cùng liên quan đến kết nối cũ)
List<TransactionSynchronization> list = new ArrayList<>(TransactionSynchronizationManager.getSynchronizations());
TransactionSynchronizationManager.clearSynchronization();
TransactionSynchronizationManager.initSynchronization();
list.remove(list.size() - 1);
list.forEach(TransactionSynchronizationManager::registerSynchronization);
// Tạo kết nối mới (proxy)
connection = RedisConnectionUtils.bindConnection(getConnectionFactory(), enableTransactionSupport);
}
return connection;
}
@Override
public void setEnableTransactionSupport(boolean enableTransactionSupport) {
super.setEnableTransactionSupport(enableTransactionSupport);
this.enableTransactionSupport = enableTransactionSupport;
}
}