diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockUpdateOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockUpdateOperator.java index 3693aeee65..d11b78b33a 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockUpdateOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockUpdateOperator.java @@ -477,6 +477,64 @@ private void resolveKeyChange(Vertex vertex, PessimisticLockUpdateParam param, C if (ByteArrayUtils.equal(key, oldKey)) { return; } + StoreInstance localStore = Services.LOCAL_STORE.getInstance(tableId, partId); + byte[] partIdByte = partId.encode(); + byte[] lockKeyBytes = encode( + CommonId.CommonType.TXN_CACHE_LOCK, + oldKey, + Op.LOCK.getCode(), + len, + txnIdByte, + tableIdByte, + partIdByte + ); + KeyValue keyValue = localStore.get(lockKeyBytes); + if (keyValue != null) { + LogUtils.debug(log, "{}, repeat resolveKeyChange key :{} keyValue is not null", + txnId, Arrays.toString(key)); + // data + byte[] dataKey = getKeyByOp(CommonId.CommonType.TXN_CACHE_DATA, Op.PUT, lockKeyBytes); + byte[] deleteKey = Arrays.copyOf(dataKey, dataKey.length); + deleteKey[deleteKey.length - 2] = (byte) Op.DELETE.getCode(); + byte[] insertKey = Arrays.copyOf(dataKey, dataKey.length); + insertKey[insertKey.length - 2] = (byte) Op.PUTIFABSENT.getCode(); + if (localStore.get(dataKey) != null) { + byte[] value = localStore.get(dataKey).getValue(); + localStore.put(new KeyValue(deleteKey, value)); + localStore.delete(dataKey); + // extraKeyValue + KeyValue extraKeyValue = new KeyValue( + ByteUtils.encode( + CommonId.CommonType.TXN_CACHE_EXTRA_DATA, + oldKey, + Op.PUT.getCode(), + len, + jobIdByte, + tableIdByte, + partIdByte), + value + ); + localStore.put(extraKeyValue); + } else if (localStore.get(insertKey) != null) { + byte[] value = localStore.get(insertKey).getValue(); + localStore.put(new KeyValue(deleteKey, value)); + localStore.delete(insertKey); + // extraKeyValue + KeyValue extraKeyValue = new KeyValue( + ByteUtils.encode( + CommonId.CommonType.TXN_CACHE_EXTRA_DATA, + oldKey, + Op.PUTIFABSENT.getCode(), + len, + jobIdByte, + tableIdByte, + partIdByte), + value + ); + localStore.put(extraKeyValue); + } + return; + } byte[] originalKey; if (isVector) { originalKey = codec.encodeKeyPrefix(newTuple, 1); @@ -487,9 +545,7 @@ private void resolveKeyChange(Vertex vertex, PessimisticLockUpdateParam param, C } else { originalKey = oldKey; } - StoreInstance localStore = Services.LOCAL_STORE.getInstance(tableId, partId); StoreInstance kvStore = Services.KV_STORE.getInstance(tableId, partId); - byte[] partIdByte = partId.encode(); // for check deadLock byte[] deadLockKeyBytes = encode( CommonId.CommonType.TXN_CACHE_BLOCK_LOCK,