Skip to content

Commit

Permalink
Merge pull request #858 from rishabhdaim/OAK-10114
Browse files Browse the repository at this point in the history
OAK-10114 : created new query API with projections to limit memory us…
  • Loading branch information
mreutegg committed Mar 16, 2023
2 parents b38b913 + 2296d43 commit ee3fb35
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.partition;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Lists.reverse;
import static java.util.Collections.singletonList;
import static java.util.Objects.nonNull;
Expand All @@ -32,6 +33,7 @@
import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
import static org.apache.jackrabbit.oak.plugins.document.DocumentNodeStoreBuilder.MANY_CHILDREN_THRESHOLD;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.MODIFIED_IN_SECS_RESOLUTION;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.PATH;
import static org.apache.jackrabbit.oak.plugins.document.Path.ROOT;
import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
Expand Down Expand Up @@ -3415,7 +3417,7 @@ private void diffManyChildren(JsopWriter w, Path path,
LOG.debug("diffManyChildren: path: {}, fromRev: {}, toRev: {}", path, fromRev, toRev);

for (NodeDocument doc : store.query(Collection.NODES, fromKey, toKey,
NodeDocument.MODIFIED_IN_SECS, minValue, Integer.MAX_VALUE)) {
NodeDocument.MODIFIED_IN_SECS, minValue, Integer.MAX_VALUE, newArrayList(PATH))) {
paths.add(doc.getPath());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.jackrabbit.oak.cache.CacheStats;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
Expand All @@ -26,6 +28,8 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import static com.google.common.collect.Sets.newHashSet;
import static org.apache.jackrabbit.oak.plugins.document.Document.ID;
import static org.apache.jackrabbit.oak.plugins.document.Throttler.NO_THROTTLING;

/**
Expand Down Expand Up @@ -486,4 +490,54 @@ default int getNodeNameLimit() {
default Throttler throttler() {
return NO_THROTTLING;
}

/**
* Get a list of documents with only projected fields (as mentioned in projections param)
* along with "_id" field and where the key is greater than a start value and
* less than an end value <em>and</em> the given "indexed property" is greater
* or equals the specified value.
* <p>
* The indexed property can either be a {@link Long} value, in which case numeric
* comparison applies, or a {@link Boolean} value, in which case "false" is mapped
* to "0" and "true" is mapped to "1".
* <p>
* The returned documents are sorted by key and are immutable.
*
* @param <T> the document type
* @param collection the collection
* @param fromKey the start value (excluding)
* @param toKey the end value (excluding)
* @param indexedProperty the name of the indexed property (optional)
* @param startValue the minimum value of the indexed property
* @param limit the maximum number of entries to return
* @param projection {@link List} of projected keys (optional). Keep this empty to fetch all fields on document.
* @return the list (possibly empty)
* @throws DocumentStoreException if the operation failed. E.g. because of
* an I/O error.
*/
@NotNull
default <T extends Document> List<T> query(final Collection<T> collection,
final String fromKey,
final String toKey,
final String indexedProperty,
final long startValue,
final int limit,
final List<String> projection) throws DocumentStoreException {

final List<T> list = query(collection, fromKey, toKey, indexedProperty, startValue, limit);

if (projection == null || projection.isEmpty()) {
return list;
}

final Set<String> projectedSet = newHashSet(projection);
projectedSet.add(ID);

return list.stream().map(t -> {
T newDocument = collection.newDocument(this);
t.deepCopy(newDocument);
newDocument.keySet().retainAll(projectedSet);
return newDocument;
}).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Maps.filterKeys;
import static com.google.common.collect.Sets.difference;
import static com.mongodb.client.model.Projections.include;
import static java.lang.Integer.MAX_VALUE;
import static java.util.Collections.emptyList;
import static org.apache.jackrabbit.oak.plugins.document.DocumentStoreException.asDocumentStoreException;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.DELETED_ONCE;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.MODIFIED_IN_SECS;
Expand Down Expand Up @@ -722,8 +724,19 @@ public <T extends Document> List<T> query(Collection<T> collection,
String indexedProperty,
long startValue,
int limit) {
return queryWithRetry(collection, fromKey, toKey, indexedProperty,
startValue, limit, maxQueryTimeMS);
return query(collection, fromKey, toKey, indexedProperty, startValue, limit, emptyList());
}

@NotNull
@Override
public <T extends Document> List<T> query(final Collection<T> collection,
final String fromKey,
final String toKey,
final String indexedProperty,
final long startValue,
final int limit,
final List<String> projection) throws DocumentStoreException {
return queryWithRetry(collection, fromKey, toKey, indexedProperty, startValue, limit, projection, maxQueryTimeMS);
}

/**
Expand All @@ -737,6 +750,7 @@ private <T extends Document> List<T> queryWithRetry(Collection<T> collection,
String indexedProperty,
long startValue,
int limit,
List<String> projection,
long maxQueryTime) {
int numAttempts = queryRetries + 1;
MongoException ex = null;
Expand All @@ -746,7 +760,7 @@ private <T extends Document> List<T> queryWithRetry(Collection<T> collection,
}
try {
return queryInternal(collection, fromKey, toKey,
indexedProperty, startValue, limit, maxQueryTime);
indexedProperty, startValue, limit, projection, maxQueryTime);
} catch (MongoException e) {
ex = e;
}
Expand All @@ -767,6 +781,7 @@ protected <T extends Document> List<T> queryInternal(Collection<T> collection,
String indexedProperty,
long startValue,
int limit,
List<String> projection,
long maxQueryTime) {
log("query", fromKey, toKey, indexedProperty, startValue, limit);

Expand Down Expand Up @@ -802,7 +817,7 @@ && canUseModifiedTimeIdx(startValue)) {
boolean isSlaveOk = false;
int resultSize = 0;
CacheChangesTracker cacheChangesTracker = null;
if (parentId != null && collection == Collection.NODES) {
if (parentId != null && collection == Collection.NODES && (projection == null || projection.isEmpty())) {
cacheChangesTracker = nodesCache.registerTracker(fromKey, toKey);
}
try {
Expand All @@ -823,6 +838,11 @@ && canUseModifiedTimeIdx(startValue)) {
} else {
result = dbCollection.find(query);
}

if (projection != null && !projection.isEmpty()) {
result.projection(include(projection));
}

result.sort(BY_ID_ASC);
if (limit >= 0) {
result.limit(limit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ public final <T extends Document> List<T> query(Collection<T> collection,
return delegate.query(collection, fromKey, toKey, indexedProperty, startValue, limit);
}

@Override
public <T extends Document> List<T> query(final Collection<T> collection, final String fromKey, final String toKey,
final String indexedProperty, final long startValue, final int limit,
final List<String> projection) {
performLeaseCheck();
return delegate.query(collection, fromKey, toKey, indexedProperty, startValue, limit, projection);
}

@Override
public final <T extends Document> void remove(Collection<T> collection, String key) {
performLeaseCheck();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,20 @@ public List<T> call() throws Exception {
}
}

@Override
@NotNull
public <T extends Document> List<T> query(final Collection<T> collection, final String fromKey, final String toKey,
final String indexedProperty, final long startValue, final int limit,
final List<String> projection) throws DocumentStoreException {
try {
logMethod("query", collection, fromKey, toKey, indexedProperty, startValue, limit, projection.toString());
return logResult(() -> store.query(collection, fromKey, toKey, indexedProperty, startValue, limit, projection));
} catch (Exception e) {
logException(e);
throw convert(e);
}
}

@Override
public <T extends Document> void remove(Collection<T> collection, String key) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ public synchronized <T extends Document> List<T> query(final Collection<T> colle
return store.query(collection, fromKey, toKey, indexedProperty, startValue, limit);
}

@Override
@NotNull
public synchronized <T extends Document> List<T> query(final Collection<T> collection, final String fromKey,
final String toKey, final String indexedProperty,
final long startValue, final int limit,
final List<String> projection) throws DocumentStoreException {
return store.query(collection, fromKey, toKey, indexedProperty, startValue, limit, projection);
}

@Override
public synchronized <T extends Document> void remove(Collection<T> collection, String key) {
store.remove(collection, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ public <T extends Document> List<T> query(final Collection<T> collection, final
return store.query(collection, fromKey, toKey, indexedProperty, startValue, limit);
}

@Override
@NotNull
public <T extends Document> List<T> query(final Collection<T> collection, final String fromKey, final String toKey,
final String indexedProperty, final long startValue, final int limit,
final List<String> projection) throws DocumentStoreException {
return store.query(collection, fromKey, toKey, indexedProperty, startValue, limit, projection);
}

@Override
public <T extends Document> void remove(Collection<T> collection, String key) {
long throttlingTime = performThrottling(collection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,24 @@ public <T extends Document> List<T> query(Collection<T> collection,
}
}

@Override
@NotNull
public <T extends Document> List<T> query(final Collection<T> collection, final String fromKey, final String toKey,
final String indexedProperty, final long startValue, final int limit,
final List<String> projection) throws DocumentStoreException {
try {
long start = now();
List<T> result = base.query(collection, fromKey, toKey, indexedProperty, startValue, limit, projection);
updateAndLogTimes("query3", start, 0, size(result));
if (logCommonCall()) {
logCommonCall(start, "query3 " + collection + " " + fromKey + " " + toKey + " " + indexedProperty + " " + startValue + " " + limit + " " + projection.toString());
}
return result;
} catch (Exception e) {
throw convert(e);
}
}

@Override
public <T extends Document> void remove(Collection<T> collection, String key) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.junit.Rule;
import org.junit.Test;

import com.google.common.collect.Maps;

import static com.google.common.collect.Lists.newArrayList;
import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.MODIFIED_IN_SECS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
Expand All @@ -43,6 +46,9 @@ public class MongoDocumentStoreIT extends AbstractMongoConnectionTest {
private static final int NUM_THREADS = 3;
private static final int UPDATES_PER_THREAD = 10;

@Rule
public DocumentMKBuilderProvider builderProvider = new DocumentMKBuilderProvider();

@Test
public void concurrent() throws Exception {
final long time = System.currentTimeMillis();
Expand Down Expand Up @@ -204,4 +210,62 @@ public void createWithNull() throws Exception {
Map<Revision, String> valueMap = doc.getValueMap("p");
assertEquals(3, valueMap.size());
}

@Test
public void queryWithProjection() {
DocumentStore docStore = mk.getDocumentStore();
DocumentNodeStore store = builderProvider.newBuilder().setAsyncDelay(0).getNodeStore();
Revision rev = Revision.newRevision(0);
List<UpdateOp> inserts = new ArrayList<UpdateOp>();
for (int i = 0; i < 10; i++) {
DocumentNodeState n = new DocumentNodeState(store, Path.fromString("/node-" + i),
new RevisionVector(rev));
inserts.add(n.asOperation(rev));
}
docStore.create(Collection.NODES, inserts);
List<NodeDocument> docs = docStore.query(Collection.NODES,
Utils.getKeyLowerLimit(Path.ROOT), Utils.getKeyUpperLimit(Path.ROOT), null, 0,
20, newArrayList(MODIFIED_IN_SECS));
// since _id is mandatory, so data size should be 2
docs.forEach(d -> assertEquals(2 , d.keySet().size()));
assertEquals(10, docs.size());
}

@Test
public void queryWithEmptyProjection() {
DocumentStore docStore = mk.getDocumentStore();
DocumentNodeStore store = builderProvider.newBuilder().setAsyncDelay(0).getNodeStore();
Revision rev = Revision.newRevision(0);
List<UpdateOp> inserts = new ArrayList<UpdateOp>();
for (int i = 0; i < 10; i++) {
DocumentNodeState n = new DocumentNodeState(store, Path.fromString("/node-" + i),
new RevisionVector(rev));
inserts.add(n.asOperation(rev));
}
docStore.create(Collection.NODES, inserts);
List<NodeDocument> docs = docStore.query(Collection.NODES,
Utils.getKeyLowerLimit(Path.ROOT), Utils.getKeyUpperLimit(Path.ROOT), null, 0,
20, newArrayList());
docs.forEach(d -> assertEquals(4 , d.keySet().size()));
assertEquals(10, docs.size());
}

@Test
public void queryWithNullProjection() {
DocumentStore docStore = mk.getDocumentStore();
DocumentNodeStore store = builderProvider.newBuilder().setAsyncDelay(0).getNodeStore();
Revision rev = Revision.newRevision(0);
List<UpdateOp> inserts = new ArrayList<UpdateOp>();
for (int i = 0; i < 10; i++) {
DocumentNodeState n = new DocumentNodeState(store, Path.fromString("/node-" + i),
new RevisionVector(rev));
inserts.add(n.asOperation(rev));
}
docStore.create(Collection.NODES, inserts);
List<NodeDocument> docs = docStore.query(Collection.NODES,
Utils.getKeyLowerLimit(Path.ROOT), Utils.getKeyUpperLimit(Path.ROOT), null, 0,
20, null);
docs.forEach(d -> assertEquals(4 , d.keySet().size()));
assertEquals(10, docs.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,11 @@ protected <T extends Document> List<T> queryInternal(Collection<T> collection,
String indexedProperty,
long startValue,
int limit,
List<String> projection,
long maxQueryTime) {
maybeFail();
return super.queryInternal(collection, fromKey, toKey,
indexedProperty, startValue, limit, maxQueryTime);
indexedProperty, startValue, limit, projection, maxQueryTime);
}

private void maybeFail() {
Expand Down

0 comments on commit ee3fb35

Please sign in to comment.