Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rollback Bigtable throttling counter #32442

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@
import static org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.google.api.gax.batching.BatchingException;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.api.gax.rpc.NotFoundException;
import com.google.api.gax.rpc.ResourceExhaustedException;
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.Mutation;
Expand All @@ -41,7 +38,6 @@
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.protobuf.ByteString;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -78,8 +74,6 @@
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
Expand Down Expand Up @@ -1121,52 +1115,27 @@ public Write withMaxOutstandingBytes(long bytes) {
* always enabled on batch writes and limits the number of outstanding requests to the Bigtable
* server.
*
* <p>When enabled, will also set default {@link #withThrottlingReportTargetMs} to 1 minute.
* This enables runner react with increased latency in flush call due to flow control.
*
* <p>Does not modify this object.
*/
public Write withFlowControl(boolean enableFlowControl) {
BigtableWriteOptions options = getBigtableWriteOptions();
BigtableWriteOptions.Builder builder = options.toBuilder().setFlowControl(enableFlowControl);
if (enableFlowControl) {
builder = builder.setThrottlingReportTargetMs(60_000);
}
return toBuilder().setBigtableWriteOptions(builder.build()).build();
return toBuilder()
.setBigtableWriteOptions(options.toBuilder().setFlowControl(enableFlowControl).build())
.build();
}

/**
* Returns a new {@link BigtableIO.Write} with client side latency based throttling enabled.
*
* <p>Will also set {@link #withThrottlingReportTargetMs} to the same value.
*/
/** Returns a new {@link BigtableIO.Write} with client side latency based throttling enabled. */
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withThrottlingTargetMs can be preserved. It was a config existed in BigtableWriteOptions but missing the correspondent in transform configuration. It was a inconsistency in BigtableIO configuration setting and not related to throttling counter change.

public Write withThrottlingTargetMs(int throttlingTargetMs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you actually remove a public method? I suspect that you need to make it a no-op that logs a warning

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I can and we should.

Only user actively set this method in their 2.59.0 pipeline will be affected. Users do not explicitly set this method, and upgrade from <=2.58.0 are unaffected. For those indeed use this method in 2.59.0 they have good reason to do so. We are removing a tested and functioning feature on request, make it no-op or warning then user won't aware this feature is gone in the next release.

In the past Beam has an "Experimental" annotation but voted to remove them (#26490). Now "new code is changeable/evolving by default" (see discussion link in that PR) especially true for a new API in single version not enabled by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the conceptual "binary compatibility" is a concern, I've add back the method, but throw instead of no-op or ignore there

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I was concerned about breaking binary compatibility. I

I dont see it throwing an exception though, nor do I think it should throw an exception. Please make it a no-op and log a warning instead.
Also we didnt expose ThrottlingTargetMs nor latency based throttling for a reason, it has some really sharp edge cases that are very hard to debug. Please make this a no-op as it was prior to your change

BigtableWriteOptions options = getBigtableWriteOptions();
return toBuilder()
.setBigtableWriteOptions(
options
.toBuilder()
.setThrottlingTargetMs(throttlingTargetMs)
.setThrottlingReportTargetMs(throttlingTargetMs)
.build())
options.toBuilder().setThrottlingTargetMs(throttlingTargetMs).build())
.build();
}

/**
* Returns a new {@link BigtableIO.Write} with throttling time reporting enabled. When write
* request latency exceeded the set value, the amount greater than the target will be considered
* as throttling time and report back to runner.
*
* <p>If not set, defaults to 3 min for completed batch request. Client side flowing control
* configurations (e.g. {@link #withFlowControl}, {@link #withThrottlingTargetMs} will adjust
* the default value accordingly. Set to 0 to disable throttling time reporting.
*/
/** This configuration is removed in Beam 2.60.0, Do not use. */
public Write withThrottlingReportTargetMs(int throttlingReportTargetMs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please dont throw an exception, instead log a warning. We dont want to be in position where are user started using this flag in 2.59 and then their jobs start failing when they upgrade to 2.60. In other words throwing an UnsupportedOperationException is effectively the same as breaking binary compatibility

BigtableWriteOptions options = getBigtableWriteOptions();
return toBuilder()
.setBigtableWriteOptions(
options.toBuilder().setThrottlingReportTargetMs(throttlingReportTargetMs).build())
.build();
throw new UnsupportedOperationException("withThrottlingReportTargetMs is removed");
}

public Write withErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandler) {
Expand Down Expand Up @@ -1333,16 +1302,8 @@ private static class BigtableWriterFn
private final BigtableServiceFactory.ConfigId id;
private final Coder<KV<ByteString, Iterable<Mutation>>> inputCoder;
private final BadRecordRouter badRecordRouter;

private final Counter throttlingMsecs =
Metrics.counter(Metrics.THROTTLE_TIME_NAMESPACE, Metrics.THROTTLE_TIME_COUNTER_NAME);

private final int throttleReportThresMsecs;

private transient ConcurrentLinkedQueue<KV<BigtableWriteException, BoundedWindow>> badRecords =
null;
// Due to callback thread not supporting Beam metrics, Record pending metrics and report later.
private transient long pendingThrottlingMsecs;
private transient boolean reportedLineage;

// Assign serviceEntry in startBundle and clear it in tearDown.
Expand All @@ -1363,8 +1324,6 @@ private static class BigtableWriterFn
this.badRecordRouter = badRecordRouter;
this.failures = new ConcurrentLinkedQueue<>();
this.id = factory.newId();
// a request completed more than this time will be considered throttled. Disabled if set to 0
throttleReportThresMsecs = firstNonNull(writeOptions.getThrottlingReportTargetMs(), 180_000);
LOG.debug("Created Bigtable Write Fn with writeOptions {} ", writeOptions);
}

Expand Down Expand Up @@ -1393,18 +1352,13 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except
drainCompletedElementFutures();
checkForFailures();
KV<ByteString, Iterable<Mutation>> record = c.element();
Instant writeStart = Instant.now();
pendingThrottlingMsecs = 0;
CompletableFuture<Void> f =
bigtableWriter
.writeRecord(record)
// transform the next CompletionStage to have its own status
// this allows us to capture any unexpected errors in the handler
.handle(handleMutationException(record, window, writeStart));
.handle(handleMutationException(record, window));
outstandingWrites.add(f);
if (pendingThrottlingMsecs > 0) {
throttlingMsecs.inc(pendingThrottlingMsecs);
}
++recordsWritten;
seenWindows.compute(window, (key, count) -> (count != null ? count : 0) + 1);
}
Expand All @@ -1420,39 +1374,14 @@ private void drainCompletedElementFutures() throws ExecutionException, Interrupt
}

private BiFunction<MutateRowResponse, Throwable, Void> handleMutationException(
KV<ByteString, Iterable<Mutation>> record, BoundedWindow window, Instant writeStart) {
KV<ByteString, Iterable<Mutation>> record, BoundedWindow window) {
return (MutateRowResponse result, Throwable exception) -> {
if (exception != null) {
if (isDataException(exception)) {
retryIndividualRecord(record, window);
} else {
// Exception due to resource unavailable or rate limited,
// including DEADLINE_EXCEEDED and RESOURCE_EXHAUSTED.
boolean isResourceException = false;
if (exception instanceof StatusRuntimeException) {
StatusRuntimeException se = (StatusRuntimeException) exception;
if (io.grpc.Status.DEADLINE_EXCEEDED.equals(se.getStatus())
|| io.grpc.Status.RESOURCE_EXHAUSTED.equals(se.getStatus())) {
isResourceException = true;
}
} else if (exception instanceof DeadlineExceededException
|| exception instanceof ResourceExhaustedException) {
isResourceException = true;
}
if (isResourceException) {
pendingThrottlingMsecs = new Duration(writeStart, Instant.now()).getMillis();
}
failures.add(new BigtableWriteException(record, exception));
}
} else {
// add the excessive amount to throttling metrics if elapsed time > target latency
if (throttleReportThresMsecs > 0) {
long excessTime =
new Duration(writeStart, Instant.now()).getMillis() - throttleReportThresMsecs;
if (excessTime > 0) {
pendingThrottlingMsecs = excessTime;
}
}
}
return null;
};
Expand Down Expand Up @@ -1489,7 +1418,6 @@ private static boolean isDataException(Throwable e) {
@FinishBundle
public void finishBundle(FinishBundleContext c) throws Exception {
if (bigtableWriter != null) {
Instant closeStart = Instant.now();
try {
bigtableWriter.close();
} catch (IOException e) {
Expand All @@ -1498,7 +1426,6 @@ public void finishBundle(FinishBundleContext c) throws Exception {
// to the error queue. Bigtable will successfully write other failures in the batch,
// so this exception should be ignored
if (!(e.getCause() instanceof BatchingException)) {
throttlingMsecs.inc(new Duration(closeStart, Instant.now()).getMillis());
throw e;
}
}
Expand All @@ -1514,14 +1441,6 @@ public void finishBundle(FinishBundleContext c) throws Exception {
e);
}

// add the excessive amount to throttling metrics if elapsed time > target latency
if (throttleReportThresMsecs > 0) {
long excessTime =
new Duration(closeStart, Instant.now()).getMillis() - throttleReportThresMsecs;
if (excessTime > 0) {
throttlingMsecs.inc(excessTime);
}
}
if (!reportedLineage) {
bigtableWriter.reportLineage();
reportedLineage = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ abstract class BigtableWriteOptions implements Serializable {
/** Returns the target latency if latency based throttling is enabled. */
abstract @Nullable Integer getThrottlingTargetMs();

/** Returns the target latency if latency based throttling report to runner is enabled. */
abstract @Nullable Integer getThrottlingReportTargetMs();

/** Returns true if batch write flow control is enabled. Otherwise return false. */
abstract @Nullable Boolean getFlowControl();

Expand Down Expand Up @@ -91,8 +88,6 @@ abstract static class Builder {

abstract Builder setThrottlingTargetMs(int targetMs);

abstract Builder setThrottlingReportTargetMs(int targetMs);

abstract Builder setFlowControl(boolean enableFlowControl);

abstract Builder setCloseWaitTimeout(Duration timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,21 +436,6 @@ public void testWriteValidationFailsMissingOptionsAndInstanceAndProject() {
write.expand(null);
}

@Test
public void testWriteClientRateLimitingAlsoSetReportMsecs() {
// client side flow control
BigtableIO.Write write = BigtableIO.write().withTableId("table").withFlowControl(true);
assertEquals(
60_000, (int) checkNotNull(write.getBigtableWriteOptions().getThrottlingReportTargetMs()));

// client side latency based throttling
int targetMs = 30_000;
write = BigtableIO.write().withTableId("table").withThrottlingTargetMs(targetMs);
assertEquals(
targetMs,
(int) checkNotNull(write.getBigtableWriteOptions().getThrottlingReportTargetMs()));
}

/** Helper function to make a single row mutation to be written. */
private static KV<ByteString, Iterable<Mutation>> makeWrite(String key, String value) {
ByteString rowKey = ByteString.copyFromUtf8(key);
Expand Down
Loading