Skip to content

Commit

Permalink
Improve documentation and replace use of var keyword by explicit type…
Browse files Browse the repository at this point in the history
… declaration where this makes code more clear.
  • Loading branch information
nfsantos committed Apr 26, 2024
1 parent 45623cf commit 934bea6
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.concurrent.atomic.LongAdder;

/**
* Used to aggregate statistics when downloading from Mongo with two threads
* Aggregates statistics when downloading from Mongo with two threads
*/
public class DownloadStageStatistics {
public static final Logger LOG = LoggerFactory.getLogger(DownloadStageStatistics.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public DownloadPosition getLowerRangeTop() {
* The batch must be in ascending order of (_modified, _id).
* <p>
* Updates the lower range top to b[i].
*
* @param batch The batch of documents to be added to the lower range, must be in ascending order
*/
public synchronized int extendLowerRange(NodeDocument[] batch, int sizeOfBatch) {
// batch must be in ascending order
Expand All @@ -103,6 +105,12 @@ public synchronized int extendLowerRange(NodeDocument[] batch, int sizeOfBatch)
return 0;
}

/**
* Extends the upper range of downloaded documents with the documents in the given batch and returns the index of
* the first/highest document in this batch that was already downloaded by the ascending download thread.
*
* @param batch The batch of documents to be added to the upper range, must be in descending order
*/
public synchronized int extendUpperRange(NodeDocument[] batch, int sizeOfBatch) {
// batch must be in descending order
int i = sizeOfBatch - 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ static List<String> mergeIndexAndCustomExcludePaths(List<String> indexExcludedPa
return indexExcludedPaths;
}

var excludedUnion = new HashSet<>(indexExcludedPaths);
HashSet<String> excludedUnion = new HashSet<>(indexExcludedPaths);
excludedUnion.addAll(customExcludedPaths);
var mergedExcludes = new ArrayList<String>();
ArrayList<String> mergedExcludes = new ArrayList<>();
for (String testPath : excludedUnion) {
// Add a path only if it is not a subpath of any other path in the list
if (excludedUnion.stream().noneMatch(p -> PathUtils.isAncestor(p, testPath))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI,
this.reporter.addConfig(OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING_MAX_PATHS, String.valueOf(regexPathFilteringMaxNumberOfPaths));
this.regexPathFilterFactory = new MongoRegexPathFilterFactory(regexPathFilteringMaxNumberOfPaths);

var parallelDumpConfig = ConfigHelper.getSystemPropertyAsBoolean(OAK_INDEXER_PIPELINED_MONGO_PARALLEL_DUMP, DEFAULT_OAK_INDEXER_PIPELINED_MONGO_PARALLEL_DUMP);
boolean parallelDumpConfig = ConfigHelper.getSystemPropertyAsBoolean(OAK_INDEXER_PIPELINED_MONGO_PARALLEL_DUMP, DEFAULT_OAK_INDEXER_PIPELINED_MONGO_PARALLEL_DUMP);
if (parallelDumpConfig && !retryOnConnectionErrors) {
LOG.warn("Parallel dump requires " + OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS + " to be set to true, but it is false. Disabling parallel dump.");
this.parallelDump = false;
Expand All @@ -256,7 +256,7 @@ public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI,
);
this.reporter.addConfig(OAK_INDEXER_PIPELINED_MONGO_CUSTOM_EXCLUDE_ENTRIES_REGEX, customExcludeEntriesRegex);

var excludePathsString = ConfigHelper.getSystemPropertyAsString(
String excludePathsString = ConfigHelper.getSystemPropertyAsString(
OAK_INDEXER_PIPELINED_MONGO_CUSTOM_EXCLUDED_PATHS,
DEFAULT_OAK_INDEXER_PIPELINED_MONGO_CUSTOM_EXCLUDED_PATHS
).trim();
Expand All @@ -271,7 +271,7 @@ public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI,
.map(String::trim)
.collect(Collectors.toList());
}
var invalidPaths = customExcludedPaths.stream()
List<String> invalidPaths = customExcludedPaths.stream()
.filter(p -> !PathUtils.isValid(p) || !PathUtils.isAbsolute(p) || PathUtils.denotesRoot(p))
.collect(Collectors.toList());
if (!invalidPaths.isEmpty()) {
Expand Down Expand Up @@ -422,10 +422,10 @@ private void downloadWithRetryOnConnectionErrors() throws InterruptedException,

// The current thread will download in ascending order. We launch a separate thread to download in
// descending order.
var originalName = Thread.currentThread().getName();
String originalName = Thread.currentThread().getName();
Thread.currentThread().setName(THREAD_NAME_PREFIX + "-ascending");
try {
var descendingDownloadThread = new Thread(() -> {
Thread descendingDownloadThread = new Thread(() -> {
try {
descendingDownloadTask.download(mongoFilter);
descendingDownloadTask.reportFinalResults();
Expand Down Expand Up @@ -490,6 +490,10 @@ public boolean downloadInAscendingOrder() {
}
}

/**
* Downloads a given range from Mongo. Instances of this class should be used for downloading a single range.
* To download multiple ranges, create multiple instances of this class.
*/
private class DownloadTask {
private final DownloadOrder downloadOrder;
private final DownloadStageStatistics downloadStatics;
Expand Down Expand Up @@ -523,14 +527,14 @@ private void download(Bson mongoQueryFilter) throws InterruptedException, Timeou
if (lastIdDownloaded == null) {
// lastIdDownloaded is null only when starting the download or if there is a connection error
// before anything is downloaded
var firstRange = new DownloadRange(0, Long.MAX_VALUE, null, downloadOrder.downloadInAscendingOrder());
DownloadRange firstRange = new DownloadRange(0, Long.MAX_VALUE, null, downloadOrder.downloadInAscendingOrder());
downloadRange(firstRange, mongoQueryFilter, downloadOrder);
} else {
LOG.info("Recovering from broken connection, finishing downloading documents with _modified={}", nextLastModified);
var partialLastModifiedRange = new DownloadRange(nextLastModified, nextLastModified, lastIdDownloaded, downloadOrder.downloadInAscendingOrder());
DownloadRange partialLastModifiedRange = new DownloadRange(nextLastModified, nextLastModified, lastIdDownloaded, downloadOrder.downloadInAscendingOrder());
downloadRange(partialLastModifiedRange, mongoQueryFilter, downloadOrder);
// Downloaded everything from _nextLastModified. Continue with the next timestamp for _modified
var nextRange = downloadOrder.downloadInAscendingOrder() ?
DownloadRange nextRange = downloadOrder.downloadInAscendingOrder() ?
new DownloadRange(nextLastModified + 1, Long.MAX_VALUE, null, true) :
new DownloadRange(0, nextLastModified - 1, null, false);
downloadRange(nextRange, mongoQueryFilter, downloadOrder);
Expand Down Expand Up @@ -620,7 +624,7 @@ void download(FindIterable<NodeDocument> mongoIterable) throws InterruptedExcept
downloadStageStatistics.incrementDocumentsDownloadedTotalBytes(docSize);
if (batchSize >= maxBatchSizeBytes || nextIndex == batch.length) {
LOG.trace("Enqueuing block with {} elements, estimated size: {} bytes", nextIndex, batchSize);
var downloadCompleted = tryEnqueueCopy(batch, nextIndex);
boolean downloadCompleted = tryEnqueueCopy(batch, nextIndex);
if (downloadCompleted) {
LOG.info("Download of range with download order {} completed, intersected with other download.", downloadOrder);
return;
Expand Down Expand Up @@ -673,13 +677,13 @@ private boolean tryEnqueueCopy(NodeDocument[] batch, int sizeOfBatch) throws Tim
boolean completedDownload = false;
int effectiveSize = sizeOfBatch;
if (downloadOrder != DownloadOrder.UNDEFINED) {
var sizeOfRangeNotAdded = downloadOrder == DownloadOrder.ASCENDING ?
int sizeOfRangeNotAdded = downloadOrder == DownloadOrder.ASCENDING ?
mongoParallelDownloadCoordinator.extendLowerRange(batch, sizeOfBatch) :
mongoParallelDownloadCoordinator.extendUpperRange(batch, sizeOfBatch);
if (sizeOfRangeNotAdded != sizeOfBatch) {
completedDownload = true;
effectiveSize = sizeOfRangeNotAdded;
var firstAlreadySeen = batch[sizeOfRangeNotAdded];
NodeDocument firstAlreadySeen = batch[sizeOfRangeNotAdded];
LOG.info("Download complete, reached already seen document: {}: {}", firstAlreadySeen.getModified(), firstAlreadySeen.getId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ public File createSortedStoreFile() throws IOException {
throw new RuntimeException(ex);
}
}
var elapsedSeconds = start.elapsed(TimeUnit.SECONDS);
long elapsedSeconds = start.elapsed(TimeUnit.SECONDS);
LOG.info("[TASK:PIPELINED-DUMP:END] Metrics: {}", MetricsFormatter.newBuilder()
.add("duration", FormattingUtils.formatToSeconds(elapsedSeconds))
.add("durationSeconds", elapsedSeconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ private void testPipelinedStrategy(Map<String, String> settings,
settings.forEach(System::setProperty);

try (MongoTestBackend rwStore = createNodeStore(false)) {
var rwNodeStore = rwStore.documentNodeStore;
DocumentNodeStore rwNodeStore = rwStore.documentNodeStore;
contentBuilder.accept(rwNodeStore);
MongoTestBackend roStore = createNodeStore(true);

Expand Down Expand Up @@ -416,7 +416,7 @@ private void testSuccessfulDownload(Predicate<String> pathPredicate, List<PathFi
// documents, even if they do not match the includedPaths.
result = result.stream()
.filter(s -> {
var name = s.split("\\|")[0];
String name = s.split("\\|")[0];
return name.length() < Utils.PATH_LONG;
})
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedMongoDownloadTask.OAK_INDEXER_PIPELINED_MONGO_PARALLEL_DUMP;
Expand Down Expand Up @@ -103,7 +104,7 @@ public void mongoDisconnectTest() throws Exception {
Path resultWithoutInterruption;
LOG.info("Creating a FFS: reference run without failures.");
try (MongoTestBackend roBackend = PipelineITUtil.createNodeStore(true, mongoUri, builderProvider)) {
var strategy = createStrategy(roBackend);
PipelinedStrategy strategy = createStrategy(roBackend);
resultWithoutInterruption = strategy.createSortedStoreFile().toPath();
}

Expand All @@ -112,7 +113,7 @@ public void mongoDisconnectTest() throws Exception {
try (MongoTestBackend roBackend = PipelineITUtil.createNodeStore(true, mongoUri, builderProvider)) {
LimitData cutConnectionUpstream = proxy.toxics()
.limitData("CUT_CONNECTION_UPSTREAM", ToxicDirection.DOWNSTREAM, 30000L);
var scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
try {
scheduleExecutor.schedule(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;

import com.codahale.metrics.Counter;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import joptsimple.internal.Strings;
Expand Down Expand Up @@ -56,6 +57,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Predicate;
Expand Down Expand Up @@ -232,7 +234,7 @@ private void testSuccessfulDownload(Predicate<String> pathPredicate, List<PathFi
File file = pipelinedStrategy.createSortedStoreFile();


var counters = statsProvider.getRegistry().getCounters();
SortedMap<String, Counter> counters = statsProvider.getRegistry().getCounters();

long nDocumentsMatchingFilter = regexPathFiltering ?
numberOfFilteredDocuments(roStore.mongoDatabase, "/content/dam") :
Expand Down

0 comments on commit 934bea6

Please sign in to comment.