Skip to content

Commit

Permalink
fix: Remove explicit partitions which are not particularly useful and…
Browse files Browse the repository at this point in the history
… add Watch workaround. (#675)

* fix: Remove explicit partitions which are not particularly useful and add Watch workaround.

Explicit partitions can always be added back, but this reduces the public API surface unless they're needed.

* fix: lint
  • Loading branch information
dpcollins-google committed Jun 8, 2021
1 parent 65ced46 commit 587c91c
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 27 deletions.
Expand Up @@ -185,8 +185,7 @@ private void handleConfig(long partitionCount) {
if (partitionCount < currentSize) {
log.atWarning().log(
"Received an unexpected decrease in partition count. Previous partition count %s, new count %s",
currentSize,
partitionCount);
currentSize, partitionCount);
return;
}
ImmutableMap.Builder<Partition, Publisher<MessageMetadata>> mapBuilder =
Expand Down
Expand Up @@ -34,7 +34,6 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -123,19 +122,8 @@ private TopicPath getTopicPath() {
@Override
public PCollection<SequencedMessage> expand(PBegin input) {
PCollection<SubscriptionPartition> subscriptionPartitions;
if (options.partitions().isEmpty()) {
subscriptionPartitions =
input.apply(new SubscriptionPartitionLoader(getTopicPath(), options.subscriptionPath()));
} else {
subscriptionPartitions =
input.apply(
Create.of(
options.partitions().stream()
.map(
partition ->
SubscriptionPartition.of(options.subscriptionPath(), partition))
.collect(Collectors.toList())));
}
subscriptionPartitions =
input.apply(new SubscriptionPartitionLoader(getTopicPath(), options.subscriptionPath()));

return subscriptionPartitions.apply(
ParDo.of(
Expand Down
Expand Up @@ -41,9 +41,7 @@
import com.google.cloud.pubsublite.v1.CursorServiceSettings;
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
import com.google.common.collect.ImmutableSet;
import java.io.Serializable;
import java.util.Set;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

Expand All @@ -70,11 +68,6 @@ public abstract class SubscriberOptions implements Serializable {
/** Per-partition flow control parameters for this subscription. */
public abstract FlowControlSettings flowControlSettings();

/**
* A set of partitions. If empty, continuously poll the set of partitions using an admin client.
*/
public abstract Set<Partition> partitions();

/**
* The minimum wall time to pass before allowing bundle closure.
*
Expand Down Expand Up @@ -110,7 +103,6 @@ public abstract class SubscriberOptions implements Serializable {
public static Builder newBuilder() {
Builder builder = new AutoValue_SubscriberOptions.Builder();
return builder
.setPartitions(ImmutableSet.of())
.setFlowControlSettings(DEFAULT_FLOW_CONTROL)
.setMinBundleTimeout(MIN_BUNDLE_TIMEOUT);
}
Expand Down Expand Up @@ -205,8 +197,6 @@ public abstract static class Builder {
public abstract Builder setSubscriptionPath(SubscriptionPath path);

// Optional parameters
public abstract Builder setPartitions(Set<Partition> partitions);

public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings);

public abstract Builder setMinBundleTimeout(Duration minBundleTimeout);
Expand Down
Expand Up @@ -86,7 +86,9 @@ public PollResult<Partition> apply(TopicPath element, Context c) {
IntStream.range(0, partitionCount)
.mapToObj(Partition::of)
.collect(Collectors.toList());
return PollResult.incomplete(Instant.now(), partitions);
return PollResult.incomplete(Instant.now(), partitions)
// TODO(BEAM-12459): Remove when this is fixed upstream
.withWatermark(Instant.now());
}
})
.withPollInterval(pollDuration)
Expand Down

0 comments on commit 587c91c

Please sign in to comment.