-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SUPPORT] Merge into command fails when trying to use only few columns in source data while using partial data payload #11138
Comments
@jonvex you have better context on this? |
@jonvex Any help on this ? |
@nsivabalan @jonvex Can you help on this ? |
@pravin1406 @jonvex Did we got a chance to fix it? Code to reproduce -
|
This part of the codebase, leads to the failure. I tested with removing reconcile schema form overriding properties, then it worked for me. But not fully aware of it's consequences |
HUDI version -> 0.14.1
Spark version -> 3.2.0
hadoop version -> 3.1.1
hive version -> 3.1.1
Hi
I wanted to use partial data update payload. I have multiple sources, which all want to write into same hudi table. Each of these table do have 1 precombine and record key in common.
With reconcile schema set to true and payload set to partial data payload. I'm able to achieve this as reconcile schema takes the effort to condition my schema propertly when using datasource .
Bu same is not the case when using merge into with spark-sql. It gives me below error.
2024-05-02 02:38:02,771 ERROR io.HoodieAppendHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=pravin partitionPath=}, currentLocation='HoodieRecordLocation {instantTime=20240502023620823, fileId=75001b47-689f-41d2-9216-9fbd79502292-0}', newLocation='HoodieRecordLocation {instantTime=20240502023755261, fileId=75001b47-689f-41d2-9216-9fbd79502292-0}'} java.lang.ArrayIndexOutOfBoundsException: 8 at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.isNullAt(SpecificInternalRow.scala:241) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply_0_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown Source) at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$3(ExpressionPayload.scala:128) at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$3$adapted(ExpressionPayload.scala:118) at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984) at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.processMatchedRecord(ExpressionPayload.scala:118) at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.getInsertValue(ExpressionPayload.scala:247) at org.apache.hudi.common.model.HoodieAvroRecord.shouldIgnore(HoodieAvroRecord.java:173) at org.apache.hudi.io.HoodieAppendHandle.prepareRecord(HoodieAppendHandle.java:254) at org.apache.hudi.io.HoodieAppendHandle.writeToBuffer(HoodieAppendHandle.java:592) at org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:448) at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:83) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:338) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:260) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2024-05-02 02:38:03,375 ERROR hudi.HoodieSparkSqlWriterInternal: UPSERT failed with errors 2024-05-02 02:38:03,375 WARN hudi.HoodieSparkSqlWriterInternal: Closing write client org.apache.hudi.exception.HoodieException: Merge into Hoodie table command failed at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.executeUpsert(MergeIntoHoodieTableCommand.scala:441) at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:282) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613) ... 47 elided
On looking at the code i found 2 problems, when i created this table with CTAS query using all relevant options including table_properties. table_properties were not getting set inside the hive table. I had to set hoodie.datasource.hive_sync.table_properties again with all properties. But then also reconcile schema did not seem to work.
After debugging and code walk through i found reconcile schema to be among overriding properties. I read the comment there, but did not really understood the perspective behind it. This has kind of blocked me from using spark sql to partial update my records.
Can you guys explain why this is there? Also how can we make this work.
CTAS Query
spark.sql("CREATE TABLE " + tablename + " USING org.apache.hudi " + " OPTIONS ( " + " primaryKey '" + recordkey + "', " + " path '/tmp/pravin/" + tablename + "', " + " hoodie.table.name '" + tablename + "', " + " hoodie.datasource.write.operation 'upsert', " + " hoodie.datasource.write.precombine.field '" + precombinekey + "', " + " hoodie.datasource.write.recordkey.field '" + recordkey + "', " + " hoodie.datasource.write.payload.class 'org.apache.hudi.common.model.PartialUpdateAvroPayload', " + " hoodie.datasource.write.table.type 'MERGE_ON_READ', " + " hoodie.enable.data.skipping 'true', " + " hoodie.datasource.write.reconcile.schema 'true', " + " hoodie.datasource.hive_sync.support_timestamp 'true', " + " hoodie.upsert.shuffle.parallelism '200', " + " hoodie.index.type 'SIMPLE', " + " hoodie.simple.index.update.partition.path 'true', " + " hoodie.datasource.write.hive_style_partitioning 'true', " + " hoodie.datasource.hive_sync.enable 'true', " + " hoodie.datasource.hive_sync.mode 'HMS', " + " hoodie.datasource.hive_sync.sync_comment 'true', " + " hoodie.datasource.hive_sync.database 'default', " + " hoodie.datasource.hive_sync.table_properties '" + tableProperties + "', " + " hoodie.datasource.hive_sync.table '" + tablename + "', " + " hoodie.schema.on.read.enable 'true' " + " ) as select * from merge_source " );
input data
val df = Seq((9,"qwertyuiop","US","1","pravin","abcd","SFO")).toDF("EventTime","transactionId","Country","storeNbr","FullName","CompanyName","City")
merge command:
spark.sql("merge into partial_update_4_rt as target using merge_source as source on target."+recordkey +" = source."+recordkey+" when matched then update set City = source.City , CompanyName = source.CompanyName, EventTime = source.EventTime").show
@codope
@ad1happy2go
The text was updated successfully, but these errors were encountered: