Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[fix][meta] Check if metadata store is closed in RocksdbMetadataStore #22852

Merged
merged 1 commit into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,7 @@ public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCac
@Override
public CompletableFuture<Optional<GetResult>> get(String path) {
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
return alreadyClosedFailedFuture();
}
long start = System.currentTimeMillis();
if (!isValidPath(path)) {
Expand Down Expand Up @@ -286,8 +285,7 @@ public CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> exp
@Override
public final CompletableFuture<List<String>> getChildren(String path) {
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
return alreadyClosedFailedFuture();
}
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
Expand All @@ -298,8 +296,7 @@ public final CompletableFuture<List<String>> getChildren(String path) {
@Override
public final CompletableFuture<Boolean> exists(String path) {
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
return alreadyClosedFailedFuture();
}
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
Expand Down Expand Up @@ -362,8 +359,7 @@ public void accept(Notification n) {
public final CompletableFuture<Void> delete(String path, Optional<Long> expectedVersion) {
log.info("Deleting path: {} (v. {})", path, expectedVersion);
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
return alreadyClosedFailedFuture();
}
long start = System.currentTimeMillis();
if (!isValidPath(path)) {
Expand Down Expand Up @@ -414,8 +410,7 @@ private CompletableFuture<Void> deleteInternal(String path, Optional<Long> expec
public CompletableFuture<Void> deleteRecursive(String path) {
log.info("Deleting recursively path: {}", path);
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
return alreadyClosedFailedFuture();
}
return getChildren(path)
.thenCompose(children -> FutureUtil.waitForAll(
Expand All @@ -435,8 +430,7 @@ protected abstract CompletableFuture<Stat> storePut(String path, byte[] data, Op
public final CompletableFuture<Stat> put(String path, byte[] data, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options) {
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
return alreadyClosedFailedFuture();
}
long start = System.currentTimeMillis();
if (!isValidPath(path)) {
Expand Down Expand Up @@ -516,10 +510,15 @@ protected void receivedSessionEvent(SessionEvent event) {
}
}

private boolean isClosed() {
protected boolean isClosed() {
return isClosed.get();
}

protected static <T> CompletableFuture<T> alreadyClosedFailedFuture() {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
}

@Override
public void close() throws Exception {
executor.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,9 @@ public CompletableFuture<Optional<GetResult>> storeGet(String path) {
}
try {
dbStateLock.readLock().lock();
if (isClosed()) {
return alreadyClosedFailedFuture();
}
byte[] value = db.get(optionCache, toBytes(path));
if (value == null) {
return CompletableFuture.completedFuture(Optional.empty());
Expand Down Expand Up @@ -407,6 +410,9 @@ protected CompletableFuture<List<String>> getChildrenFromStore(String path) {
}
try {
dbStateLock.readLock().lock();
if (isClosed()) {
return alreadyClosedFailedFuture();
}
try (RocksIterator iterator = db.newIterator(optionDontCache)) {
Set<String> result = new HashSet<>();
String firstKey = path.equals("/") ? path : path + "/";
Expand Down Expand Up @@ -449,6 +455,9 @@ protected CompletableFuture<Boolean> existsFromStore(String path) {
}
try {
dbStateLock.readLock().lock();
if (isClosed()) {
return alreadyClosedFailedFuture();
}
byte[] value = db.get(optionDontCache, toBytes(path));
if (log.isDebugEnabled()) {
if (value != null) {
Expand All @@ -471,6 +480,9 @@ protected CompletableFuture<Void> storeDelete(String path, Optional<Long> expect
}
try {
dbStateLock.readLock().lock();
if (isClosed()) {
return alreadyClosedFailedFuture();
}
try (Transaction transaction = db.beginTransaction(writeOptions)) {
byte[] pathBytes = toBytes(path);
byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, true);
Expand Down Expand Up @@ -507,6 +519,9 @@ protected CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Lo
}
try {
dbStateLock.readLock().lock();
if (isClosed()) {
return alreadyClosedFailedFuture();
}
try (Transaction transaction = db.beginTransaction(writeOptions)) {
byte[] pathBytes = toBytes(path);
byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, true);
Expand Down
Loading