Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Diagnostics Improvements for quorumAckedLSN, CurrentReplicaSetSize, and replicaStatusList #39844

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -983,9 +983,12 @@ public void directDiagnosticsOnException() throws Exception {
assertThat(responseStatisticsList.size()).isGreaterThan(0);
JsonNode storeResult = responseStatisticsList.get(0).get("storeResult");
assertThat(storeResult).isNotNull();
int currentReplicaSetSize = storeResult.get("currentReplicaSetSize").asInt(-1);
assertThat(currentReplicaSetSize).isEqualTo(-1);
JsonNode replicaStatusList = storeResult.get("replicaStatusList");
assertThat(replicaStatusList.isArray()).isTrue();
assertThat(replicaStatusList.size()).isGreaterThan(0);
assertThat(replicaStatusList.isObject()).isTrue();
int quorumAcked = storeResult.get("quorumAckedLSN").asInt(-1);
assertThat(quorumAcked).isEqualTo(-1);
}
}

Expand Down Expand Up @@ -1228,11 +1231,15 @@ private void validateRntbdStatistics(CosmosDiagnostics cosmosDiagnostics,
assertThat(responseStatisticsList.size()).isGreaterThan(0);
JsonNode storeResult = responseStatisticsList.get(0).get("storeResult");
assertThat(storeResult).isNotNull();

int replicaSetSize = storeResult.get("currentReplicaSetSize").asInt(-1);
assertThat(replicaSetSize).isGreaterThan(0);
JsonNode replicaStatusList = storeResult.get("replicaStatusList");
assertThat(replicaStatusList.isArray()).isTrue();
assertThat(replicaStatusList.size()).isGreaterThan(0);

assertThat(replicaStatusList.isObject()).isTrue();
int replicasNum = replicaStatusList.get(Uri.ATTEMPTING).size() + replicaStatusList.get(Uri.IGNORING).size();
assertThat(replicasNum).isEqualTo(replicaSetSize);
String replicaStatusTxt = replicaStatusList.get(Uri.ATTEMPTING).get(0).asText();
assertThat(replicaStatusTxt.contains("P")).isTrue();
assertThat(replicaStatusTxt.contains("Connected")).isTrue();
// validate serviceEndpointStatistics
JsonNode serviceEndpointStatistics = storeResult.get("serviceEndpointStatistics");
assertThat(serviceEndpointStatistics).isNotNull();
Expand Down Expand Up @@ -1508,14 +1515,14 @@ public void negativeE2ETimeoutWithQueryOperation() {

TestItem testItem = TestItem.createNewItem();
container.createItem(testItem).block();

CosmosQueryRequestOptions requestOptions = new CosmosQueryRequestOptions();
requestOptions.setCosmosEndToEndOperationLatencyPolicyConfig(
new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(-1)).build()
);
CosmosPagedFlux<ObjectNode> flux = container.readAllItems(requestOptions, ObjectNode.class);
List<ObjectNode> results = flux.collectList().block();

fail("This should have failed with an exception");
} catch(OperationCancelledException cancelledException) {
assertThat(cancelledException).isNotNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public AddressSelectorWrapper validate() {
}

public AddressSelectorWrapper verifyVesolvePrimaryUriAsyncCount(int count) {
Mockito.verify(addressSelector, Mockito.times(count)).resolvePrimaryUriAsync(Mockito.any(), Mockito.anyBoolean());
Mockito.verify(addressSelector, Mockito.times(count)).resolvePrimaryUriAsync(Mockito.any(), Mockito.anyBoolean(), Mockito.any());
return this;
}

Expand Down Expand Up @@ -314,7 +314,7 @@ public PrimaryReplicaMoveBuilder withPrimaryReplicaMove(Uri primaryURIBeforeForc
}

return Mono.just(primaryURIBeforeForceRefresh);
}).when(addressSelector).resolvePrimaryUriAsync(Mockito.any(RxDocumentServiceRequest.class), Mockito.anyBoolean());
}).when(addressSelector).resolvePrimaryUriAsync(Mockito.any(RxDocumentServiceRequest.class), Mockito.anyBoolean(), Mockito.any());

Mockito.doAnswer((invocation -> {
capture(invocation);
Expand Down Expand Up @@ -482,7 +482,7 @@ public AddressSelectorWrapper build() {
Mockito.doAnswer((invocation) -> {
capture(invocation);
return Mono.just(primaryAddress);
}).when(addressSelector).resolvePrimaryUriAsync(Mockito.any(RxDocumentServiceRequest.class), Mockito.anyBoolean());
}).when(addressSelector).resolvePrimaryUriAsync(Mockito.any(RxDocumentServiceRequest.class), Mockito.anyBoolean(), Mockito.any());

Mockito.doAnswer((invocation -> {
capture(invocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@
import reactor.core.publisher.Mono;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -498,7 +502,7 @@ public void readPrimaryAsync() {
request.requestContext.requestChargeTracker = new RequestChargeTracker();

Mockito.doReturn(Mono.just(primaryURI)).when(addressSelector).resolvePrimaryUriAsync(
Mockito.eq(request) , Mockito.eq(false));
Mockito.eq(request) , Mockito.eq(false), Mockito.any());

StoreResponse storeResponse = Mockito.mock(StoreResponse.class);
Mockito.doReturn(Mono.just(storeResponse)).when(transportClient).invokeResourceOperationAsync(Mockito.eq(primaryURI), Mockito.eq(request));
Expand Down Expand Up @@ -529,7 +533,7 @@ public void readPrimaryAsync_GoneFromReplica() {
request.requestContext.requestChargeTracker = new RequestChargeTracker();

Mockito.doReturn(Mono.just(primaryURI)).when(addressSelector).resolvePrimaryUriAsync(
Mockito.eq(request) , Mockito.eq(false));
Mockito.eq(request) , Mockito.eq(false), Mockito.any());

Mockito.doReturn(Mono.error(ExceptionBuilder.create().asGoneException())).when(transportClient).invokeResourceOperationAsync(Mockito.eq(primaryURI), Mockito.eq(request));
StoreReader storeReader = new StoreReader(transportClient, addressSelector, sessionContainer);
Expand Down Expand Up @@ -586,7 +590,7 @@ public void readPrimaryAsync_Error() {
request.requestContext.requestChargeTracker = new RequestChargeTracker();

Mockito.doReturn(Mono.just(primaryURI)).when(addressSelector).resolvePrimaryUriAsync(
Mockito.eq(request) , Mockito.eq(false));
Mockito.eq(request) , Mockito.eq(false), Mockito.any());

StoreResponse storeResponse = Mockito.mock(StoreResponse.class);
Mockito.doReturn(Mono.just(storeResponse)).when(transportClient).invokeResourceOperationAsync(Mockito.eq(primaryURI), Mockito.eq(request));
Expand Down Expand Up @@ -636,13 +640,15 @@ public void canParseLongLsn() {
.withGlobalCommittedLsn(bigLsn)
.build();

Map<String, Set<String>> replicaStatusList = new HashMap<>();
replicaStatusList.put(Uri.ATTEMPTING, new HashSet<>(Arrays.asList(primaryURI.getHealthStatusDiagnosticString())));
StoreResult result = storeReader.createStoreResult(
storeResponse,
null,
false,
false,
null,
Arrays.asList(primaryURI.getHealthStatusDiagnosticString()));
replicaStatusList);
assertThat(result.globalCommittedLSN).isEqualTo(bigLsn);
assertThat(result.lsn).isEqualTo(bigLsn);
}
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

#### Other Changes
* Load Blackbird or Afterburner into the ObjectMapper depending upon Java version and presence of modules in classpath. Make Afterburner and Blackbird optional maven dependencies. See - [PR 39689](https://github.com/Azure/azure-sdk-for-java/pull/39689)
* Added diagnostic fields for `quorumAckedLSN` and `currentReplicaSetSize`. Changed `replicaStatusList` to include all replicas and more information. - See [PR 39844](https://github.com/Azure/azure-sdk-for-java/pull/39844)

### 4.53.5-hotfix (2024-04-25)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -139,7 +141,7 @@ public class CosmosException extends AzureException {
/***
* All selectable replica status.
*/
private final List<String> replicaStatusList = new ArrayList<>();
private final Map<String, Set<String>> replicaStatusList = new HashMap<>();

/**
* Fault injection ruleId
Expand Down Expand Up @@ -610,7 +612,7 @@ List<String> getFaultInjectionEvaluationResults() {
return this.faultInjectionEvaluationResults;
}

List<String> getReplicaStatusList() {
Map<String, Set<String>> getReplicaStatusList() {
return this.replicaStatusList;
}

Expand All @@ -626,7 +628,7 @@ public CosmosException createCosmosException(int statusCode, Exception innerExce
}

@Override
public List<String> getReplicaStatusList(CosmosException cosmosException) {
public Map<String, Set<String>> getReplicaStatusList(CosmosException cosmosException) {
return cosmosException.getReplicaStatusList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -1424,7 +1425,7 @@ public static void setCosmosExceptionAccessor(final CosmosExceptionAccessor newA

public interface CosmosExceptionAccessor {
CosmosException createCosmosException(int statusCode, Exception innerException);
List<String> getReplicaStatusList(CosmosException cosmosException);
Map<String, Set<String>> getReplicaStatusList(CosmosException cosmosException);
CosmosException setRntbdChannelStatistics(CosmosException cosmosException, RntbdChannelStatistics rntbdChannelStatistics);
RntbdChannelStatistics getRntbdChannelStatistics(CosmosException cosmosException);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@
import com.azure.cosmos.implementation.Strings;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
tvaron3 marked this conversation as resolved.
Show resolved Hide resolved
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

public class AddressSelector {
Expand All @@ -34,6 +39,28 @@ public Mono<List<Uri>> resolveAllUriAsync(
.map(a -> a.getPhysicalUri()).collect(Collectors.toList()));
}

public Mono<List<List<Uri>>> resolveAndStoreAllUriAsync(
RxDocumentServiceRequest request,
boolean includePrimary,
boolean forceRefresh) {
Mono<List<AddressInformation>> allReplicaAddressesObs = this.resolveAddressesAsync(request, forceRefresh);
return allReplicaAddressesObs.map(allReplicaAddresses -> {
List<Uri> readReplicas = new ArrayList<>();
List<Uri> allReplicas = new ArrayList<>();
allReplicaAddresses.forEach(a -> {
if (!a.isPrimary() || includePrimary) {
readReplicas.add(a.getPhysicalUri());
}
allReplicas.add(a.getPhysicalUri());
});
List<List<Uri>> replicasTuple = new ArrayList<>();
replicasTuple.add(readReplicas);
replicasTuple.add(allReplicas);

return replicasTuple;
});
}

public Mono<Uri> resolvePrimaryUriAsync(RxDocumentServiceRequest request, boolean forceAddressRefresh) {
Mono<List<AddressInformation>> replicaAddressesObs = this.resolveAddressesAsync(request, forceAddressRefresh);
return replicaAddressesObs.flatMap(replicaAddresses -> {
Expand All @@ -45,6 +72,19 @@ public Mono<Uri> resolvePrimaryUriAsync(RxDocumentServiceRequest request, boolea
});
}

public Mono<Uri> resolvePrimaryUriAsync(RxDocumentServiceRequest request, boolean forceAddressRefresh, Set<String> replicaStatuses) {
Mono<List<AddressInformation>> replicaAddressesObs = this.resolveAddressesAsync(request, forceAddressRefresh);
return replicaAddressesObs.flatMap(replicaAddresses -> {
try {
replicaAddresses.stream().filter(replica -> !replica.isPrimary()).forEach(replica ->
replicaStatuses.add(replica.getPhysicalUri().getHealthStatusDiagnosticString()));
return Mono.just(AddressSelector.getPrimaryUri(request, replicaAddresses));
} catch (Exception e) {
return Mono.error(e);
}
});
}

public static Uri getPrimaryUri(RxDocumentServiceRequest request, List<AddressInformation> replicaAddresses) throws GoneException {
AddressInformation primaryAddress = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -166,12 +171,19 @@ Mono<StoreResponse> writePrivateAsync(

Mono<List<AddressInformation>> replicaAddressesObs = this.addressSelector.resolveAddressesAsync(request, forceRefresh);
AtomicReference<Uri> primaryURI = new AtomicReference<>();
AtomicReference<List<String>> replicaStatusList = new AtomicReference<>();
Map<String, Set<String>> replicaStatusList = new ConcurrentHashMap<>();
Set<String> replicaStatuses = Collections.newSetFromMap(new ConcurrentHashMap<>());

return replicaAddressesObs.flatMap(replicaAddresses -> {
try {
List<URI> contactedReplicas = new ArrayList<>();
replicaAddresses.forEach(replicaAddress -> contactedReplicas.add(replicaAddress.getPhysicalUri().getURI()));
replicaAddresses.forEach(replicaAddress -> {
Uri uri = replicaAddress.getPhysicalUri();
contactedReplicas.add(uri.getURI());
if (!uri.isPrimary()) {
replicaStatuses.add(uri.getHealthStatusDiagnosticString());
}
});
BridgeInternal.setContactedReplicas(request.requestContext.cosmosDiagnostics, contactedReplicas);
return Mono.just(AddressSelector.getPrimaryUri(request, replicaAddresses));
} catch (GoneException e) {
Expand All @@ -196,8 +208,8 @@ Mono<StoreResponse> writePrivateAsync(
} catch (Exception e) {
return Mono.error(e);
}

replicaStatusList.set(Arrays.asList(primaryUri.getHealthStatusDiagnosticString()));
replicaStatusList.put(Uri.IGNORING, replicaStatuses);
replicaStatusList.put(Uri.ATTEMPTING, new HashSet<>(Arrays.asList(primaryUri.getHealthStatusDiagnosticString())));

return this.transportClient.invokeResourceOperationAsync(primaryUri, request)
.doOnError(
Expand All @@ -220,7 +232,7 @@ Mono<StoreResponse> writePrivateAsync(
false,
false,
primaryUri,
replicaStatusList.get());
replicaStatusList);
String value = ex != null ?
ex
.getResponseHeaders()
Expand Down Expand Up @@ -257,7 +269,7 @@ Mono<StoreResponse> writePrivateAsync(
false,
false,
primaryURI.get(),
replicaStatusList.get());
replicaStatusList);
return barrierForGlobalStrong(request, response);
})
.doFinally(signalType -> {
Expand All @@ -269,7 +281,7 @@ Mono<StoreResponse> writePrivateAsync(
request,
false,
false,
replicaStatusList.get());
replicaStatusList);
});
} else {

Expand Down