Skip to content

Commit

Permalink
fix: simplify AckSetTrackerImpl and make acks after shutdown not caus…
Browse files Browse the repository at this point in the history
…e a permanent error (#872)

* fix: simplify AckSetTrackerImpl and make acks after shutdown not cause a permanent error

* fix: comment
  • Loading branch information
dpcollins-google committed Sep 13, 2021
1 parent 23765cb commit 98ceff0
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 76 deletions.
10 changes: 10 additions & 0 deletions google-cloud-pubsublite/clirr-ignored-differences.xml
Expand Up @@ -132,6 +132,16 @@
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
</difference>
<difference>
<differenceType>4001</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
<to>**</to>
</difference>
<difference>
<differenceType>5001</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
<to>**</to>
</difference>
<difference>
<differenceType>6000</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
Expand Down
Expand Up @@ -24,120 +24,117 @@
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.common.collect.ImmutableList;
import com.google.common.flogger.GoogleLogger;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class AckSetTrackerImpl extends TrivialProxyService implements AckSetTracker {
// Receipt represents an unacked message. It can be cleared, which will cause the ack to be
// ignored.
public class AckSetTrackerImpl extends ProxyService implements AckSetTracker {
private static final GoogleLogger LOGGER = GoogleLogger.forEnclosingClass();

// Receipt represents an unacked message. If the tracker generation is incremented, the ack will
// be ignored.
private static class Receipt {
final Offset offset;
final long generation;

private final CloseableMonitor m = new CloseableMonitor();

@GuardedBy("m.monitor")
private boolean wasAcked = false;
private final AtomicBoolean wasAcked = new AtomicBoolean();

@GuardedBy("m.monitor")
private Optional<AckSetTrackerImpl> tracker;
private final AckSetTrackerImpl tracker;

Receipt(Offset offset, AckSetTrackerImpl tracker) {
Receipt(Offset offset, long generation, AckSetTrackerImpl tracker) {
this.offset = offset;
this.tracker = Optional.of(tracker);
}

void clear() {
try (CloseableMonitor.Hold h = m.enter()) {
tracker = Optional.empty();
}
this.generation = generation;
this.tracker = tracker;
}

void onAck() {
try (CloseableMonitor.Hold h = m.enter()) {
if (!tracker.isPresent()) {
return;
}
if (wasAcked) {
CheckedApiException e =
new CheckedApiException("Duplicate acks are not allowed.", Code.FAILED_PRECONDITION);
tracker.get().onPermanentError(e);
throw e.underlying;
}
wasAcked = true;
tracker.get().onAck(offset);
void onAck() throws ApiException {
if (wasAcked.getAndSet(true)) {
CheckedApiException e =
new CheckedApiException("Duplicate acks are not allowed.", Code.FAILED_PRECONDITION);
tracker.onPermanentError(e);
throw e.underlying;
}
tracker.onAck(offset, generation);
}
}

private final CloseableMonitor monitor = new CloseableMonitor();

@GuardedBy("monitor.monitor")
@GuardedBy("this")
private final Committer committer;

@GuardedBy("monitor.monitor")
@GuardedBy("this")
private final Deque<Receipt> receipts = new ArrayDeque<>();

@GuardedBy("monitor.monitor")
@GuardedBy("this")
private final PriorityQueue<Offset> acks = new PriorityQueue<>();

@GuardedBy("this")
private long generation = 0L;

@GuardedBy("this")
private boolean shutdown = false;

public AckSetTrackerImpl(Committer committer) throws ApiException {
super(committer);
this.committer = committer;
addServices(committer);
}

// AckSetTracker implementation.
@Override
public Runnable track(SequencedMessage message) throws CheckedApiException {
final Offset messageOffset = message.offset();
try (CloseableMonitor.Hold h = monitor.enter()) {
checkArgument(
receipts.isEmpty() || receipts.peekLast().offset.value() < messageOffset.value());
Receipt receipt = new Receipt(messageOffset, this);
receipts.addLast(receipt);
return receipt::onAck;
}
public synchronized Runnable track(SequencedMessage message) throws CheckedApiException {
checkArgument(
receipts.isEmpty() || receipts.peekLast().offset.value() < message.offset().value());
Receipt receipt = new Receipt(message.offset(), generation, this);
receipts.addLast(receipt);
return receipt::onAck;
}

@Override
public void waitUntilCommitted() throws CheckedApiException {
List<Receipt> receiptsCopy;
try (CloseableMonitor.Hold h = monitor.enter()) {
receiptsCopy = ImmutableList.copyOf(receipts);
public synchronized void waitUntilCommitted() throws CheckedApiException {
++generation;
receipts.clear();
acks.clear();
committer.waitUntilEmpty();
}

private synchronized void onAck(Offset offset, long generation) {
if (shutdown) {
LOGGER.atFine().log("Dropping ack after tracker shutdown.");
return;
}
// Clearing receipts here avoids deadlocks due to locks acquired in different order.
receiptsCopy.forEach(Receipt::clear);
try (CloseableMonitor.Hold h = monitor.enter()) {
receipts.clear();
acks.clear();
committer.waitUntilEmpty();
if (generation != this.generation) {
LOGGER.atFine().log("Dropping ack from wrong generation (admin seek occurred).");
return;
}
acks.add(offset);
Optional<Offset> prefixAckedOffset = Optional.empty();
while (!receipts.isEmpty()
&& !acks.isEmpty()
&& receipts.peekFirst().offset.value() == acks.peek().value()) {
prefixAckedOffset = Optional.of(acks.remove());
receipts.removeFirst();
}
// Convert from last acked to first unacked.
if (prefixAckedOffset.isPresent()) {
ApiFuture<?> future = committer.commitOffset(Offset.of(prefixAckedOffset.get().value() + 1));
ExtractStatus.addFailureHandler(future, this::onPermanentError);
}
}

private void onAck(Offset offset) {
try (CloseableMonitor.Hold h = monitor.enter()) {
acks.add(offset);
Optional<Offset> prefixAckedOffset = Optional.empty();
while (!receipts.isEmpty()
&& !acks.isEmpty()
&& receipts.peekFirst().offset.value() == acks.peek().value()) {
prefixAckedOffset = Optional.of(acks.remove());
receipts.removeFirst();
}
// Convert from last acked to first unacked.
if (prefixAckedOffset.isPresent()) {
ApiFuture<?> future =
committer.commitOffset(Offset.of(prefixAckedOffset.get().value() + 1));
ExtractStatus.addFailureHandler(future, this::onPermanentError);
}
}
@Override
protected void start() throws CheckedApiException {}

@Override
protected synchronized void stop() throws CheckedApiException {
shutdown = true;
}

@Override
protected void handlePermanentError(CheckedApiException error) {}
}
Expand Up @@ -131,4 +131,13 @@ public void waitUntilCommittedDiscardsPendingAcks() throws Exception {
ack.run();
verify(committer, never()).commitOffset(any());
}

@Test
public void ackAfterShutdown() throws Exception {
Runnable ack = tracker.track(messageForOffset(1));

tracker.stopAsync().awaitTerminated();
ack.run();
verify(committer, never()).commitOffset(any());
}
}

0 comments on commit 98ceff0

Please sign in to comment.