diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index 76fc1a70b78c..0bc984877217 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -61,6 +61,8 @@
import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
+import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -236,6 +238,27 @@
* destination-dependent: every window/pane for every destination will use the same number of shards
* specified via {@link Write#withNumShards} or {@link Write#withSharding}.
*
+ *
Handling Errors
+ *
+ * When using dynamic destinations, or when using a formatting function to format a record for
+ * writing, it's possible for an individual record to be malformed, causing an exception. By
+ * default, these exceptions are propagated to the runner causing the bundle to fail. These are
+ * usually retried, though this depends on the runner. Alternately, these errors can be routed to
+ * another {@link PTransform} by using {@link Write#withBadRecordErrorHandler(ErrorHandler)}. The
+ * ErrorHandler is registered with the pipeline (see below). See {@link ErrorHandler} for more
+ * documentation. Of note, this error handling only handles errors related to specific records. It
+ * does not handle errors related to connectivity, authorization, etc. as those should be retried by
+ * the runner.
+ *
+ *
{@code
+ * PCollection<> records = ...;
+ * PTransform,?> alternateSink = ...;
+ * try (BadRecordErrorHandler> handler = pipeline.registerBadRecordErrorHandler(alternateSink) {
+ * records.apply("Write", FileIO.writeDynamic().otherConfigs()
+ * .withBadRecordErrorHandler(handler));
+ * }
+ * }
+ *
* Writing custom types to sinks
*
* Normally, when writing a collection of a custom type using a {@link Sink} that takes a
@@ -1016,6 +1039,8 @@ public static FileNaming relativeFileNaming(
abstract boolean getNoSpilling();
+ abstract @Nullable ErrorHandler getBadRecordErrorHandler();
+
abstract Builder toBuilder();
@AutoValue.Builder
@@ -1062,6 +1087,9 @@ abstract Builder setSharding(
abstract Builder setNoSpilling(boolean noSpilling);
+ abstract Builder setBadRecordErrorHandler(
+ @Nullable ErrorHandler badRecordErrorHandler);
+
abstract Write build();
}
@@ -1288,6 +1316,18 @@ public Write withNoSpilling() {
return toBuilder().setNoSpilling(true).build();
}
+ /**
+ * Configures a new {@link Write} with an ErrorHandler. For configuring an ErrorHandler, see
+ * {@link ErrorHandler}. Whenever a record is formatted, or a lookup for a dynamic destination
+ * is performed, and that operation fails, the exception is passed to the error handler. This is
+ * intended to handle any errors related to the data of a record, but not any connectivity or IO
+ * errors related to the literal writing of a record.
+ */
+ public Write withBadRecordErrorHandler(
+ ErrorHandler errorHandler) {
+ return toBuilder().setBadRecordErrorHandler(errorHandler).build();
+ }
+
@VisibleForTesting
Contextful> resolveFileNamingFn() {
if (getDynamic()) {
@@ -1391,6 +1431,9 @@ public WriteFilesResult expand(PCollection input) {
if (getNoSpilling()) {
writeFiles = writeFiles.withNoSpilling();
}
+ if (getBadRecordErrorHandler() != null) {
+ writeFiles = writeFiles.withBadRecordErrorHandler(getBadRecordErrorHandler());
+ }
return input.apply(writeFiles);
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 2c7a4fc5d4f5..96635a37fac1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -51,6 +51,8 @@
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
+import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -176,6 +178,10 @@
*
* For backwards compatibility, {@link TextIO} also supports the legacy {@link
* DynamicDestinations} interface for advanced features via {@link Write#to(DynamicDestinations)}.
+ *
+ *
Error handling for records that are malformed can be handled by using {@link
+ * TypedWrite#withBadRecordErrorHandler(ErrorHandler)}. See documentation in {@link FileIO} for
+ * details on usage
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
@@ -708,6 +714,8 @@ public abstract static class TypedWrite
*/
abstract WritableByteChannelFactory getWritableByteChannelFactory();
+ abstract @Nullable ErrorHandler getBadRecordErrorHandler();
+
abstract Builder toBuilder();
@AutoValue.Builder
@@ -754,6 +762,9 @@ abstract Builder setNumShards(
abstract Builder setWritableByteChannelFactory(
WritableByteChannelFactory writableByteChannelFactory);
+ abstract Builder setBadRecordErrorHandler(
+ @Nullable ErrorHandler badRecordErrorHandler);
+
abstract TypedWrite build();
}
@@ -993,6 +1004,12 @@ public TypedWrite withNoSpilling() {
return toBuilder().setNoSpilling(true).build();
}
+ /** See {@link FileIO.Write#withBadRecordErrorHandler(ErrorHandler)} for details on usage. */
+ public TypedWrite withBadRecordErrorHandler(
+ ErrorHandler errorHandler) {
+ return toBuilder().setBadRecordErrorHandler(errorHandler).build();
+ }
+
/** Don't write any output files if the PCollection is empty. */
public TypedWrite skipIfEmpty() {
return toBuilder().setSkipIfEmpty(true).build();
@@ -1083,6 +1100,9 @@ public WriteFilesResult expand(PCollection input) {
if (getNoSpilling()) {
write = write.withNoSpilling();
}
+ if (getBadRecordErrorHandler() != null) {
+ write = write.withBadRecordErrorHandler(getBadRecordErrorHandler());
+ }
if (getSkipIfEmpty()) {
write = write.withSkipIfEmpty();
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 91d6082eede4..7359141c5b87 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io;
+import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
@@ -49,6 +50,7 @@
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
@@ -62,6 +64,10 @@
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
+import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
+import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
+import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.DefaultErrorHandler;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -166,6 +172,8 @@ public static WriteFiles())
+ .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER)
.build();
}
@@ -189,6 +197,10 @@ public static WriteFiles getShardingFunction();
+ public abstract ErrorHandler getBadRecordErrorHandler();
+
+ public abstract BadRecordRouter getBadRecordRouter();
+
abstract Builder toBuilder();
@AutoValue.Builder
@@ -215,6 +227,12 @@ abstract Builder setSideInputs(
abstract Builder setShardingFunction(
@Nullable ShardingFunction shardingFunction);
+ abstract Builder setBadRecordErrorHandler(
+ ErrorHandler badRecordErrorHandler);
+
+ abstract Builder setBadRecordRouter(
+ BadRecordRouter badRecordRouter);
+
abstract WriteFiles build();
}
@@ -330,6 +348,15 @@ public WriteFiles withSkipIfEmpty() {
return toBuilder().setSkipIfEmpty(true).build();
}
+ /** See {@link FileIO.Write#withBadRecordErrorHandler(ErrorHandler)} for details on usage. */
+ public WriteFiles withBadRecordErrorHandler(
+ ErrorHandler errorHandler) {
+ return toBuilder()
+ .setBadRecordErrorHandler(errorHandler)
+ .setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER)
+ .build();
+ }
+
@Override
public void validate(PipelineOptions options) {
getSink().validate(options);
@@ -495,28 +522,39 @@ private WriteUnshardedBundlesToTempFiles(
@Override
public PCollection> expand(PCollection input) {
- if (getMaxNumWritersPerBundle() < 0) {
- return input
- .apply(
- "WritedUnshardedBundles",
- ParDo.of(new WriteUnshardedTempFilesFn(null, destinationCoder))
- .withSideInputs(getSideInputs()))
- .setCoder(fileResultCoder);
- }
TupleTag> writtenRecordsTag = new TupleTag<>("writtenRecords");
TupleTag, UserT>> unwrittenRecordsTag =
new TupleTag<>("unwrittenRecords");
+ Coder inputCoder = input.getCoder();
+ if (getMaxNumWritersPerBundle() < 0) {
+ PCollectionTuple writeTuple =
+ input.apply(
+ "WritedUnshardedBundles",
+ ParDo.of(new WriteUnshardedTempFilesFn(null, destinationCoder, inputCoder))
+ .withSideInputs(getSideInputs())
+ .withOutputTags(
+ writtenRecordsTag, TupleTagList.of(ImmutableList.of(BAD_RECORD_TAG))));
+ addErrorCollection(writeTuple);
+ return writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder);
+ }
+
PCollectionTuple writeTuple =
input.apply(
"WriteUnshardedBundles",
- ParDo.of(new WriteUnshardedTempFilesFn(unwrittenRecordsTag, destinationCoder))
+ ParDo.of(
+ new WriteUnshardedTempFilesFn(
+ unwrittenRecordsTag, destinationCoder, inputCoder))
.withSideInputs(getSideInputs())
- .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittenRecordsTag)));
+ .withOutputTags(
+ writtenRecordsTag,
+ TupleTagList.of(ImmutableList.of(unwrittenRecordsTag, BAD_RECORD_TAG))));
+ addErrorCollection(writeTuple);
+
PCollection> writtenBundleFiles =
writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder);
// Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in
// finalize to stay consistent with what WriteWindowedBundles does.
- PCollection> writtenSpilledFiles =
+ PCollectionTuple spilledWriteTuple =
writeTuple
.get(unwrittenRecordsTag)
.setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
@@ -529,7 +567,15 @@ public PCollection> expand(PCollection input) {
.apply("GroupUnwritten", GroupByKey.create())
.apply(
"WriteUnwritten",
- ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
+ ParDo.of(new WriteShardsIntoTempFilesFn(input.getCoder()))
+ .withSideInputs(getSideInputs())
+ .withOutputTags(writtenRecordsTag, TupleTagList.of(BAD_RECORD_TAG)));
+
+ addErrorCollection(spilledWriteTuple);
+
+ PCollection> writtenSpilledFiles =
+ spilledWriteTuple
+ .get(writtenRecordsTag)
.setCoder(fileResultCoder)
.apply(
"DropShardNum",
@@ -556,6 +602,8 @@ private class WriteUnshardedTempFilesFn extends DoFn, UserT>> unwrittenRecordsTag;
private final Coder destinationCoder;
+ private final Coder inputCoder;
+
// Initialized in startBundle()
private @Nullable Map, Writer> writers;
@@ -563,9 +611,11 @@ private class WriteUnshardedTempFilesFn extends DoFn, UserT>> unwrittenRecordsTag,
- Coder destinationCoder) {
+ Coder destinationCoder,
+ Coder inputCoder) {
this.unwrittenRecordsTag = unwrittenRecordsTag;
this.destinationCoder = destinationCoder;
+ this.inputCoder = inputCoder;
}
@StartBundle
@@ -575,7 +625,9 @@ public void startBundle(StartBundleContext c) {
}
@ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+ public void processElement(
+ ProcessContext c, BoundedWindow window, MultiOutputReceiver outputReceiver)
+ throws Exception {
getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
PaneInfo paneInfo = c.pane();
// If we are doing windowed writes, we need to ensure that we have separate files for
@@ -583,7 +635,12 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except
// destinations go to different writers.
// In the case of unwindowed writes, the window and the pane will always be the same, and
// the map will only have a single element.
- DestinationT destination = getDynamicDestinations().getDestination(c.element());
+ MaybeDestination maybeDestination =
+ getDestinationWithErrorHandling(c.element(), outputReceiver, inputCoder);
+ if (!maybeDestination.isValid) {
+ return;
+ }
+ DestinationT destination = maybeDestination.destination;
WriterKey key = new WriterKey<>(window, c.pane(), destination);
Writer writer = writers.get(key);
if (writer == null) {
@@ -607,15 +664,22 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except
} else {
spilledShardNum = (spilledShardNum + 1) % SPILLED_RECORD_SHARDING_FACTOR;
}
- c.output(
- unwrittenRecordsTag,
- KV.of(
- ShardedKey.of(hashDestination(destination, destinationCoder), spilledShardNum),
- c.element()));
+ outputReceiver
+ .get(unwrittenRecordsTag)
+ .output(
+ KV.of(
+ ShardedKey.of(
+ hashDestination(destination, destinationCoder), spilledShardNum),
+ c.element()));
return;
}
}
- writeOrClose(writer, getDynamicDestinations().formatRecord(c.element()));
+ OutputT formattedRecord =
+ formatRecordWithErrorHandling(c.element(), outputReceiver, inputCoder);
+ if (formattedRecord == null) {
+ return;
+ }
+ writeOrClose(writer, formattedRecord);
}
@FinishBundle
@@ -701,6 +765,56 @@ private static int hashDestination(
.asInt();
}
+ private static class MaybeDestination {
+ final DestinationT destination;
+ final boolean isValid;
+
+ MaybeDestination(DestinationT destination, boolean isValid) {
+ this.destination = destination;
+ this.isValid = isValid;
+ }
+ }
+ // Utility method to get the dynamic destination based on a record. Returns a MaybeDestination
+ // because some implementations of dynamic destinations return null, despite this being prohibited
+ // by the interface
+ private MaybeDestination getDestinationWithErrorHandling(
+ UserT input, MultiOutputReceiver outputReceiver, Coder inputCoder) throws Exception {
+ try {
+ return new MaybeDestination<>(getDynamicDestinations().getDestination(input), true);
+ } catch (Exception e) {
+ getBadRecordRouter()
+ .route(
+ outputReceiver, input, inputCoder, e, "Unable to get dynamic destination for record");
+ return new MaybeDestination<>(null, false);
+ }
+ }
+
+ // Utility method to format a record based on the dynamic destination. If the operation fails, and
+ // is output to the bad record router, this returns null
+ private @Nullable OutputT formatRecordWithErrorHandling(
+ UserT input, MultiOutputReceiver outputReceiver, Coder inputCoder) throws Exception {
+ try {
+ return getDynamicDestinations().formatRecord(input);
+ } catch (Exception e) {
+ getBadRecordRouter()
+ .route(
+ outputReceiver,
+ input,
+ inputCoder,
+ e,
+ "Unable to format record for Dynamic Destination");
+ return null;
+ }
+ }
+
+ private void addErrorCollection(PCollectionTuple sourceTuple) {
+ getBadRecordErrorHandler()
+ .addErrorCollection(
+ sourceTuple
+ .get(BAD_RECORD_TAG)
+ .setCoder(BadRecord.getCoder(sourceTuple.getPipeline())));
+ }
+
private class WriteShardedBundlesToTempFiles
extends PTransform, PCollection>> {
private final Coder destinationCoder;
@@ -728,17 +842,32 @@ public PCollection> expand(PCollection input) {
? new RandomShardingFunction(destinationCoder)
: getShardingFunction();
- return input
- .apply(
+ TupleTag, UserT>> shardedRecords = new TupleTag<>("shardedRecords");
+ TupleTag> writtenRecordsTag = new TupleTag<>("writtenRecords");
+
+ PCollectionTuple shardedFiles =
+ input.apply(
"ApplyShardingKey",
- ParDo.of(new ApplyShardingFunctionFn(shardingFunction, numShardsView))
- .withSideInputs(shardingSideInputs))
- .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
- .apply("GroupIntoShards", GroupByKey.create())
- .apply(
- "WriteShardsIntoTempFiles",
- ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
- .setCoder(fileResultCoder);
+ ParDo.of(
+ new ApplyShardingFunctionFn(
+ shardingFunction, numShardsView, input.getCoder()))
+ .withSideInputs(shardingSideInputs)
+ .withOutputTags(shardedRecords, TupleTagList.of(BAD_RECORD_TAG)));
+ addErrorCollection(shardedFiles);
+
+ PCollectionTuple writtenFiles =
+ shardedFiles
+ .get(shardedRecords)
+ .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
+ .apply("GroupIntoShards", GroupByKey.create())
+ .apply(
+ "WriteShardsIntoTempFiles",
+ ParDo.of(new WriteShardsIntoTempFilesFn(input.getCoder()))
+ .withSideInputs(getSideInputs())
+ .withOutputTags(writtenRecordsTag, TupleTagList.of(BAD_RECORD_TAG)));
+ addErrorCollection(writtenFiles);
+
+ return writtenFiles.get(writtenRecordsTag).setCoder(fileResultCoder);
}
}
@@ -763,22 +892,18 @@ public PCollection>> expand(PCollection inp
//
// TODO(https://github.com/apache/beam/issues/20928): The implementation doesn't currently
// work with merging windows.
+ TupleTag> shardTag = new TupleTag<>("shardTag");
+
+ PCollectionTuple shardedElements =
+ input.apply(
+ "KeyedByDestinationHash",
+ ParDo.of(new KeyByDestinationHash(input.getCoder(), destinationCoder))
+ .withOutputTags(shardTag, TupleTagList.of(BAD_RECORD_TAG)));
+ addErrorCollection(shardedElements);
+
PCollection, Iterable>> shardedInput =
- input
- .apply(
- "KeyedByDestinationHash",
- ParDo.of(
- new DoFn>() {
- @ProcessElement
- public void processElement(@Element UserT element, ProcessContext context)
- throws Exception {
- getDynamicDestinations().setSideInputAccessorFromProcessContext(context);
- DestinationT destination =
- getDynamicDestinations().getDestination(context.element());
- context.output(
- KV.of(hashDestination(destination, destinationCoder), element));
- }
- }))
+ shardedElements
+ .get(shardTag)
.setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()))
.apply(
"ShardAndBatch",
@@ -791,8 +916,9 @@ public void processElement(@Element UserT element, ProcessContext context)
org.apache.beam.sdk.util.ShardedKey.Coder.of(VarIntCoder.of()),
IterableCoder.of(input.getCoder())));
+ TupleTag> writtenRecordsTag = new TupleTag<>("writtenRecords");
// Write grouped elements to temp files.
- PCollection> tempFiles =
+ PCollectionTuple writtenFiles =
shardedInput
.apply(
"AddDummyShard",
@@ -816,7 +942,15 @@ public KV, Iterable> apply(
ShardedKeyCoder.of(VarIntCoder.of()), IterableCoder.of(input.getCoder())))
.apply(
"WriteShardsIntoTempFiles",
- ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
+ ParDo.of(new WriteShardsIntoTempFilesFn(input.getCoder()))
+ .withSideInputs(getSideInputs())
+ .withOutputTags(writtenRecordsTag, TupleTagList.of(BAD_RECORD_TAG)));
+
+ addErrorCollection(writtenFiles);
+
+ PCollection> tempFiles =
+ writtenFiles
+ .get(writtenRecordsTag)
.setCoder(fileResultCoder)
.apply(
"DropShardNum",
@@ -865,6 +999,32 @@ public void processElement(
}
}
+ private class KeyByDestinationHash extends DoFn> {
+
+ private final Coder inputCoder;
+
+ private final Coder destinationCoder;
+
+ public KeyByDestinationHash(Coder inputCoder, Coder destinationCoder) {
+ this.inputCoder = inputCoder;
+ this.destinationCoder = destinationCoder;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element UserT element, ProcessContext context, MultiOutputReceiver outputReceiver)
+ throws Exception {
+ getDynamicDestinations().setSideInputAccessorFromProcessContext(context);
+ MaybeDestination maybeDestination =
+ getDestinationWithErrorHandling(context.element(), outputReceiver, inputCoder);
+ if (!maybeDestination.isValid) {
+ return;
+ }
+ DestinationT destination = maybeDestination.destination;
+ context.output(KV.of(hashDestination(destination, destinationCoder), element));
+ }
+ }
+
private class RandomShardingFunction implements ShardingFunction {
private final Coder destinationCoder;
@@ -903,15 +1063,20 @@ private class ApplyShardingFunctionFn extends DoFn
private final ShardingFunction shardingFn;
private final @Nullable PCollectionView numShardsView;
+ private final Coder inputCoder;
+
ApplyShardingFunctionFn(
ShardingFunction shardingFn,
- @Nullable PCollectionView numShardsView) {
+ @Nullable PCollectionView numShardsView,
+ Coder inputCoder) {
this.numShardsView = numShardsView;
this.shardingFn = shardingFn;
+ this.inputCoder = inputCoder;
}
@ProcessElement
- public void processElement(ProcessContext context) throws Exception {
+ public void processElement(ProcessContext context, MultiOutputReceiver outputReceiver)
+ throws Exception {
getDynamicDestinations().setSideInputAccessorFromProcessContext(context);
final int shardCount;
if (numShardsView != null) {
@@ -927,7 +1092,12 @@ public void processElement(ProcessContext context) throws Exception {
+ " Got %s",
shardCount);
- DestinationT destination = getDynamicDestinations().getDestination(context.element());
+ MaybeDestination maybeDestination =
+ getDestinationWithErrorHandling(context.element(), outputReceiver, inputCoder);
+ if (!maybeDestination.isValid) {
+ return;
+ }
+ DestinationT destination = maybeDestination.destination;
ShardedKey shardKey =
shardingFn.assignShardKey(destination, context.element(), shardCount);
context.output(KV.of(shardKey, context.element()));
@@ -936,6 +1106,13 @@ public void processElement(ProcessContext context) throws Exception {
private class WriteShardsIntoTempFilesFn
extends DoFn, Iterable>, FileResult> {
+
+ private final Coder inputCoder;
+
+ public WriteShardsIntoTempFilesFn(Coder inputCoder) {
+ this.inputCoder = inputCoder;
+ }
+
private transient List> closeFutures = new ArrayList<>();
private transient List>> deferredOutput =
new ArrayList<>();
@@ -949,14 +1126,21 @@ private void readObject(java.io.ObjectInputStream in)
}
@ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+ public void processElement(
+ ProcessContext c, BoundedWindow window, MultiOutputReceiver outputReceiver)
+ throws Exception {
getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
// Since we key by a 32-bit hash of the destination, there might be multiple destinations
// in this iterable. The number of destinations is generally very small (1000s or less), so
// there will rarely be hash collisions.
Map> writers = Maps.newHashMap();
for (UserT input : c.element().getValue()) {
- DestinationT destination = getDynamicDestinations().getDestination(input);
+ MaybeDestination maybeDestination =
+ getDestinationWithErrorHandling(input, outputReceiver, inputCoder);
+ if (!maybeDestination.isValid) {
+ continue;
+ }
+ DestinationT destination = maybeDestination.destination;
Writer writer = writers.get(destination);
if (writer == null) {
String uuid = UUID.randomUUID().toString();
@@ -971,7 +1155,12 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except
writer.open(uuid);
writers.put(destination, writer);
}
- writeOrClose(writer, getDynamicDestinations().formatRecord(input));
+
+ OutputT formattedRecord = formatRecordWithErrorHandling(input, outputReceiver, inputCoder);
+ if (formattedRecord == null) {
+ continue;
+ }
+ writeOrClose(writer, formattedRecord);
}
// Ensure that we clean-up any prior writers that were being closed as part of this bundle
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.java
index e02965b72022..cf040470d608 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.java
@@ -119,6 +119,10 @@ private void readObject(ObjectInputStream aInputStream)
@Override
public void addErrorCollection(PCollection errorCollection) {
+ if (isClosed()) {
+ throw new IllegalStateException(
+ "Error collections cannot be added after Error Handler is closed");
+ }
errorCollections.add(errorCollection);
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index 39cb612f2d89..2db20b92f27f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -61,6 +61,7 @@
import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStream;
@@ -78,6 +79,8 @@
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler;
+import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.ErrorSinkTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -634,6 +637,134 @@ private void testDynamicDestinationsHelper(boolean bounded, boolean emptyShards)
}
}
+ // Test FailingDynamicDestinations class. Expects user values to be string-encoded integers.
+ // Throws exceptions when trying to format records or get destinations based on the mod
+ // of the element
+ static class FailingTestDestinations extends DynamicDestinations {
+ private ResourceId baseOutputDirectory;
+
+ FailingTestDestinations(ResourceId baseOutputDirectory) {
+ this.baseOutputDirectory = baseOutputDirectory;
+ }
+
+ @Override
+ public String formatRecord(String record) {
+ int value = Integer.valueOf(record);
+ // deterministically fail to format 1/3rd of records
+ if (value % 3 == 0) {
+ throw new RuntimeException("Failed To Format Record");
+ }
+ return "record_" + record;
+ }
+
+ @Override
+ public Integer getDestination(String element) {
+ int value = Integer.valueOf(element);
+ // deterministically fail to find the destination for 1/7th of records
+ if (value % 7 == 0) {
+ throw new RuntimeException("Failed To Get Destination");
+ }
+ return value % 5;
+ }
+
+ @Override
+ public Integer getDefaultDestination() {
+ return 0;
+ }
+
+ @Override
+ public FilenamePolicy getFilenamePolicy(Integer destination) {
+ return new PerWindowFiles(
+ baseOutputDirectory.resolve("file_" + destination, StandardResolveOptions.RESOLVE_FILE),
+ "simple");
+ }
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testFailingDynamicDestinationsBounded() throws Exception {
+ testFailingDynamicDestinationsHelper(true, false);
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesUnboundedPCollections.class})
+ public void testFailingDynamicDestinationsUnbounded() throws Exception {
+ testFailingDynamicDestinationsHelper(false, false);
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesUnboundedPCollections.class})
+ public void testFailingDynamicDestinationsAutosharding() throws Exception {
+ testFailingDynamicDestinationsHelper(false, true);
+ }
+
+ private void testFailingDynamicDestinationsHelper(boolean bounded, boolean autosharding)
+ throws IOException {
+ FailingTestDestinations dynamicDestinations =
+ new FailingTestDestinations(getBaseOutputDirectory());
+ SimpleSink sink =
+ new SimpleSink<>(getBaseOutputDirectory(), dynamicDestinations, Compression.UNCOMPRESSED);
+
+ // Flag to validate that the pipeline options are passed to the Sink.
+ WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class);
+ options.setTestFlag("test_value");
+ Pipeline p = TestPipeline.create(options);
+
+ final int numInputs = 100;
+ long expectedFailures = 0;
+ List inputs = Lists.newArrayList();
+ for (int i = 0; i < numInputs; ++i) {
+ inputs.add(Integer.toString(i));
+ if (i % 7 == 0 || i % 3 == 0) {
+ expectedFailures++;
+ }
+ }
+ // Prepare timestamps for the elements.
+ List timestamps = new ArrayList<>();
+ for (long i = 0; i < inputs.size(); i++) {
+ timestamps.add(i + 1);
+ }
+
+ BadRecordErrorHandler> errorHandler =
+ p.registerBadRecordErrorHandler(new ErrorSinkTransform());
+ int numShards = autosharding ? 0 : 2;
+ WriteFiles writeFiles =
+ WriteFiles.to(sink).withNumShards(numShards).withBadRecordErrorHandler(errorHandler);
+
+ PCollection input = p.apply(Create.timestamped(inputs, timestamps));
+ WriteFilesResult res;
+ if (!bounded) {
+ input.setIsBoundedInternal(IsBounded.UNBOUNDED);
+ input = input.apply(Window.into(FixedWindows.of(Duration.standardDays(1))));
+ res = input.apply(writeFiles.withWindowedWrites());
+ } else {
+ res = input.apply(writeFiles);
+ }
+
+ errorHandler.close();
+
+ PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(expectedFailures);
+
+ res.getPerDestinationOutputFilenames().apply(new VerifyFilesExist<>());
+ p.run();
+
+ for (int i = 0; i < 5; ++i) {
+ ResourceId base =
+ getBaseOutputDirectory().resolve("file_" + i, StandardResolveOptions.RESOLVE_FILE);
+ List expected = Lists.newArrayList();
+ for (int j = i; j < numInputs; j += 5) {
+ if (j % 3 != 0 && j % 7 != 0) {
+ expected.add("record_" + j);
+ }
+ }
+ checkFileContents(
+ base.toString(),
+ expected,
+ Optional.fromNullable(autosharding ? null : numShards),
+ bounded /* expectRemovedTempDirectory */);
+ }
+ }
+
@Test
public void testShardedDisplayData() {
DynamicDestinations dynamicDestinations =
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java
new file mode 100644
index 000000000000..41367765b920
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.errorhandling;
+
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+import org.joda.time.Duration;
+
+public class ErrorHandlingTestUtils {
+ public static class ErrorSinkTransform
+ extends PTransform, PCollection> {
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized PCollection expand(
+ PCollection input) {
+ if (input.isBounded() == IsBounded.BOUNDED) {
+ return input.apply("Combine", Combine.globally(Count.combineFn()));
+ } else {
+ return input
+ .apply("Window", Window.into(FixedWindows.of(Duration.standardDays(1))))
+ .apply("Combine", Combine.globally(Count.combineFn()).withoutDefaults());
+ }
+ }
+ }
+}
diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java
index a65db5a90bad..2e4939560ad1 100644
--- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java
+++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java
@@ -69,6 +69,8 @@
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
+import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -337,6 +339,10 @@
* events.apply("WriteAvros", AvroIO.writeCustomTypeToGenericRecords()
* .to(new UserDynamicAvroDestinations(userToSchemaMap)));
* }
+ *
+ * Error handling for writing records that are malformed can be handled by using {@link
+ * TypedWrite#withBadRecordErrorHandler(ErrorHandler)}. See documentation in {@link FileIO} for
+ * details on usage
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
@@ -1427,6 +1433,8 @@ public abstract static class TypedWrite
abstract AvroSink.@Nullable DatumWriterFactory getDatumWriterFactory();
+ abstract @Nullable ErrorHandler getBadRecordErrorHandler();
+
/**
* The codec used to encode the blocks in the Avro file. String value drawn from those in
* https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
@@ -1489,6 +1497,9 @@ abstract Builder setDynamicDestinations(
abstract Builder setDatumWriterFactory(
AvroSink.DatumWriterFactory datumWriterFactory);
+ abstract Builder setBadRecordErrorHandler(
+ @Nullable ErrorHandler badRecordErrorHandler);
+
abstract TypedWrite build();
}
@@ -1713,6 +1724,12 @@ public TypedWrite withMetadata(Map
return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build();
}
+ /** See {@link FileIO.Write#withBadRecordErrorHandler(ErrorHandler)} for details on usage. */
+ public TypedWrite withBadRecordErrorHandler(
+ ErrorHandler errorHandler) {
+ return toBuilder().setBadRecordErrorHandler(errorHandler).build();
+ }
+
DynamicAvroDestinations resolveDynamicDestinations() {
DynamicAvroDestinations dynamicDestinations =
getDynamicDestinations();
@@ -1782,6 +1799,9 @@ public WriteFilesResult expand(PCollection input) {
if (getNoSpilling()) {
write = write.withNoSpilling();
}
+ if (getBadRecordErrorHandler() != null) {
+ write = write.withBadRecordErrorHandler(getBadRecordErrorHandler());
+ }
return input.apply("Write", write);
}
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
index 5b976687f2c1..ab6ac52e318d 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
@@ -44,7 +44,6 @@
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
-import org.apache.beam.sdk.io.kafka.KafkaIOTest.ErrorSinkTransform;
import org.apache.beam.sdk.io.kafka.KafkaIOTest.FailingLongSerializer;
import org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFnTest.FailingDeserializer;
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource;
@@ -77,6 +76,7 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler;
+import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.ErrorSinkTransform;
import org.apache.beam.sdk.transforms.windowing.CalendarWindows;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index b0df82bcdc19..9b15b86051f5 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -88,7 +88,6 @@
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Distinct;
@@ -97,15 +96,13 @@
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler;
+import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.ErrorSinkTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.CalendarWindows;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.CoderUtils;
@@ -145,10 +142,7 @@
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
-import org.checkerframework.checker.initialization.qual.Initialized;
-import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
-import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.hamcrest.collection.IsIterableWithSize;
import org.joda.time.Duration;
@@ -1472,18 +1466,6 @@ public void testSinkWithSerializationErrors() throws Exception {
}
}
- public static class ErrorSinkTransform
- extends PTransform, PCollection> {
-
- @Override
- public @UnknownKeyFor @NonNull @Initialized PCollection expand(
- PCollection input) {
- return input
- .apply("Window", Window.into(CalendarWindows.years(1)))
- .apply("Combine", Combine.globally(Count.combineFn()).withoutDefaults());
- }
- }
-
@Test
public void testValuesSink() throws Exception {
// similar to testSink(), but use values()' interface.