Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG]Optimize command fails when using liquid clustering on local Delta Lake & PySpark #3087

Closed
2 of 8 tasks
donielix opened this issue May 13, 2024 · 3 comments · Fixed by #3109
Closed
2 of 8 tasks
Labels
bug Something isn't working

Comments

@donielix
Copy link

donielix commented May 13, 2024

Bug

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Describe the problem

When attempting to optimize a Delta Table configured with liquid clustering, an error occurs during the execution of the optimize().executeCompaction() method.

Steps to reproduce

from pyspark.sql import SparkSession
from delta.pip_utils import configure_spark_with_delta_pip

# Initializes a SparkSession configured with Delta
builder = (
    SparkSession.builder.config(
        "spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"
    )
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .enableHiveSupport()
)
spark= configure_spark_with_delta_pip(
    spark_session_builder=builder
).getOrCreate()

# Initialize an empty Delta Table with liquid clustering
dt = (
    DeltaTable.createIfNotExists(spark)
    .tableName("testtable")
    .addColumn("id", dataType="bigint", nullable=False)
    .addColumn("date", dataType="date", nullable=False)
    .addColumn("name", dataType="string", nullable=False)
    .addColumn("amount", dataType="double")
    .addColumn("year_month", dataType="string", nullable=False)
    .clusterBy("year_month")
    .execute()
)

# Pushes some test data into newly created Delta table
spark.sql(
    """
    INSERT INTO testtable VALUES
    (1, '2024-01-01', 'Jack', 30.5, '2024-01'),
    (2, '2024-02-10', 'Claude', 11.2, '2024-02'),
    (3, '2024-02-25', 'Mick', 10.1, '2024-02')
    """
)

# Optimizes the Delta Table (this triggers the error)
dt.optimize().executeCompaction()

Observed results

When running above snippet, I get an extended error traceback.


Py4JJavaError Traceback (most recent call last)
Cell In[9], line 1
----> 1 dt.optimize().executeCompaction()

File ~/projects/ecoempy/.venv/lib/python3.10/site-packages/delta/tables.py:1391, in DeltaOptimizeBuilder.executeCompaction(self)
1382 @SInCE(2.0) # type: ignore[arg-type]
1383 def executeCompaction(self) -> DataFrame:
1384 """
1385 Compact the small files in selected partitions.
1386
1387 :return: DataFrame containing the OPTIMIZE execution metrics
1388 :rtype: pyspark.sql.DataFrame
1389 """
1390 return DataFrame(
-> 1391 self._jbuilder.executeCompaction(),
1392 getattr(self._spark, "_wrapped", self._spark) # type: ignore[attr-defined]
1393 )

File ~/projects/ecoempy/.venv/lib/python3.10/site-packages/py4j/java_gateway.py:1322, in JavaMember.call(self, *args)
1316 command = proto.CALL_COMMAND_NAME +
1317 self.command_header +
1318 args_command +
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):

File ~/projects/ecoempy/.venv/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py:179, in capture_sql_exception..deco(*a, **kw)
177 def deco(*a: Any, **kw: Any) -> Any:
178 try:
--> 179 return f(*a, **kw)
180 except Py4JJavaError as e:
181 converted = convert_exception(e.java_exception)

File ~/projects/ecoempy/.venv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o123.executeCompaction.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:387)
at org.apache.spark.sql.delta.commands.OptimizeExecutor.$anonfun$optimize$1(OptimizeTableCommand.scala:276)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:168)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:166)
at org.apache.spark.sql.delta.commands.OptimizeExecutor.recordFrameProfile(OptimizeTableCommand.scala:217)
at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:136)
at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
at org.apache.spark.sql.delta.commands.OptimizeExecutor.recordOperation(OptimizeTableCommand.scala:217)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:135)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:125)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:115)
at org.apache.spark.sql.delta.commands.OptimizeExecutor.recordDeltaOperation(OptimizeTableCommand.scala:217)
at org.apache.spark.sql.delta.commands.OptimizeExecutor.optimize(OptimizeTableCommand.scala:255)
at org.apache.spark.sql.delta.commands.OptimizeTableCommand.run(OptimizeTableCommand.scala:180)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
at org.apache.spark.sql.Dataset.(Dataset.scala:220)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:92)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
at org.apache.spark.sql.delta.util.AnalysisHelper.toDataset(AnalysisHelper.scala:92)
at org.apache.spark.sql.delta.util.AnalysisHelper.toDataset$(AnalysisHelper.scala:91)
at io.delta.tables.DeltaOptimizeBuilder.toDataset(DeltaOptimizeBuilder.scala:43)
at io.delta.tables.DeltaOptimizeBuilder.$anonfun$execute$1(DeltaOptimizeBuilder.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.delta.DeltaTableUtils$.withActiveSession(DeltaTable.scala:470)
at io.delta.tables.DeltaOptimizeBuilder.execute(DeltaOptimizeBuilder.scala:85)
at io.delta.tables.DeltaOptimizeBuilder.executeCompaction(DeltaOptimizeBuilder.scala:67)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.complete(Promise.scala:53)
at scala.concurrent.Promise.complete$(Promise.scala:52)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1395)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: java.lang.AssertionError: assertion failed: Cannot do Hilbert clustering by zero or one column!
at scala.Predef$.assert(Predef.scala:223)
at org.apache.spark.sql.delta.skipping.HilbertClustering$.getClusteringExpression(MultiDimClustering.scala:108)
at org.apache.spark.sql.delta.skipping.SpaceFillingCurveClustering.cluster(MultiDimClustering.scala:78)
at org.apache.spark.sql.delta.skipping.SpaceFillingCurveClustering.cluster$(MultiDimClustering.scala:68)
at org.apache.spark.sql.delta.skipping.HilbertClustering$.cluster(MultiDimClustering.scala:106)
at org.apache.spark.sql.delta.skipping.MultiDimClustering$.cluster(MultiDimClustering.scala:59)
at org.apache.spark.sql.delta.commands.OptimizeExecutor.runOptimizeBinJob(OptimizeTableCommand.scala:428)
at org.apache.spark.sql.delta.commands.OptimizeExecutor.$anonfun$optimize$6(OptimizeTableCommand.scala:277)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:384)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
... 8 more

Expected results

I'd expect to run the optimize command successfully

Further details

Environment information

  • Python version: 3.10.14
  • Delta Lake version: 3.2.0
  • Spark version: 3.5.1

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • Yes. I can contribute a fix for this bug independently.
  • Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • No. I cannot contribute a bug fix at this time.
@donielix donielix added the bug Something isn't working label May 13, 2024
@vkorukanti
Copy link
Collaborator

@donielix Could you paste the full error callstack? cc. @zedtang

@donielix
Copy link
Author

@donielix Could you paste the full error callstack? cc. @zedtang

Updated the issue with traceback

@zedtang
Copy link
Collaborator

zedtang commented May 17, 2024

Thanks for reporting! It's due to hilbert clustering not supporting clustering on 1 column, and we should fall back to use zorder in that case.

I sent out a fix: #3109

vkorukanti pushed a commit that referenced this issue May 17, 2024
)

## Description
Fall back to zorder when clustering on a single column, because hilbert
clustering doesn't support 1 column.

Resolves #3087 

## How was this patch tested?
New unit test.
zedtang added a commit to zedtang/delta that referenced this issue May 20, 2024
…lta-io#3109)

## Description
Fall back to zorder when clustering on a single column, because hilbert
clustering doesn't support 1 column.

Resolves delta-io#3087 

## How was this patch tested?
New unit test.
tdas pushed a commit that referenced this issue May 20, 2024
#3121)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
Fall back to zorder when clustering on a single column, because hilbert
clustering doesn't support 1 column.

Resolves #3087 
## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
New unit test.
## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants