diff --git a/CHANGES.md b/CHANGES.md index 847532e855625..4b977bf3790d9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -74,6 +74,7 @@ * Running multi-language pipelines locally no longer requires Docker. Instead, the same (generally auto-started) subprocess used to perform the expansion can also be used as the cross-language worker. +* Framework for adding Error Handlers to composite transforms added in Java ([#29164](https://github.com/apache/beam/pull/29164)) ## Breaking Changes diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index 8f415e718e957..6829e0d6b232a 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.ServiceLoader; import java.util.Set; import org.apache.beam.model.pipeline.v1.RunnerApi; @@ -502,6 +503,12 @@ public RunnerApi.PTransform translate( SchemaTranslation.schemaToProto(configRow.getSchema(), true).toByteArray())); } + for (Entry annotation : + appliedPTransform.getTransform().getAnnotations().entrySet()) { + transformBuilder.putAnnotations( + annotation.getKey(), ByteString.copyFrom(annotation.getValue())); + } + return transformBuilder.build(); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index bd0215e1326ea..50158f109cb00 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -22,6 +22,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables.transform; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -43,6 +44,9 @@ import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler; import org.apache.beam.sdk.transforms.resourcehints.ResourceHints; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.PBegin; @@ -318,6 +322,7 @@ public PipelineResult run(PipelineOptions options) { LOG.debug("Running {} via {}", this, runner); try { validate(options); + validateErrorHandlers(); return runner.run(this); } catch (UserCodeException e) { // This serves to replace the stack with one that ends here and @@ -343,6 +348,13 @@ public SchemaRegistry getSchemaRegistry() { return schemaRegistry; } + public BadRecordErrorHandler registerBadRecordErrorHandler( + PTransform, OutputT> sinkTransform) { + BadRecordErrorHandler errorHandler = new BadRecordErrorHandler<>(sinkTransform, this); + errorHandlers.add(errorHandler); + return errorHandler; + } + ///////////////////////////////////////////////////////////////////////////// // Below here are operations that aren't normally called by users. @@ -511,6 +523,8 @@ public static OutputT applyTran private final Multimap> instancePerName = ArrayListMultimap.create(); private final PipelineOptions defaultOptions; + private final List> errorHandlers = new ArrayList<>(); + private Pipeline(TransformHierarchy transforms, PipelineOptions options) { this.transforms = transforms; this.defaultOptions = options; @@ -715,4 +729,14 @@ public boolean apply(@Nonnull final Map.Entry> input) { return input != null && input.getValue().size() == 1; } } + + private void validateErrorHandlers() { + for (ErrorHandler errorHandler : errorHandlers) { + if (!errorHandler.isClosed()) { + throw new IllegalStateException( + "One or more ErrorHandlers aren't closed, and this pipeline" + + "cannot be run. See the ErrorHandler documentation for expected usage"); + } + } + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java index 7b3dfb188033a..c0c3638b28dd5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java @@ -21,6 +21,7 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CannotProvideCoderException; @@ -216,6 +217,17 @@ public ResourceHints getResourceHints() { return resourceHints; } + /** Returns annotations map to provide additional hints to the runner. */ + public Map getAnnotations() { + return annotations; + } + + public PTransform addAnnotation( + @NonNull String annotationType, byte @NonNull [] annotation) { + annotations.put(annotationType, annotation); + return this; + } + ///////////////////////////////////////////////////////////////////////////// // See the note about about PTransform's fake Serializability, to @@ -229,6 +241,8 @@ public ResourceHints getResourceHints() { protected transient @NonNull ResourceHints resourceHints = ResourceHints.create(); + protected transient @NonNull Map annotations = new HashMap<>(); + protected PTransform() { this.name = null; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/BadRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/BadRecord.java new file mode 100644 index 0000000000000..fd49078350c48 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/BadRecord.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.errorhandling; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.google.auto.value.AutoValue; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.Serializable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@AutoValue +@DefaultSchema(AutoValueSchema.class) +public abstract class BadRecord implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(BadRecord.class); + + /** Information about the record that failed. */ + public abstract Record getRecord(); + + /** Information about why the record failed. */ + public abstract Failure getFailure(); + + public static Builder builder() { + return new AutoValue_BadRecord.Builder(); + } + + public static Coder getCoder(Pipeline pipeline) { + try { + SchemaRegistry schemaRegistry = pipeline.getSchemaRegistry(); + return SchemaCoder.of( + schemaRegistry.getSchema(BadRecord.class), + TypeDescriptor.of(BadRecord.class), + schemaRegistry.getToRowFunction(BadRecord.class), + schemaRegistry.getFromRowFunction(BadRecord.class)); + } catch (NoSuchSchemaException e) { + throw new RuntimeException(e); + } + } + + public static BadRecord fromExceptionInformation( + RecordT record, + @Nullable Coder coder, + @Nullable Exception exception, + String description) + throws IOException { + Preconditions.checkArgumentNotNull(record); + + // Build up record information + BadRecord.Record.Builder recordBuilder = Record.builder(); + recordBuilder.addHumanReadableJson(record).addCoderAndEncodedRecord(coder, record); + + // Build up failure information + BadRecord.Failure.Builder failureBuilder = Failure.builder().setDescription(description); + + // It's possible for us to want to handle an error scenario where no actual exception object + // exists + if (exception != null) { + failureBuilder.setException(exception.toString()).addExceptionStackTrace(exception); + } + + return BadRecord.builder() + .setRecord(recordBuilder.build()) + .setFailure(failureBuilder.build()) + .build(); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setRecord(Record record); + + public abstract Builder setFailure(Failure error); + + public abstract BadRecord build(); + } + + @AutoValue + @DefaultSchema(AutoValueSchema.class) + public abstract static class Record implements Serializable { + + /** The failing record, encoded as JSON. Will be null if serialization as JSON fails. */ + public abstract @Nullable String getHumanReadableJsonRecord(); + + /** + * Nullable to account for failing to encode, or if there is no coder for the record at the time + * of failure. + */ + @SuppressWarnings("mutable") + public abstract byte @Nullable [] getEncodedRecord(); + + /** The coder for the record, or null if there is no coder. */ + public abstract @Nullable String getCoder(); + + public static Builder builder() { + return new AutoValue_BadRecord_Record.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setHumanReadableJsonRecord(@Nullable String jsonRecord); + + public Builder addHumanReadableJson(Object record) { + ObjectWriter objectWriter = new ObjectMapper().writer().withDefaultPrettyPrinter(); + try { + this.setHumanReadableJsonRecord(objectWriter.writeValueAsString(record)); + } catch (Exception e) { + LOG.error( + "Unable to serialize record as JSON. Human readable record attempted via .toString", + e); + try { + this.setHumanReadableJsonRecord(record.toString()); + } catch (Exception e2) { + LOG.error( + "Unable to serialize record via .toString. Human readable record will be null", e2); + } + } + return this; + } + + @SuppressWarnings("mutable") + public abstract Builder setEncodedRecord(byte @Nullable [] encodedRecord); + + public abstract Builder setCoder(@Nullable String coder); + + public Builder addCoderAndEncodedRecord(@Nullable Coder coder, T record) { + // We will sometimes not have a coder for a failing record, for example if it has already + // been + // modified within the dofn. + if (coder != null) { + this.setCoder(coder.toString()); + try { + this.setEncodedRecord(CoderUtils.encodeToByteArray(coder, record)); + } catch (IOException e) { + LOG.error( + "Unable to encode failing record using provided coder." + + " BadRecord will be published without encoded bytes", + e); + } + } + return this; + } + + public abstract Record build(); + } + } + + @AutoValue + @DefaultSchema(AutoValueSchema.class) + public abstract static class Failure implements Serializable { + + /** The exception itself, e.g. IOException. Null if there is a failure without an exception. */ + public abstract @Nullable String getException(); + + /** The full stacktrace. Null if there is a failure without an exception. */ + public abstract @Nullable String getExceptionStacktrace(); + + /** The description of what was being attempted when the failure occurred. */ + public abstract String getDescription(); + + public static Builder builder() { + return new AutoValue_BadRecord_Failure.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setException(@Nullable String exception); + + public abstract Builder setExceptionStacktrace(@Nullable String stacktrace); + + public Builder addExceptionStackTrace(Exception exception) throws IOException { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + PrintStream printStream = new PrintStream(stream, false, Charsets.UTF_8.name()); + exception.printStackTrace(printStream); + printStream.close(); + + this.setExceptionStacktrace(new String(stream.toByteArray(), Charsets.UTF_8)); + return this; + } + + public abstract Builder setDescription(String description); + + public abstract Failure build(); + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/BadRecordRouter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/BadRecordRouter.java new file mode 100644 index 0000000000000..7670baf5c6ff2 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/BadRecordRouter.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.errorhandling; + +import java.io.Serializable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.sdk.values.TupleTag; +import org.checkerframework.checker.nullness.qual.Nullable; + +public interface BadRecordRouter extends Serializable { + + BadRecordRouter THROWING_ROUTER = new ThrowingBadRecordRouter(); + + BadRecordRouter RECORDING_ROUTER = new RecordingBadRecordRouter(); + + TupleTag BAD_RECORD_TAG = new TupleTag<>(); + + void route( + MultiOutputReceiver outputReceiver, + RecordT record, + @Nullable Coder coder, + @Nullable Exception exception, + String description) + throws Exception; + + class ThrowingBadRecordRouter implements BadRecordRouter { + + @Override + public void route( + MultiOutputReceiver outputReceiver, + RecordT record, + @Nullable Coder coder, + @Nullable Exception exception, + String description) + throws Exception { + if (exception != null) { + throw exception; + } else { + Preconditions.checkArgumentNotNull(record); + String encodedRecord = + BadRecord.Record.builder() + .addHumanReadableJson(record) + .build() + .getHumanReadableJsonRecord(); + if (encodedRecord == null) { + encodedRecord = "Unable to serialize bad record"; + } + throw new RuntimeException("Encountered Bad Record: " + encodedRecord); + } + } + } + + class RecordingBadRecordRouter implements BadRecordRouter { + + @Override + public void route( + MultiOutputReceiver outputReceiver, + RecordT record, + @Nullable Coder coder, + @Nullable Exception exception, + String description) + throws Exception { + outputReceiver + .get(BAD_RECORD_TAG) + .output(BadRecord.fromExceptionInformation(record, coder, exception, description)); + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.java new file mode 100644 index 0000000000000..9e0298d885eb9 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.errorhandling; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.POutput; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An Error Handler is a utility object used for plumbing error PCollections to a configured sink + * Error Handlers must be closed before a pipeline is run to properly pipe error collections to the + * sink, and the pipeline will be rejected if any handlers aren't closed. + * + * @param The type of the error object. This will usually be a {@link BadRecord}, but can + * be any type + * @param The return type of the sink PTransform. + *

Usage of Error Handlers: + *

Simple usage with one DLQ + *

{@code
+ * PCollection records = ...;
+ * try (ErrorHandler errorHandler = pipeline.registerErrorHandler(SomeSink.write())) {
+ *  PCollection results = records.apply(SomeIO.write().withDeadLetterQueue(errorHandler));
+ * }
+ * results.apply(SomeOtherTransform);
+ * }
+ * Usage with multiple DLQ stages + *
{@code
+ * PCollection records = ...;
+ * try (ErrorHandler errorHandler = pipeline.registerErrorHandler(SomeSink.write())) {
+ *  PCollection results = records.apply(SomeIO.write().withDeadLetterQueue(errorHandler))
+ *                        .apply(OtherTransform.builder().withDeadLetterQueue(errorHandler));
+ * }
+ * results.apply(SomeOtherTransform);
+ * }
+ */ +public interface ErrorHandler extends AutoCloseable { + + void addErrorCollection(PCollection errorCollection); + + boolean isClosed(); + + @Nullable + OutputT getOutput(); + + class PTransformErrorHandler + implements ErrorHandler { + + private static final Logger LOG = LoggerFactory.getLogger(PTransformErrorHandler.class); + private final PTransform, OutputT> sinkTransform; + + private final Pipeline pipeline; + + private final Coder coder; + + private final List> errorCollections = new ArrayList<>(); + + private @Nullable OutputT sinkOutput = null; + + private boolean closed = false; + + /** + * Constructs a new ErrorHandler, but should not be called directly. Instead, call + * pipeline.registerErrorHandler to ensure safe pipeline construction + */ + @Internal + public PTransformErrorHandler( + PTransform, OutputT> sinkTransform, + Pipeline pipeline, + Coder coder) { + this.sinkTransform = sinkTransform; + this.pipeline = pipeline; + this.coder = coder; + } + + @Override + public void addErrorCollection(PCollection errorCollection) { + errorCollections.add(errorCollection); + } + + @Override + public boolean isClosed() { + return closed; + } + + @Override + public @Nullable OutputT getOutput() { + if (!this.isClosed()) { + throw new IllegalStateException( + "ErrorHandler must be finalized before the output can be returned"); + } + return sinkOutput; + } + + @Override + public void close() { + if (closed) { + throw new IllegalStateException( + "Error handler is already closed, and may not be closed twice"); + } + closed = true; + PCollection flattened; + if (errorCollections.isEmpty()) { + LOG.info("Empty list of error pcollections passed to ErrorHandler."); + flattened = pipeline.apply(Create.empty(coder)); + } else { + flattened = PCollectionList.of(errorCollections).apply(Flatten.pCollections()); + } + LOG.debug( + "{} error collections are being sent to {}", + errorCollections.size(), + sinkTransform.getName()); + String sinkTransformName = sinkTransform.getName(); + sinkOutput = + flattened + .apply( + "Record Error Metrics to " + sinkTransformName, + new WriteErrorMetrics(sinkTransformName)) + .apply( + "Write to error Sink", + sinkTransform.addAnnotation( + "FeatureMetric", "ErrorHandler".getBytes(StandardCharsets.UTF_8))); + } + + public static class WriteErrorMetrics + extends PTransform, PCollection> { + + private final Counter errorCounter; + + public WriteErrorMetrics(String sinkTransformName) { + errorCounter = Metrics.counter("ErrorMetrics", sinkTransformName + "-input"); + } + + @Override + public PCollection expand(PCollection input) { + return input.apply(ParDo.of(new CountErrors(errorCounter))); + } + + public static class CountErrors extends DoFn { + + private final Counter errorCounter; + + public CountErrors(Counter errorCounter) { + this.errorCounter = errorCounter; + } + + @ProcessElement + public void processElement(@Element ErrorT error, OutputReceiver receiver) { + errorCounter.inc(); + receiver.output(error); + } + } + } + } + + class BadRecordErrorHandler + extends PTransformErrorHandler { + + /** Constructs a new ErrorHandler for handling BadRecords. */ + @Internal + public BadRecordErrorHandler( + PTransform, OutputT> sinkTransform, Pipeline pipeline) { + super(sinkTransform, pipeline, BadRecord.getCoder(pipeline)); + } + } + + /** + * A default, placeholder error handler that exists to allow usage of .addErrorCollection() + * without effects. This enables more simple codepaths without checking for whether the user + * configured an error handler or not. + */ + @Internal + class DefaultErrorHandler + implements ErrorHandler { + + @Override + public void addErrorCollection(PCollection errorCollection) {} + + @Override + public boolean isClosed() { + throw new IllegalArgumentException( + "No Op handler should not be closed. This implies this IO is misconfigured."); + } + + @Override + public @Nullable OutputT getOutput() { + throw new IllegalArgumentException( + "No Op handler has no output. This implies this IO is misconfigured."); + } + + @Override + public void close() { + throw new IllegalArgumentException( + "No Op handler should not be closed. This implies this IO is misconfigured."); + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/package-info.java new file mode 100644 index 0000000000000..c41e10143c12d --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** Provides utilities for handling errors in Pipelines. */ +@DefaultAnnotation(NonNull.class) +package org.apache.beam.sdk.transforms.errorhandling; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import org.checkerframework.checker.nullness.qual.NonNull; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/BRHEnabledPTransform.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/BRHEnabledPTransform.java new file mode 100644 index 0000000000000..b279ad938dba8 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/BRHEnabledPTransform.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.errorhandling; + +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.DefaultErrorHandler; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; + +/** + * Dummy PTransform that is configurable with a Bad Record Handler. TODO(johncasey) look to factor + * some of this out for easy use in other IOs + */ +public class BRHEnabledPTransform extends PTransform, PCollection> { + + private ErrorHandler errorHandler = new DefaultErrorHandler<>(); + + private BadRecordRouter badRecordRouter = BadRecordRouter.THROWING_ROUTER; + + private static final TupleTag RECORDS = new TupleTag<>(); + + public BRHEnabledPTransform() {} + + public BRHEnabledPTransform withBadRecordHandler(ErrorHandler errorHandler) { + this.errorHandler = errorHandler; + this.badRecordRouter = BadRecordRouter.RECORDING_ROUTER; + return this; + } + + @Override + public PCollection expand(PCollection input) { + // TODO this pattern is a clunky. Look to improve this once we have ParDo level error handling. + PCollectionTuple pCollectionTuple = + input.apply( + "NoOpDoFn", + ParDo.of(new OddIsBad(badRecordRouter)) + .withOutputTags(RECORDS, TupleTagList.of(BadRecordRouter.BAD_RECORD_TAG))); + + errorHandler.addErrorCollection( + pCollectionTuple + .get(BadRecordRouter.BAD_RECORD_TAG) + .setCoder(BadRecord.getCoder(input.getPipeline()))); + + return pCollectionTuple.get(RECORDS).setCoder(BigEndianIntegerCoder.of()); + } + + public static class OddIsBad extends DoFn { + + private final BadRecordRouter badRecordRouter; + + public OddIsBad(BadRecordRouter badRecordRouter) { + this.badRecordRouter = badRecordRouter; + } + + @ProcessElement + public void processElement(@Element Integer element, MultiOutputReceiver receiver) + throws Exception { + if (element % 2 == 0) { + receiver.get(RECORDS).output(element); + } else { + badRecordRouter.route( + receiver, + element, + BigEndianIntegerCoder.of(), + new RuntimeException("Integer was odd"), + "Integer was odd"); + } + } + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/BadRecordRouterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/BadRecordRouterTest.java new file mode 100644 index 0000000000000..fb18a6077db1f --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/BadRecordRouterTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.errorhandling; + +import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.function.BiFunction; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord.Record; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentMatcher; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class BadRecordRouterTest { + + @Rule public final MockitoRule mockito = MockitoJUnit.rule(); + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Mock private MultiOutputReceiver outputReceiver; + + @Mock private OutputReceiver badRecordOutputReceiver; + + private static final BiFunction< + BadRecord.Builder, BadRecord.Failure.Builder, ArgumentMatcher> + ignoreStacktraceMatcher = + (expectedBuilder, failure) -> + (ArgumentMatcher) + argument -> { + // This complex matcher means we don't need to maintain an expected stacktrace + String stackTrace = argument.getFailure().getExceptionStacktrace(); + failure.setExceptionStacktrace(stackTrace); + BadRecord expected = expectedBuilder.setFailure(failure.build()).build(); + return expected.equals(argument); + }; + + @Test + public void testThrowingHandlerWithException() throws Exception { + BadRecordRouter handler = BadRecordRouter.THROWING_ROUTER; + + thrown.expect(IOException.class); + + handler.route(outputReceiver, new Object(), null, new IOException(), "desc"); + } + + @Test + public void testThrowingHandlerWithNoException() throws Exception { + BadRecordRouter handler = BadRecordRouter.THROWING_ROUTER; + + thrown.expect(RuntimeException.class); + + handler.route(outputReceiver, new Object(), null, null, "desc"); + } + + @Test + public void testRecordingHandler() throws Exception { + when(outputReceiver.get(BAD_RECORD_TAG)).thenReturn(badRecordOutputReceiver); + + BadRecordRouter handler = BadRecordRouter.RECORDING_ROUTER; + + handler.route(outputReceiver, 5, BigEndianIntegerCoder.of(), new RuntimeException(), "desc"); + + BadRecord.Builder expectedBuilder = + BadRecord.builder() + .setRecord( + Record.builder() + .setHumanReadableJsonRecord("5") + .setEncodedRecord(new byte[] {0, 0, 0, 5}) + .setCoder("BigEndianIntegerCoder") + .build()); + + BadRecord.Failure.Builder failure = + BadRecord.Failure.builder() + .setException("java.lang.RuntimeException") + .setDescription("desc"); + + verify(badRecordOutputReceiver) + .output(ArgumentMatchers.argThat(ignoreStacktraceMatcher.apply(expectedBuilder, failure))); + } + + @Test + public void testNoCoder() throws Exception { + when(outputReceiver.get(BAD_RECORD_TAG)).thenReturn(badRecordOutputReceiver); + + BadRecordRouter handler = BadRecordRouter.RECORDING_ROUTER; + + handler.route(outputReceiver, 5, null, new RuntimeException(), "desc"); + + BadRecord.Builder expectedBuilder = + BadRecord.builder().setRecord(Record.builder().setHumanReadableJsonRecord("5").build()); + + BadRecord.Failure.Builder failure = + BadRecord.Failure.builder() + .setException("java.lang.RuntimeException") + .setDescription("desc"); + + verify(badRecordOutputReceiver) + .output(ArgumentMatchers.argThat(ignoreStacktraceMatcher.apply(expectedBuilder, failure))); + } + + @Test + public void testFailingCoder() throws Exception { + when(outputReceiver.get(BAD_RECORD_TAG)).thenReturn(badRecordOutputReceiver); + + BadRecordRouter handler = BadRecordRouter.RECORDING_ROUTER; + + Coder failingCoder = + new Coder() { + @Override + public void encode(Integer value, OutputStream outStream) + throws CoderException, IOException { + throw new CoderException("Failing Coder"); + } + + @Override + public Integer decode(InputStream inStream) throws CoderException, IOException { + return null; + } + + @Override + public List> getCoderArguments() { + return null; + } + + @Override + public void verifyDeterministic() throws NonDeterministicException {} + }; + + handler.route(outputReceiver, 5, failingCoder, new RuntimeException(), "desc"); + + BadRecord.Builder expectedBuilder = + BadRecord.builder() + .setRecord( + Record.builder() + .setHumanReadableJsonRecord("5") + .setCoder(failingCoder.toString()) + .build()); + + BadRecord.Failure.Builder failure = + BadRecord.Failure.builder() + .setException("java.lang.RuntimeException") + .setDescription("desc"); + + verify(badRecordOutputReceiver) + .output(ArgumentMatchers.argThat(ignoreStacktraceMatcher.apply(expectedBuilder, failure))); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlerTest.java new file mode 100644 index 0000000000000..b0a5733cb1bd0 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlerTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.errorhandling; + +import java.util.Objects; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord.Record; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ErrorHandlerTest { + @Rule public final TestPipeline pipeline = TestPipeline.create(); + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + @Category(NeedsRunner.class) + public void testNoUsageErrorHandlerUsage() throws Exception { + try (BadRecordErrorHandler> eh = + pipeline.registerBadRecordErrorHandler(new DummySinkTransform<>())) {} + + pipeline.run(); + } + + @Test + public void testUnclosedErrorHandlerUsage() { + + pipeline.registerBadRecordErrorHandler(new DummySinkTransform<>()); + + // Expected to be thrown because the error handler isn't closed + thrown.expect(IllegalStateException.class); + + pipeline.run(); + } + + @Test + public void testBRHEnabledPTransform() { + PCollection record = pipeline.apply(Create.of(1, 2, 3, 4)); + record.apply(new BRHEnabledPTransform()); + + // unhandled runtime exception thrown by the BRHEnabledPTransform + thrown.expect(RuntimeException.class); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testErrorHandlerWithBRHTransform() throws Exception { + PCollection record = pipeline.apply(Create.of(1, 2, 3, 4)); + DummySinkTransform transform = new DummySinkTransform<>(); + ErrorHandler> eh = + pipeline.registerBadRecordErrorHandler(transform); + record.apply(new BRHEnabledPTransform().withBadRecordHandler(eh)); + eh.close(); + PCollection badRecords = eh.getOutput(); + + // We use a more complex satisfies statement to ensure we don't need to preserve stacktraces + // in test cases + PAssert.that(badRecords) + .satisfies( + (records) -> { + int count = 0; + for (BadRecord badRecord : records) { + count++; + + Record r = null; + + if (Objects.equals(badRecord.getRecord().getHumanReadableJsonRecord(), "1")) { + r = + Record.builder() + .setHumanReadableJsonRecord("1") + .setEncodedRecord(new byte[] {0, 0, 0, 1}) + .setCoder("BigEndianIntegerCoder") + .build(); + } else { + r = + Record.builder() + .setHumanReadableJsonRecord("3") + .setEncodedRecord(new byte[] {0, 0, 0, 3}) + .setCoder("BigEndianIntegerCoder") + .build(); + } + + BadRecord.Builder expectedBuilder = BadRecord.builder().setRecord(r); + + BadRecord.Failure.Builder failure = + BadRecord.Failure.builder() + .setException("java.lang.RuntimeException: Integer was odd") + .setDescription("Integer was odd"); + + failure.setExceptionStacktrace(badRecord.getFailure().getExceptionStacktrace()); + expectedBuilder.setFailure(failure.build()); + Assert.assertEquals("Expect failure to match", expectedBuilder.build(), badRecord); + } + Assert.assertEquals("Expect 2 errors", 2, count); + return null; + }); + + pipeline.run().waitUntilFinish(); + } + + public static class DummySinkTransform extends PTransform, PCollection> { + + @Override + public PCollection expand(PCollection input) { + return input; + } + } +}