Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Batch commit requests #883

Merged
merged 2 commits into from Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,12 +18,12 @@

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import com.google.cloud.pubsublite.internal.ProxyService;
import java.util.List;

// A MultiPartitionSubscriber wraps multiple subscribers into a single ApiService that can be
// interacted with. If any single subscriber fails, all others are stopped.
public class MultiPartitionSubscriber extends TrivialProxyService implements Subscriber {
public class MultiPartitionSubscriber extends ProxyService implements Subscriber {
public static Subscriber of(List<Subscriber> subscribers) throws ApiException {
return new MultiPartitionSubscriber(subscribers);
}
Expand Down
Expand Up @@ -26,13 +26,13 @@
import com.google.cloud.pubsublite.MessageTransformer;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.pubsub.v1.PubsubMessage;

// A WrappingPublisher wraps the wire protocol client with a Cloud Pub/Sub api compliant
// publisher. It encodes a MessageMetadata object in the response string.
public class WrappingPublisher extends TrivialProxyService implements Publisher {
public class WrappingPublisher extends ProxyService implements Publisher {
private final com.google.cloud.pubsublite.internal.Publisher<MessageMetadata> wirePublisher;
private final MessageTransformer<PubsubMessage, Message> transformer;

Expand Down
@@ -0,0 +1,45 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;

public final class MoreApiFutures {
private MoreApiFutures() {}

public static <T> void connectFutures(
ApiFuture<T> source, SettableApiFuture<? super T> toConnect) {
ApiFutures.addCallback(
source,
new ApiFutureCallback<T>() {
@Override
public void onFailure(Throwable throwable) {
toConnect.setException(throwable);
}

@Override
public void onSuccess(T t) {
toConnect.set(t);
}
},
SystemExecutors.getFuturesExecutor());
}
}
Expand Up @@ -28,6 +28,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.flogger.GoogleLogger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -41,7 +42,13 @@ public abstract class ProxyService extends AbstractApiService {
private final List<ApiService> services = new ArrayList<>();
private final AtomicBoolean stoppedOrFailed = new AtomicBoolean(false);

protected ProxyService() {}
protected <T extends ApiService> ProxyService(Collection<T> services) {
addServices(services);
}

protected ProxyService(ApiService... services) throws ApiException {
this(Arrays.asList(services));
}

// Add a new ApiServices to this. Requires that all of them are in state NEW and this is in state
// NEW.
Expand All @@ -59,13 +66,13 @@ protected final void addServices(ApiService... services) throws ApiException {
}

// Method to be called on service start after dependent services start.
protected abstract void start() throws CheckedApiException;
protected void start() throws CheckedApiException {}
// Method to be called on service stop before dependent services stop.
protected abstract void stop() throws CheckedApiException;
protected void stop() throws CheckedApiException {}

// Method to be called for class-specific permanent error handling after trying to stop all other
// services. May not throw.
protected abstract void handlePermanentError(CheckedApiException error);
protected void handlePermanentError(CheckedApiException error) {}

// Tries to stop all dependent services and sets this service into the FAILED state.
protected final void onPermanentError(CheckedApiException error) {
Expand Down

This file was deleted.

Expand Up @@ -22,9 +22,9 @@
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import com.google.cloud.pubsublite.internal.ProxyService;

class ApiExceptionCommitter extends TrivialProxyService implements Committer {
class ApiExceptionCommitter extends ProxyService implements Committer {
private final Committer committer;

ApiExceptionCommitter(Committer committer) throws ApiException {
Expand Down
Expand Up @@ -21,11 +21,11 @@
import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import java.io.IOException;

public class ApiExceptionPublisher<T> extends TrivialProxyService implements Publisher<T> {
public class ApiExceptionPublisher<T> extends ProxyService implements Publisher<T> {
private final Publisher<T> publisher;

ApiExceptionPublisher(Publisher<T> publisher) throws ApiException {
Expand Down
Expand Up @@ -20,7 +20,6 @@

import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiService;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.common.collect.ImmutableList;
Expand All @@ -31,7 +30,7 @@ public class ApiServiceUtils {

private ApiServiceUtils() {}

public static ApiService backgroundResourceAsApiService(BackgroundResource resource) {
public static ApiService autoCloseableAsApiService(AutoCloseable resource) {
return new AbstractApiService() {
@Override
protected void doStart() {
Expand Down
Expand Up @@ -16,13 +16,13 @@

package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.backgroundResourceAsApiService;
import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.autoCloseableAsApiService;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest;
import com.google.cloud.pubsublite.proto.PartitionAssignment;
import com.google.cloud.pubsublite.proto.PartitionAssignmentRequest;
Expand All @@ -33,7 +33,7 @@
import java.util.HashSet;
import java.util.Set;

public class AssignerImpl extends TrivialProxyService
public class AssignerImpl extends ProxyService
implements Assigner, RetryingConnectionObserver<PartitionAssignment> {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

Expand Down Expand Up @@ -72,7 +72,7 @@ public AssignerImpl(
new ConnectedAssignerImpl.Factory(),
initialRequest,
receiver);
addServices(backgroundResourceAsApiService(client));
addServices(autoCloseableAsApiService(client));
}

@Override
Expand Down
@@ -0,0 +1,72 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.AlarmFactory;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.MoreApiFutures;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.Optional;
import java.util.concurrent.Future;

public class BatchingCommitter extends ProxyService implements Committer {
private final Committer underlying;

@GuardedBy("this")
private SettableApiFuture<Void> currentFuture = SettableApiFuture.create();

@GuardedBy("this")
private Optional<Offset> currentOffset = Optional.empty();

BatchingCommitter(Committer underlying, AlarmFactory alarmFactory) {
super(underlying);
this.underlying = underlying;
Future<?> alarm = alarmFactory.newAlarm(this::flush);
addServices(ApiServiceUtils.autoCloseableAsApiService(() -> alarm.cancel(false)));
}

@Override
public synchronized ApiFuture<Void> commitOffset(Offset offset) {
currentOffset = Optional.of(offset);
return currentFuture;
}

@Override
public void waitUntilEmpty() throws CheckedApiException {
flush();
underlying.waitUntilEmpty();
}

@Override
protected void stop() {
flush();
}

private synchronized void flush() {
if (!currentOffset.isPresent()) {
return;
}
ApiFuture<Void> underlyingFuture = underlying.commitOffset(currentOffset.get());
MoreApiFutures.connectFutures(underlyingFuture, currentFuture);
currentOffset = Optional.empty();
currentFuture = SettableApiFuture.create();
}
}
Expand Up @@ -17,7 +17,7 @@
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.CheckedApiPreconditions.checkState;
import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.backgroundResourceAsApiService;
import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.autoCloseableAsApiService;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
Expand Down Expand Up @@ -81,7 +81,7 @@ public CommitterImpl(CursorServiceClient client, InitialCommitCursorRequest requ
stream -> client.streamingCommitCursorCallable().splitCall(stream),
new ConnectedCommitterImpl.Factory(),
request);
addServices(backgroundResourceAsApiService(client));
addServices(autoCloseableAsApiService(client));
}

// ProxyService implementation.
Expand All @@ -94,9 +94,6 @@ protected void handlePermanentError(CheckedApiException error) {
}
}

@Override
protected void start() {}

@Override
protected void stop() {
try (CloseableMonitor.Hold h = monitor.enter()) {
Expand Down
Expand Up @@ -19,8 +19,10 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.AlarmFactory;
import com.google.cloud.pubsublite.proto.InitialCommitCursorRequest;
import com.google.cloud.pubsublite.v1.CursorServiceClient;
import java.time.Duration;

@AutoValue
public abstract class CommitterSettings {
Expand Down Expand Up @@ -54,6 +56,8 @@ public Committer instantiate() {
.setPartition(partition().value())
.build();
return new ApiExceptionCommitter(
new CommitterImpl(serviceClient(), initialCommitCursorRequest));
new BatchingCommitter(
new CommitterImpl(serviceClient(), initialCommitCursorRequest),
AlarmFactory.create(Duration.ofMillis(50))));
}
}
Expand Up @@ -200,9 +200,6 @@ private void handleConfig(long partitionCount) {
}
}

@Override
protected void start() {}

@Override
protected void stop() {
Optional<PartitionsWithRouting> current;
Expand Down
Expand Up @@ -17,7 +17,7 @@
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.CheckedApiPreconditions.checkState;
import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.backgroundResourceAsApiService;
import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.autoCloseableAsApiService;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
Expand Down Expand Up @@ -136,7 +136,7 @@ public PublisherImpl(
Objects.requireNonNull(batchingSettings.getDelayThreshold()).toNanos())),
initialRequest,
batchingSettings);
addServices(backgroundResourceAsApiService(client));
addServices(autoCloseableAsApiService(client));
}

@GuardedBy("monitor.monitor")
Expand Down