Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimzing Expand+Aggregate in sqls with many count distinct [WIP] #10798

Open
wants to merge 1 commit into
base: branch-24.06
Choose a base branch
from

Conversation

binmahone
Copy link
Collaborator

@binmahone binmahone commented May 13, 2024

Fixing #10799. This PR tries to optimize the Expand&Aggregate exec in the first stage of a sql with many count distinct measures.

The optimizations in this PR include:

  1. Avoid allocating&initializing large number of null vectors when doing Expand
  2. Try coaleasce expanded column batches before sending them to Aggregate

@binmahone binmahone changed the title optimzing Expand+Aggregate in sqlw with many count distinct optimzing Expand+Aggregate in sqlw with many count distinct [WIP] May 13, 2024
@binmahone
Copy link
Collaborator Author

build

@winningsix winningsix changed the title optimzing Expand+Aggregate in sqlw with many count distinct [WIP] Optimzing Expand+Aggregate in sqlw with many count distinct [WIP] May 13, 2024
@binmahone binmahone changed the title Optimzing Expand+Aggregate in sqlw with many count distinct [WIP] Optimzing Expand+Aggregate in sqls with many count distinct [WIP] May 13, 2024
@sameerz sameerz added the performance A performance related task/issue label May 13, 2024
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
@binmahone
Copy link
Collaborator Author

build

@binmahone
Copy link
Collaborator Author

@revans2 @abellina @winningsix can you pls take a look of this PR ? we're going to pack a debug build based on this PR

@@ -72,11 +75,17 @@ case class GpuExpandExec(
output: Seq[Attribute],
child: SparkPlan)(
useTieredProject: Boolean = false,
preprojectEnabled: Boolean = false) extends ShimUnaryExecNode with GpuExec {
preprojectEnabled: Boolean = false,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add scaladoc documentation for the new arguments.

case s: GpuScalar =>
if (!s.isValid && cachedNullVectors.get() != null) {
if (!cachedNullVectors.get.containsKey(NullVecKey.apply(s.dataType, numRows))) {
cachedNullVectors.get.put(NullVecKey.apply(s.dataType, numRows),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit,

Suggested change
cachedNullVectors.get.put(NullVecKey.apply(s.dataType, numRows),
cachedNullVectors.get.put(NullVecKey(s.dataType, numRows),

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also could we create the key once and pass it both to containsKey and put?

}

override def put(key: NullVecKey, v: GpuColumnVector): GpuColumnVector = {
if (v.getRowCount > maxNulls) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to throw if we can't cache a vector that has too many rows to be cached?

I think it would be better if it would skip the cache.

Also the user needs to set the number of rows they expect not per column, but overall for all null columns that they might cache (that's how I understand maxNulls). I do not know how to set that number.

Could we maybe configure this via a byte limit? I.e. "cache up to X MB worth of null vectors", that seems easier to reason about, and we can tie this to memory limits. That said, in addition to maxNulls being a sum total of rows, it's also per concurrently executing task. The MB limit would be similar "cache up to X MB worth of null vectors for this task". I am also having a hard time coming up with a good number here.

Ideally this is tied to the spill framework. The spill framework has no limits: if we made all these spillable columns we'd be allowed to fill GPU memory with spillable nulls. I do not know if it makes sense to support limits for categories of objects in that framework seeing this work.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@revans2 @jlowe would be good to get your take on this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. We do not want to fail if the cache is configured badly. We want to run, but with possible reduced performance. I would say that when this happens we clear the cache and return the GpuColumnVector

}

override def put(key: NullVecKey, v: GpuColumnVector): GpuColumnVector = {
if (v.getRowCount > maxNulls) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. We do not want to fail if the cache is configured badly. We want to run, but with possible reduced performance. I would say that when this happens we clear the cache and return the GpuColumnVector

s"($maxNulls) is set too small to hold single vector with ${v.getRowCount} rows.")
}
val iter = entrySet().iterator()
while (iter.hasNext && totalNulls > maxNulls - v.getRowCount) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused why we are going off of maxNulls and not the size of the data being cached? We have potentially multiple orders of magnitude difference in the amount of memory used to cache something. 100 BOOL8 null values would take up 113 bytes, but 100 DECIMAL128 null values would take up 1613 bytes. It is not that hard to figure out the size of the data needed, even for nested types. If you need help with this please let me know.

case class NullVecKey(d: DataType, n: Int)

class NullVecCache(private val maxNulls: Int)
extends util.LinkedHashMap[NullVecKey, GpuColumnVector](100, 0.75f, true) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't understand why we are extending a map instead of wrapping it? Or even better using some other cache data structure built for this type of use case.

If we wrapped it, then we could get true LRU functionality and be able to reset the the priority on a read. It would let us not need to override remove so it throws. That API would just not exist.

override def remove(key: Any): GpuColumnVector = throw new UnsupportedOperationException()
}

val cachedNullVectors = new ThreadLocal[NullVecCache]()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using a thread local for this? This makes the GPU memory used dependent on the number of task threads in Spark. I really would prefer to have a set size in bytes that is for the entire cache.

// This is only for ExpandExec which will generate a lot of null vectors
case class NullVecKey(d: DataType, n: Int)

class NullVecCache(private val maxNulls: Int)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data stored in the cache needs to be spillable in some form. Eventually it would be nice to make it so instead of spilling we can just delete the value from the cache, but in the short term we need to make sure that everything stored in the cache is spillable.

It would also be really nice to have a timeout of some kind. If an entry is unused for a specific amount of time it should be deleted to avoid adding more memory pressure to the system.

GpuColumnVector.from(s, numRows, s.dataType))
}

val ret = cachedNullVectors.get().get(NullVecKey.apply(s.dataType, numRows))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go with a single cache, or even evicting data on spill this will need to change because it would introduce a race condition. Could we please have the API either return an optional value, or not have a separate get and put APIs, but have it be a factory API where we just ask it to hand us back a null column, and it does it.

.doc("Max number of null scalar in null vectors to cache for GPU Expand. " +
"If the number of null scala exceeds this value, the null vectors will not be cached." +
"The value has to be positive for caching to be enabled.")
.internal().integerConf
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the plan here? Both of these features/configs are disabled by default. Is the plan to work with some customers and see if enabling them helps, and then after that possibly enable them by default?

@sameerz
Copy link
Collaborator

sameerz commented May 29, 2024

Please retarget to 24.08

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance A performance related task/issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants