Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Benchmark custom #2345

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Expand Up @@ -52,6 +52,8 @@ private static final class DateFormatInfo {
private ArrayList<Path> inputFiles = new ArrayList<>();
private int nextFile = 0;
private int iteration = 0;
private int[] threadIndex;
private volatile boolean threadIndexCreated;

@Override
public void setConfig(Config config) {
Expand Down Expand Up @@ -102,19 +104,43 @@ public void close() throws IOException {
public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOException {
Path f = null;
String name = null;
synchronized (this) {
if (nextFile >= inputFiles.size()) {
// exhausted files, start a new round, unless forever set to false.
if (!forever) {
throw new NoMoreDataException();
}
nextFile = 0;
iteration++;
}
f = inputFiles.get(nextFile++);
name = f.toRealPath() + "_" + iteration;
int inputFilesSize = inputFiles.size();

/*
* synchronized (this) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just delete this old code? You are replacing it with a more concurrent version, yay!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will delete the commented codes.

* if (nextFile >= inputFiles.size()) { // exhausted files, start a new round, unless forever set to false.
* if (!forever) {
* throw new NoMoreDataException();
* }
* nextFile = 0;
* iteration++;
* }
* f = inputFiles.get(nextFile++);
* name = f.toRealPath() + "_" +iteration;
* }
*/
if (!threadIndexCreated) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (threadIndexCreated == false) { instead (to reduce chance of accidental future refactoring bugs)? This likely won't pass our code style checker (gradle precommit).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do the required changes.

createThreadIndex();
}

int index = (int) Thread.currentThread().getId() % threadIndex.length;
int fIndex = index + threadIndex[index] * threadIndex.length;
threadIndex[index]++;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused how this approach ensures that we will indeed index every document in the inputFiles?

Thread.currentThread().getId() % threadIndex.length is not guaranteed to reach every possible int from 0 .. threadIndex.length?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although, getId() is controlled by JVM but in our case, all threadIndex are getting initialized at once. Hence, there is high chance of getting guaranteed sequence of thread id, as we also observed. However, we understand your concern and tweaked our code in such a way that it guaranteed to reach every possible int from 0 .. threadIndex.length. We achieved it by setting a unique thread name and parsing the same for calculating the index value.


// Sanity check, if # threads is greater than # input files, wrap index
if (index >= inputFilesSize) index %= inputFilesSize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move the index %= inputFilesSize to newline and inside { ... } body?


// Check if this thread has exhausted its files
if (fIndex >= inputFilesSize) {
threadIndex[index] = 0;
fIndex = index + threadIndex[index] * threadIndex.length;
threadIndex[index]++;
iteration++;
}

f = inputFiles.get(fIndex);
name = f.toRealPath() + "_" + iteration;

try (BufferedReader reader = Files.newBufferedReader(f, StandardCharsets.UTF_8)) {
// First line is the date, 3rd is the title, rest is body
String dateStr = reader.readLine();
Expand Down Expand Up @@ -146,4 +172,11 @@ public synchronized void resetInputs() throws IOException {
nextFile = 0;
iteration = 0;
}

private synchronized void createThreadIndex() {
if (!threadIndexCreated) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

== false instead? Or maybe change to assert threadIndexCreated == false since you also check this up above with a real if already?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do the required changes.

threadIndex = new int[getConfig().getNumThreads()];
threadIndexCreated = true;
}
}
}
Expand Up @@ -334,6 +334,7 @@ private int doParallelTasks() throws Exception {

initTasksArray();
ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()];
this.getRunData().getConfig().setNumThreads(t.length);
// prepare threads
int index = 0;
for (int k = 0; k < repetitions; k++) {
Expand Down
Expand Up @@ -54,6 +54,7 @@ public class Config {
private HashMap<String, Object> valByRound = new HashMap<>();
private HashMap<String, String> colForValByRound = new HashMap<>();
private String algorithmText;
private int numThreads = 1;

/**
* Read both algorithm and config properties.
Expand Down Expand Up @@ -113,6 +114,14 @@ public Config(Properties props) {
}
}

public void setNumThreads(int numThreads) {
this.numThreads = numThreads;
}

public int getNumThreads() {
return numThreads;
}

@SuppressWarnings({"unchecked", "rawtypes"})
private void printProps() {
System.out.println("------------> config properties:");
Expand Down