From 88a2a0ef18d05fd0059b638cc6556396568aecbb Mon Sep 17 00:00:00 2001 From: pyalex Date: Mon, 13 Jul 2020 17:23:39 +0300 Subject: [PATCH 1/2] fix race condition --- .../io/gcp/bigquery/BatchLoadsWithResult.java | 90 +++++++++++-------- 1 file changed, 54 insertions(+), 36 deletions(-) diff --git a/storage/connectors/bigquery/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadsWithResult.java b/storage/connectors/bigquery/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadsWithResult.java index 69dcc999e7b..16edfedff60 100644 --- a/storage/connectors/bigquery/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadsWithResult.java +++ b/storage/connectors/bigquery/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadsWithResult.java @@ -5,12 +5,10 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.auto.value.AutoValue; - import java.util.Collections; import java.util.List; import java.util.Set; import javax.annotation.Nullable; - import org.apache.beam.sdk.coders.*; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.state.*; @@ -26,7 +24,7 @@ @AutoValue public abstract class BatchLoadsWithResult extends PTransform< - PCollection>, PCollection>> { + PCollection>, PCollection>> { static final Logger LOG = LoggerFactory.getLogger(BatchLoadsWithResult.class); @VisibleForTesting @@ -140,7 +138,8 @@ public PCollection> expand( PCollection> results = input - .apply("WindowWithTrigger", + .apply( + "WindowWithTrigger", Window.>configure() .triggering( Repeatedly.forever( @@ -161,8 +160,8 @@ public void process(ProcessContext c) { .apply( "WriteGroupedRecords", ParDo.of( - new WriteGroupedRecordsToFiles<>( - tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory())) + new WriteGroupedRecordsToFiles<>( + tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory())) .withSideInputs(tempFilePrefixView)) .setCoder(WriteBundlesToFiles.ResultCoder.of(getDestinationCoder())); @@ -176,8 +175,7 @@ tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory())) results .apply( Window.>configure() - .triggering(DefaultTrigger.of()) - ) + .triggering(DefaultTrigger.of())) .apply("AttachSingletonKey", WithKeys.of((Void) null)) .setCoder( KvCoder.of( @@ -187,15 +185,15 @@ tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory())) .apply( "WritePartitionTriggered", ParDo.of( - new WritePartition<>( - false, - getDynamicDestinations(), - tempFilePrefixView, - DEFAULT_MAX_FILES_PER_PARTITION, - DEFAULT_MAX_BYTES_PER_PARTITION, - multiPartitionsTag, - singlePartitionTag, - getRowWriterFactory())) + new WritePartition<>( + false, + getDynamicDestinations(), + tempFilePrefixView, + DEFAULT_MAX_FILES_PER_PARTITION, + DEFAULT_MAX_BYTES_PER_PARTITION, + multiPartitionsTag, + singlePartitionTag, + getRowWriterFactory())) .withSideInputs(tempFilePrefixView) .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); @@ -210,6 +208,21 @@ tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory())) partitions.get(singlePartitionTag), loadJobIdPrefixView.apply(View.asSingleton())); } + /** + * Generates one jobId per window only if any feature row was submitted in this window. We need to + * generate exactly one id per window, otherwise SingletonView will fail. To achieve this we have + * to use stateful DoFn that remembers that in current window id was generated by using {@link + * ValueState}. To prevent race condition in accessing {@link ValueState} we group all elements + * into single batch and ensure that processing will occur on single worker. Regular + * AfterWatermark trigger is used with early triggering on first element in pane, since we only + * interested in one first element and want to minimize other triggers. To keep memory footprint + * as low as possible we map all {@link feast.proto.types.FeatureRowProto.FeatureRow} to {@link + * Void} since we interested only in events (fact that there is some feature rows) and not the + * data. + * + * @param input feature Rows + * @return job id generated once per input's window + */ private PCollection createLoadJobIdPrefixView( PCollection> input) { // We generate new JobId per each (input) window @@ -218,40 +231,42 @@ private PCollection createLoadJobIdPrefixView( // So generated ids can be applied as side input return input .apply( - "EraseKey", + "EraseKeyAndValue", ParDo.of( - new DoFn, KV>() { + new DoFn, KV>() { @ProcessElement public void process(ProcessContext c) { - c.output(KV.of(null, c.element().getValue())); + // we don't need data, only fact of data existing + c.output(KV.of(null, null)); } })) + .apply( + "TriggerOnFirstElement", + Window.>configure() + .triggering( + AfterWatermark.pastEndOfWindow() + // interested only in first element + .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()))) + .apply("CollectToOneWorker", GroupByKey.create()) .apply( "CreateJobId", ParDo.of( - new DoFn, String>() { - @StateId("oncePerWindow") - private final StateSpec> oncePerWindow = StateSpecs.set(BooleanCoder.of()); + new DoFn>, String>() { + @StateId("generatedForWindow") + private final StateSpec> generatedForWindow = + StateSpecs.value(BooleanCoder.of()); @ProcessElement public void process( ProcessContext c, BoundedWindow w, - @StateId("oncePerWindow") SetState oncePerWindow) { + @StateId("generatedForWindow") ValueState generatedForWindow) { - // if set already contains something - // it means we already generated Id for this window - Boolean empty = oncePerWindow.isEmpty().read(); - if (empty != null && !empty) { + if (generatedForWindow.read() != null) { return; } - // trying to add to Set and check if it was added - // if true - we won and Id will be generated in current Process - Boolean insertResult = oncePerWindow.addIfAbsent(true).read(); - if (insertResult != null && !insertResult) { - return; - } + generatedForWindow.write(true); c.output( String.format( @@ -259,7 +274,11 @@ public void process( c.getPipelineOptions().getJobName().replaceAll("-", ""), BigQueryHelpers.randomUUIDString())); - LOG.info("Pane {}, start: {}, last: {}", c.pane().getIndex(), c.pane().isFirst(), c.pane().isLast()); + LOG.info( + "Pane {}, start: {}, last: {}", + c.pane().getIndex(), + c.pane().isFirst(), + c.pane().isLast()); LOG.info("[BQ] New window {}, {}", c.timestamp(), w.maxTimestamp()); } })); @@ -321,5 +340,4 @@ PCollection> writeSinglePartitionWithResult( true, getSchemaUpdateOptions())); } - } From e7c4c0b1d187e48a732a0027c9edd24c66ee8881 Mon Sep 17 00:00:00 2001 From: pyalex Date: Thu, 16 Jul 2020 08:09:03 +0300 Subject: [PATCH 2/2] [bq] temp file prefix in global window --- .../io/gcp/bigquery/BatchLoadsWithResult.java | 95 +++++++------------ 1 file changed, 32 insertions(+), 63 deletions(-) diff --git a/storage/connectors/bigquery/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadsWithResult.java b/storage/connectors/bigquery/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadsWithResult.java index 16edfedff60..879d518e6a6 100644 --- a/storage/connectors/bigquery/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadsWithResult.java +++ b/storage/connectors/bigquery/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadsWithResult.java @@ -9,9 +9,9 @@ import java.util.List; import java.util.Set; import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.*; import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.state.*; import org.apache.beam.sdk.transforms.*; import org.apache.beam.sdk.transforms.windowing.*; import org.apache.beam.sdk.values.*; @@ -132,9 +132,9 @@ public PCollection> expand( input.getWindowingStrategy().getWindowFn() instanceof FixedWindows, "Input to BQ writer must be windowed in advance"); - final PCollection loadJobIdPrefixView = createLoadJobIdPrefixView(input); + final PCollectionView loadJobIdPrefixView = createLoadJobIdPrefixView(input); final PCollectionView tempFilePrefixView = - createTempFilePrefixView(loadJobIdPrefixView); + createTempFilePrefixView(input.getPipeline()); PCollection> results = input @@ -204,94 +204,60 @@ tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory())) ShardedKeyCoder.of(NullableCoder.of(getDestinationCoder())), ListCoder.of(StringUtf8Coder.of()))); - return writeSinglePartitionWithResult( - partitions.get(singlePartitionTag), loadJobIdPrefixView.apply(View.asSingleton())); + return writeSinglePartitionWithResult(partitions.get(singlePartitionTag), loadJobIdPrefixView); } /** * Generates one jobId per window only if any feature row was submitted in this window. We need to - * generate exactly one id per window, otherwise SingletonView will fail. To achieve this we have - * to use stateful DoFn that remembers that in current window id was generated by using {@link - * ValueState}. To prevent race condition in accessing {@link ValueState} we group all elements - * into single batch and ensure that processing will occur on single worker. Regular - * AfterWatermark trigger is used with early triggering on first element in pane, since we only - * interested in one first element and want to minimize other triggers. To keep memory footprint - * as low as possible we map all {@link feast.proto.types.FeatureRowProto.FeatureRow} to {@link - * Void} since we interested only in events (fact that there is some feature rows) and not the - * data. + * generate exactly one id per window, otherwise SingletonView will fail. * * @param input feature Rows * @return job id generated once per input's window */ - private PCollection createLoadJobIdPrefixView( + private PCollectionView createLoadJobIdPrefixView( PCollection> input) { // We generate new JobId per each (input) window // To keep BQ job's name unique // Windowing of this generator is expected to be synchronized with input window // So generated ids can be applied as side input + + String baseName = input.getPipeline().getOptions().getJobName().replaceAll("-", ""); + return input .apply( "EraseKeyAndValue", ParDo.of( - new DoFn, KV>() { + new DoFn, String>() { @ProcessElement public void process(ProcessContext c) { // we don't need data, only fact of data existing - c.output(KV.of(null, null)); + c.output(""); } })) .apply( - "TriggerOnFirstElement", - Window.>configure() - .triggering( - AfterWatermark.pastEndOfWindow() - // interested only in first element - .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()))) - .apply("CollectToOneWorker", GroupByKey.create()) - .apply( - "CreateJobId", - ParDo.of( - new DoFn>, String>() { - @StateId("generatedForWindow") - private final StateSpec> generatedForWindow = - StateSpecs.value(BooleanCoder.of()); - - @ProcessElement - public void process( - ProcessContext c, - BoundedWindow w, - @StateId("generatedForWindow") ValueState generatedForWindow) { - - if (generatedForWindow.read() != null) { - return; - } - - generatedForWindow.write(true); - - c.output( - String.format( - "beam_load_%s_%s", - c.getPipelineOptions().getJobName().replaceAll("-", ""), - BigQueryHelpers.randomUUIDString())); - - LOG.info( - "Pane {}, start: {}, last: {}", - c.pane().getIndex(), - c.pane().isFirst(), - c.pane().isLast()); - LOG.info("[BQ] New window {}, {}", c.timestamp(), w.maxTimestamp()); - } - })); + Combine.globally( + (SerializableFunction, String>) + g -> + String.format( + "beam_load_%s_%s", baseName, BigQueryHelpers.randomUUIDString())) + .withoutDefaults()) + .apply("JobIdView", View.asSingleton()); } - private PCollectionView createTempFilePrefixView(final PCollection jobId) { - return jobId + /** + * Generates one global (per all windows) prefix path to store files before load to BQ + * + * @param p Pipeline + * @return view in global window + */ + private PCollectionView createTempFilePrefixView(final Pipeline p) { + return p.apply("CreateGlobalTempPrefix", Create.of("")) .apply( "GetTempFilePrefix", ParDo.of( new DoFn() { @ProcessElement - public void getTempFilePrefix(ProcessContext c, BoundedWindow w) { + public void getTempFilePrefix(ProcessContext c) { String tempLocationRoot; if (getCustomGcsTempLocation() != null) { tempLocationRoot = getCustomGcsTempLocation().get(); @@ -299,8 +265,11 @@ public void getTempFilePrefix(ProcessContext c, BoundedWindow w) { tempLocationRoot = c.getPipelineOptions().getTempLocation(); } String tempLocation = - resolveTempLocation(tempLocationRoot, "BigQueryWriteTemp", c.element()); - LOG.info("[BQ] temp location generated {}, {}", tempLocation, w.maxTimestamp()); + resolveTempLocation( + tempLocationRoot, + "BigQueryWriteTemp", + c.getPipelineOptions().getJobName()); + c.output(tempLocation); } }))