Skip to content

Commit

Permalink
Merge pull request #10557 from NVIDIA/branch-24.02
Browse files Browse the repository at this point in the history
Merge branch 'branch-24.02' into main [skip ci]
  • Loading branch information
NvTimLiu committed Mar 6, 2024
2 parents 440a5d6 + 2ac71eb commit 4b866f5
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 15 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Change log
Generated on 2024-03-05
Generated on 2024-03-06

## Release 24.02

Expand Down Expand Up @@ -46,6 +46,7 @@ Generated on 2024-03-05
### Bugs Fixed
|||
|:---|:---|
|[#10548](https://github.com/NVIDIA/spark-rapids/issues/10548)|[BUG] test_dpp_bypass / test_dpp_via_aggregate_subquery failures in CI Databricks 13.3|
|[#10530](https://github.com/NVIDIA/spark-rapids/issues/10530)|test_delta_merge_match_delete_only java.lang.OutOfMemoryError: GC overhead limit exceeded|
|[#10464](https://github.com/NVIDIA/spark-rapids/issues/10464)|[BUG] spark334 and spark342 shims missed in scala2.13 dist jar|
|[#10473](https://github.com/NVIDIA/spark-rapids/issues/10473)|[BUG] Leak when running RANK query|
Expand Down Expand Up @@ -123,6 +124,8 @@ Generated on 2024-03-05
### PRs
|||
|:---|:---|
|[#10551](https://github.com/NVIDIA/spark-rapids/pull/10551)|Try to make degenerative joins here impossible for these tests|
|[#10546](https://github.com/NVIDIA/spark-rapids/pull/10546)|Update changelog [skip ci]|
|[#10541](https://github.com/NVIDIA/spark-rapids/pull/10541)|Fix Delta log cache size settings during integration tests|
|[#10525](https://github.com/NVIDIA/spark-rapids/pull/10525)|Update changelog for v24.02.0 release [skip ci]|
|[#10465](https://github.com/NVIDIA/spark-rapids/pull/10465)|Add missed shims for scala2.13|
Expand Down
29 changes: 15 additions & 14 deletions integration_tests/src/main/python/dpp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@

def create_dim_table(table_name, table_format, length=500):
def fn(spark):
# Pick a random filter value, but make it constant for the whole
df = gen_df(spark, [
('key', IntegerGen(nullable=False, min_val=0, max_val=9, special_cases=[])),
('skey', IntegerGen(nullable=False, min_val=0, max_val=4, special_cases=[])),
('ex_key', IntegerGen(nullable=False, min_val=0, max_val=3, special_cases=[])),
('value', value_gen),
# specify nullable=False for `filter` to avoid generating invalid SQL with
# expression `filter = None` (https://github.com/NVIDIA/spark-rapids/issues/9817)
('filter', RepeatSeqGen(
IntegerGen(min_val=0, max_val=length, special_cases=[], nullable=False), length=length // 20))
('filter', RepeatSeqGen(IntegerGen(nullable=False), length=1))
], length)
df.cache()
df.write.format(table_format) \
.mode("overwrite") \
.saveAsTable(table_name)
return df.select('filter').where("value > 0").first()[0]
return df.select('filter').first()[0], df.select('ex_key').first()[0]

return with_cpu_session(fn)

Expand Down Expand Up @@ -146,7 +146,7 @@ def fn(spark):
dim_table AS (
SELECT dim.key as key, dim.value as value, dim.filter as filter
FROM {1} dim
WHERE ex_key = 3
WHERE ex_key = {3}
ORDER BY dim.key
)
SELECT key, max(value)
Expand Down Expand Up @@ -181,8 +181,9 @@ def fn(spark):
def test_dpp_reuse_broadcast_exchange(spark_tmp_table_factory, store_format, s_index, aqe_enabled):
fact_table, dim_table = spark_tmp_table_factory.get(), spark_tmp_table_factory.get()
create_fact_table(fact_table, store_format, length=10000)
filter_val = create_dim_table(dim_table, store_format, length=2000)
statement = _statements[s_index].format(fact_table, dim_table, filter_val)
filter_val, ex_key_val = create_dim_table(dim_table, store_format, length=2000)
statement = _statements[s_index].format(fact_table, dim_table, filter_val, ex_key_val)

if is_databricks113_or_later() and aqe_enabled == 'true':
# SubqueryBroadcastExec is unoptimized in Databricks 11.3 with EXECUTOR_BROADCAST
# See https://github.com/NVIDIA/spark-rapids/issues/7425
Expand All @@ -202,8 +203,8 @@ def test_dpp_reuse_broadcast_exchange(spark_tmp_table_factory, store_format, s_i
def test_dpp_reuse_broadcast_exchange_cpu_scan(spark_tmp_table_factory):
fact_table, dim_table = spark_tmp_table_factory.get(), spark_tmp_table_factory.get()
create_fact_table(fact_table, 'parquet', length=10000)
filter_val = create_dim_table(dim_table, 'parquet', length=2000)
statement = _statements[0].format(fact_table, dim_table, filter_val)
filter_val, ex_key_val = create_dim_table(dim_table, 'parquet', length=2000)
statement = _statements[0].format(fact_table, dim_table, filter_val, ex_key_val)
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: spark.sql(statement),
# The existence of GpuSubqueryBroadcastExec indicates the reuse works on the GPU
Expand All @@ -226,8 +227,8 @@ def test_dpp_reuse_broadcast_exchange_cpu_scan(spark_tmp_table_factory):
def test_dpp_bypass(spark_tmp_table_factory, store_format, s_index, aqe_enabled):
fact_table, dim_table = spark_tmp_table_factory.get(), spark_tmp_table_factory.get()
create_fact_table(fact_table, store_format)
filter_val = create_dim_table(dim_table, store_format)
statement = _statements[s_index].format(fact_table, dim_table, filter_val)
filter_val, ex_key_val = create_dim_table(dim_table, store_format)
statement = _statements[s_index].format(fact_table, dim_table, filter_val, ex_key_val)
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: spark.sql(statement),
# Bypass with a true literal, if we can not reuse broadcast exchange.
Expand All @@ -250,8 +251,8 @@ def test_dpp_bypass(spark_tmp_table_factory, store_format, s_index, aqe_enabled)
def test_dpp_via_aggregate_subquery(spark_tmp_table_factory, store_format, s_index, aqe_enabled):
fact_table, dim_table = spark_tmp_table_factory.get(), spark_tmp_table_factory.get()
create_fact_table(fact_table, store_format)
filter_val = create_dim_table(dim_table, store_format)
statement = _statements[s_index].format(fact_table, dim_table, filter_val)
filter_val, ex_key_val = create_dim_table(dim_table, store_format)
statement = _statements[s_index].format(fact_table, dim_table, filter_val, ex_key_val)
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: spark.sql(statement),
# SubqueryExec appears if we plan extra subquery for DPP
Expand All @@ -271,8 +272,8 @@ def test_dpp_via_aggregate_subquery(spark_tmp_table_factory, store_format, s_ind
def test_dpp_skip(spark_tmp_table_factory, store_format, s_index, aqe_enabled):
fact_table, dim_table = spark_tmp_table_factory.get(), spark_tmp_table_factory.get()
create_fact_table(fact_table, store_format)
filter_val = create_dim_table(dim_table, store_format)
statement = _statements[s_index].format(fact_table, dim_table, filter_val)
filter_val, ex_key_val = create_dim_table(dim_table, store_format)
statement = _statements[s_index].format(fact_table, dim_table, filter_val, ex_key_val)
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: spark.sql(statement),
# SubqueryExec appears if we plan extra subquery for DPP
Expand Down

0 comments on commit 4b866f5

Please sign in to comment.