Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the scalability of the join between the LHS and GroupBys by breaking up the join #621

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

mears-stripe
Copy link
Collaborator

@mears-stripe mears-stripe commented Nov 22, 2023

Summary

Improve the scalability of the join between the LHS and GroupBys by breaking up the join. Previously, when joining together a large number of GroupBys, the Spark job could get stuck.

Why / Goal

Prevent the Spark job from getting stuck when joining the LHS with a large number of GroupBys.

Test Plan

Checklist

N.A.

Reviewers

@@ -67,6 +67,7 @@ class Join(joinConf: api.Join,
extends JoinBase(joinConf, endPartition, tableUtils, skipFirstHole, mutationScan, showDf) {

private val bootstrapTable = joinConf.metaData.bootstrapTable
private val joinsAtATime = 8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this consume a spark conf param - via tableUtils?

@@ -324,6 +324,9 @@ case class TableUtils(sparkSession: SparkSession) {
df
}

def addJoinBreak(dataFrame: DataFrame): DataFrame =
dataFrame.cache()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableUtils has a cache_level param and a wrap with cache method that does exception handling to release the resources claimed by the cache. I think we should use that here.

case (partialDf, (rightPart, rightDf)) => joinWithLeft(partialDf, rightDf, rightPart)
case (partialDf, ((rightPart, rightDf), i)) =>
val next = joinWithLeft(partialDf, rightDf, rightPart)
if (((i + 1) % joinsAtATime) == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we have 24 parts - there will be 3 cache points - at 8, 16, 24

16 should evict the 8 cache. 24 shouldn't cache since it is the last one.

@qiyang0221
Copy link

Does the PR mean we will break up the batch request into mini batch request and fetch them parallel? @nikhilsimha

@nikhilsimha
Copy link
Contributor

Does the PR mean we will break up the batch request into mini batch request and fetch them parallel? @nikhilsimha

This basically only applies to spark offline jobs Yang.

@mears-stripe mears-stripe changed the title Break up joins Improve the scalability of the join between the LHS and GroupBys by breaking up the join Nov 23, 2023
@mears-stripe
Copy link
Collaborator Author

Does the PR mean we will break up the batch request into mini batch request and fetch them parallel? @nikhilsimha

This basically only applies to spark offline jobs Yang.

I added some details to the PR description.

And sorry, the PR is still a WIP. I'm working on getting the CI setup to work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants