Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OAK-10778 - Support downloading from Mongo in parallel. #1435

Merged
merged 24 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e903c7a
Support downloading from Mongo in parallel. Adds new boolean system p…
nfsantos Apr 25, 2024
a074ea0
Add missing license headers.
nfsantos Apr 25, 2024
5f58aa9
Add missing registration of MongoClientURI to whiteboard in failing t…
nfsantos Apr 25, 2024
65387ce
Retrieve from system properties the version of the Mongo docker image…
nfsantos Apr 25, 2024
45623cf
Merge remote-tracking branch 'upstream/trunk' into OAK-10778
nfsantos Apr 26, 2024
934bea6
Improve documentation and replace use of var keyword by explicit type…
nfsantos Apr 26, 2024
3440733
Merge remote-tracking branch 'upstream/trunk' into OAK-10778
nfsantos Apr 29, 2024
55b1157
Merge remote-tracking branch 'upstream/trunk' into OAK-10778
nfsantos Apr 29, 2024
c412f2f
Reuse the name defined in MongoDockerRule for the Docker image for in…
nfsantos Apr 29, 2024
b2684bd
Merge remote-tracking branch 'upstream/trunk' into OAK-10778
nfsantos Apr 30, 2024
051cd51
Reduce frequency of logging progress messages in the downloader from …
nfsantos Apr 30, 2024
f5fd28c
When downloading the full range of values in _modified, use $gte(0) a…
nfsantos May 2, 2024
3fd5c3c
Merge remote-tracking branch 'upstream/trunk' into OAK-10778
nfsantos May 6, 2024
63254c2
Improve error handling when parallel download fails with some excepti…
nfsantos May 7, 2024
70f431e
Merge remote-tracking branch 'upstream/trunk' into OAK-10778
nfsantos May 7, 2024
a1bc963
Apply review comments.
nfsantos May 8, 2024
8aceb22
Merge remote-tracking branch 'upstream/trunk' into OAK-10778
nfsantos May 10, 2024
9e1a1a3
Add a test for Pipelined strategy where the mongo filter does not mat…
nfsantos May 10, 2024
d76d5fe
Make NodeDocumentCodec thread safe and simplify logic of switch state…
nfsantos May 10, 2024
0d6f607
Merge remote-tracking branch 'upstream/trunk' into OAK-10778
nfsantos May 10, 2024
7dad006
Address review comments.
nfsantos May 10, 2024
d7d2f12
Simplify logic.
nfsantos May 10, 2024
1dcb944
Fix: Download only documents with _modified also when doing a column …
nfsantos May 10, 2024
d3f2377
Merge remote-tracking branch 'upstream/trunk' into OAK-10778
nfsantos May 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import java.util.List;

/**
* Stores diagnostic and performance information about indexing operations for reporting at the end of the indexing job.
*/
public interface IndexingReporter {
IndexingReporter NOOP = new IndexingReporter() {
@Override
Expand Down
19 changes: 12 additions & 7 deletions oak-run-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -204,16 +204,21 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.stefanbirkner</groupId>
<artifactId>system-rules</artifactId>
<version>1.19.0</version>
<groupId>org.testcontainers</groupId>
<artifactId>toxiproxy</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<!-- see https://issues.apache.org/jira/browse/OAK-7787 -->
<groupId>javax.activation</groupId>
nfsantos marked this conversation as resolved.
Show resolved Hide resolved
<artifactId>activation</artifactId>
<version>1.1.1</version>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.stefanbirkner</groupId>
<artifactId>system-rules</artifactId>
<version>1.19.0</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.jackrabbit.oak.index.indexer.document;

import com.codahale.metrics.MetricRegistry;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoDatabase;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.guava.common.io.Closer;
Expand Down Expand Up @@ -190,6 +191,7 @@ private List<FlatFileStore> buildFlatFileStoreList(NodeState checkpointedState,
.withRootRevision(rootDocumentState.getRootRevision())
.withNodeStore(nodeStore)
.withMongoDocumentStore(getMongoDocumentStore())
.withMongoClientURI(getMongoClientURI())
.withMongoDatabase(getMongoDatabase())
.withNodeStateEntryTraverserFactory(new MongoNodeStateEntryTraverserFactory(rootDocumentState.getRootRevision(),
nodeStore, getMongoDocumentStore(), traversalLog))
Expand Down Expand Up @@ -425,6 +427,10 @@ private MongoDocumentStore getMongoDocumentStore() {
return checkNotNull(indexHelper.getService(MongoDocumentStore.class));
}

private MongoClientURI getMongoClientURI() {
return checkNotNull(indexHelper.getService(MongoClientURI.class));
}

private MongoDatabase getMongoDatabase() {
return checkNotNull(indexHelper.getService(MongoDatabase.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.jackrabbit.oak.index.indexer.document.flatfile;

import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoDatabase;
import org.apache.commons.lang3.StringUtils;
import org.apache.jackrabbit.guava.common.collect.Iterables;
Expand Down Expand Up @@ -104,6 +105,7 @@ public class FlatFileNodeStoreBuilder {
private String checkpoint;
private StatisticsProvider statisticsProvider = StatisticsProvider.NOOP;
private IndexingReporter indexingReporter = IndexingReporter.NOOP;
private MongoClientURI mongoClientURI;

public enum SortStrategyType {
/**
Expand Down Expand Up @@ -177,6 +179,11 @@ public FlatFileNodeStoreBuilder withCheckpoint(String checkpoint) {
return this;
}

public FlatFileNodeStoreBuilder withMongoClientURI(MongoClientURI mongoClientURI) {
this.mongoClientURI = mongoClientURI;
return this;
}

public FlatFileNodeStoreBuilder withMongoDatabase(MongoDatabase mongoDatabase) {
this.mongoDatabase = mongoDatabase;
return this;
Expand Down Expand Up @@ -322,7 +329,7 @@ IndexStoreSortStrategy createSortStrategy(File dir) {
List<PathFilter> pathFilters = indexDefinitions.stream().map(IndexDefinition::getPathFilter).collect(Collectors.toList());
List<String> indexNames = indexDefinitions.stream().map(IndexDefinition::getIndexName).collect(Collectors.toList());
indexingReporter.setIndexNames(indexNames);
return new PipelinedStrategy(mongoDocumentStore, mongoDatabase, nodeStore, rootRevision,
return new PipelinedStrategy(mongoClientURI, mongoDocumentStore, nodeStore, rootRevision,
preferredPathElements, blobStore, dir, algorithm, pathPredicate, pathFilters, checkpoint,
statisticsProvider, indexingReporter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,25 @@
*/
package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;

import com.mongodb.client.model.Filters;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;

final class DownloadRange {
import java.util.ArrayList;

public final class DownloadRange {
private final long lastModifiedFrom;
private final long lastModifiedTo;
private final long lastModifiedToInclusive;
private final String startAfterDocumentID;
private final boolean traversingInAscendingOrder;

public DownloadRange(long lastModifiedFrom, long lastModifiedTo, String startAfterDocumentID) {
if (lastModifiedTo < lastModifiedFrom) {
throw new IllegalArgumentException("Invalid range (" + lastModifiedFrom + ", " + lastModifiedTo + ")");
public DownloadRange(long lastModifiedFrom, long lastModifiedToInclusive, String startAfterDocumentID, boolean traversingInAscendingOrder) {
this.traversingInAscendingOrder = traversingInAscendingOrder;
if (!(lastModifiedFrom <= lastModifiedToInclusive)) {
throw new IllegalArgumentException("Invalid range (" + lastModifiedFrom + ", " + lastModifiedToInclusive + ")");
}
this.lastModifiedFrom = lastModifiedFrom;
this.lastModifiedTo = lastModifiedTo;
this.lastModifiedToInclusive = lastModifiedToInclusive;
this.startAfterDocumentID = startAfterDocumentID;
}

Expand All @@ -43,31 +48,34 @@ public long getLastModifiedFrom() {
return lastModifiedFrom;
}

public long getLastModifiedTo() {
return lastModifiedTo;
public long getLastModifiedToInclusive() {
return lastModifiedToInclusive;
}

public BsonDocument getFindQuery() {
String lastModifiedRangeQueryPart = "{$gte:" + lastModifiedFrom;
if (lastModifiedTo == Long.MAX_VALUE) {
lastModifiedRangeQueryPart += "}";
public Bson getFindQuery() {
ArrayList<Bson> filters = new ArrayList<>(3);
if (lastModifiedFrom == lastModifiedToInclusive) {
filters.add(Filters.eq(NodeDocument.MODIFIED_IN_SECS, lastModifiedFrom));
} else {
lastModifiedRangeQueryPart += ", $lt:" + lastModifiedTo + "}";
filters.add(Filters.gte(NodeDocument.MODIFIED_IN_SECS, lastModifiedFrom));
filters.add(Filters.lte(NodeDocument.MODIFIED_IN_SECS, lastModifiedToInclusive));
}
String idRangeQueryPart = "";
if (startAfterDocumentID != null) {
String condition = "{$gt:\"" + startAfterDocumentID + "\"}";
idRangeQueryPart = ", " + NodeDocument.ID + ":" + condition;
if (traversingInAscendingOrder) {
filters.add(Filters.gt(NodeDocument.ID, startAfterDocumentID));
} else {
filters.add(Filters.lt(NodeDocument.ID, startAfterDocumentID));
}
}
return BsonDocument.parse("{" + NodeDocument.MODIFIED_IN_SECS + ":" + lastModifiedRangeQueryPart
+ idRangeQueryPart + "}");
// If there is only one filter, do not wrap it in an $and
return filters.size() == 1 ? filters.get(0) : Filters.and(filters);
}

@Override
public String toString() {
return "DownloadRange{" +
"lastModifiedFrom=" + lastModifiedFrom +
", lastModifiedTo=" + lastModifiedTo +
", lastModifiedToInclusive=" + lastModifiedToInclusive +
", startAfterDocumentID='" + startAfterDocumentID + '\'' +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;

import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import org.apache.jackrabbit.oak.plugins.index.IndexingReporter;
import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.apache.jackrabbit.oak.plugins.index.MetricsUtils;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.LongAdder;

/**
* Aggregates statistics when downloading from Mongo with two threads
*/
public class DownloadStageStatistics {
public static final Logger LOG = LoggerFactory.getLogger(DownloadStageStatistics.class);

private final LongAdder totalEnqueueWaitTimeMillis = new LongAdder();
private final LongAdder documentsDownloadedTotal = new LongAdder();
private final LongAdder documentsDownloadedTotalBytes = new LongAdder();


public long getTotalEnqueueWaitTimeMillis() {
return totalEnqueueWaitTimeMillis.sum();
}

public long getDocumentsDownloadedTotal() {
return documentsDownloadedTotal.sum();
}

public long getDocumentsDownloadedTotalBytes() {
return documentsDownloadedTotalBytes.sum();
}

public void incrementTotalEnqueueWaitTimeMillis(long millis) {
this.totalEnqueueWaitTimeMillis.add(millis);
}

public void incrementDocumentsDownloadedTotal() {
this.documentsDownloadedTotal.increment();
}

public void incrementDocumentsDownloadedTotalBytes(long bytes) {
this.documentsDownloadedTotalBytes.add(bytes);
}

@Override
public String toString() {
return MetricsFormatter.newBuilder()
.add("totalEnqueueWaitTimeMillis", getTotalEnqueueWaitTimeMillis())
.add("documentsDownloadedTotal", getDocumentsDownloadedTotal())
.add("documentsDownloadedTotalBytes", getDocumentsDownloadedTotalBytes())
.build();
}

public void publishStatistics(StatisticsProvider statisticsProvider, IndexingReporter reporter, long durationMillis) {
LOG.info("Publishing download stage statistics");
MetricsUtils.addMetric(statisticsProvider, reporter,
PipelinedMetrics.OAK_INDEXER_PIPELINED_MONGO_DOWNLOAD_DURATION_SECONDS, durationMillis / 1000);
MetricsUtils.addMetric(statisticsProvider, reporter,
PipelinedMetrics.OAK_INDEXER_PIPELINED_DOCUMENTS_DOWNLOADED_TOTAL, getDocumentsDownloadedTotal());
MetricsUtils.addMetric(statisticsProvider, reporter,
PipelinedMetrics.OAK_INDEXER_PIPELINED_MONGO_DOWNLOAD_ENQUEUE_DELAY_PERCENTAGE,
PipelinedUtils.toPercentage(getTotalEnqueueWaitTimeMillis(), durationMillis)
);
MetricsUtils.addMetricByteSize(statisticsProvider, reporter,
PipelinedMetrics.OAK_INDEXER_PIPELINED_DOCUMENTS_DOWNLOADED_TOTAL_BYTES,
getDocumentsDownloadedTotalBytes());
}

public String formatStats(long durationMillis) {
String enqueueingDelayPercentage = PipelinedUtils.formatAsPercentage(getTotalEnqueueWaitTimeMillis(), durationMillis);
long durationSeconds = durationMillis / 1000;
return MetricsFormatter.newBuilder()
.add("duration", FormattingUtils.formatToSeconds(durationSeconds))
.add("durationSeconds", durationSeconds)
.add("documentsDownloaded", getDocumentsDownloadedTotal())
.add("documentsDownloadedTotalBytes", getDocumentsDownloadedTotalBytes())
.add("dataDownloaded", IOUtils.humanReadableByteCountBin(getDocumentsDownloadedTotalBytes()))
.add("enqueueingDelayMillis", getTotalEnqueueWaitTimeMillis())
.add("enqueueingDelayPercentage", enqueueingDelayPercentage)
.build();
}
}