Skip to content

Commit

Permalink
fix: Fix PubsubLiteUnboundedSource to create n partitions not partiti…
Browse files Browse the repository at this point in the history
…ons of n size (#313)
  • Loading branch information
dpcollins-google committed Oct 21, 2020
1 parent ed9d961 commit dbebc4b
Showing 1 changed file with 11 additions and 3 deletions.
Expand Up @@ -30,9 +30,9 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.grpc.StatusException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
Expand All @@ -50,9 +50,17 @@ class PubsubLiteUnboundedSource extends UnboundedSource<SequencedMessage, Offset
@Override
public List<? extends UnboundedSource<SequencedMessage, OffsetCheckpointMark>> split(
int desiredNumSplits, PipelineOptions options) {
ArrayList<ArrayList<Partition>> partitionPartitions = new ArrayList<>(desiredNumSplits);
for (int i = 0; i < desiredNumSplits; i++) {
partitionPartitions.add(new ArrayList<>());
}
int counter = 0;
for (Partition partition : subscriberOptions.partitions()) {
partitionPartitions.get(counter % desiredNumSplits).add(partition);
++counter;
}
ImmutableList.Builder<PubsubLiteUnboundedSource> builder = ImmutableList.builder();
for (List<Partition> partitionSubset :
Iterables.partition(subscriberOptions.partitions(), desiredNumSplits)) {
for (List<Partition> partitionSubset : partitionPartitions) {
if (partitionSubset.isEmpty()) continue;
try {
builder.add(
Expand Down

0 comments on commit dbebc4b

Please sign in to comment.