Skip to content

Commit

Permalink
Feature/dead letter queue core (apache#29164)
Browse files Browse the repository at this point in the history
* Update 2.50 release notes to include new Kafka topicPattern feature

* Create groovy class for io performance tests
Create gradle task and github actions config for GCS using this.

* delete unnecessary class

* fix env call

* fix call to gradle

* run on hosted runner for testing

* add additional checkout

* add destination for triggered tests

* move env variables to correct location

* try uploading against separate dataset

* try without a user

* update branch checkout, try to view the failure log

* run on failure

* update to use correct BigQuery instance

* convert to matrix

* add result reporting

* add failure clause

* remove failure clause, update to run on self-hosted

* address comments, clean up build

* clarify branching

* Add error handling base implementation & test DLQ enabled class

* Add test cases

* apply spotless

* Fix Checkstyles

* Fix Checkstyles

* make DLH serializable

* rename dead letter to bad record

* make DLH serializable

* Change bad record router name, and use multioutputreceiver instead of process context

* Refactor BadRecord to be nested

* clean up checkstyle

* Update error handler test

* Add metrics for counting error records, and for measuring feature usage

* apply spotless

* fix checkstyle

* make metric reporting static

* spotless

* Rework annotations to be an explicit label on a PTransform, instead of using java annotations

* fix checkstyle

* Address comments

* Address comments

* Fix test cases, spotless

* remove flatting without error collections

* fix nullness

* spotless + encoding issues

* spotless

* throw error when error handler isn't used

* add concrete bad record error handler class

* spotless, fix test category

* fix checkstyle

* clean up comments

* fix test case

* remove "failing transform" field on bad record, add note to CHANGES.md

* fix failing test cases

* fix failing test cases

* apply spotless

* apply final comments

* apply final comments

* apply final comments
  • Loading branch information
johnjcasey authored and Naireen committed Jan 3, 2024
1 parent 0d90138 commit 943c4c1
Show file tree
Hide file tree
Showing 11 changed files with 1,005 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
* Running multi-language pipelines locally no longer requires Docker.
Instead, the same (generally auto-started) subprocess used to perform the
expansion can also be used as the cross-language worker.
* Framework for adding Error Handlers to composite transforms added in Java ([#29164](https://github.com/apache/beam/pull/29164))

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.ServiceLoader;
import java.util.Set;
import org.apache.beam.model.pipeline.v1.RunnerApi;
Expand Down Expand Up @@ -502,6 +503,12 @@ public RunnerApi.PTransform translate(
SchemaTranslation.schemaToProto(configRow.getSchema(), true).toByteArray()));
}

for (Entry<String, byte[]> annotation :
appliedPTransform.getTransform().getAnnotations().entrySet()) {
transformBuilder.putAnnotations(
annotation.getKey(), ByteString.copyFrom(annotation.getValue()));
}

return transformBuilder.build();
}
}
Expand Down
24 changes: 24 additions & 0 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables.transform;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
Expand All @@ -43,6 +44,9 @@
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler;
import org.apache.beam.sdk.transforms.resourcehints.ResourceHints;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PBegin;
Expand Down Expand Up @@ -318,6 +322,7 @@ public PipelineResult run(PipelineOptions options) {
LOG.debug("Running {} via {}", this, runner);
try {
validate(options);
validateErrorHandlers();
return runner.run(this);
} catch (UserCodeException e) {
// This serves to replace the stack with one that ends here and
Expand All @@ -343,6 +348,13 @@ public SchemaRegistry getSchemaRegistry() {
return schemaRegistry;
}

public <OutputT extends POutput> BadRecordErrorHandler<OutputT> registerBadRecordErrorHandler(
PTransform<PCollection<BadRecord>, OutputT> sinkTransform) {
BadRecordErrorHandler<OutputT> errorHandler = new BadRecordErrorHandler<>(sinkTransform, this);
errorHandlers.add(errorHandler);
return errorHandler;
}

/////////////////////////////////////////////////////////////////////////////
// Below here are operations that aren't normally called by users.

Expand Down Expand Up @@ -511,6 +523,8 @@ public static <InputT extends PInput, OutputT extends POutput> OutputT applyTran
private final Multimap<String, PTransform<?, ?>> instancePerName = ArrayListMultimap.create();
private final PipelineOptions defaultOptions;

private final List<ErrorHandler<?, ?>> errorHandlers = new ArrayList<>();

private Pipeline(TransformHierarchy transforms, PipelineOptions options) {
this.transforms = transforms;
this.defaultOptions = options;
Expand Down Expand Up @@ -715,4 +729,14 @@ public boolean apply(@Nonnull final Map.Entry<K, Collection<V>> input) {
return input != null && input.getValue().size() == 1;
}
}

private void validateErrorHandlers() {
for (ErrorHandler<?, ?> errorHandler : errorHandlers) {
if (!errorHandler.isClosed()) {
throw new IllegalStateException(
"One or more ErrorHandlers aren't closed, and this pipeline"
+ "cannot be run. See the ErrorHandler documentation for expected usage");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
Expand Down Expand Up @@ -216,6 +217,17 @@ public ResourceHints getResourceHints() {
return resourceHints;
}

/** Returns annotations map to provide additional hints to the runner. */
public Map<String, byte[]> getAnnotations() {
return annotations;
}

public PTransform<InputT, OutputT> addAnnotation(
@NonNull String annotationType, byte @NonNull [] annotation) {
annotations.put(annotationType, annotation);
return this;
}

/////////////////////////////////////////////////////////////////////////////

// See the note about about PTransform's fake Serializability, to
Expand All @@ -229,6 +241,8 @@ public ResourceHints getResourceHints() {

protected transient @NonNull ResourceHints resourceHints = ResourceHints.create();

protected transient @NonNull Map<String, byte @NonNull []> annotations = new HashMap<>();

protected PTransform() {
this.name = null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms.errorhandling;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.auto.value.AutoValue;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.io.Serializable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoValue
@DefaultSchema(AutoValueSchema.class)
public abstract class BadRecord implements Serializable {

private static final Logger LOG = LoggerFactory.getLogger(BadRecord.class);

/** Information about the record that failed. */
public abstract Record getRecord();

/** Information about why the record failed. */
public abstract Failure getFailure();

public static Builder builder() {
return new AutoValue_BadRecord.Builder();
}

public static Coder<BadRecord> getCoder(Pipeline pipeline) {
try {
SchemaRegistry schemaRegistry = pipeline.getSchemaRegistry();
return SchemaCoder.of(
schemaRegistry.getSchema(BadRecord.class),
TypeDescriptor.of(BadRecord.class),
schemaRegistry.getToRowFunction(BadRecord.class),
schemaRegistry.getFromRowFunction(BadRecord.class));
} catch (NoSuchSchemaException e) {
throw new RuntimeException(e);
}
}

public static <RecordT> BadRecord fromExceptionInformation(
RecordT record,
@Nullable Coder<RecordT> coder,
@Nullable Exception exception,
String description)
throws IOException {
Preconditions.checkArgumentNotNull(record);

// Build up record information
BadRecord.Record.Builder recordBuilder = Record.builder();
recordBuilder.addHumanReadableJson(record).addCoderAndEncodedRecord(coder, record);

// Build up failure information
BadRecord.Failure.Builder failureBuilder = Failure.builder().setDescription(description);

// It's possible for us to want to handle an error scenario where no actual exception object
// exists
if (exception != null) {
failureBuilder.setException(exception.toString()).addExceptionStackTrace(exception);
}

return BadRecord.builder()
.setRecord(recordBuilder.build())
.setFailure(failureBuilder.build())
.build();
}

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setRecord(Record record);

public abstract Builder setFailure(Failure error);

public abstract BadRecord build();
}

@AutoValue
@DefaultSchema(AutoValueSchema.class)
public abstract static class Record implements Serializable {

/** The failing record, encoded as JSON. Will be null if serialization as JSON fails. */
public abstract @Nullable String getHumanReadableJsonRecord();

/**
* Nullable to account for failing to encode, or if there is no coder for the record at the time
* of failure.
*/
@SuppressWarnings("mutable")
public abstract byte @Nullable [] getEncodedRecord();

/** The coder for the record, or null if there is no coder. */
public abstract @Nullable String getCoder();

public static Builder builder() {
return new AutoValue_BadRecord_Record.Builder();
}

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setHumanReadableJsonRecord(@Nullable String jsonRecord);

public Builder addHumanReadableJson(Object record) {
ObjectWriter objectWriter = new ObjectMapper().writer().withDefaultPrettyPrinter();
try {
this.setHumanReadableJsonRecord(objectWriter.writeValueAsString(record));
} catch (Exception e) {
LOG.error(
"Unable to serialize record as JSON. Human readable record attempted via .toString",
e);
try {
this.setHumanReadableJsonRecord(record.toString());
} catch (Exception e2) {
LOG.error(
"Unable to serialize record via .toString. Human readable record will be null", e2);
}
}
return this;
}

@SuppressWarnings("mutable")
public abstract Builder setEncodedRecord(byte @Nullable [] encodedRecord);

public abstract Builder setCoder(@Nullable String coder);

public <T> Builder addCoderAndEncodedRecord(@Nullable Coder<T> coder, T record) {
// We will sometimes not have a coder for a failing record, for example if it has already
// been
// modified within the dofn.
if (coder != null) {
this.setCoder(coder.toString());
try {
this.setEncodedRecord(CoderUtils.encodeToByteArray(coder, record));
} catch (IOException e) {
LOG.error(
"Unable to encode failing record using provided coder."
+ " BadRecord will be published without encoded bytes",
e);
}
}
return this;
}

public abstract Record build();
}
}

@AutoValue
@DefaultSchema(AutoValueSchema.class)
public abstract static class Failure implements Serializable {

/** The exception itself, e.g. IOException. Null if there is a failure without an exception. */
public abstract @Nullable String getException();

/** The full stacktrace. Null if there is a failure without an exception. */
public abstract @Nullable String getExceptionStacktrace();

/** The description of what was being attempted when the failure occurred. */
public abstract String getDescription();

public static Builder builder() {
return new AutoValue_BadRecord_Failure.Builder();
}

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setException(@Nullable String exception);

public abstract Builder setExceptionStacktrace(@Nullable String stacktrace);

public Builder addExceptionStackTrace(Exception exception) throws IOException {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
PrintStream printStream = new PrintStream(stream, false, Charsets.UTF_8.name());
exception.printStackTrace(printStream);
printStream.close();

this.setExceptionStacktrace(new String(stream.toByteArray(), Charsets.UTF_8));
return this;
}

public abstract Builder setDescription(String description);

public abstract Failure build();
}
}
}
Loading

0 comments on commit 943c4c1

Please sign in to comment.