Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NUTCH-2793 indexer-csv: make it work in distributed mode #534

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

pmezard
Copy link
Contributor

@pmezard pmezard commented Jun 10, 2020

Before the change, the output file name was hard-coded to "nutch.csv".
When running in distributed mode, multiple reducers would clobber each
other output.

After the change, the filename is taken from the first open(cfg, name)
initialization call, where name is a unique file name generated by
IndexerOutputFormat, derived from hadoop FileOutputFormat. The CSV files
are now named like part-r-000xx.

Before the change, the output file name was hard-coded to "nutch.csv".
When running in distributed mode, multiple reducers would clobber each
other output.

After the change, the filename is taken from the first open(cfg, name)
initialization call, where name is a unique file name generated by
IndexerOutputFormat, derived from hadoop FileOutputFormat. The CSV files
are now named like part-r-000xx.
Copy link
Contributor

@sebastian-nagel sebastian-nagel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @pmezard! The indexer-csv was initially thought more as a debugging tool and eventually a quick export utility in local mode only (CSV is not really a format for big data). To lift the limitation to local mode would require a couple of substantial changes to the IndexWriter interface, esp. if we want to reliably allow for any filesystems supported by Hadoop. You may have a look on this description of the committer architecture to get an insight into the requirements of atomic commits etc.

@@ -192,7 +189,7 @@ protected int find(String value, int start) {

@Override
public void open(Configuration conf, String name) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is deprecated since the switch to the XML-based index writer configuration (see NUTCH-1480 and the wiki page IndexWriters). "name" was just an arbitrary name not a file name indicating a task-specific output path. We would need a method which takes both: the IndexWriterParams and the output path. This would require changes in the IndexWriter interface and also the classes IndexWriters and IndexerMapReduce. I'm also not sure whether the output path alone is sufficient. We'll eventually need an OutputCommitter and need to think about situations if we have multiple index writers (eg. via exchanges). See also the discussion in NUTCH-1541.

outpath | Output path / directory (local filesystem path, relative to current working directory) | csvindexwriter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still "local filesystem"? Ev. we could the outpath to overcome the problem of multiple index writers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I did not understand that, could you elaborate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I've mixed two points mixed together:

  • the description would also need a change as it will not be a path on the local filesystem if running in distributed mode
  • there is also the open question how to allow two index writers writing output the filesystem:
    • in local mode this would require that the outpath points to a different directory
    • in distributed mode we could use outpath to write into distinct output directories or distinct subdirectories of one job-specific output directory

@pmezard
Copy link
Contributor Author

pmezard commented Jun 10, 2020

What are the backward compatibility requirements for nutch? Is it OK to just change the interface and implement what you suggest? Should it be best-effort to keep things BC? Or is it impossible to implement such a change at this point?

@sebastian-nagel
Copy link
Contributor

Is it OK to just change the interface and implement what you suggest?

Yes, that's ok. We'll put a notice about a breaking change to the release notes, so that users having there own indexer plugin know they have to adapt it.

Should it be best-effort to keep things BC?

We could try to only extend the IndexWriter interface and provide default do-nothing implementations for newly added methods as most index writers do not write data to the filesystem.

@pmezard
Copy link
Contributor Author

pmezard commented Jun 11, 2020

OK, there is a lot to unpack. Let me try to rephrase what was my naive understanding of the issue, how I intended to fix it and what is wrong about it.

What I saw is indexing to csv worked locally but failed in a distributed setup (with only 3 nodes). The reduce step emitted errors when writing data to GCS. At the end, there was something containing roughly a third of the expected dataset. I assumed I had 3 reducers overwriting each other with only one winner at the end (or a mix of winning output blocks). So I thought "if only I could map the CSVIndexWriter output file to a reducer to separate each reducer output, that would solve the issue".

What you are saying is:

  • In addition to distributed mode requiring the writers output to be separated, there is a lot of complexity involved with dealing with eventually consistent object stores (I will assume that GCS works roughly like S3). Ideally we would like reducers output to appear in the outpath only if the tasks or jobs succeed, which involves the commiter logic you referenced. But in an initial implementation we may not care about that. If the indexing fails, partial output will be left in outpath and such is life (I am OK with that).
  • I assumed that NutchAction writes in a given reducer are serialized. It it no clear to me if this is correct or not.
  • Exchanges introduce additional complexity in that a single NutchAction can be handled by more than one writer. I do not see what would be the issue with this assuming each writer output are separated. If I have 2 writers with an outpath set to "out1" and "out2", in a reducer generating a "part-r-0001", the actions would go either in "out1/part-r-0001" or "out2/part-r-0002" or both. I do not see overlapping writes there.
  • Same reasoning with there is also the open question how to allow two index writers writing output the filesystem:. Again I assume the writers have distinct output "directories" and the active reducer defines a unique output file name, so the combination of both should be unique.
  • About "name" was just an arbitrary name not a file name indicating a task-specific output path, maybe but does anything prevents it to be used that way? getUniqueFile seems suitable here.

With this current understanding, I would now implement it like:

  • Kill open(Configuration cfg, String name) method, if possible (I haven't checked the code yet).
  • Refactor open(IndexWriterParams params) into open(IndexWriterParams params, String name), where name would be the same thing passed to the other method.
  • In CSVIndexWriter, use name directly and drop the filename kludge I introduced.
  • Maybe implement a fallback of the previous method to the new one with a dummy argument.

How far am I?

@sebastian-nagel
Copy link
Contributor

Thanks for the exhaustive listing. I have only a few points to add.

I assumed that NutchAction writes in a given reducer are serialized. It it no clear to me if this is correct or not.

The MapReduce framework takes care of data serialization and concurrency issues: the reduce() method is never called concurrently within one task - tasks run in parallel and that's why every task needs it's own output (part-r-nnnnn). The name of the output file (the number in n) is also determined by the framework - that's important if a task is restarted to avoid duplicated output.

writers have distinct output "directories" and the active reducer defines a unique output file name, so the combination of both should be unique.

I think we need 3 components:

  • the task-specific file or folder (part-r-nnnnn)
  • a unique folder per index writer (eg. the name or a path defined in index-writers.xml)
  • a job-specific output location - you do not want to change the index-writers.xml for that if you run another indexing job

In short, the path of a task output might look like: job-output/indexer-csv-1/part-r-00000.csv

getUniqueFile

You mean [ParseOutputFormat::getUniqueFile](https://github.com/apache/nutch/blob/59d0d9532abdac409e123ab103a506cfb0df790a/src/java/org/apache/nutch/parse/ParseOutputFormat.java#L120]? ParseOutputFormat or FetcherOutputFormat are good examples as they write output into multiple segment subdirectories. Hence, there are no plugins involved which determine whether there is output written to the filesystem or not.

Maybe implement a fallback of the previous method to the new one with a dummy argument

That could be done using default method implementations in Java 8 interfaces. Note: Nutch requires now Java 8 but it started with Java 1.4 and there is still a lot of code not using features of Java 8.

Also, to keep the indexer usable (because most index writers (solr, elasticsearch, etc.) do not write output to the filesystem): if nothing is written to the filesystem IndexingJob should not require an output location as command-line argument.

@pmezard
Copy link
Contributor Author

pmezard commented Jun 12, 2020

Thank you for the details.

One thing I wonder is if it would not be possible to define the index-writers specific path as their identifier in index-writers.xml, at least by default. It would be unique by construction, which reduces a bit the amount of configuration. Drawbacks:

  • The identifier may be arbitrary and not compatible with FS/Object stores paths constraints. Not sure how hard it would be to detect that in practice, or if it is a real problem in practice.
  • Said identifiers are a bit ugly, like indexer_csv_1. Maybe we can change them. Or maybe that's not an issue.

@sebastian-nagel
Copy link
Contributor

Yes, we could use the identifier but as we already have the param "outpath" - why not use it? The other constraints should be documented.

@sebastian-nagel sebastian-nagel marked this pull request as draft June 21, 2022 10:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants