Skip to content

Commit

Permalink
[Delta-Flink Sink] Compute txnVersion lazily to reduce CPU utilizat…
Browse files Browse the repository at this point in the history
…ion on commit (#532)
  • Loading branch information
scottsand-db authored Apr 10, 2023
1 parent 7edb182 commit 5759de8
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 4 deletions.
39 changes: 39 additions & 0 deletions flink/src/main/java/io/delta/flink/internal/lang/Lazy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.flink.internal.lang;

import java.util.Optional;
import java.util.function.Supplier;

public class Lazy<T> {
private final Supplier<T> supplier;
private Optional<T> instance = Optional.empty();

public Lazy(Supplier<T> supplier) {
this.supplier = supplier;
}

/** Not thread safe. */
public T get() {
if (!instance.isPresent()) {
instance = Optional.of(supplier.get());
}
return instance.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.delta.flink.internal.ConnectorUtils;
import io.delta.flink.internal.lang.Lazy;
import io.delta.flink.sink.internal.SchemaConverter;
import io.delta.flink.sink.internal.committables.DeltaCommittable;
import io.delta.flink.sink.internal.committables.DeltaGlobalCommittable;
Expand Down Expand Up @@ -271,16 +272,22 @@ private SortedMap<Long, List<CheckpointData>> getCommittablesPerCheckpoint(
List<DeltaGlobalCommittable> globalCommittables,
DeltaLog deltaLog) {

OptimisticTransaction transaction = deltaLog.startTransaction();
long tableVersion = transaction.txnVersion(appId);
// The last committed table version by THIS flink application.
//
// We can access this value using the thread-unsafe `Lazy::get` because Flink's threading
// model guarantees that GlobalCommitter::commit will be executed by a single thread.
Lazy<Long> lastCommittedTableVersion =
new Lazy<>(() -> deltaLog.startTransaction().txnVersion(appId));

if (!this.firstCommit || tableVersion < 0) {
// Keep `lastCommittedTableVersion.get() < 0` as the second predicate in the OR statement
// below since it is expensive and we should avoid computing it if possible.
if (!this.firstCommit || lastCommittedTableVersion.get() < 0) {
// normal run
return groupCommittablesByCheckpointInterval(globalCommittables);
} else {
// processing recovery, deduplication on recovered committables.
Collection<CheckpointData> deDuplicateData =
deduplicateFiles(globalCommittables, deltaLog, tableVersion);
deduplicateFiles(globalCommittables, deltaLog, lastCommittedTableVersion.get());

return groupCommittablesByCheckpointInterval(deDuplicateData);
}
Expand Down

0 comments on commit 5759de8

Please # to comment.