Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cosmos][VectorSearch] Non Streaming Order By Query #39897

Merged
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
540a16d
Initial changes
aayush3011 Apr 24, 2024
6f49c75
Initial changes
aayush3011 Apr 25, 2024
97509eb
Merge branch 'main' into users/akataria/nonStreamingOrderBy
aayush3011 Apr 25, 2024
528a0eb
[Cosmos][VectorIndex]Adding changes for vectorIndex and vectorEmbeddi…
aayush3011 May 2, 2024
86b36d3
Merge branch 'main' into users/akataria/nonStreamingOrderBy
aayush3011 May 3, 2024
a979c11
Initial changes
aayush3011 May 3, 2024
e2756a5
Initial changes
aayush3011 May 3, 2024
8be2277
Initial changes
aayush3011 May 3, 2024
e491b9d
Resolving comments
aayush3011 May 7, 2024
151bb50
Fixing build issues
aayush3011 May 7, 2024
73afd5b
Merge branch 'Azure:main' into users/akataria/nonStreamingOrderBy
aayush3011 May 8, 2024
148cba5
[Cosmos][VectorSearch] Non Streaming Order By Query (#40085)
aayush3011 May 8, 2024
3b0d751
Merge branch 'Azure:main' into users/akataria/nonStreamingOrderBy
aayush3011 May 9, 2024
87572f7
Merge branch 'feature/vector_search' into users/akataria/nonStreaming…
aayush3011 May 9, 2024
df7e838
[Cosmos][VectorSearch] Non Streaming Order By Query (#40096)
aayush3011 May 9, 2024
179f904
Initial changes
aayush3011 May 9, 2024
5602e33
Merge branch 'users/akataria/nonStreamingOrderBy' of github.com:aayus…
aayush3011 May 9, 2024
36ab9b7
Merge branch 'feature/vector_search' into users/akataria/nonStreaming…
aayush3011 May 9, 2024
70639b5
Initial changes
aayush3011 May 10, 2024
c45c3a5
Fixes
aayush3011 May 10, 2024
6c255ee
Merge branch 'Azure:main' into users/akataria/nonStreamingOrderBy
aayush3011 May 10, 2024
9d427e6
Users/akataria/vectorindexing (#40117)
aayush3011 May 10, 2024
1cadb1b
Merge branch 'feature/VectorSearch' into users/akataria/nonStreamingO…
aayush3011 May 10, 2024
0f1be0c
Users/akataria/non streaming order by (#40118)
aayush3011 May 10, 2024
d4dcad2
Fixing some merge issues
aayush3011 May 10, 2024
cdaa5bc
Fixing some merge issues
aayush3011 May 10, 2024
dfa8b64
Fixing some merge issues
aayush3011 May 10, 2024
7549cbe
Resolving comments
aayush3011 May 14, 2024
c6e2376
Merge branch 'feature/VectorSearch' into users/akataria/nonStreamingO…
aayush3011 May 14, 2024
30d8370
Users/akataria/vectorindexing (#40158)
aayush3011 May 14, 2024
8279358
Users/akataria/non streaming order by (#40159)
aayush3011 May 14, 2024
dd9d13b
Fixing build issue
aayush3011 May 14, 2024
d966af4
Merge branch 'feature/VectorSearch' into users/akataria/nonStreamingO…
aayush3011 May 14, 2024
9eb9208
Fixing build issue
aayush3011 May 14, 2024
1aaecef
Merge branch 'users/akataria/nonStreamingOrderBy' of github.com:aayus…
aayush3011 May 14, 2024
8bf4f8d
Merge branch 'main' into users/akataria/nonStreamingOrderBy
aayush3011 May 16, 2024
943cc4c
Adding tests
aayush3011 May 16, 2024
1db9b31
Adding capability for CI pipeline
aayush3011 May 16, 2024
2770b0f
Updating the PQ logic
aayush3011 May 17, 2024
7002362
Resolving comments, adding new test cases
aayush3011 May 17, 2024
508e94a
Adding argument to run emulator tests
aayush3011 May 17, 2024
822bd67
fixing emulator test pipeline
aayush3011 May 17, 2024
46fe7cb
fixing emulator test pipeline
aayush3011 May 17, 2024
5657b75
Adding logging for variable AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY
aayush3011 May 17, 2024
015a77c
Adding logging for variable AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY
aayush3011 May 17, 2024
f87be45
fixing emulator test pipeline
aayush3011 May 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view

Large diffs are not rendered by default.

14 changes: 12 additions & 2 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@
### 4.60.0-beta.1 (Unreleased)

#### Features Added
* Added `cosmosVectorEmbeddingPolicy` in `cosmosContainerProperties` and `vectorIndexes` in `indexPolicy` to support vector search in CosmosDB - See[39379](https://github.com/Azure/azure-sdk-for-java/pull/39379)

* Added support for non-streaming OrderBy query and a query feature `NonStreamingOrderBy` to support Vector Search queries. - See [PR 39897](https://github.com/Azure/azure-sdk-for-java/pull/39897/)

* Added support for non-streaming OrderBy query and a query feature `NonStreamingOrderBy` to support Vector Search queries. - See [PR 39897](https://github.com/Azure/azure-sdk-for-java/pull/39897/)

* Added support for non-streaming OrderBy query and a query feature `NonStreamingOrderBy` to support Vector Search queries. - See [PR 39897](https://github.com/Azure/azure-sdk-for-java/pull/39897/)

* Added support for non-streaming OrderBy query and a query feature `NonStreamingOrderBy` to support Vector Search queries. - See [PR 39897](https://github.com/Azure/azure-sdk-for-java/pull/39897/)

* Added support for non-streaming OrderBy query and a query feature `NonStreamingOrderBy` to support Vector Search queries. - See [PR 39897](https://github.com/Azure/azure-sdk-for-java/pull/39897/)

#### Breaking Changes

Expand All @@ -11,10 +22,9 @@
#### Other Changes

### 4.59.0 (2024-04-27)

#### Features Added
* Added public APIs `getCustomItemSerializer` and `setCustomItemSerializer` to allow customers to specify custom payload transformations or serialization settings. - See [PR 38997](https://github.com/Azure/azure-sdk-for-java/pull/38997) and [PR 39933](https://github.com/Azure/azure-sdk-for-java/pull/39933)

#### Other Changes
* Load Blackbird or Afterburner into the ObjectMapper depending upon Java version and presence of modules in classpath. Make Afterburner and Blackbird optional maven dependencies. See - [PR 39689](https://github.com/Azure/azure-sdk-for-java/pull/39689)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ public class Configs {
public static final String MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = "COSMOS.MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED";
private static final int DEFAULT_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = 1;

private static final String MAX_ITEM_COUNT_FOR_VECTOR_SEARCH = "COSMOS.MAX_ITEM_SIZE_FOR_VECTOR_SEARCH";
public static final int DEFAULT_MAX_ITEM_COUNT_FOR_VECTOR_SEARCH = 50000;

private static final String AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY = "COSMOS.AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY";

private static final boolean DEFAULT_AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static final boolean DEFAULT_AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY = false;
private static final boolean DEFAULT_AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY = true;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed yesterday in the vector search scrum with Hari, and for now we would go as the default value as False, meaning if the users are want to use the older queries they would have to set this env variable as True.


public static final int MIN_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = 1;

public static final String TCP_CONNECTION_ACQUISITION_TIMEOUT_IN_MS = "COSMOS.TCP_CONNECTION_ACQUISITION_TIMEOUT_IN_MS";
Expand Down Expand Up @@ -484,6 +491,20 @@ public static int getMaxRetriesInLocalRegionWhenRemoteRegionPreferred() {
MIN_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED);
}

public static int getMaxItemCountForVectorSearch() {
return Integer.parseInt(System.getProperty(MAX_ITEM_COUNT_FOR_VECTOR_SEARCH,
firstNonNull(
emptyToNull(System.getenv().get(MAX_ITEM_COUNT_FOR_VECTOR_SEARCH)),
String.valueOf(DEFAULT_MAX_ITEM_COUNT_FOR_VECTOR_SEARCH))));
}

public static boolean getAzureCosmosNonStreamingOrderByDisabled() {
return Boolean.parseBoolean(System.getProperty(AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY,
firstNonNull(
emptyToNull(System.getenv().get(AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY)),
String.valueOf(DEFAULT_AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY))));
}

public static Duration getMinRetryTimeInLocalRegionWhenRemoteRegionPreferred() {
return
Duration.ofMillis(Math.max(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ public static final class Properties {
public static final String SPATIAL_INDEXES = "spatialIndexes";
public static final String TYPES = "types";

// Vector Embedding Policy
public static final String VECTOR_EMBEDDING_POLICY = "vectorEmbeddingPolicy";
public static final String VECTOR_INDEXES = "vectorIndexes";
public static final String VECTOR_EMBEDDINGS = "vectorEmbeddings";
public static final String VECTOR_INDEX_TYPE = "type";
public static final String VECTOR_DATA_TYPE = "dataType";
public static final String VECTOR_DIMENSIONS = "dimensions";
public static final String DISTANCE_FUNCTION = "distanceFunction";

// Unique index.
public static final String UNIQUE_KEY_POLICY = "uniqueKeyPolicy";
public static final String UNIQUE_KEYS = "uniqueKeys";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public final class CosmosQueryRequestOptionsImpl extends CosmosQueryRequestOptio
private boolean queryPlanRetrievalDisallowed;
private boolean emptyPageDiagnosticsEnabled;
private String queryName;
private Integer maxItemCountForVectorSearch;
private List<CosmosDiagnostics> cancelledRequestDiagnosticsTracker = new ArrayList<>();

/**
Expand Down Expand Up @@ -62,6 +63,7 @@ public CosmosQueryRequestOptionsImpl(CosmosQueryRequestOptionsImpl options) {
this.queryName = options.queryName;
this.feedRange = options.feedRange;
this.cancelledRequestDiagnosticsTracker = options.cancelledRequestDiagnosticsTracker;
this.maxItemCountForVectorSearch = options.maxItemCountForVectorSearch;
}

/**
Expand Down Expand Up @@ -196,6 +198,29 @@ public CosmosQueryRequestOptionsImpl setMaxItemCount(Integer maxItemCount) {
return this;
}

/**
* Gets the maximum item size to fetch during non-streaming order by queries.
*
* @return the max number of items for vector search.
*/
public Integer getMaxItemCountForVectorSearch() {
if (this.maxItemCountForVectorSearch == null) {
this.maxItemCountForVectorSearch = Configs.DEFAULT_MAX_ITEM_COUNT_FOR_VECTOR_SEARCH;
}
return this.maxItemCountForVectorSearch;
}

/**
* Sets the maximum item size to fetch during non-streaming order by queries.
*
* @param maxItemCountForVectorSearch the max number of items for vector search.
* return the CosmosQueryRequestOptions.
*/
public CosmosQueryRequestOptionsImpl setMaxItemCountForVectorSearch(Integer maxItemCountForVectorSearch) {
this.maxItemCountForVectorSearch = maxItemCountForVectorSearch;
return this;
}

/**
* Gets the request continuation token.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
import com.azure.cosmos.CosmosItemSerializer;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.caches.SerializableWrapper;
import com.azure.cosmos.models.ClientEncryptionPolicy;
import com.azure.cosmos.models.ChangeFeedPolicy;
import com.azure.cosmos.models.ClientEncryptionPolicy;
import com.azure.cosmos.models.ComputedProperty;
import com.azure.cosmos.models.ConflictResolutionPolicy;
import com.azure.cosmos.models.CosmosVectorEmbeddingPolicy;
import com.azure.cosmos.models.IndexingPolicy;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKeyDefinition;
Expand All @@ -24,6 +25,8 @@
import java.util.Collection;
import java.util.Collections;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

/**
* Represents a document collection in the Azure Cosmos DB database service. A collection is a named logical container
* for documents.
Expand All @@ -40,6 +43,7 @@ public final class DocumentCollection extends Resource {
private UniqueKeyPolicy uniqueKeyPolicy;
private PartitionKeyDefinition partitionKeyDefinition;
private ClientEncryptionPolicy clientEncryptionPolicyInternal;
private CosmosVectorEmbeddingPolicy cosmosVectorEmbeddingPolicy;

/**
* Constructor.
Expand Down Expand Up @@ -410,6 +414,33 @@ public void setClientEncryptionPolicy(ClientEncryptionPolicy value) {
this.set(Constants.Properties.CLIENT_ENCRYPTION_POLICY, value, CosmosItemSerializer.DEFAULT_SERIALIZER);
}

/**
* Gets the Vector Embedding Policy containing paths for embeddings along with path-specific settings for the item
* used in performing vector search on the items in a collection in the Azure CosmosDB database service.
*
* @return the Vector Embedding Policy.
*/
public CosmosVectorEmbeddingPolicy getVectorEmbeddingPolicy() {
if (this.cosmosVectorEmbeddingPolicy == null) {
if (super.has(Constants.Properties.VECTOR_EMBEDDING_POLICY)) {
this.cosmosVectorEmbeddingPolicy = super.getObject(Constants.Properties.VECTOR_EMBEDDING_POLICY,
CosmosVectorEmbeddingPolicy.class);
}
}
return this.cosmosVectorEmbeddingPolicy;
}

/**
* Sets the Vector Embedding Policy containing paths for embeddings along with path-specific settings for the item
* used in performing vector search on the items in a collection in the Azure CosmosDB database service.
*
* @param value the Vector Embedding Policy.
*/
public void setVectorEmbeddingPolicy(CosmosVectorEmbeddingPolicy value) {
checkNotNull(value, "cosmosVectorEmbeddingPolicy cannot be null");
this.set(Constants.Properties.VECTOR_EMBEDDING_POLICY, value, CosmosItemSerializer.DEFAULT_SERIALIZER);
}

public void populatePropertyBag() {
super.populatePropertyBag();
if (this.indexingPolicy == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ void setCancelledRequestDiagnosticsTracker(
Integer getMaxItemCount(CosmosQueryRequestOptions options);

String getRequestContinuation(CosmosQueryRequestOptions options);

Integer getMaxItemCountForVectorSearch(CosmosQueryRequestOptions options);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.BadRequestException;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.Constants;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.PartitionKeyRange;
Expand Down Expand Up @@ -48,6 +50,12 @@
*/
public class DocumentQueryExecutionContextFactory {

private static final ImplementationBridgeHelpers
.CosmosQueryRequestOptionsHelper
.CosmosQueryRequestOptionsAccessor qryOptAccessor = ImplementationBridgeHelpers
.CosmosQueryRequestOptionsHelper
.getCosmosQueryRequestOptionsAccessor();

private final static int PageSizeFactorForTop = 5;
private static final Logger logger = LoggerFactory.getLogger(DocumentQueryExecutionContextFactory.class);
private static Mono<Utils.ValueHolder<DocumentCollection>> resolveCollection(DiagnosticsClientContext diagnosticsClientContext,
Expand Down Expand Up @@ -239,7 +247,8 @@ private static boolean canCacheQuery(QueryInfo queryInfo) {
&& !queryInfo.hasTop()
&& !queryInfo.hasOffset()
&& !queryInfo.hasDCount()
&& !queryInfo.hasOrderBy();
&& !queryInfo.hasOrderBy()
&& !queryInfo.hasNonStreamingOrderBy();
}

private static boolean isScopedToSinglePartition(CosmosQueryRequestOptions cosmosQueryRequestOptions) {
Expand Down Expand Up @@ -358,6 +367,38 @@ public static <T> Flux<? extends IDocumentQueryExecutionContext<T>> createSpecia

boolean getLazyFeedResponse = queryInfo.hasTop();

// We need to compute the optimal initial age size for non-streaming order-by queries
if (queryInfo.hasNonStreamingOrderBy()) {
// Validate the TOP or LIMIT for non-streaming order-by queries
if (!queryInfo.hasTop() && !queryInfo.hasLimit() && queryInfo.getTop() < 0 && queryInfo.getLimit() < 0) {
kushagraThapar marked this conversation as resolved.
Show resolved Hide resolved
throw new NonStreamingOrderByBadRequestException(HttpConstants.StatusCodes.BADREQUEST,
"Executing a vector search query without TOP or LIMIT can consume a large number of RUs" +
"very fast and have long runtimes. Please ensure you are using one of the above two filters" +
"with you vector search query.");
}
// Validate the size of TOP or LIMIT against MaxItemSizeForVectorSearch
int maxLimit = Math.max(queryInfo.hasTop() ? queryInfo.getTop() : 0,
queryInfo.hasLimit() ? queryInfo.getLimit() : 0);
int maxItemSizeForVectorSearch = Math.max(Configs.getMaxItemCountForVectorSearch(),
qryOptAccessor.getMaxItemCountForVectorSearch(cosmosQueryRequestOptions));
if (maxLimit > maxItemSizeForVectorSearch) {
throw new NonStreamingOrderByBadRequestException(HttpConstants.StatusCodes.BADREQUEST,
"Executing a vector search query with TOP or LIMIT larger than the maxItemSizeForVectorSearch " +
"is not allowed");
}
// Set initialPageSize based on the smallest of TOP or LIMIT
if (queryInfo.hasTop() || queryInfo.hasLimit()) {
int pageSizeWithTopOrLimit = Math.min(queryInfo.hasTop() ? queryInfo.getTop() : Integer.MAX_VALUE,
queryInfo.hasLimit() && queryInfo.hasOffset() ?
queryInfo.getLimit() + queryInfo.getOffset() : Integer.MAX_VALUE);
if (initialPageSize > 0) {
initialPageSize = Math.min(pageSizeWithTopOrLimit, initialPageSize);
} else {
initialPageSize = pageSizeWithTopOrLimit;
}
}
}

// We need to compute the optimal initial page size for order-by queries
if (queryInfo.hasOrderBy()) {
int top;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
aayush3011 marked this conversation as resolved.
Show resolved Hide resolved
// Licensed under the MIT License.
package com.azure.cosmos.implementation.query;

import com.azure.cosmos.CosmosException;

public class NonStreamingOrderByBadRequestException extends CosmosException {

private static final long serialVersionUID = 1L;

/**
* Creates a new instance of the NonStreamingOrderByBadRequestException class.
*
* @param statusCode the http status code of the response.
* @param errorMessage the error message.
*/
public NonStreamingOrderByBadRequestException(int statusCode, String errorMessage) {
super(statusCode, errorMessage);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.query;

import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.query.orderbyquery.OrderbyRowComparer;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import reactor.core.publisher.Mono;

import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;

public class NonStreamingOrderByDocumentProducer extends DocumentProducer<Document> {
private final OrderbyRowComparer<Document> consumeComparer;

NonStreamingOrderByDocumentProducer(
OrderbyRowComparer<Document> consumeComparer,
IDocumentQueryClient client,
String collectionResourceId,
CosmosQueryRequestOptions cosmosQueryRequestOptions,
TriFunction<FeedRangeEpkImpl, String, Integer, RxDocumentServiceRequest> createRequestFunc,
Function<RxDocumentServiceRequest, Mono<FeedResponse<Document>>> executeRequestFunc,
FeedRangeEpkImpl feedRange,
String collectionLink,
Supplier<DocumentClientRetryPolicy> createRetryPolicyFunc,
Class<Document> resourceType,
UUID correlatedActivityId,
int initialPageSize,
String initialContinuationToken,
int top,
Supplier<String> operationContextTextProvider) {
super(client, collectionResourceId, cosmosQueryRequestOptions, createRequestFunc, executeRequestFunc,
collectionLink, createRetryPolicyFunc, resourceType, correlatedActivityId, initialPageSize,
initialContinuationToken, top, feedRange, operationContextTextProvider);
this.consumeComparer = consumeComparer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split handling?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it uses produceAsync from the base DocumentProducer which is split-proof.

Copy link
Member

@xinlian12 xinlian12 May 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But during createChildDocumentProducerOnSplit, when creating the child documentProducer, should we create NonStreamingOrderByDocumentProducer? By default, it only creates DocumentProducer

@aayush3011 can we add a split tests to verify? thanks

}
}