Skip to content

Commit

Permalink
OAK-10778 - Support downloading from Mongo in parallel. (#1435)
Browse files Browse the repository at this point in the history
  • Loading branch information
nfsantos committed May 13, 2024
1 parent 6f9cb9b commit 98206cb
Show file tree
Hide file tree
Showing 26 changed files with 2,524 additions and 829 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import java.util.List;

/**
* Stores diagnostic and performance information about indexing operations for reporting at the end of the indexing job.
*/
public interface IndexingReporter {
IndexingReporter NOOP = new IndexingReporter() {
@Override
Expand Down
19 changes: 12 additions & 7 deletions oak-run-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -204,16 +204,21 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.stefanbirkner</groupId>
<artifactId>system-rules</artifactId>
<version>1.19.0</version>
<groupId>org.testcontainers</groupId>
<artifactId>toxiproxy</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<!-- see https://issues.apache.org/jira/browse/OAK-7787 -->
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
<version>1.1.1</version>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.stefanbirkner</groupId>
<artifactId>system-rules</artifactId>
<version>1.19.0</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.jackrabbit.oak.index.indexer.document;

import com.codahale.metrics.MetricRegistry;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoDatabase;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.guava.common.io.Closer;
Expand Down Expand Up @@ -190,6 +191,7 @@ private List<FlatFileStore> buildFlatFileStoreList(NodeState checkpointedState,
.withRootRevision(rootDocumentState.getRootRevision())
.withNodeStore(nodeStore)
.withMongoDocumentStore(getMongoDocumentStore())
.withMongoClientURI(getMongoClientURI())
.withMongoDatabase(getMongoDatabase())
.withNodeStateEntryTraverserFactory(new MongoNodeStateEntryTraverserFactory(rootDocumentState.getRootRevision(),
nodeStore, getMongoDocumentStore(), traversalLog))
Expand Down Expand Up @@ -425,6 +427,10 @@ private MongoDocumentStore getMongoDocumentStore() {
return checkNotNull(indexHelper.getService(MongoDocumentStore.class));
}

private MongoClientURI getMongoClientURI() {
return checkNotNull(indexHelper.getService(MongoClientURI.class));
}

private MongoDatabase getMongoDatabase() {
return checkNotNull(indexHelper.getService(MongoDatabase.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.jackrabbit.oak.index.indexer.document.flatfile;

import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoDatabase;
import org.apache.commons.lang3.StringUtils;
import org.apache.jackrabbit.guava.common.collect.Iterables;
Expand Down Expand Up @@ -104,6 +105,7 @@ public class FlatFileNodeStoreBuilder {
private String checkpoint;
private StatisticsProvider statisticsProvider = StatisticsProvider.NOOP;
private IndexingReporter indexingReporter = IndexingReporter.NOOP;
private MongoClientURI mongoClientURI;

public enum SortStrategyType {
/**
Expand Down Expand Up @@ -177,6 +179,11 @@ public FlatFileNodeStoreBuilder withCheckpoint(String checkpoint) {
return this;
}

public FlatFileNodeStoreBuilder withMongoClientURI(MongoClientURI mongoClientURI) {
this.mongoClientURI = mongoClientURI;
return this;
}

public FlatFileNodeStoreBuilder withMongoDatabase(MongoDatabase mongoDatabase) {
this.mongoDatabase = mongoDatabase;
return this;
Expand Down Expand Up @@ -322,7 +329,7 @@ IndexStoreSortStrategy createSortStrategy(File dir) {
List<PathFilter> pathFilters = indexDefinitions.stream().map(IndexDefinition::getPathFilter).collect(Collectors.toList());
List<String> indexNames = indexDefinitions.stream().map(IndexDefinition::getIndexName).collect(Collectors.toList());
indexingReporter.setIndexNames(indexNames);
return new PipelinedStrategy(mongoDocumentStore, mongoDatabase, nodeStore, rootRevision,
return new PipelinedStrategy(mongoClientURI, mongoDocumentStore, nodeStore, rootRevision,
preferredPathElements, blobStore, dir, algorithm, pathPredicate, pathFilters, checkpoint,
statisticsProvider, indexingReporter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,25 @@
*/
package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;

import com.mongodb.client.model.Filters;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;

final class DownloadRange {
import java.util.ArrayList;

public final class DownloadRange {
private final long lastModifiedFrom;
private final long lastModifiedTo;
private final long lastModifiedToInclusive;
private final String startAfterDocumentID;
private final boolean traversingInAscendingOrder;

public DownloadRange(long lastModifiedFrom, long lastModifiedTo, String startAfterDocumentID) {
if (lastModifiedTo < lastModifiedFrom) {
throw new IllegalArgumentException("Invalid range (" + lastModifiedFrom + ", " + lastModifiedTo + ")");
public DownloadRange(long lastModifiedFrom, long lastModifiedToInclusive, String startAfterDocumentID, boolean traversingInAscendingOrder) {
this.traversingInAscendingOrder = traversingInAscendingOrder;
if (!(lastModifiedFrom <= lastModifiedToInclusive)) {
throw new IllegalArgumentException("Invalid range (" + lastModifiedFrom + ", " + lastModifiedToInclusive + ")");
}
this.lastModifiedFrom = lastModifiedFrom;
this.lastModifiedTo = lastModifiedTo;
this.lastModifiedToInclusive = lastModifiedToInclusive;
this.startAfterDocumentID = startAfterDocumentID;
}

Expand All @@ -43,31 +48,34 @@ public long getLastModifiedFrom() {
return lastModifiedFrom;
}

public long getLastModifiedTo() {
return lastModifiedTo;
public long getLastModifiedToInclusive() {
return lastModifiedToInclusive;
}

public BsonDocument getFindQuery() {
String lastModifiedRangeQueryPart = "{$gte:" + lastModifiedFrom;
if (lastModifiedTo == Long.MAX_VALUE) {
lastModifiedRangeQueryPart += "}";
public Bson getFindQuery() {
ArrayList<Bson> filters = new ArrayList<>(3);
if (lastModifiedFrom == lastModifiedToInclusive) {
filters.add(Filters.eq(NodeDocument.MODIFIED_IN_SECS, lastModifiedFrom));
} else {
lastModifiedRangeQueryPart += ", $lt:" + lastModifiedTo + "}";
filters.add(Filters.gte(NodeDocument.MODIFIED_IN_SECS, lastModifiedFrom));
filters.add(Filters.lte(NodeDocument.MODIFIED_IN_SECS, lastModifiedToInclusive));
}
String idRangeQueryPart = "";
if (startAfterDocumentID != null) {
String condition = "{$gt:\"" + startAfterDocumentID + "\"}";
idRangeQueryPart = ", " + NodeDocument.ID + ":" + condition;
if (traversingInAscendingOrder) {
filters.add(Filters.gt(NodeDocument.ID, startAfterDocumentID));
} else {
filters.add(Filters.lt(NodeDocument.ID, startAfterDocumentID));
}
}
return BsonDocument.parse("{" + NodeDocument.MODIFIED_IN_SECS + ":" + lastModifiedRangeQueryPart
+ idRangeQueryPart + "}");
// If there is only one filter, do not wrap it in an $and
return filters.size() == 1 ? filters.get(0) : Filters.and(filters);
}

@Override
public String toString() {
return "DownloadRange{" +
"lastModifiedFrom=" + lastModifiedFrom +
", lastModifiedTo=" + lastModifiedTo +
", lastModifiedToInclusive=" + lastModifiedToInclusive +
", startAfterDocumentID='" + startAfterDocumentID + '\'' +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;

import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import org.apache.jackrabbit.oak.plugins.index.IndexingReporter;
import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.apache.jackrabbit.oak.plugins.index.MetricsUtils;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.LongAdder;

/**
* Aggregates statistics when downloading from Mongo with two threads
*/
public class DownloadStageStatistics {
public static final Logger LOG = LoggerFactory.getLogger(DownloadStageStatistics.class);

private final LongAdder totalEnqueueWaitTimeMillis = new LongAdder();
private final LongAdder documentsDownloadedTotal = new LongAdder();
private final LongAdder documentsDownloadedTotalBytes = new LongAdder();


public long getTotalEnqueueWaitTimeMillis() {
return totalEnqueueWaitTimeMillis.sum();
}

public long getDocumentsDownloadedTotal() {
return documentsDownloadedTotal.sum();
}

public long getDocumentsDownloadedTotalBytes() {
return documentsDownloadedTotalBytes.sum();
}

public void incrementTotalEnqueueWaitTimeMillis(long millis) {
this.totalEnqueueWaitTimeMillis.add(millis);
}

public void incrementDocumentsDownloadedTotal() {
this.documentsDownloadedTotal.increment();
}

public void incrementDocumentsDownloadedTotalBytes(long bytes) {
this.documentsDownloadedTotalBytes.add(bytes);
}

@Override
public String toString() {
return MetricsFormatter.newBuilder()
.add("totalEnqueueWaitTimeMillis", getTotalEnqueueWaitTimeMillis())
.add("documentsDownloadedTotal", getDocumentsDownloadedTotal())
.add("documentsDownloadedTotalBytes", getDocumentsDownloadedTotalBytes())
.build();
}

public void publishStatistics(StatisticsProvider statisticsProvider, IndexingReporter reporter, long durationMillis) {
LOG.info("Publishing download stage statistics");
MetricsUtils.addMetric(statisticsProvider, reporter,
PipelinedMetrics.OAK_INDEXER_PIPELINED_MONGO_DOWNLOAD_DURATION_SECONDS, durationMillis / 1000);
MetricsUtils.addMetric(statisticsProvider, reporter,
PipelinedMetrics.OAK_INDEXER_PIPELINED_DOCUMENTS_DOWNLOADED_TOTAL, getDocumentsDownloadedTotal());
MetricsUtils.addMetric(statisticsProvider, reporter,
PipelinedMetrics.OAK_INDEXER_PIPELINED_MONGO_DOWNLOAD_ENQUEUE_DELAY_PERCENTAGE,
PipelinedUtils.toPercentage(getTotalEnqueueWaitTimeMillis(), durationMillis)
);
MetricsUtils.addMetricByteSize(statisticsProvider, reporter,
PipelinedMetrics.OAK_INDEXER_PIPELINED_DOCUMENTS_DOWNLOADED_TOTAL_BYTES,
getDocumentsDownloadedTotalBytes());
}

public String formatStats(long durationMillis) {
String enqueueingDelayPercentage = PipelinedUtils.formatAsPercentage(getTotalEnqueueWaitTimeMillis(), durationMillis);
long durationSeconds = durationMillis / 1000;
return MetricsFormatter.newBuilder()
.add("duration", FormattingUtils.formatToSeconds(durationSeconds))
.add("durationSeconds", durationSeconds)
.add("documentsDownloaded", getDocumentsDownloadedTotal())
.add("documentsDownloadedTotalBytes", getDocumentsDownloadedTotalBytes())
.add("dataDownloaded", IOUtils.humanReadableByteCountBin(getDocumentsDownloadedTotalBytes()))
.add("enqueueingDelayMillis", getTotalEnqueueWaitTimeMillis())
.add("enqueueingDelayPercentage", enqueueingDelayPercentage)
.build();
}
}

0 comments on commit 98206cb

Please sign in to comment.