Skip to content

Commit

Permalink
feat: Change restriction to OffsetByteRange to allow functioning with…
Browse files Browse the repository at this point in the history
… runnerv2. (#674)

* feat: Change restriction to OffsetByteRange to allow functioning with runnerv2.

* fix: Rebase and fix test

* fix: Rebase
  • Loading branch information
dpcollins-google committed Jun 15, 2021
1 parent 558d505 commit 1749ca9
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 69 deletions.
Expand Up @@ -103,7 +103,8 @@ private void onMessages(MessageResponse response) throws CheckedApiException {
checkState(
response.getMessagesCount() > 0,
String.format(
"Received an empty MessageResponse on stream with initial request %s.", initialRequest));
"Received an empty MessageResponse on stream with initial request %s.",
initialRequest));
List<SequencedMessage> messages =
response.getMessagesList().stream()
.map(SequencedMessage::fromProto)
Expand Down
@@ -0,0 +1,38 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.beam;

import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;

@AutoValue
@DefaultSchema(AutoValueSchema.class)
abstract class OffsetByteRange {
abstract OffsetRange getRange();

abstract long getByteCount();

static OffsetByteRange of(OffsetRange range, long byteCount) {
return new AutoValue_OffsetByteRange(range, byteCount);
}

static OffsetByteRange of(OffsetRange range) {
return of(range, 0);
}
}
Expand Up @@ -26,8 +26,6 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.joda.time.Duration;

Expand All @@ -43,23 +41,26 @@
* received. IMPORTANT: minTrackingTime must be strictly smaller than the SDF read timeout when it
* would return ProcessContinuation.resume().
*/
class OffsetByteRangeTracker extends RestrictionTracker<OffsetRange, OffsetByteProgress>
implements HasProgress {
class OffsetByteRangeTracker extends TrackerWithProgress {
private final TopicBacklogReader backlogReader;
private final Duration minTrackingTime;
private final long minBytesReceived;
private final Stopwatch stopwatch;
private OffsetRange range;
private OffsetByteRange range;
private @Nullable Long lastClaimed;
private long byteCount = 0;

public OffsetByteRangeTracker(
OffsetRange range,
OffsetByteRange range,
TopicBacklogReader backlogReader,
Stopwatch stopwatch,
Duration minTrackingTime,
long minBytesReceived) {
checkArgument(range.getTo() == Long.MAX_VALUE);
checkArgument(
range.getRange().getTo() == Long.MAX_VALUE,
"May only construct OffsetByteRangeTracker with an unbounded range with no progress.");
checkArgument(
range.getByteCount() == 0L,
"May only construct OffsetByteRangeTracker with an unbounded range with no progress.");
this.backlogReader = backlogReader;
this.minTrackingTime = minTrackingTime;
this.minBytesReceived = minBytesReceived;
Expand All @@ -86,32 +87,32 @@ public boolean tryClaim(OffsetByteProgress position) {
position.lastOffset().value(),
lastClaimed);
checkArgument(
toClaim >= range.getFrom(),
toClaim >= range.getRange().getFrom(),
"Trying to claim offset %s before start of the range %s",
toClaim,
range);
// split() has already been called, truncating this range. No more offsets may be claimed.
if (range.getTo() != Long.MAX_VALUE) {
boolean isRangeEmpty = range.getTo() == range.getFrom();
boolean isValidClosedRange = nextOffset() == range.getTo();
if (range.getRange().getTo() != Long.MAX_VALUE) {
boolean isRangeEmpty = range.getRange().getTo() == range.getRange().getFrom();
boolean isValidClosedRange = nextOffset() == range.getRange().getTo();
checkState(
isRangeEmpty || isValidClosedRange,
"Violated class precondition: offset range improperly split. Please report a beam bug.");
return false;
}
lastClaimed = toClaim;
byteCount += position.batchBytes();
range = OffsetByteRange.of(range.getRange(), range.getByteCount() + position.batchBytes());
return true;
}

@Override
public OffsetRange currentRestriction() {
public OffsetByteRange currentRestriction() {
return range;
}

private long nextOffset() {
checkState(lastClaimed == null || lastClaimed < Long.MAX_VALUE);
return lastClaimed == null ? currentRestriction().getFrom() : lastClaimed + 1;
return lastClaimed == null ? currentRestriction().getRange().getFrom() : lastClaimed + 1;
}

/**
Expand All @@ -123,29 +124,33 @@ private boolean receivedEnough() {
if (duration.isLongerThan(minTrackingTime)) {
return true;
}
if (byteCount >= minBytesReceived) {
if (currentRestriction().getByteCount() >= minBytesReceived) {
return true;
}
return false;
}

@Override
public @Nullable SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
public @Nullable SplitResult<OffsetByteRange> trySplit(double fractionOfRemainder) {
// Cannot split a bounded range. This should already be completely claimed.
if (range.getTo() != Long.MAX_VALUE) {
if (range.getRange().getTo() != Long.MAX_VALUE) {
return null;
}
if (!receivedEnough()) {
return null;
}
range = new OffsetRange(currentRestriction().getFrom(), nextOffset());
return SplitResult.of(this.range, new OffsetRange(nextOffset(), Long.MAX_VALUE));
range =
OffsetByteRange.of(
new OffsetRange(currentRestriction().getRange().getFrom(), nextOffset()),
range.getByteCount());
return SplitResult.of(
this.range, OffsetByteRange.of(new OffsetRange(nextOffset(), Long.MAX_VALUE), 0));
}

@Override
@SuppressWarnings("unboxing.of.nullable")
public void checkDone() throws IllegalStateException {
if (range.getFrom() == range.getTo()) {
if (range.getRange().getFrom() == range.getRange().getTo()) {
return;
}
checkState(
Expand All @@ -154,18 +159,18 @@ public void checkDone() throws IllegalStateException {
range);
long lastClaimedNotNull = checkNotNull(lastClaimed);
checkState(
lastClaimedNotNull >= range.getTo() - 1,
lastClaimedNotNull >= range.getRange().getTo() - 1,
"Last attempted offset was %s in range %s, claiming work in [%s, %s) was not attempted",
lastClaimedNotNull,
range,
lastClaimedNotNull + 1,
range.getTo());
range.getRange().getTo());
}

@Override
public Progress getProgress() {
ComputeMessageStatsResponse stats =
this.backlogReader.computeMessageStats(Offset.of(nextOffset()));
return Progress.from(byteCount, stats.getMessageBytes());
return Progress.from(range.getByteCount(), stats.getMessageBytes());
}
}
Expand Up @@ -34,18 +34,14 @@ class PerSubscriptionPartitionSdf extends DoFn<SubscriptionPartition, SequencedM
private final SubscriptionPartitionProcessorFactory processorFactory;
private final SerializableFunction<SubscriptionPartition, InitialOffsetReader>
offsetReaderFactory;
private final SerializableBiFunction<
SubscriptionPartition, OffsetRange, RestrictionTracker<OffsetRange, OffsetByteProgress>>
private final SerializableBiFunction<SubscriptionPartition, OffsetByteRange, TrackerWithProgress>
trackerFactory;
private final SerializableFunction<SubscriptionPartition, Committer> committerFactory;

PerSubscriptionPartitionSdf(
Duration maxSleepTime,
SerializableFunction<SubscriptionPartition, InitialOffsetReader> offsetReaderFactory,
SerializableBiFunction<
SubscriptionPartition,
OffsetRange,
RestrictionTracker<OffsetRange, OffsetByteProgress>>
SerializableBiFunction<SubscriptionPartition, OffsetByteRange, TrackerWithProgress>
trackerFactory,
SubscriptionPartitionProcessorFactory processorFactory,
SerializableFunction<SubscriptionPartition, Committer> committerFactory) {
Expand All @@ -68,7 +64,7 @@ public MonotonicallyIncreasing newWatermarkEstimator(@WatermarkEstimatorState In

@ProcessElement
public ProcessContinuation processElement(
RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
@Element SubscriptionPartition subscriptionPartition,
OutputReceiver<SequencedMessage> receiver)
throws Exception {
Expand Down Expand Up @@ -103,8 +99,18 @@ public OffsetRange getInitialRestriction(@Element SubscriptionPartition subscrip
}

@NewTracker
public RestrictionTracker<OffsetRange, OffsetByteProgress> newTracker(
@Element SubscriptionPartition subscriptionPartition, @Restriction OffsetRange range) {
public TrackerWithProgress newTracker(
@Element SubscriptionPartition subscriptionPartition, @Restriction OffsetByteRange range) {
return trackerFactory.apply(subscriptionPartition, range);
}

@GetSize
public double getSize(
@Element SubscriptionPartition subscriptionPartition,
@Restriction OffsetByteRange restriction) {
if (restriction.getRange().getTo() != Long.MAX_VALUE) {
return restriction.getByteCount();
}
return newTracker(subscriptionPartition, restriction).getProgress().getWorkRemaining();
}
}
Expand Up @@ -33,7 +33,6 @@
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -71,7 +70,7 @@ private Subscriber newSubscriber(

private SubscriptionPartitionProcessor newPartitionProcessor(
SubscriptionPartition subscriptionPartition,
RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
OutputReceiver<SequencedMessage> receiver)
throws ApiException {
checkSubscription(subscriptionPartition);
Expand All @@ -81,13 +80,13 @@ private SubscriptionPartitionProcessor newPartitionProcessor(
consumer ->
newSubscriber(
subscriptionPartition.partition(),
Offset.of(tracker.currentRestriction().getFrom()),
Offset.of(tracker.currentRestriction().getRange().getFrom()),
consumer),
options.flowControlSettings());
}

private RestrictionTracker<OffsetRange, OffsetByteProgress> newRestrictionTracker(
SubscriptionPartition subscriptionPartition, OffsetRange initial) {
private TrackerWithProgress newRestrictionTracker(
SubscriptionPartition subscriptionPartition, OffsetByteRange initial) {
checkSubscription(subscriptionPartition);
return new OffsetByteRangeTracker(
initial,
Expand Down
Expand Up @@ -18,7 +18,6 @@

import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.io.Serializable;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;

Expand All @@ -27,6 +26,6 @@ interface SubscriptionPartitionProcessorFactory extends Serializable {

SubscriptionPartitionProcessor newProcessor(
SubscriptionPartition subscriptionPartition,
RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
OutputReceiver<SequencedMessage> receiver);
}
Expand Up @@ -36,7 +36,6 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
Expand All @@ -45,7 +44,7 @@

class SubscriptionPartitionProcessorImpl extends Listener
implements SubscriptionPartitionProcessor {
private final RestrictionTracker<OffsetRange, OffsetByteProgress> tracker;
private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;
private final OutputReceiver<SequencedMessage> receiver;
private final Subscriber subscriber;
private final SettableFuture<Void> completionFuture = SettableFuture.create();
Expand All @@ -54,7 +53,7 @@ class SubscriptionPartitionProcessorImpl extends Listener

@SuppressWarnings("methodref.receiver.bound.invalid")
SubscriptionPartitionProcessorImpl(
RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
OutputReceiver<SequencedMessage> receiver,
Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory,
FlowControlSettings flowControlSettings) {
Expand Down
@@ -0,0 +1,23 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.beam;

import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;

public abstract class TrackerWithProgress
extends RestrictionTracker<OffsetByteRange, OffsetByteProgress> implements HasProgress {}
Expand Up @@ -59,7 +59,11 @@ public void setUp() {
when(ticker.read()).thenReturn(0L);
tracker =
new OffsetByteRangeTracker(
RANGE, reader, Stopwatch.createUnstarted(ticker), Duration.millis(500), MIN_BYTES);
OffsetByteRange.of(RANGE, 0),
reader,
Stopwatch.createUnstarted(ticker),
Duration.millis(500),
MIN_BYTES);
}

@Test
Expand All @@ -85,11 +89,15 @@ public void getProgressStatsFailure() {
public void claimSplitSuccess() {
assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_000), MIN_BYTES)));
assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(10_000), MIN_BYTES)));
SplitResult<OffsetRange> splits = tracker.trySplit(IGNORED_FRACTION);
assertEquals(RANGE.getFrom(), splits.getPrimary().getFrom());
assertEquals(10_001, splits.getPrimary().getTo());
assertEquals(10_001, splits.getResidual().getFrom());
assertEquals(Long.MAX_VALUE, splits.getResidual().getTo());
SplitResult<OffsetByteRange> splits = tracker.trySplit(IGNORED_FRACTION);
OffsetByteRange primary = splits.getPrimary();
assertEquals(RANGE.getFrom(), primary.getRange().getFrom());
assertEquals(10_001, primary.getRange().getTo());
assertEquals(MIN_BYTES * 2, primary.getByteCount());
OffsetByteRange residual = splits.getResidual();
assertEquals(10_001, residual.getRange().getFrom());
assertEquals(Long.MAX_VALUE, residual.getRange().getTo());
assertEquals(0, residual.getByteCount());
assertEquals(splits.getPrimary(), tracker.currentRestriction());
tracker.checkDone();
assertNull(tracker.trySplit(IGNORED_FRACTION));
Expand All @@ -99,10 +107,10 @@ public void claimSplitSuccess() {
@SuppressWarnings({"dereference.of.nullable", "argument.type.incompatible"})
public void splitWithoutClaimEmpty() {
when(ticker.read()).thenReturn(100000000000000L);
SplitResult<OffsetRange> splits = tracker.trySplit(IGNORED_FRACTION);
assertEquals(RANGE.getFrom(), splits.getPrimary().getFrom());
assertEquals(RANGE.getFrom(), splits.getPrimary().getTo());
assertEquals(RANGE, splits.getResidual());
SplitResult<OffsetByteRange> splits = tracker.trySplit(IGNORED_FRACTION);
assertEquals(RANGE.getFrom(), splits.getPrimary().getRange().getFrom());
assertEquals(RANGE.getFrom(), splits.getPrimary().getRange().getTo());
assertEquals(RANGE, splits.getResidual().getRange());
assertEquals(splits.getPrimary(), tracker.currentRestriction());
tracker.checkDone();
assertNull(tracker.trySplit(IGNORED_FRACTION));
Expand Down

0 comments on commit 1749ca9

Please sign in to comment.