Skip to content

Commit

Permalink
[dingo-raft-store] async execute read index operation
Browse files Browse the repository at this point in the history
  • Loading branch information
JYcz authored and astor-oss committed Jul 2, 2022
1 parent a4a1f9c commit 0390165
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,20 @@ private class ReadIndexClosure<T> extends io.dingodb.raft.closure.ReadIndexClosu
@Override
public void run(Status status, long index, byte[] reqCtx) {
if (status.isOk()) {
future.complete((T) executeFunc.apply(operation));
Executors.execute("read-index-exec", () -> future.complete((T) executeFunc.apply(operation)));
return;
}
executor.execute(() -> {
if (node.isLeader()) {
log.warn("Fail to [get] with 'ReadIndex': {}, try to applying to the state machine.", status);
// If 'read index' read fails, try to applying to the state machine at the leader node
RaftRawKVOperation.sync().applyOnNode(node).whenComplete((r, e) -> {
RaftRawKVOperation.sync().applyOnNode(node).whenCompleteAsync((r, e) -> {
if (e == null) {
future.complete((T) executeFunc.apply(operation));
} else {
future.completeExceptionally(e);
}
});
}, Executors.executor("read-index-exec"));
} else {
log.warn("Fail to [get] with 'ReadIndex': {}.", status);
// Client will retry to leader node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -170,14 +171,16 @@ public boolean delete(List<byte[]> keys) {
public boolean delete(byte[] startKey, byte[] endKey) {
try {
if (endKey == null) {
try (RocksIterator iterator = this.db.newIterator()) {
try (RocksIterator iterator = this.db.newIterator();) {
iterator.seekToLast();
if (iterator.isValid()) {
try (final WriteBatch batch = new WriteBatch()) {
endKey = iterator.key();
batch.delete(endKey);
batch.deleteRange(startKey, endKey);
this.db.write(this.writeOptions, batch);
this.db.deleteFilesInRanges(this.db.getDefaultColumnFamily(), Arrays.asList(startKey, endKey), true);
this.db.compactRange(startKey, endKey);
return true;
}
} else {
Expand All @@ -186,7 +189,9 @@ public boolean delete(byte[] startKey, byte[] endKey) {
}
}
} else {
this.db.deleteFilesInRanges(this.db.getDefaultColumnFamily(), Arrays.asList(startKey, endKey), true);
this.db.deleteRange(this.writeOptions, startKey, endKey);
this.db.compactRange(startKey, endKey);
return true;
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ private void sendStats() {
log.warn("Report stats but current node not leader.");
execute("cancel-report-stats", () -> scheduledFuture.cancel(true));
}
SeekableIterator<byte[], ByteArrayEntry> iterator = store.scan(part.getStart(), part.getEnd()).join();
List<ApproximateStats> approximateStats = new ArrayList<>();
if (collectStats) {
SeekableIterator<byte[], ByteArrayEntry> iterator = store.scan(part.getStart(), part.getEnd()).join();
long count = 0;
long size = 0;
byte[] startKey = null;
Expand Down

0 comments on commit 0390165

Please # to comment.