Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG][Spark] Deletion Vectors can cause AMBIGUOUS_REFERENCE errors on MERGE #2943

Open
2 of 8 tasks
whitleykeith opened this issue Apr 22, 2024 · 1 comment
Open
2 of 8 tasks
Labels
bug Something isn't working

Comments

@whitleykeith
Copy link

whitleykeith commented Apr 22, 2024

Bug

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Describe the problem

TL;DR: Certain MERGE operations with deletion vectors enabled can consistently fail, though more investigation is needed on why these specific MERGEs fail

For context, we have a system to incremental take snapshots from upstream JDBC sources and write them into Delta. This system ultimately creates a DF that looks something like this like this:

+----+------+------+------------+
| id | col1 | col2 | _is_delete |
+----+------+------+------------+
|  1 | val1 | val2 | False      |
|  2 | val3 | val4 | False      |
|  3 | NULL | NULL | True       |
+----+------+------+------------+

The _is_delete column is a temporary column in this DF to determine if a row is being deleted or not in the Delta table. This DF is then MERGED into our existing snapshot table (we would have taken a normal snapshot if the table didn't exist yet), updating/deleting necessary rows. We do this in one MERGE so we can have single transaction for a given snapshot, and this works pretty well for our tables across the board.

We recently enabled Deletion Vectors for performance benefits, etc. and have noticed a sparse-yet-unavoidable ERRORs since enabling it. The core error is Reference `filePath` is ambiguous, could be: [`filePath`, `filePath`, `filePath`], and the stacktrace (pasted below) indicates this is happening when building the DV.

We've noticed the following:

  • This does not happen to every table. In fact, it's only happening to one of them at the moment
  • The affected table does not have a column called filePath, but does have one called path.
  • The affected table will have MERGEs consistently fail while DVs are enabled (i.e. not transient)
  • The affected table will have successful MERGEs when DVs are disabled.
  • It does not seem to be related to the size of the Delta Table. The affected table is pretty small
  • Will happen even if there are no deletes to apply

Steps to reproduce

This is the following merge command we use:

      deltaTable
        .as("current")
        .merge(
          rows.as("rows"),
          s"current.${idCol} = rows.${idCol} and current.${idCol} >= ${minId} and current.${idCol} <= ${maxId}"
        )
        .whenMatched("rows._is_delete = true")
        .delete()
        .whenMatched()
        .updateAll()
        .whenNotMatched("rows._is_delete is null")
        .insertAll()
        .execute()

Observed results

Full Failure of MERGE operation

Expected results

Successful MERGE operation

Further details

Stacktrace:

org.apache.spark.sql.AnalysisException: [AMBIGUOUS_REFERENCE] Reference `filePath` is ambiguous, could be: [`filePath`, `filePath`, `filePath`].
 	at org.apache.spark.sql.errors.QueryCompilationErrors$.ambiguousReferenceError(QueryCompilationErrors.scala:1938)
 	at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:377)
 	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:144)
 	at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpressionByPlanChildren$1(ColumnResolutionHelper.scala:364)
 	at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpression$3(ColumnResolutionHelper.scala:157)
 	at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:100)
 	at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpression$1(ColumnResolutionHelper.scala:164)
 	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
 	at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.innerResolve$1(ColumnResolutionHelper.scala:135)
 	at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpression(ColumnResolutionHelper.scala:194)
 	at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpressionByPlanChildren(ColumnResolutionHelper.scala:371)
 	at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpressionByPlanChildren$(ColumnResolutionHelper.scala:357)
 	at org.apache.spark.sql.catalyst.analysis.ResolveReferencesInAggregate$.resolveExpressionByPlanChildren(ResolveReferencesInAggregate.scala:49)
 	at org.apache.spark.sql.catalyst.analysis.ResolveReferencesInAggregate$.$anonfun$apply$1(ResolveReferencesInAggregate.scala:61)
 	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
 	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
 	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
 	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
 	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
 	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
 	at org.apache.spark.sql.catalyst.analysis.ResolveReferencesInAggregate$.apply(ResolveReferencesInAggregate.scala:61)
 	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.applyOrElse(Analyzer.scala:1602)
 	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.applyOrElse(Analyzer.scala:1494)
 	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
 	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
 	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
 	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
 	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
 	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
 	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:32)
 	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:111)
 	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:110)
 	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:32)
 	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:1494)
 	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:1469)
 	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
 	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
 	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
 	at scala.collection.immutable.List.foldLeft(List.scala:91)
 	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
 	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
 	at scala.collection.immutable.List.foreach(List.scala:431)
 	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
 	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:226)
 	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:222)
 	at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
 	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:222)
 	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
 	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
 	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
 	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
 	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
 	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
 	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
 	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
 	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
 	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
 	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
 	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
 	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
 	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
 	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
 	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
 	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
 	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
 	at org.apache.spark.sql.RelationalGroupedDataset.toDF(RelationalGroupedDataset.scala:75)
 	at org.apache.spark.sql.RelationalGroupedDataset.agg(RelationalGroupedDataset.scala:244)
 	at org.apache.spark.sql.delta.commands.DeletionVectorBitmapGenerator$DeletionVectorSet.computeResult(DMLWithDeletionVectorsHelper.scala:307)
 	at org.apache.spark.sql.delta.commands.DeletionVectorBitmapGenerator$.buildDeletionVectors(DMLWithDeletionVectorsHelper.scala:359)
 	at org.apache.spark.sql.delta.commands.DeletionVectorBitmapGenerator$.buildRowIndexSetsForFilesMatchingCondition(DMLWithDeletionVectorsHelper.scala:406)
 	at org.apache.spark.sql.delta.commands.merge.ClassicMergeExecutor.$anonfun$writeDVs$1(ClassicMergeExecutor.scala:503)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.executeThunk$1(MergeIntoCommandBase.scala:423)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.$anonfun$recordMergeOperation$7(MergeIntoCommandBase.scala:440)
 	at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53)
 	at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32)
 	at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommand.withStatusCode(MergeIntoCommand.scala:60)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.$anonfun$recordMergeOperation$6(MergeIntoCommandBase.scala:440)
 	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
 	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordFrameProfile(MergeIntoCommand.scala:60)
 	at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133)
 	at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
 	at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordOperation(MergeIntoCommand.scala:60)
 	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132)
 	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122)
 	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordDeltaOperation(MergeIntoCommand.scala:60)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.recordMergeOperation(MergeIntoCommandBase.scala:437)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.recordMergeOperation$(MergeIntoCommandBase.scala:401)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordMergeOperation(MergeIntoCommand.scala:60)
 	at org.apache.spark.sql.delta.commands.merge.ClassicMergeExecutor.writeDVs(ClassicMergeExecutor.scala:473)
 	at org.apache.spark.sql.delta.commands.merge.ClassicMergeExecutor.writeDVs$(ClassicMergeExecutor.scala:467)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommand.writeDVs(MergeIntoCommand.scala:60)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$5(MergeIntoCommand.scala:132)
 	at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53)
 	at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32)
 	at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommand.withStatusCode(MergeIntoCommand.scala:60)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$2(MergeIntoCommand.scala:132)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$2$adapted(MergeIntoCommand.scala:83)
 	at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:223)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$1(MergeIntoCommand.scala:83)
 	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
 	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
 	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordFrameProfile(MergeIntoCommand.scala:60)
 	at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133)
 	at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
 	at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordOperation(MergeIntoCommand.scala:60)
 	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132)
 	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122)
 	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordDeltaOperation(MergeIntoCommand.scala:60)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommand.runMerge(MergeIntoCommand.scala:81)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.$anonfun$run$1(MergeIntoCommandBase.scala:138)
 	at org.apache.spark.sql.delta.commands.merge.MergeIntoMaterializeSource.runWithMaterializedSourceLostRetries(MergeIntoMaterializeSource.scala:106)
 	at org.apache.spark.sql.delta.commands.merge.MergeIntoMaterializeSource.runWithMaterializedSourceLostRetries$(MergeIntoMaterializeSource.scala:94)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommand.runWithMaterializedSourceLostRetries(MergeIntoCommand.scala:60)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.run(MergeIntoCommandBase.scala:138)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.run$(MergeIntoCommandBase.scala:113)
 	at org.apache.spark.sql.delta.commands.MergeIntoCommand.run(MergeIntoCommand.scala:60)

Environment information

  • Delta Lake version: 3.1.0
  • Spark version: 3.5.1
  • Scala version: 2.12

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • Yes. I can contribute a fix for this bug independently.
  • Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • No. I cannot contribute a bug fix at this time.
@whitleykeith whitleykeith added the bug Something isn't working label Apr 22, 2024
@whitleykeith
Copy link
Author

I believe this is an issue with tables that have columns named path as it seems this error consistently happens on them and I can see DV look to have path columns defined:

val joinedDf = matchedRowsDf.join(filePathToDVDf, joinExpr, "leftOuter")
.drop(FILE_NAME_COL)
.withColumnRenamed("path", FILE_NAME_COL)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant