Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare sourceCall by scheme #37

Open
wants to merge 1 commit into
base: 2.6
Choose a base branch
from

Conversation

dieu
Copy link

@dieu dieu commented Nov 28, 2018

Hello,

We are using version 2.6.1 (unfortunately) in https://github.com/twitter/scalding and encounter the problem with the combination of TupleEntrySchemeIterator (from cascading) and "bad" record reader like "ElepehantBird". more in twitter/scalding#1887

So, we begin to read some data directly from submitter node by HadoopTupleEntrySchemeIterator which extends TupleEntrySchemeIterator.

TupleEntrySchemeIterator use inputIterator (which return recordReader for particular split, https://github.com/Cascading/cascading/blob/2.6.1/cascading-hadoop/src/main/shared/cascading/tap/hadoop/io/HadoopTupleEntrySchemeIterator.java#L52), and iterates through recordReaders to get data from each split, but the problem that TupleEntrySchemeIterator don't invoke createKeyand createValue on a next reacordReader and do it only once in the constructor.

That works fine in most cases, but not with complex recordReaders which reads data in methods createKey and createValue.

@@ -165,6 +165,7 @@ private TupleEntry getNext() throws IOException
if( !hasWaiting && inputIterator.hasNext() )
{
sourceCall.setInput( wrapInput( inputIterator.next() ) );
scheme.sourcePrepare( flowProcess, sourceCall );

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there going to be a big hit to call this each time? Is this the right fix vs in elephantbird?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will hit only once we switch between recordReaders.

Unfortunately, it's impossible to fix in every possible recordRead. many of them written in the same way which used in ElephantBird. I think it's a problem with "elegant" design of recordReader in hadoop, createKey and createValue creates mutable objects which passed to method next which supposed to mutate them.

@cwensel
Copy link
Member

cwensel commented Nov 29, 2018

I fear re-calling sourcePrepare will have a large number of side effects due the fact this is where the ‘context’ object is initialized.

Alone, not really a big deal, the additional GC would probably go unnoticed unless there are lots of parts. But anyone maintaining state, or initializing external resources will immediately notice. Or worse, not notice since the contract for the last 10 years is that sourcePrepare is called once.

But this is probably related to why I introduced sourceRePrepare in Cascading 3.x in order to overcome this oversight.
http://docs.cascading.org/cascading/3.3/javadoc/cascading-core/cascading/scheme/Scheme.html#sourceRePrepare(cascading.flow.FlowProcess,%20cascading.scheme.SourceCall).

I suggest you upgrade to the 3.x line as 2.7 is already 40 months old, and 2.6 is 46 months old.

@dieu
Copy link
Author

dieu commented Nov 29, 2018

I fear re-calling sourcePrepare will have a large number of side effects due the fact this is where the ‘context’ object is initialized.

Alone, not really a big deal, the additional GC would probably go unnoticed unless there are lots of parts. But anyone maintaining state, or initializing external resources will immediately notice.

I believe it's not a big deal because we invoke this initiliazation only when we switch the split.

Or worse, not notice since the contract for the last 10 years is that sourcePrepare is called once.

I'm afraid that not everybody follow this. By the way do you know where is defined this restriction?

I suggest you upgrade to the 3.x line as 2.7 is already 40 months old, and 2.6 is 46 months old.

Unfortunately, we can not upgrade cascading to 3.x at this moment. We defently will try to come back to this, but not in current release.

Thanks for your response.

@cwensel
Copy link
Member

cwensel commented Nov 29, 2018

from the javadoc of sourcePrepare.

Method sourcePrepare is used to initialize resources needed during each call of source(cascading.flow.FlowProcess, SourceCall).
This method is guaranteed to be called once before the first invocation of source(FlowProcess, SourceCall).

Be sure to place any initialized objects in the SourceContext so each instance will remain thread-safe.

@dieu
Copy link
Author

dieu commented Nov 29, 2018

from the javadoc of sourcePrepare.

Method sourcePrepare is used to initialize resources needed during each call of source(cascading.flow.FlowProcess, SourceCall).
This method is guaranteed to be called once before the first invocation of source(FlowProcess, SourceCall).

Be sure to place any initialized objects in the SourceContext so each instance will remain thread-safe.

Sorry I misunderstand, I was thought about createKey and createValue contract.

So, basically sourcePrepare is creating context by invoking createKey and createValueon RecordReader (in most cases which I saw) but the problem is that some of RecordReader is stateful and has meaningful code inside createKey and createValue, so we can not say that we can call it once, otherwise some of RecordReader will be not initialized properly. In the same time we can not fix every possible RecordReader.

@cwensel
Copy link
Member

cwensel commented Nov 29, 2018

So yeah, they didn’t do such a good job documenting the RecordReader contract for the stable/old MR apis (mapred package). You will notice the newer/experimental RecordReader api leaves little for debate (mapreduce package). K/V instance reuse is up to the implementation.

But 10 years ago, we assumed you only ever needed to instantiate the key/values once as GC pause times have always been a critical slowdown.

And for the last 10 years I haven’t seen any complaints. The sourceRePrepare was added to solve the a different set of issues.

@dieu
Copy link
Author

dieu commented Nov 29, 2018

So yeah, they didn’t do such a good job documenting the RecordReader contract for the stable/old MR apis (mapred package). You will notice the newer/experimental RecordReader api leaves little for debate (mapreduce package). K/V instance reuse is up to the implementation.

yes, but unfortunately we have what we have, old RecordReader API still in place and abused by many implementations.

But 10 years ago, we assumed you only ever needed to instantiate the key/values once as GC pause times have always been a critical slowdown.

I'm not sure why it's so critical since we are initializing key/values once per split. It should be so small compare to keys/values from data.

And for the last 10 years I haven’t seen any complaints. The sourceRePrepare was added to solve the a different set of issues.

Here we are :) we didn't see this error before, because before we used TupleEntrySchemeIterator with only single spilt (before we always created a snapshot of data into a single file before reading them at submitter node).

@dieu
Copy link
Author

dieu commented Dec 3, 2018

I would really appreciate if you (@cwensel) could merge this PR and make 2.6.4 release.

I understand that we really far behind the modern version of cascading, but unfortunately, we can not upgrade cascading and scalding at the same time. Once we got the recent version of scalding in production, we can come back to upgrading the cascading version and do not go back to this situation.

This PR is the last problem, so far, which preventing us from releasing recent version of scalding.

Thanks.

@cwensel
Copy link
Member

cwensel commented Dec 4, 2018

As mentioned before, this changes the contract to sourcePrepare, so I will not merge the PR.

@dieu
Copy link
Author

dieu commented Dec 4, 2018

As mentioned before, this changes the contract to sourcePrepare, so I will not merge the PR.

Could you suggest a workaround for us (for instance ParquetRecordReader will not work with current code) without upgrading to next major version cascading?

@cwensel
Copy link
Member

cwensel commented Dec 5, 2018

This is confusing since there is a Cascading 2 and 3 version of the Parquet Scheme. And the build dependency for Cascading 2 is for version 2.5.3.

@oscar-stripe
Copy link

what he means is getting the TupleEntryIterator outside of a hadoop job is not working.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants