Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OAK-10778 - Support downloading from Mongo in parallel. #1435

Merged
merged 24 commits into from May 13, 2024

Conversation

nfsantos
Copy link
Contributor

@nfsantos nfsantos commented Apr 25, 2024

A Mongo cluster (REPLICA_SET) usually consists of one primary and two secondaries. This PR adds support to download in parallel from the two secondaries.

Adds new boolean system property: oak.indexer.pipelined.mongoParallelDump defaults to false.

Parallel download can be enabled only when oak.indexer.pipelined.retryOnConnectionErrors is true. (Parallel download requires ordered traversals on Mongo which are enabled by retry on connection errors.)

Goals of this PR

  • Do not download from the primary because this is more likely to slow down the operation of the Mongo and affect the other workloads (read/writes by Oak).
  • Use one and only one connection at a time to each secondary to avoid overloading any given replica. More on this in the notes below.
  • Gracefully handle scale up/down operations and failures in general, pausing the second download when there is only one secondary available and reconnecting when all secondaries are up.

Implementation

One of the difficulties of parallel downloading is to partition the range of documents among the download threads. We do not know in advance the distribution of the documents over the range of keys (_modified, _id) which are used to download, so it is challenging to distribute them evenly among the download threads. This PR sidesteps this problem by having one thread download in ascending and the other in descending order. This has the limitation that we can only have 2 parallel threads, but this fits nicely the typical configuration of a Mongo cluster with two secondaries. Adding more parallel downloads would provide a smaller increase in overall download speed and would risk overloading the replicas.

The two download threads coordinate to check when the ranges they have downloaded have crossed, to stop the download at this point.

To distribute the threads among replicas this PR creates a custom implementation of ServerSelector. The Mongo Java driver calls the registered ServerSelectors whenever it needs to open a connection to a Mongo cluster to get a list of eligible servers. The default implementations are taken from the readPreference settings, but it is possible to create and register a custom implementation. This PR uses an implementation that allows connections only to secondaries and keeps track of which thread last received a given secondary, so that this secondary is not given to any other thread.

There is also a mechanism to disconnect from a replica who was promoted from secondary to primary. This happens whenever there is a scale up/down. Mongo will first take one secondary down and replace it by a new one, then the other secondary, and before taking down the primary, it promotes one of the new secondaries to primary. Therefore, we may have connected to a secondary which in the meantime was promoted to primary. Since is very likely to happen during a scale up/down, it was important to detect promotions of secondaries to primary and disconnect. This is done by listening to ClusterListener events and having each download thread periodically query to check if the replica it is using is now the Primary. If it is, the replica disconnects and tries to establish a new connection, which will be redirected to a secondary.

Performance results

System 1

Sequential download:

Timings:
  Mongo dump: 12:09:11
  Merge sort: 00:05:28
  Build FFS (Dump+Merge): 12:14:40

Parallel download

Timings:
  Mongo dump: 06:14:16
  Merge sort: 00:05:26
  Build FFS (Dump+Merge): 06:19:46

System 2

Sequential download:

Timings:
  Mongo dump: 00:14:10
  Merge sort: 00:02:19
  Build FFS (Dump+Merge): 00:16:29

Parallel download

Timings:
  Mongo dump: 00:06:41
  Merge sort: 00:02:13
  Build FFS (Dump+Merge): 00:08:54

Note on risk of slowing down the Mongo cluster

When parallel download is enabled, both secondaries will be under load. There may be a concern if this will slow down writes, which need an ack from the primary and a secondary. I do not think this would be a problem in the common cases. A single download connection should not be enough to saturate a node. Mongo seems to allocate only one thread to handle a connection, so as long as the Mongo node has at least 2 cores, the download query will not take up all the CPU. The pressure on IOPS, disk throughput and network bandwidth created by the downloader may in some situations get close to the limit, but even there this pressure is not kept all the time. The protocol used by the Mongo Java driver is synchronous request/response, and the client only sends the request for the next batch of results after the current one is parsed and iterated over, so during this time the Mongo server is idle. I have observed that the IOPS and disk throughput usually stay below 80% on Mongo, which gives some spare headroom to process other queries.

Additional changes

  • Improve the test of recovery from disconnections from Mongo. The previous test was relying on mockito to simulate a connection failure. This was very complex and tedious to write. This PR replaces that test by one that uses a real Mongo server inside a Docker container and toxiproxy to simulate a connection failure.
  • Create a parametrized version of PipelinedIT tests which test all combinations of: regex path filtering, parallel download and retry on connection errors.

This PR has a large change set in part because it refactors the PipelinedMongoDownloadTask in order to make it more concise and cohesive:

  • It moves out the logic to handle regex filtering to a separate class
  • It creates a new DownloadTask class that includes the logic to do the actual download from Mongo, trying to separate it from the logic to setup and launch download threads.

@nfsantos nfsantos changed the title OAK-10778 - Support downloading from Mongo in parallel. Adds new boolean system property: oak.indexer.pipelined.mongoParallelDump. OAK-10778 - Support downloading from Mongo in parallel. Apr 25, 2024
…nd $lte(Long.MAX_VALUE) instead $exits(_modified). $exists also checks for the property being equals to null, which cannot be verified just by looking at an index, because indexes in MongoDB do not contain null values. Using $exists requires retrieving the full document from the column store, which dramatically slows down the traversal.
…on. Ensures that both download threads are shutdown gracefully.

Small refactoring.
Comment on lines +141 to +151
connectedToPrimaryThreads.clear();
lastSeenClusterDescription.getServerDescriptions().stream()
.filter(ServerDescription::isPrimary)
.map(ServerDescription::getAddress)
.forEach(primaryAddress -> {
for (var entry : serverAddressHashMap.entrySet()) {
if (entry.getValue().equals(primaryAddress)) {
connectedToPrimaryThreads.add(entry.getKey());
}
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be consistent in terms of style, you can write this logic in a fully functional way. This would require connectedToPrimaryThreads to be set as non final though. It's a matter of taste.

private Set<Long> connectedToPrimaryThreads = new HashSet<>();
Suggested change
connectedToPrimaryThreads.clear();
lastSeenClusterDescription.getServerDescriptions().stream()
.filter(ServerDescription::isPrimary)
.map(ServerDescription::getAddress)
.forEach(primaryAddress -> {
for (var entry : serverAddressHashMap.entrySet()) {
if (entry.getValue().equals(primaryAddress)) {
connectedToPrimaryThreads.add(entry.getKey());
}
}
});
connectedToPrimaryThreads = lastSeenClusterDescription.getServerDescriptions().stream()
.filter(ServerDescription::isPrimary)
.map(ServerDescription::getAddress)
.flatMap(primaryAddress -> serverAddressHashMap.entrySet().stream()
.filter(entry -> primaryAddress.equals(entry.getValue()))
.map(Map.Entry::getKey))
.collect(Collectors.toSet());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially wrote the code all as streams, but I did not like the stream-within-a-stream nesting and the use of flatMap. I feel that the forEach loop is a bit more clear.

@@ -170,9 +170,9 @@ static List<String> mergeIndexAndCustomExcludePaths(List<String> indexExcludedPa
return indexExcludedPaths;
}

var excludedUnion = new HashSet<>(indexExcludedPaths);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity: I have seen you have replaced vars in multiple places. Any specific reason?

Copy link
Contributor Author

@nfsantos nfsantos May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No specific reason, I'm still unsure about when to use var vs explicit type. In many cases keeping the type annotation makes the code more clear and does not significantly increase verbosity, for instance:

var i = 1;

int i = 1;

The second option makes it clear that it is an int, while the first is ambiguous.

In other cases, even if the type annotation is longer, I still somewhat prefer that it appears at least one time. For instance, here I feel it's better to use var:

var bi = new DownloadPosition(batch[i].getModified(), batch[i].getId());

Because the type is explicit on the right hand side.

But here:

            FindIterable<NodeDocument> mongoIterable = dbCollection
                    .find(findQuery)
                    .sort(sortOrder);

I think it's nicer to have the type annotation because the right hand side does not contain the type.

But it's just my gut feeling of what seems more clear and easy to read, I'm not following any set of best-practices. It would be interesting to have a discussion about the use of var.

while (cursor.hasNext()) {
NodeDocument next = cursor.next();
String id = next.getId();
this.nextLastModified = next.getModified();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My editor warns me that this might produce a NullPointerException. And we use this a few other places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, this would indeed be a problem when doing a column traversal because the query used for that was downloading all documents, even those without the _modified field. We do not need these documents to build the FFS, so it is safe to filter them on the Mongo query. Like this, there is not need to have a null check here. This code is on the critical path, so it should be kept as lean as possible.

…traversal (retry on connection errors false). Documents without _modified field are not needed to build the FFS and like this there is no need to do a null check when calling getModified on the documents.
@nfsantos nfsantos merged commit 98206cb into apache:trunk May 13, 2024
1 of 2 checks passed
@nfsantos nfsantos deleted the OAK-10778 branch May 13, 2024 07:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants