-
Notifications
You must be signed in to change notification settings - Fork 219
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimzing Expand+Aggregate in sqls with many count distinct [WIP] #10798
base: branch-24.06
Are you sure you want to change the base?
Conversation
build |
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
d4a8261
to
cdc867d
Compare
build |
@revans2 @abellina @winningsix can you pls take a look of this PR ? we're going to pack a debug build based on this PR |
@@ -72,11 +75,17 @@ case class GpuExpandExec( | |||
output: Seq[Attribute], | |||
child: SparkPlan)( | |||
useTieredProject: Boolean = false, | |||
preprojectEnabled: Boolean = false) extends ShimUnaryExecNode with GpuExec { | |||
preprojectEnabled: Boolean = false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add scaladoc documentation for the new arguments.
case s: GpuScalar => | ||
if (!s.isValid && cachedNullVectors.get() != null) { | ||
if (!cachedNullVectors.get.containsKey(NullVecKey.apply(s.dataType, numRows))) { | ||
cachedNullVectors.get.put(NullVecKey.apply(s.dataType, numRows), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit,
cachedNullVectors.get.put(NullVecKey.apply(s.dataType, numRows), | |
cachedNullVectors.get.put(NullVecKey(s.dataType, numRows), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also could we create the key once and pass it both to containsKey and put?
} | ||
|
||
override def put(key: NullVecKey, v: GpuColumnVector): GpuColumnVector = { | ||
if (v.getRowCount > maxNulls) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really want to throw if we can't cache a vector that has too many rows to be cached?
I think it would be better if it would skip the cache.
Also the user needs to set the number of rows they expect not per column, but overall for all null columns that they might cache (that's how I understand maxNulls
). I do not know how to set that number.
Could we maybe configure this via a byte limit? I.e. "cache up to X MB worth of null vectors", that seems easier to reason about, and we can tie this to memory limits. That said, in addition to maxNulls
being a sum total of rows, it's also per concurrently executing task. The MB limit would be similar "cache up to X MB worth of null vectors for this task". I am also having a hard time coming up with a good number here.
Ideally this is tied to the spill framework. The spill framework has no limits: if we made all these spillable columns we'd be allowed to fill GPU memory with spillable nulls. I do not know if it makes sense to support limits for categories of objects in that framework seeing this work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. We do not want to fail if the cache is configured badly. We want to run, but with possible reduced performance. I would say that when this happens we clear the cache and return the GpuColumnVector
} | ||
|
||
override def put(key: NullVecKey, v: GpuColumnVector): GpuColumnVector = { | ||
if (v.getRowCount > maxNulls) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. We do not want to fail if the cache is configured badly. We want to run, but with possible reduced performance. I would say that when this happens we clear the cache and return the GpuColumnVector
s"($maxNulls) is set too small to hold single vector with ${v.getRowCount} rows.") | ||
} | ||
val iter = entrySet().iterator() | ||
while (iter.hasNext && totalNulls > maxNulls - v.getRowCount) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused why we are going off of maxNulls
and not the size of the data being cached? We have potentially multiple orders of magnitude difference in the amount of memory used to cache something. 100 BOOL8 null values would take up 113 bytes, but 100 DECIMAL128 null values would take up 1613 bytes. It is not that hard to figure out the size of the data needed, even for nested types. If you need help with this please let me know.
case class NullVecKey(d: DataType, n: Int) | ||
|
||
class NullVecCache(private val maxNulls: Int) | ||
extends util.LinkedHashMap[NullVecKey, GpuColumnVector](100, 0.75f, true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really don't understand why we are extending a map instead of wrapping it? Or even better using some other cache data structure built for this type of use case.
If we wrapped it, then we could get true LRU functionality and be able to reset the the priority on a read. It would let us not need to override remove so it throws. That API would just not exist.
override def remove(key: Any): GpuColumnVector = throw new UnsupportedOperationException() | ||
} | ||
|
||
val cachedNullVectors = new ThreadLocal[NullVecCache]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we using a thread local for this? This makes the GPU memory used dependent on the number of task threads in Spark. I really would prefer to have a set size in bytes that is for the entire cache.
// This is only for ExpandExec which will generate a lot of null vectors | ||
case class NullVecKey(d: DataType, n: Int) | ||
|
||
class NullVecCache(private val maxNulls: Int) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The data stored in the cache needs to be spillable in some form. Eventually it would be nice to make it so instead of spilling we can just delete the value from the cache, but in the short term we need to make sure that everything stored in the cache is spillable.
It would also be really nice to have a timeout of some kind. If an entry is unused for a specific amount of time it should be deleted to avoid adding more memory pressure to the system.
GpuColumnVector.from(s, numRows, s.dataType)) | ||
} | ||
|
||
val ret = cachedNullVectors.get().get(NullVecKey.apply(s.dataType, numRows)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we go with a single cache, or even evicting data on spill this will need to change because it would introduce a race condition. Could we please have the API either return an optional value, or not have a separate get and put APIs, but have it be a factory API where we just ask it to hand us back a null column, and it does it.
.doc("Max number of null scalar in null vectors to cache for GPU Expand. " + | ||
"If the number of null scala exceeds this value, the null vectors will not be cached." + | ||
"The value has to be positive for caching to be enabled.") | ||
.internal().integerConf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the plan here? Both of these features/configs are disabled by default. Is the plan to work with some customers and see if enabling them helps, and then after that possibly enable them by default?
Please retarget to 24.08 |
Fixing #10799. This PR tries to optimize the Expand&Aggregate exec in the first stage of a sql with many count distinct measures.
The optimizations in this PR include: