From 5759de83f4ba135cd9b25328e8cac6aad02e8f47 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Mon, 10 Apr 2023 13:45:21 -0700 Subject: [PATCH] [Delta-Flink Sink] Compute `txnVersion` lazily to reduce CPU utilization on commit (#532) --- .../io/delta/flink/internal/lang/Lazy.java | 39 +++++++++++++++++++ .../committer/DeltaGlobalCommitter.java | 15 +++++-- 2 files changed, 50 insertions(+), 4 deletions(-) create mode 100644 flink/src/main/java/io/delta/flink/internal/lang/Lazy.java diff --git a/flink/src/main/java/io/delta/flink/internal/lang/Lazy.java b/flink/src/main/java/io/delta/flink/internal/lang/Lazy.java new file mode 100644 index 00000000000..91622bd25b7 --- /dev/null +++ b/flink/src/main/java/io/delta/flink/internal/lang/Lazy.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.flink.internal.lang; + +import java.util.Optional; +import java.util.function.Supplier; + +public class Lazy { + private final Supplier supplier; + private Optional instance = Optional.empty(); + + public Lazy(Supplier supplier) { + this.supplier = supplier; + } + + /** Not thread safe. */ + public T get() { + if (!instance.isPresent()) { + instance = Optional.of(supplier.get()); + } + return instance.get(); + } +} diff --git a/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java b/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java index 05b59239e96..3dc8387cd55 100644 --- a/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java +++ b/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java @@ -38,6 +38,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.delta.flink.internal.ConnectorUtils; +import io.delta.flink.internal.lang.Lazy; import io.delta.flink.sink.internal.SchemaConverter; import io.delta.flink.sink.internal.committables.DeltaCommittable; import io.delta.flink.sink.internal.committables.DeltaGlobalCommittable; @@ -271,16 +272,22 @@ private SortedMap> getCommittablesPerCheckpoint( List globalCommittables, DeltaLog deltaLog) { - OptimisticTransaction transaction = deltaLog.startTransaction(); - long tableVersion = transaction.txnVersion(appId); + // The last committed table version by THIS flink application. + // + // We can access this value using the thread-unsafe `Lazy::get` because Flink's threading + // model guarantees that GlobalCommitter::commit will be executed by a single thread. + Lazy lastCommittedTableVersion = + new Lazy<>(() -> deltaLog.startTransaction().txnVersion(appId)); - if (!this.firstCommit || tableVersion < 0) { + // Keep `lastCommittedTableVersion.get() < 0` as the second predicate in the OR statement + // below since it is expensive and we should avoid computing it if possible. + if (!this.firstCommit || lastCommittedTableVersion.get() < 0) { // normal run return groupCommittablesByCheckpointInterval(globalCommittables); } else { // processing recovery, deduplication on recovered committables. Collection deDuplicateData = - deduplicateFiles(globalCommittables, deltaLog, tableVersion); + deduplicateFiles(globalCommittables, deltaLog, lastCommittedTableVersion.get()); return groupCommittablesByCheckpointInterval(deDuplicateData); }