Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the WritePartitioner to exactly match cascading #1805

Merged
merged 3 commits into from Feb 17, 2018

Conversation

johnynek
Copy link
Collaborator

We were failing the laws previously, sometimes taking
more than n + 1 steps where cascading took n. By
improving the logic, we fix those bugs and reach
actually exactly matching cascading.

This should allow use batching to bypass any
case of cascading taking too long to plan.

We were failing the laws previously, sometimes taking
more than n + 1 steps where cascading took n. By
improving the logic, we fix those bugs and reach
actually exactly matching cascading.

This should allow use batching to bypass any
case of cascading taking too long to plan.
@johnynek
Copy link
Collaborator Author

@non helped me get the algorithms right here. Thanks!

@johnynek
Copy link
Collaborator Author

fixes #1804

@@ -215,80 +215,163 @@ object WritePartitioner {
}
}

/**
* If cascading would conside the current pipe as a Logical reduce
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conside => consider?

@@ -80,7 +81,7 @@ class WritePartitionerTest extends FunSuite with PropertyChecks {
}
}

forAll(TypedPipeGen.genWithFakeSources)(afterPartitioningEachStepIsSize1(_))
//forAll(TypedPipeGen.genWithFakeSources)(afterPartitioningEachStepIsSize1(_))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented test?

@@ -127,6 +207,6 @@ class WritePartitionerTest extends FunSuite with PropertyChecks {
.waitFor(Config.empty, Local(true)).get.isEmpty)
}

forAll(TypedPipeGen.genWithIterableSources)(partitioningDoesNotChange(_))
//forAll(TypedPipeGen.genWithIterableSources)(partitioningDoesNotChange(_))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented test?

/**
* This is a lattice value that tracks
* what we have seen below a given TypedPipe as
* we recurse up.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is super neet/nice, could you expand the comments here -- don't think i'd have followed other than I know what your aiming to do.

(Motivation/why we are tracking/what we are aiming to do with it)

rec(src)
case (Fork(src@SourcePipe(_)), rec) =>
case ((cp: CounterPipe[a], bs), rec) =>
mat.map(rec((cp.pipe, bs | Write)))(CounterPipe(_: TypedPipe[(a, Iterable[((String, String), Long)])]))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is a counter pipe | Write ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't need to be, Write is the bottom, so x | Write = x.

@ianoc
Copy link
Collaborator

ianoc commented Feb 16, 2018

lgtm

@johnynek johnynek merged commit 4bbeada into develop Feb 17, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants