Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel processing #132

Merged
merged 13 commits into from Jun 24, 2021
Merged

Conversation

balmukundblr
Copy link
Contributor

Description

Description

Please note- This is not a new PR- Original PR (apache/lucene-solr#2345) was raised on old apache/lucene-solr github repository. This is just a copy in new repo.

Lucene Benchmark Scaling Problem with Reuters Corpus

While Indexing 1 million documents with reuters21578 (plain text Document derived from reuters21578 corpus), we observed that with higher number of Index threads, the Index throughput does not scale and degrades. Existing implementation with synchronization block allows only one thread to pick up a document/file from list, at any given time – this code is part of getNextDocData() in ReutersContentSource.java. With multiple index threads, this becomes a thread contention bottleneck and does not allow the system CPU resource to be used efficiently.

Solution

We developed a strategy to distribute total number of files across multiple number of Indexing threads, so that these threads work independently and parallelly.

Tests

We mainly modified existing getNextDocData(), which is not altering functionality, hence not added any new test cases.

Passed existing tests

Checklist

Please review the following and check all that apply:

  • I have reviewed the guidelines for How to Contribute and my code conforms to the standards described there to the best of my ability.
  • I have created a Jira issue and added the issue ID to my pull request title.
  • I have given Lucene maintainers access to contribute to my PR branch. (optional but recommended)
  • I have developed this patch against the main branch.
  • I have run ./gradlew check.
  • I have added tests for my changes.

@balmukundblr
Copy link
Contributor Author

@mikemccand
We have raised a new PR as you suggested in new lucene github repo.

Thanks & Regards,
Balmukund

Copy link
Member

@mikemccand mikemccand left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks close! I'm a little worried about the corner case when number of threads exceeds number of documents.

Can you share what speedup you saw on what kind of concurrent computer with this versus mainline?

}

// Getting file index value which is set for each thread
int index = Integer.parseInt(Thread.currentThread().getName().substring(12));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is TaskSequence / ParallelTask the only place where new Threads are created in benchmarks?

Could you add a comment here pointing to ParallelTask.java explaining that we named/numbered the threads carefully, and that's why this parsing to int is safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-Yes, TaskSequence.java is only where new Index threads are created.

-We want to ensure that the name of Index threads maintains a guaranteed sequence and we explicitly setup thread names in TaskSequence.java. The thread name maintains "IndexThread-" pattern where is an integer. So, it is safe to parse the thread name to int.
We'll also add necessary comments in ReutersContentSource.java as well.


// Getting file index value which is set for each thread
int index = Integer.parseInt(Thread.currentThread().getName().substring(12));
int fIndex = index + threadIndex[index] * threadIndex.length;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add assert index >= 0 && index < threadIndex.length above this? This way if there is some thread naming bug, and assertions are enabled, we hit AssertionError before AIOOBE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. We'll incorporate this check.

}

// Check if this thread has exhausted its files
if (fIndex >= inputFilesSize) {
threadIndex[index] = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, in the case where number-of-threads is bigger than number-of-input-files, aren't we (always) setting the wrong index back to 0 here? Does that matter? Maybe add a dedicated test case so this new code is exercised?

@mikemccand
Copy link
Member

Thanks for the updates!

It looks like gradle check is upset -- if you run ./gradlew tidy it will re-format your changes and it should pass again!

Copy link
Member

@mikemccand mikemccand left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is really close! I left a few small comments. Thanks @balmukundblr!

int inFileSize = inputFiles.size();

//Modulo Operator covers all three possible senarios i.e. 1. If inputFiles.size() < Num Of Threads 2.inputFiles.size() == Num Of Threads 3.inputFiles.size() > Num Of Threads
int fileIndex = stride % inFileSize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm do we already guard for the (degenerate) case of inFileSize == 0? If not can we add some protection here, e.g. maybe throw a clear exception that there is nothing to index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mike, its already handling in ReutersContentSource.java's setConfig(). Please find the code snippet for the same.
if (inputFiles.size() == 0) {
throw new RuntimeException("No txt files in dataDir: "+dataDir.toAbsolutePath());
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry Mike, i forgot to mention that i've tested with inFileSize == 0 and it throws expected exception.

t[index++] = new ParallelTask(task);
t[index] = new ParallelTask(task);
//Setting unique ThreadName with index value which is used in ReuersContentSource.java's getNextDocData()
t[index].setName("IndexThread-" + index);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, parallel tasks might be running queries too right? Maybe we should pick a more generic name? Maybe ParallelTaskThread-N?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Mike, did the required changes.

// prepare threads
int index = 0;
for (int k = 0; k < repetitions; k++) {
for (int i = 0; i < tasksArray.length; i++) {
final PerfTask task = tasksArray[i].clone();
t[index++] = new ParallelTask(task);
t[index] = new ParallelTask(task);
//Setting unique ThreadName with index value which is used in ReuersContentSource.java's getNextDocData()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you strengthen the comment to state that we should NOT change this thread name, unless we also fix the String -> int parsing logic in ReutersContentSource?

Actually, could we factor out this string part of the thread name into a static final String constant, e.g.static final String PARALLEL_TASK_THREAD_NAME_PREFIX = "ParallelTaskThread";, and reference that constant from both places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorporated the required changes through adding it in Constants.java file and referred from both places.


//Modulo Operator covers all three possible senarios i.e. 1. If inputFiles.size() < Num Of Threads 2.inputFiles.size() == Num Of Threads 3.inputFiles.size() > Num Of Threads
int fileIndex = stride % inFileSize;
int iteration = stride / inFileSize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for improving this logic -- much easier to understand now!

public int doLogic() throws Exception {
IndexWriter iw = getRunData().getIndexWriter();
if (iw != null) {
iw.flushNextBuffer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flushes one thread; not all. I'm honestly not sure what the use-case is of that method. Did you mean to call iw.flush()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for delay response. We observed that post- processing was taking longer time because it was singly threaded. Also, it was depending upon the per-thread indexed data. Hence, we are explicitly flushing the per thread data as soon as it finishes the indexing process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this task is optional and can be used purely on need basis.

@balmukundblr
Copy link
Contributor Author

@mikemccand
Mike, We've committed the code 4 days before and it still in Checking phase. Ideally 'musedev' should not take longer time. Looks like, there is some problem is the checking process. It would be really helpful if you could advise us for the next steps.

@balmukundblr
Copy link
Contributor Author

@mikemccand
Mike, I was wondering, are there any suggestions to incorporate in this PR. Also, need your help to resolve this ** pending checks** issues, it has been more than 15 days and "musedev" still showing in pending state.

@mikemccand
Copy link
Member

it has been more than 15 days and "musedev" still showing in pending state.

Egads! I don't know why it's stuck in Pending. We can skip it -- I'll confirm gradlew check is happy.

Copy link
Member

@mikemccand mikemccand left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @balmukundblr this is a nice improvement in concurrency. I'll try to push soon!

@mikemccand mikemccand merged commit f1d54f7 into apache:main Jun 24, 2021
@mikemccand
Copy link
Member

Thanks @balmukundblr -- I merged this with Lucene's main branch and also backported to 8.x (for eventual future 8.10.0 release).

@balmukundblr
Copy link
Contributor Author

@mikemccand
Thank you very much for your great suggestions which really helped us to improve the code at greater level.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants