Skip to content

Commit

Permalink
fix: Two QOL issues with PubsubLiteIO (#552)
Browse files Browse the repository at this point in the history
* fix: Two QOL issues with PubsubLiteIO

- UUIDs generated randomly are not valid UTF-8 strings so they cannot be read from the CPS shim by default
- Restrictions on bundle closing make it hard to test PSL with windowing pipelines

* fix: Two QOL issues with PubsubLiteIO

- UUIDs generated randomly are not valid UTF-8 strings so they cannot be read from the CPS shim by default
- Restrictions on bundle closing make it hard to test PSL with windowing pipelines
  • Loading branch information
dpcollins-google committed Mar 17, 2021
1 parent 39e04e4 commit d4682b6
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 6 deletions.
Expand Up @@ -43,8 +43,6 @@
import org.joda.time.Duration;

class SubscribeTransform extends PTransform<PBegin, PCollection<SequencedMessage>> {
private static final Duration MAX_SLEEP_TIME = Duration.standardMinutes(1);

private final SubscriberOptions options;

SubscribeTransform(SubscriberOptions options) {
Expand Down Expand Up @@ -90,7 +88,7 @@ private RestrictionTracker<OffsetRange, OffsetByteProgress> newRestrictionTracke
initial,
options.getBacklogReader(subscriptionPartition.partition()),
Stopwatch.createUnstarted(),
MAX_SLEEP_TIME.multipliedBy(3).dividedBy(4),
options.minBundleTimeout(),
LongMath.saturatedMultiply(options.flowControlSettings().bytesOutstanding(), 10));
}

Expand Down Expand Up @@ -136,7 +134,8 @@ public PCollection<SequencedMessage> expand(PBegin input) {
return subscriptionPartitions.apply(
ParDo.of(
new PerSubscriptionPartitionSdf(
MAX_SLEEP_TIME,
// Ensure we read for at least 5 seconds more than the bundle timeout.
options.minBundleTimeout().plus(Duration.standardSeconds(5)),
this::newInitialOffsetReader,
this::newRestrictionTracker,
this::newPartitionProcessor,
Expand Down
Expand Up @@ -42,6 +42,7 @@
import java.io.Serializable;
import java.util.Set;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

@AutoValue
public abstract class SubscriberOptions implements Serializable {
Expand All @@ -51,6 +52,8 @@ public abstract class SubscriberOptions implements Serializable {

private static final long MEBIBYTE = 1L << 20;

private static final Duration MIN_BUNDLE_TIMEOUT = Duration.standardMinutes(1);

public static final FlowControlSettings DEFAULT_FLOW_CONTROL =
FlowControlSettings.builder()
.setMessagesOutstanding(Long.MAX_VALUE)
Expand All @@ -69,6 +72,14 @@ public abstract class SubscriberOptions implements Serializable {
*/
public abstract Set<Partition> partitions();

/**
* The minimum wall time to pass before allowing bundle closure.
*
* <p>Setting this to too small of a value will result in increased compute costs and lower
* throughput per byte. Immediate timeouts (Duration.ZERO) may be useful for testing.
*/
public abstract Duration minBundleTimeout();

/**
* A factory to override subscriber creation entirely and delegate to another method. Primarily
* useful for testing.
Expand All @@ -95,7 +106,10 @@ public abstract class SubscriberOptions implements Serializable {

public static Builder newBuilder() {
Builder builder = new AutoValue_SubscriberOptions.Builder();
return builder.setPartitions(ImmutableSet.of()).setFlowControlSettings(DEFAULT_FLOW_CONTROL);
return builder
.setPartitions(ImmutableSet.of())
.setFlowControlSettings(DEFAULT_FLOW_CONTROL)
.setMinBundleTimeout(MIN_BUNDLE_TIMEOUT);
}

public abstract Builder toBuilder();
Expand Down Expand Up @@ -188,6 +202,8 @@ public abstract static class Builder {

public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings);

public abstract Builder setMinBundleTimeout(Duration minBundleTimeout);

// Used in unit tests
abstract Builder setSubscriberFactory(SubscriberFactory subscriberFactory);

Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.google.protobuf.ByteString;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Base64;
import java.util.UUID;
import org.apache.beam.sdk.coders.DefaultCoder;

Expand Down Expand Up @@ -48,6 +49,9 @@ public static Uuid random() {
} catch (IOException e) {
throw new RuntimeException("Should never have an IOException since there is no io.", e);
}
return Uuid.of(output.toByteString());
// Encode to Base64 so the random UUIDs are valid if consumed from the Cloud Pub/Sub client.
return Uuid.of(
ByteString.copyFrom(
Base64.getEncoder().encode(output.toByteString().asReadOnlyByteBuffer())));
}
}

0 comments on commit d4682b6

Please sign in to comment.