Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Implement Assigner, which delivers partition assignments to a P…
…artitionAssignmentReceiver (#133) * [feat] Import and build new Pub/Sub Lite assignment protos. Also fix a build issue and reformat all java files. * [feat] Implement Assigner, which delivers partition assignments to a PartitionAssignmentReceiver. * Add a comment to handleInitialResponse
- Loading branch information
1 parent
2d5242f
commit a4485d9
Showing
7 changed files
with
377 additions
and
0 deletions.
There are no files selected for viewing
22 changes: 22 additions & 0 deletions
22
...le-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/Assigner.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
/* | ||
* Copyright 2020 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.pubsublite.internal.wire; | ||
|
||
import com.google.api.core.ApiService; | ||
|
||
/** An Assigner is responsible for handling partition assignments for a subscribing client. */ | ||
public interface Assigner extends ApiService {} |
83 changes: 83 additions & 0 deletions
83
...loud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
/* | ||
* Copyright 2020 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.pubsublite.internal.wire; | ||
|
||
import com.google.cloud.pubsublite.internal.CloseableMonitor; | ||
import com.google.cloud.pubsublite.internal.ProxyService; | ||
import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest; | ||
import com.google.cloud.pubsublite.proto.PartitionAssignment; | ||
import com.google.cloud.pubsublite.proto.PartitionAssignmentRequest; | ||
import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc.PartitionAssignmentServiceStub; | ||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.errorprone.annotations.concurrent.GuardedBy; | ||
import io.grpc.Status; | ||
import io.grpc.StatusException; | ||
|
||
public class AssignerImpl extends ProxyService | ||
implements Assigner, RetryingConnectionObserver<PartitionAssignment> { | ||
@GuardedBy("monitor.monitor") | ||
private final RetryingConnection<ConnectedAssigner> connection; | ||
|
||
@GuardedBy("monitor.monitor") | ||
private final PartitionAssignmentReceiver receiver; | ||
|
||
private final CloseableMonitor monitor = new CloseableMonitor(); | ||
|
||
@VisibleForTesting | ||
AssignerImpl( | ||
PartitionAssignmentServiceStub stub, | ||
ConnectedAssignerFactory factory, | ||
InitialPartitionAssignmentRequest initialRequest, | ||
PartitionAssignmentReceiver receiver) | ||
throws StatusException { | ||
this.receiver = receiver; | ||
this.connection = | ||
new RetryingConnectionImpl<>( | ||
stub::assignPartitions, | ||
factory, | ||
PartitionAssignmentRequest.newBuilder().setInitial(initialRequest).build(), | ||
this); | ||
addServices(this.connection); | ||
} | ||
|
||
@Override | ||
protected void start() {} | ||
|
||
@Override | ||
protected void stop() {} | ||
|
||
@Override | ||
protected void handlePermanentError(StatusException error) {} | ||
|
||
@Override | ||
public void triggerReinitialize() { | ||
try (CloseableMonitor.Hold h = monitor.enter()) { | ||
connection.reinitialize(); | ||
} | ||
} | ||
|
||
@Override | ||
public Status onClientResponse(PartitionAssignment value) { | ||
try (CloseableMonitor.Hold h = monitor.enter()) { | ||
receiver.DeliverAssignment(value); | ||
connection.modifyConnection(connectionOr -> connectionOr.ifPresent(ConnectedAssigner::ack)); | ||
} catch (StatusException e) { | ||
return e.getStatus(); | ||
} | ||
return Status.OK; | ||
} | ||
} |
22 changes: 22 additions & 0 deletions
22
...pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedAssigner.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
/* | ||
* Copyright 2020 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.pubsublite.internal.wire; | ||
|
||
public interface ConnectedAssigner extends AutoCloseable { | ||
// Acknowledge an outstanding assignment. | ||
void ack(); | ||
} |
24 changes: 24 additions & 0 deletions
24
...ite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedAssignerFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* Copyright 2020 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.pubsublite.internal.wire; | ||
|
||
import com.google.cloud.pubsublite.proto.PartitionAssignment; | ||
import com.google.cloud.pubsublite.proto.PartitionAssignmentRequest; | ||
|
||
interface ConnectedAssignerFactory | ||
extends SingleConnectionFactory< | ||
PartitionAssignmentRequest, PartitionAssignment, PartitionAssignment, ConnectedAssigner> {} |
92 changes: 92 additions & 0 deletions
92
...ublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedAssignerImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
/* | ||
* Copyright 2020 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.pubsublite.internal.wire; | ||
|
||
import static com.google.cloud.pubsublite.internal.Preconditions.checkState; | ||
|
||
import com.google.cloud.pubsublite.internal.CloseableMonitor; | ||
import com.google.cloud.pubsublite.proto.PartitionAssignment; | ||
import com.google.cloud.pubsublite.proto.PartitionAssignmentAck; | ||
import com.google.cloud.pubsublite.proto.PartitionAssignmentRequest; | ||
import com.google.errorprone.annotations.concurrent.GuardedBy; | ||
import io.grpc.Status; | ||
import io.grpc.StatusException; | ||
import io.grpc.stub.StreamObserver; | ||
|
||
public class ConnectedAssignerImpl | ||
extends SingleConnection<PartitionAssignmentRequest, PartitionAssignment, PartitionAssignment> | ||
implements ConnectedAssigner { | ||
private final CloseableMonitor monitor = new CloseableMonitor(); | ||
|
||
@GuardedBy("monitor.monitor") | ||
boolean outstanding = false; | ||
|
||
private ConnectedAssignerImpl( | ||
StreamFactory<PartitionAssignmentRequest, PartitionAssignment> streamFactory, | ||
StreamObserver<PartitionAssignment> clientStream, | ||
PartitionAssignmentRequest initialRequest) { | ||
super(streamFactory, clientStream); | ||
initialize(initialRequest); | ||
} | ||
|
||
static class Factory implements ConnectedAssignerFactory { | ||
@Override | ||
public ConnectedAssigner New( | ||
StreamFactory<PartitionAssignmentRequest, PartitionAssignment> streamFactory, | ||
StreamObserver<PartitionAssignment> clientStream, | ||
PartitionAssignmentRequest initialRequest) { | ||
return new ConnectedAssignerImpl(streamFactory, clientStream, initialRequest); | ||
} | ||
} | ||
|
||
// SingleConnection implementation. | ||
@Override | ||
protected Status handleInitialResponse(PartitionAssignment response) { | ||
// The assignment stream is server-initiated by sending a PartitionAssignment. The | ||
// initial response from the server is handled identically to other responses. | ||
return handleStreamResponse(response); | ||
} | ||
|
||
@Override | ||
protected Status handleStreamResponse(PartitionAssignment response) { | ||
try (CloseableMonitor.Hold h = monitor.enter()) { | ||
checkState( | ||
!outstanding, | ||
"Received assignment from the server while there was an assignment outstanding."); | ||
outstanding = true; | ||
} catch (StatusException e) { | ||
return e.getStatus(); | ||
} | ||
sendToClient(response); | ||
return Status.OK; | ||
} | ||
|
||
// ConnectedAssigner implementation. | ||
@Override | ||
public void ack() { | ||
try (CloseableMonitor.Hold h = monitor.enter()) { | ||
checkState(outstanding, "Client acknowledged when there was no request outstanding."); | ||
outstanding = false; | ||
} catch (StatusException e) { | ||
setError(e.getStatus()); | ||
} | ||
sendToStream( | ||
PartitionAssignmentRequest.newBuilder() | ||
.setAck(PartitionAssignmentAck.getDefaultInstance()) | ||
.build()); | ||
} | ||
} |
27 changes: 27 additions & 0 deletions
27
.../src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionAssignmentReceiver.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
/* | ||
* Copyright 2020 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.pubsublite.internal.wire; | ||
|
||
import com.google.cloud.pubsublite.proto.PartitionAssignment; | ||
|
||
/** | ||
* A receiver for partition assignments. All updates to reflect the assignment should be performed | ||
* inline. | ||
*/ | ||
public interface PartitionAssignmentReceiver { | ||
void DeliverAssignment(PartitionAssignment assignment); | ||
} |
107 changes: 107 additions & 0 deletions
107
...-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/AssignerImplTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
/* | ||
* Copyright 2020 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.pubsublite.internal.wire; | ||
|
||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.ArgumentMatchers.eq; | ||
import static org.mockito.Mockito.doAnswer; | ||
import static org.mockito.Mockito.verifyNoMoreInteractions; | ||
|
||
import com.google.api.core.ApiService.Listener; | ||
import com.google.cloud.pubsublite.CloudRegion; | ||
import com.google.cloud.pubsublite.CloudZone; | ||
import com.google.cloud.pubsublite.ProjectNumber; | ||
import com.google.cloud.pubsublite.SubscriptionName; | ||
import com.google.cloud.pubsublite.SubscriptionPaths; | ||
import com.google.cloud.pubsublite.internal.FakeApiService; | ||
import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest; | ||
import com.google.cloud.pubsublite.proto.PartitionAssignment; | ||
import com.google.cloud.pubsublite.proto.PartitionAssignmentRequest; | ||
import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc; | ||
import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc.PartitionAssignmentServiceStub; | ||
import io.grpc.ManagedChannel; | ||
import io.grpc.StatusException; | ||
import io.grpc.inprocess.InProcessChannelBuilder; | ||
import io.grpc.stub.StreamObserver; | ||
import io.grpc.testing.GrpcCleanupRule; | ||
import org.junit.Before; | ||
import org.junit.Rule; | ||
import org.junit.Test; | ||
import org.junit.runner.RunWith; | ||
import org.junit.runners.JUnit4; | ||
import org.mockito.Mock; | ||
import org.mockito.MockitoAnnotations; | ||
import org.mockito.Spy; | ||
|
||
@RunWith(JUnit4.class) | ||
public class AssignerImplTest { | ||
@Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); | ||
|
||
private static PartitionAssignmentRequest initialRequest() { | ||
try { | ||
return PartitionAssignmentRequest.newBuilder() | ||
.setInitial( | ||
InitialPartitionAssignmentRequest.newBuilder() | ||
.setSubscription( | ||
SubscriptionPaths.newBuilder() | ||
.setProjectNumber(ProjectNumber.of(12345)) | ||
.setZone(CloudZone.of(CloudRegion.of("us-east1"), 'a')) | ||
.setSubscriptionName(SubscriptionName.of("some_subscription")) | ||
.build() | ||
.value())) | ||
.build(); | ||
} catch (StatusException e) { | ||
throw e.getStatus().asRuntimeException(); | ||
} | ||
} | ||
|
||
abstract static class ConnectedAssignerFakeService extends FakeApiService | ||
implements ConnectedAssigner {} | ||
|
||
@Spy private ConnectedAssignerFakeService connectedAssigner; | ||
@Mock private ConnectedAssignerFactory assignerFactory; | ||
|
||
@Mock private PartitionAssignmentReceiver receiver; | ||
@Mock private Listener permanentErrorHandler; | ||
|
||
private Assigner assigner; | ||
private StreamObserver<PartitionAssignment> leakedResponseObserver; | ||
|
||
@Before | ||
public void setUp() throws Exception { | ||
MockitoAnnotations.initMocks(this); | ||
doAnswer( | ||
args -> { | ||
leakedResponseObserver = args.getArgument(1); | ||
return connectedAssigner; | ||
}) | ||
.when(assignerFactory) | ||
.New(any(), any(), eq(initialRequest())); | ||
ManagedChannel channel = | ||
grpcCleanup.register( | ||
InProcessChannelBuilder.forName("localhost:12345").directExecutor().build()); | ||
PartitionAssignmentServiceStub unusedStub = PartitionAssignmentServiceGrpc.newStub(channel); | ||
assigner = | ||
new AssignerImpl(unusedStub, assignerFactory, initialRequest().getInitial(), receiver); | ||
} | ||
|
||
@Test | ||
public void construct_CallsFactoryNew() { | ||
verifyNoMoreInteractions(assignerFactory); | ||
verifyNoMoreInteractions(connectedAssigner); | ||
} | ||
} |