Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Diagnostics Improvements for quorumAckedLSN, CurrentReplicaSetSize, and replicaStatusList #39844

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,11 @@ public void directDiagnosticsOnException() throws Exception {
JsonNode replicaStatusList = storeResult.get("replicaStatusList");
assertThat(replicaStatusList.isArray()).isTrue();
assertThat(replicaStatusList.size()).isGreaterThan(0);
int quorumAcked = storeResult.get("quorumAckedLSN").asInt(-1);
assertThat(quorumAcked).isGreaterThan(0);
int currentReplicaSetSize = storeResult.get("currentReplicaSetSize").asInt(-1);
assertThat(currentReplicaSetSize).isGreaterThan(0);

}
}

Expand Down Expand Up @@ -1222,11 +1227,14 @@ private void validateRntbdStatistics(CosmosDiagnostics cosmosDiagnostics,
assertThat(responseStatisticsList.size()).isGreaterThan(0);
JsonNode storeResult = responseStatisticsList.get(0).get("storeResult");
assertThat(storeResult).isNotNull();

int replicaSetSize = storeResult.get("currentReplicaSetSize").asInt(-1);
assertThat(replicaSetSize).isGreaterThan(0);
JsonNode replicaStatusList = storeResult.get("replicaStatusList");
assertThat(replicaStatusList.isArray()).isTrue();
assertThat(replicaStatusList.size()).isGreaterThan(0);

assertThat(replicaStatusList.size()).isEqualTo(replicaSetSize);
String replicaStatusTxt = replicaStatusList.get(0).asText();
assertThat(replicaStatusTxt.contains("Primary")).isTrue();
assertThat(replicaStatusTxt.contains("Connected")).isTrue();
// validate serviceEndpointStatistics
JsonNode serviceEndpointStatistics = storeResult.get("serviceEndpointStatistics");
assertThat(serviceEndpointStatistics).isNotNull();
Expand Down Expand Up @@ -1502,14 +1510,14 @@ public void negativeE2ETimeoutWithQueryOperation() {

TestItem testItem = TestItem.createNewItem();
container.createItem(testItem).block();

CosmosQueryRequestOptions requestOptions = new CosmosQueryRequestOptions();
requestOptions.setCosmosEndToEndOperationLatencyPolicyConfig(
new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(-1)).build()
);
CosmosPagedFlux<ObjectNode> flux = container.readAllItems(requestOptions, ObjectNode.class);
List<ObjectNode> results = flux.collectList().block();

fail("This should have failed with an exception");
} catch(OperationCancelledException cancelledException) {
assertThat(cancelledException).isNotNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public AddressSelectorWrapper validate() {
}

public AddressSelectorWrapper verifyVesolvePrimaryUriAsyncCount(int count) {
Mockito.verify(addressSelector, Mockito.times(count)).resolvePrimaryUriAsync(Mockito.any(), Mockito.anyBoolean());
Mockito.verify(addressSelector, Mockito.times(count)).resolvePrimaryUriAsync(Mockito.any(), Mockito.anyBoolean(), Mockito.anyMap());
return this;
}

Expand Down Expand Up @@ -314,7 +314,7 @@ public PrimaryReplicaMoveBuilder withPrimaryReplicaMove(Uri primaryURIBeforeForc
}

return Mono.just(primaryURIBeforeForceRefresh);
}).when(addressSelector).resolvePrimaryUriAsync(Mockito.any(RxDocumentServiceRequest.class), Mockito.anyBoolean());
}).when(addressSelector).resolvePrimaryUriAsync(Mockito.any(RxDocumentServiceRequest.class), Mockito.anyBoolean(), Mockito.anyMap());

Mockito.doAnswer((invocation -> {
capture(invocation);
Expand Down Expand Up @@ -482,7 +482,7 @@ public AddressSelectorWrapper build() {
Mockito.doAnswer((invocation) -> {
capture(invocation);
return Mono.just(primaryAddress);
}).when(addressSelector).resolvePrimaryUriAsync(Mockito.any(RxDocumentServiceRequest.class), Mockito.anyBoolean());
}).when(addressSelector).resolvePrimaryUriAsync(Mockito.any(RxDocumentServiceRequest.class), Mockito.anyBoolean(), Mockito.anyMap());

Mockito.doAnswer((invocation -> {
capture(invocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ public void readPrimaryAsync() {
request.requestContext.requestChargeTracker = new RequestChargeTracker();

Mockito.doReturn(Mono.just(primaryURI)).when(addressSelector).resolvePrimaryUriAsync(
Mockito.eq(request) , Mockito.eq(false));
Mockito.eq(request) , Mockito.eq(false), Mockito.anyMap());

StoreResponse storeResponse = Mockito.mock(StoreResponse.class);
Mockito.doReturn(Mono.just(storeResponse)).when(transportClient).invokeResourceOperationAsync(Mockito.eq(primaryURI), Mockito.eq(request));
Expand Down Expand Up @@ -529,7 +529,7 @@ public void readPrimaryAsync_GoneFromReplica() {
request.requestContext.requestChargeTracker = new RequestChargeTracker();

Mockito.doReturn(Mono.just(primaryURI)).when(addressSelector).resolvePrimaryUriAsync(
Mockito.eq(request) , Mockito.eq(false));
Mockito.eq(request) , Mockito.eq(false), Mockito.anyMap());

Mockito.doReturn(Mono.error(ExceptionBuilder.create().asGoneException())).when(transportClient).invokeResourceOperationAsync(Mockito.eq(primaryURI), Mockito.eq(request));
StoreReader storeReader = new StoreReader(transportClient, addressSelector, sessionContainer);
Expand Down Expand Up @@ -586,7 +586,7 @@ public void readPrimaryAsync_Error() {
request.requestContext.requestChargeTracker = new RequestChargeTracker();

Mockito.doReturn(Mono.just(primaryURI)).when(addressSelector).resolvePrimaryUriAsync(
Mockito.eq(request) , Mockito.eq(false));
Mockito.eq(request) , Mockito.eq(false), Mockito.anyMap());

StoreResponse storeResponse = Mockito.mock(StoreResponse.class);
Mockito.doReturn(Mono.just(storeResponse)).when(transportClient).invokeResourceOperationAsync(Mockito.eq(primaryURI), Mockito.eq(request));
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.59.0-beta.1 (Unreleased)

#### Features Added
* Added diagnostic fields for `quorumAckedLSN` and `currentReplicaSetSize`. Changed `replicaStatusList` to include all replicas and more information. - See [PR 39844](https://github.com/Azure/azure-sdk-for-java/pull/39844)
tvaron3 marked this conversation as resolved.
Show resolved Hide resolved
* Added public APIs `getCustomeSerializer` and `setCustomSerializer` to allow customers to specify custom payload transformations or serialization settings. - See [PR 38997](https://github.com/Azure/azure-sdk-for-java/pull/38997)

#### Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.util.Arrays;
import java.util.List;
import java.util.Map;
tvaron3 marked this conversation as resolved.
Show resolved Hide resolved
import java.util.stream.Collectors;

public class AddressSelector {
Expand All @@ -31,7 +32,12 @@ public Mono<List<Uri>> resolveAllUriAsync(
boolean forceRefresh) {
Mono<List<AddressInformation>> allReplicaAddressesObs = this.resolveAddressesAsync(request, forceRefresh);
return allReplicaAddressesObs.map(allReplicaAddresses -> allReplicaAddresses.stream().filter(a -> includePrimary || !a.isPrimary())
.map(a -> a.getPhysicalUri()).collect(Collectors.toList()));
.map(a -> {
if (includePrimary && a.isPrimary()) {
a.getPhysicalUri().setHealthStatusTuplePrimary(true);
}
return a.getPhysicalUri();
}).collect(Collectors.toList()));
}

public Mono<Uri> resolvePrimaryUriAsync(RxDocumentServiceRequest request, boolean forceAddressRefresh) {
Expand All @@ -45,6 +51,20 @@ public Mono<Uri> resolvePrimaryUriAsync(RxDocumentServiceRequest request, boolea
});
}

public Mono<Uri> resolvePrimaryUriAsync(RxDocumentServiceRequest request, boolean forceAddressRefresh, Map<Uri, String> replicaStatuses) {
tvaron3 marked this conversation as resolved.
Show resolved Hide resolved
Mono<List<AddressInformation>> replicaAddressesObs = this.resolveAddressesAsync(request, forceAddressRefresh);
return replicaAddressesObs.flatMap(replicaAddresses -> {
try {
replicaAddresses.forEach(replica -> {
replicaStatuses.put(replica.getPhysicalUri(), replica.getPhysicalUri().getHealthStatusDiagnosticString());
});
return Mono.just(AddressSelector.getPrimaryUri(request, replicaAddresses));
} catch (Exception e) {
return Mono.error(e);
}
});
}

public static Uri getPrimaryUri(RxDocumentServiceRequest request, List<AddressInformation> replicaAddresses) throws GoneException {
AddressInformation primaryAddress = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -167,11 +170,16 @@ Mono<StoreResponse> writePrivateAsync(
Mono<List<AddressInformation>> replicaAddressesObs = this.addressSelector.resolveAddressesAsync(request, forceRefresh);
AtomicReference<Uri> primaryURI = new AtomicReference<>();
AtomicReference<List<String>> replicaStatusList = new AtomicReference<>();
Map<Uri, String> replicaStatuses = new ConcurrentHashMap<>();

return replicaAddressesObs.flatMap(replicaAddresses -> {
try {
List<URI> contactedReplicas = new ArrayList<>();
replicaAddresses.forEach(replicaAddress -> contactedReplicas.add(replicaAddress.getPhysicalUri().getURI()));
replicaAddresses.forEach(replicaAddress -> {
Uri uri = replicaAddress.getPhysicalUri();
contactedReplicas.add(uri.getURI());
replicaStatuses.put(uri, uri.getHealthStatusDiagnosticString());
tvaron3 marked this conversation as resolved.
Show resolved Hide resolved
});
BridgeInternal.setContactedReplicas(request.requestContext.cosmosDiagnostics, contactedReplicas);
return Mono.just(AddressSelector.getPrimaryUri(request, replicaAddresses));
} catch (GoneException e) {
Expand All @@ -196,8 +204,10 @@ Mono<StoreResponse> writePrivateAsync(
} catch (Exception e) {
return Mono.error(e);
}

replicaStatusList.set(Arrays.asList(primaryUri.getHealthStatusDiagnosticString()));
primaryUri.setHealthStatusTuplePrimary(true);
primaryUri.setHealthStatusTupleAttempting(true);
replicaStatuses.replace(primaryUri, primaryUri.getHealthStatusDiagnosticString());
replicaStatusList.set(new ArrayList<>(replicaStatuses.values()));

return this.transportClient.invokeResourceOperationAsync(primaryUri, request)
.doOnError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -216,16 +219,20 @@ private Flux<List<StoreResult>> readFromReplicas(List<StoreResult> resultCollect
return Flux.error(new GoneException());
}
List<Pair<Flux<StoreResponse>, Uri>> readStoreTasks = new ArrayList<>();

Map<Uri, String> replicaStatuses = new HashMap<>();
resolveApiResults.forEach(uri -> {
replicaStatuses.put(uri, uri.getHealthStatusDiagnosticString());
});
List<Uri> addressRandomPermutation = AddressEnumerator.getTransportAddresses(entity, resolveApiResults);

// The health status of the Uri will change as the time goes by
// what we really want to track is the health status snapshot at this moment
List<String> replicaStatusList =
addressRandomPermutation
.stream()
.map(uri -> uri.getHealthStatusDiagnosticString())
.collect(Collectors.toList());
addressRandomPermutation.forEach(uri -> {
uri.setHealthStatusTupleAttempting(true);
replicaStatuses.replace(uri, uri.getHealthStatusDiagnosticString());
});
// how to add primary
List<String> replicaStatusList = new ArrayList<>(replicaStatuses.values());

int startIndex = 0;

Expand Down Expand Up @@ -563,9 +570,10 @@ private Mono<ReadReplicaResult> readPrimaryInternalAsync(
return Mono.error(new GoneException());
}

Map<Uri, String> replicaStatuses = new ConcurrentHashMap<>();
Mono<Uri> primaryUriObs = this.addressSelector.resolvePrimaryUriAsync(
entity,
entity.requestContext.forceRefreshAddressCache);
entity.requestContext.forceRefreshAddressCache, replicaStatuses);

AtomicReference<List<String>> replicaStatusList = new AtomicReference<>();

Expand All @@ -588,7 +596,9 @@ private Mono<ReadReplicaResult> readPrimaryInternalAsync(
this.readFromStoreAsync(
primaryUri,
entity);
replicaStatusList.set(Arrays.asList(primaryUri.getHealthStatusDiagnosticString()));
primaryUri.setHealthStatusTuplePrimary(true);
replicaStatuses.replace(primaryUri, primaryUri.getHealthStatusDiagnosticString());
replicaStatusList.set(new ArrayList<>(replicaStatuses.values()));

return storeResponseObsAndUri.getLeft().flatMap(
storeResponse -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ public void serialize(StoreResultDiagnostics storeResultDiagnostics,
"storePhysicalAddress",
storeResultDiagnostics.storePhysicalAddress == null ? null : storeResultDiagnostics.storePhysicalAddress.getURIAsString());
jsonGenerator.writeNumberField("lsn", storeResultDiagnostics.lsn);
jsonGenerator.writeNumberField("quorumAckedLSN",storeResultDiagnostics.quorumAckedLSN);
jsonGenerator.writeNumberField("currentReplicaSetSize", storeResultDiagnostics.currentReplicaSetSize);
jsonGenerator.writeNumberField("globalCommittedLsn", storeResultDiagnostics.globalCommittedLSN);
jsonGenerator.writeStringField("partitionKeyRangeId", storeResponseDiagnostics.getPartitionKeyRangeId());
jsonGenerator.writeBooleanField("isValid", storeResultDiagnostics.isValid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.cosmos.implementation.directconnectivity;

import com.azure.cosmos.implementation.cpu.CpuMemoryReader;
import com.azure.cosmos.implementation.directconnectivity.addressEnumerator.AddressEnumerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -183,7 +184,25 @@ public boolean shouldRefreshHealthStatus() {
}

public String getHealthStatusDiagnosticString() {
return this.healthStatusTuple.get().diagnsoticString;
return this.healthStatusTuple.get().getDiagnsoticString();
tvaron3 marked this conversation as resolved.
Show resolved Hide resolved
}

public void setHealthStatusTupleAttempting(boolean attempting) {
tvaron3 marked this conversation as resolved.
Show resolved Hide resolved
this.healthStatusTuple.updateAndGet(previousStatusTuple ->
{
previousStatusTuple.setAttempting(attempting);
return previousStatusTuple;
}
);
}

public void setHealthStatusTuplePrimary(boolean primary) {
tvaron3 marked this conversation as resolved.
Show resolved Hide resolved
this.healthStatusTuple.updateAndGet(previousStatusTuple ->
{
previousStatusTuple.setPrimary(primary);
return previousStatusTuple;
}
);
}

@Override
Expand Down Expand Up @@ -230,12 +249,31 @@ public int getPriority() {
}

static class HealthStatusAndDiagnosticStringTuple {
private final String diagnsoticString;
private final HealthStatus status;
private final String ATTEMPTING = "Attempting";
private final String NOT_ATTEMPTING = "NotAttempting";
tvaron3 marked this conversation as resolved.
Show resolved Hide resolved
private final String PRIMARY = "Primary";
private final String SECONDARY = "Secondary";
private final URI uri;
private boolean isAttempting;
private boolean isPrimary;

public HealthStatusAndDiagnosticStringTuple(URI uri, HealthStatus status) {
tvaron3 marked this conversation as resolved.
Show resolved Hide resolved
this.diagnsoticString = uri.getPort() + ":" + status;
this.uri = uri;
this.status = status;
}

public String getDiagnsoticString() {
return uri.getPort() + ":" + (isPrimary ? PRIMARY : SECONDARY) + ":" +
tvaron3 marked this conversation as resolved.
Show resolved Hide resolved
status + "|" + (isAttempting ? ATTEMPTING : NOT_ATTEMPTING);
}

public void setAttempting(boolean attempting) {
isAttempting = attempting;
}

public void setPrimary(boolean primary) {
isPrimary = primary;
}
}
}