New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OAK-10778 - Support downloading from Mongo in parallel. #1435
Conversation
…roperty: oak.indexer.pipelined.mongoParallelDump.
… to be used for tests.
… declaration where this makes code more clear.
…every 10k to every 20k.
…nd $lte(Long.MAX_VALUE) instead $exits(_modified). $exists also checks for the property being equals to null, which cannot be verified just by looking at an index, because indexes in MongoDB do not contain null values. Using $exists requires retrieving the full document from the column store, which dramatically slows down the traversal.
…on. Ensures that both download threads are shutdown gracefully. Small refactoring.
connectedToPrimaryThreads.clear(); | ||
lastSeenClusterDescription.getServerDescriptions().stream() | ||
.filter(ServerDescription::isPrimary) | ||
.map(ServerDescription::getAddress) | ||
.forEach(primaryAddress -> { | ||
for (var entry : serverAddressHashMap.entrySet()) { | ||
if (entry.getValue().equals(primaryAddress)) { | ||
connectedToPrimaryThreads.add(entry.getKey()); | ||
} | ||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be consistent in terms of style, you can write this logic in a fully functional way. This would require connectedToPrimaryThreads
to be set as non final though. It's a matter of taste.
private Set<Long> connectedToPrimaryThreads = new HashSet<>();
connectedToPrimaryThreads.clear(); | |
lastSeenClusterDescription.getServerDescriptions().stream() | |
.filter(ServerDescription::isPrimary) | |
.map(ServerDescription::getAddress) | |
.forEach(primaryAddress -> { | |
for (var entry : serverAddressHashMap.entrySet()) { | |
if (entry.getValue().equals(primaryAddress)) { | |
connectedToPrimaryThreads.add(entry.getKey()); | |
} | |
} | |
}); | |
connectedToPrimaryThreads = lastSeenClusterDescription.getServerDescriptions().stream() | |
.filter(ServerDescription::isPrimary) | |
.map(ServerDescription::getAddress) | |
.flatMap(primaryAddress -> serverAddressHashMap.entrySet().stream() | |
.filter(entry -> primaryAddress.equals(entry.getValue())) | |
.map(Map.Entry::getKey)) | |
.collect(Collectors.toSet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially wrote the code all as streams, but I did not like the stream-within-a-stream nesting and the use of flatMap. I feel that the forEach loop is a bit more clear.
...ckrabbit/oak/index/indexer/document/flatfile/pipelined/MongoParallelDownloadCoordinator.java
Outdated
Show resolved
Hide resolved
@@ -170,9 +170,9 @@ static List<String> mergeIndexAndCustomExcludePaths(List<String> indexExcludedPa | |||
return indexExcludedPaths; | |||
} | |||
|
|||
var excludedUnion = new HashSet<>(indexExcludedPaths); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of curiosity: I have seen you have replaced vars in multiple places. Any specific reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No specific reason, I'm still unsure about when to use var vs explicit type. In many cases keeping the type annotation makes the code more clear and does not significantly increase verbosity, for instance:
var i = 1;
int i = 1;
The second option makes it clear that it is an int, while the first is ambiguous.
In other cases, even if the type annotation is longer, I still somewhat prefer that it appears at least one time. For instance, here I feel it's better to use var:
var bi = new DownloadPosition(batch[i].getModified(), batch[i].getId());
Because the type is explicit on the right hand side.
But here:
FindIterable<NodeDocument> mongoIterable = dbCollection
.find(findQuery)
.sort(sortOrder);
I think it's nicer to have the type annotation because the right hand side does not contain the type.
But it's just my gut feeling of what seems more clear and easy to read, I'm not following any set of best-practices. It would be interesting to have a discussion about the use of var.
...va/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/MongoTestBackend.java
Show resolved
Hide resolved
...java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelineITUtil.java
Show resolved
Hide resolved
…ch any documents. Add additional comments.
…ment to reduce its size.
...ache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/MongoDownloaderRegexUtils.java
Outdated
Show resolved
Hide resolved
while (cursor.hasNext()) { | ||
NodeDocument next = cursor.next(); | ||
String id = next.getId(); | ||
this.nextLastModified = next.getModified(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My editor warns me that this might produce a NullPointerException
. And we use this a few other places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, this would indeed be a problem when doing a column traversal because the query used for that was downloading all documents, even those without the _modified
field. We do not need these documents to build the FFS, so it is safe to filter them on the Mongo query. Like this, there is not need to have a null check here. This code is on the critical path, so it should be kept as lean as possible.
...ckrabbit/oak/index/indexer/document/flatfile/pipelined/MongoParallelDownloadCoordinator.java
Show resolved
Hide resolved
...ckrabbit/oak/index/indexer/document/flatfile/pipelined/MongoParallelDownloadCoordinator.java
Show resolved
Hide resolved
...a/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeDocumentCodec.java
Outdated
Show resolved
Hide resolved
…traversal (retry on connection errors false). Documents without _modified field are not needed to build the FFS and like this there is no need to do a null check when calling getModified on the documents.
A Mongo cluster (
REPLICA_SET
) usually consists of one primary and two secondaries. This PR adds support to download in parallel from the two secondaries.Adds new boolean system property:
oak.indexer.pipelined.mongoParallelDump
defaults to false.Parallel download can be enabled only when
oak.indexer.pipelined.retryOnConnectionErrors
is true. (Parallel download requires ordered traversals on Mongo which are enabled by retry on connection errors.)Goals of this PR
Implementation
One of the difficulties of parallel downloading is to partition the range of documents among the download threads. We do not know in advance the distribution of the documents over the range of keys
(_modified, _id)
which are used to download, so it is challenging to distribute them evenly among the download threads. This PR sidesteps this problem by having one thread download in ascending and the other in descending order. This has the limitation that we can only have 2 parallel threads, but this fits nicely the typical configuration of a Mongo cluster with two secondaries. Adding more parallel downloads would provide a smaller increase in overall download speed and would risk overloading the replicas.The two download threads coordinate to check when the ranges they have downloaded have crossed, to stop the download at this point.
To distribute the threads among replicas this PR creates a custom implementation of ServerSelector. The Mongo Java driver calls the registered ServerSelectors whenever it needs to open a connection to a Mongo cluster to get a list of eligible servers. The default implementations are taken from the
readPreference
settings, but it is possible to create and register a custom implementation. This PR uses an implementation that allows connections only to secondaries and keeps track of which thread last received a given secondary, so that this secondary is not given to any other thread.There is also a mechanism to disconnect from a replica who was promoted from secondary to primary. This happens whenever there is a scale up/down. Mongo will first take one secondary down and replace it by a new one, then the other secondary, and before taking down the primary, it promotes one of the new secondaries to primary. Therefore, we may have connected to a secondary which in the meantime was promoted to primary. Since is very likely to happen during a scale up/down, it was important to detect promotions of secondaries to primary and disconnect. This is done by listening to ClusterListener events and having each download thread periodically query to check if the replica it is using is now the Primary. If it is, the replica disconnects and tries to establish a new connection, which will be redirected to a secondary.
Performance results
System 1
Sequential download:
Parallel download
System 2
Sequential download:
Parallel download
Note on risk of slowing down the Mongo cluster
When parallel download is enabled, both secondaries will be under load. There may be a concern if this will slow down writes, which need an ack from the primary and a secondary. I do not think this would be a problem in the common cases. A single download connection should not be enough to saturate a node. Mongo seems to allocate only one thread to handle a connection, so as long as the Mongo node has at least 2 cores, the download query will not take up all the CPU. The pressure on IOPS, disk throughput and network bandwidth created by the downloader may in some situations get close to the limit, but even there this pressure is not kept all the time. The protocol used by the Mongo Java driver is synchronous request/response, and the client only sends the request for the next batch of results after the current one is parsed and iterated over, so during this time the Mongo server is idle. I have observed that the IOPS and disk throughput usually stay below 80% on Mongo, which gives some spare headroom to process other queries.
Additional changes
This PR has a large change set in part because it refactors the PipelinedMongoDownloadTask in order to make it more concise and cohesive: