-
Notifications
You must be signed in to change notification settings - Fork 8.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HDFS-17532. RBF: Allow router state store cache update to overwrite and delete in parallel #6839
Conversation
💔 -1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kokonguyen191 thanks for your report. This change makes sense.
The overwrite operation and delete operation should be handled asynchronously, so that the StateStores can be updated in time and avoid unnecessary safe-mode.
The main thread can submit these callables to a threadPoolExecutor and not wait for the result. So that the main thread can return the normal records in time. The newRecords
will contain some pending or deleted records, which is different from the current logic, but I thinks it doesn't matter because these pending or deleted records is expired in the newRecords
.
For the newRecords
we should use a ThreadPoolExecutor to submit these tasks and
…nd delete in parallel
💔 -1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This version is clear, leave some minor comments
if (conf.getBoolean( | ||
RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_ASYNC_OVERRIDE, | ||
RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_ASYNC_OVERRIDE_DEFAULT)) { | ||
executor = new ThreadPoolExecutor(2, 2, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can refer HDFS-16848 to change this configuration to the number of threads.
throw new IOException(e); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public <R extends BaseRecord> List<R> handleOverwriteAndDelete(List<R> commitRecords,
List<R> deleteRecords) throws IOException {
List<R> result = null;
try {
// overwrite all expired records.
if (commitRecords != null && !commitRecords.isEmpty()) {
Callable<StateStoreOperationResult> overwriteCallable =
() -> putAll(commitRecords, true, false);
if (executor != null) {
executor.submit(overwriteCallable);
} else {
overwriteCallable.call();
}
}
// delete all deletable records.
if (deleteRecords != null && !deleteRecords.isEmpty()) {
Callable<Map<R, Boolean>> deletionCallable = () -> removeMultiple(deleteRecords);
if (executor != null) {
executor.submit(deletionCallable);
} else {
result = new ArrayList<>();
Map<R, Boolean> removedRecords = deletionCallable.call();
for (Map.Entry<R, Boolean> entry : removedRecords.entrySet()) {
if (entry.getValue()) {
result.add(entry.getKey());
}
}
}
}
} catch (Exception e) {
throw new IOException(e);
}
return result;
}
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
merged. Thanks @kokonguyen191 for your report. |
Description of PR
This ticket aims to allow the overwrite part and delete part of org.apache.hadoop.hdfs.server.federation.store.CachedRecordStore#overrideExpiredRecords to run in parallel.
Sister ticket to HDFS-17529