Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Hudi fails ACID verification test #11170

Closed
matthijseikelenboom opened this issue May 7, 2024 · 11 comments
Closed

[SUPPORT] Hudi fails ACID verification test #11170

matthijseikelenboom opened this issue May 7, 2024 · 11 comments
Assignees
Labels
data-consistency phantoms, duplicates, write skew, inconsistent snapshot feature-enquiry issue contains feature enquiries/requests or great improvement ideas on-call-triaged priority:critical production down; pipelines stalled; Need help asap.

Comments

@matthijseikelenboom
Copy link

matthijseikelenboom commented May 7, 2024

Describe the problem you faced

For work we had needed to have a concurrent read/write support for our data lake, which uses Spark. We where noticing some inconsistencies, so we wrote a test that can verify whether something like Hudi adheres to ACID. We did however find that Hudi fails this test.

Now, it can be that we've wrongly configured Hudi or that there is some mistake in the test code.

My question is if someone of you can take a look at it, and perhaps can explain what is going wrong here.

To Reproduce

How to run the test and it's findings are described in the README of the repository, but here is a short run down

Steps to reproduce the behavior:

  1. Check out repo: hudi-acid-verification
  2. Start Docker if not already running
  3. Run the test TransactionManagerTest.java
  4. Observe that writers breakdown and that very few transactions have been processed.

Expected behavior

  1. I expect the writers not to break down
  2. I expect that the full amount of transactions are executed

Environment Description

  • Hudi version : 0.14.1

  • Spark version : 3.4.2

  • Hive version : 4.0.0-beta-1

  • Hadoop version : 3.2.2

  • Storage (HDFS/S3/GCS..) : NTFS(Windows), APFS(macOS) & HDFS

  • Running on Docker? (yes/no) : No

Additional context
It's worth noting that other solutions, Iceberg and Delta Lake, have also been tested this way. Iceberg also didn't pass this test. Delta Lake did pass the test.

Stacktrace

24/05/07 21:49:38 ERROR TransactionWriter: Exception in writer.
org.example.writer.TransactionFailedException: org.apache.hudi.exception.HoodieRollbackException: Failed to rollback file:/tmp/lakehouse/concurrencytestdb.db/acid_verification commits 20240507214932607
	at org.example.writer.TransactionWriter.wrapOrRethrowException(TransactionWriter.java:190)
	at org.example.writer.TransactionWriter.tryTransaction(TransactionWriter.java:184)
	at org.example.writer.TransactionWriter.updateTransaction(TransactionWriter.java:143)
	at org.example.writer.TransactionWriter.lambda$handleTransaction$0(TransactionWriter.java:89)
	at org.example.writer.TransactionWriter.withRetryOnException(TransactionWriter.java:109)
	at org.example.writer.TransactionWriter.handleTransaction(TransactionWriter.java:83)
	at org.example.writer.TransactionWriter.run(TransactionWriter.java:70)
Caused by: org.apache.hudi.exception.HoodieRollbackException: Failed to rollback file:/tmp/lakehouse/concurrencytestdb.db/acid_verification commits 20240507214932607
	at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:1065)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:1012)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:940)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:922)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:917)
	at org.apache.hudi.client.BaseHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(BaseHoodieWriteClient.java:941)
	at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:222)
	at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:940)
	at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:933)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:501)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
	at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.executeUpsert(MergeIntoHoodieTableCommand.scala:439)
	at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:282)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:218)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:95)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:671)
	at org.example.writer.TransactionWriter.lambda$updateTransaction$2(TransactionWriter.java:160)
	at org.example.writer.TransactionWriter.tryTransaction(TransactionWriter.java:181)
	... 5 more
Caused by: org.apache.hudi.exception.HoodieRollbackException: Found commits after time :20240507214932607, please rollback greater commits first
	at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.validateRollbackCommitSequence(BaseRollbackActionExecutor.java:179)
	at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.doRollbackAndGetStats(BaseRollbackActionExecutor.java:218)
	at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.runRollback(BaseRollbackActionExecutor.java:111)
	at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:138)
	at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.rollback(HoodieSparkCopyOnWriteTable.java:298)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:1048)
	... 51 more
24/05/07 21:49:38 INFO TransactionWriter: acid-writer-1 finished.
@danny0405 danny0405 added feature-enquiry issue contains feature enquiries/requests or great improvement ideas data-consistency phantoms, duplicates, write skew, inconsistent snapshot labels May 8, 2024
@ad1happy2go
Copy link
Contributor

@matthijseikelenboom I don't see any lock related configurations in your setup. I checked that you are using 2 parallel writers. So you may need to configure lock during write. Hudi follows OCC principal.
Check multi writer setup here - https://hudi.apache.org/docs/concurrency_control/#model-c-multi-writer

Let me know in case I am missing anything on the same. Thanks a lot.

@matthijseikelenboom
Copy link
Author

@ad1happy2go Ah yes, you're right. I seem to have forgot to add the hudi-defaults.conf file to this project. I've added it to my repository and ran the test again. It comes further along, but still breaks down.

Stacktrace (Be warned, it's a big one):

ERROR! : Failed to upsert for commit time 20240508114518478
24/05/08 11:45:18 ERROR TransactionWriter: Exception in writer.
java.lang.RuntimeException: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20240508114518478
	at org.example.writer.TransactionWriter.wrapOrRethrowException(TransactionWriter.java:192)
	at org.example.writer.TransactionWriter.tryTransaction(TransactionWriter.java:184)
	at org.example.writer.TransactionWriter.updateTransaction(TransactionWriter.java:143)
	at org.example.writer.TransactionWriter.lambda$handleTransaction$0(TransactionWriter.java:89)
	at org.example.writer.TransactionWriter.withRetryOnException(TransactionWriter.java:109)
	at org.example.writer.TransactionWriter.handleTransaction(TransactionWriter.java:83)
	at org.example.writer.TransactionWriter.run(TransactionWriter.java:70)
Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20240508114518478
	at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:70)
	at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
	at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:114)
	at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:103)
	at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:142)
	at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:224)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:504)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:502)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
	at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.executeUpsert(MergeIntoHoodieTableCommand.scala:439)
	at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:282)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:218)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:95)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:671)
	at org.example.writer.TransactionWriter.lambda$updateTransaction$2(TransactionWriter.java:160)
	at org.example.writer.TransactionWriter.tryTransaction(TransactionWriter.java:181)
	... 5 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 490.0 failed 1 times, most recent failure: Lost task 1.0 in stage 490.0 (TID 1367) (192.168.1.21 executor driver): com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: java.util.concurrent.atomic.AtomicBoolean
Serialization trace:
isDeleteComputed (org.apache.spark.sql.hudi.command.payload.ExpressionPayload)
	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
	at org.apache.hudi.common.model.HoodieAvroRecord.writeRecordPayload(HoodieAvroRecord.java:225)
	at org.apache.hudi.common.model.HoodieAvroRecord.writeRecordPayload(HoodieAvroRecord.java:48)
	at org.apache.hudi.common.model.HoodieRecord.write(HoodieRecord.java:356)
	at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.write(DefaultSerializers.java:514)
	at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.write(DefaultSerializers.java:512)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
	at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:274)
	at org.apache.spark.storage.memory.SerializedValuesHolder.storeValue(MemoryStore.scala:727)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1548)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1458)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1522)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.IllegalArgumentException: Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: java.util.concurrent.atomic.AtomicBoolean
	at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:65)
	at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:43)
	at com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:396)
	at com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:62)
	at com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:380)
	at com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:74)
	at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:508)
	at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:97)
	at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:540)
	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:75)
	... 41 more
Caused by: java.lang.reflect.InvocationTargetException
	at jdk.internal.reflect.GeneratedConstructorAccessor37.newInstance(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
	at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:51)
	... 50 more
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private volatile int java.util.concurrent.atomic.AtomicBoolean.value accessible: module java.base does not "opens java.util.concurrent.atomic" to unnamed module @7c29daf3
	at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
	at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
	at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178)
	at java.base/java.lang.reflect.Field.setAccessible(Field.java:172)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.buildValidFields(FieldSerializer.java:283)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:216)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:157)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:150)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:134)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:130)
	... 55 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
	at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
	at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
	at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
	at org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:177)
	at org.apache.hudi.index.simple.HoodieSimpleIndex.fetchRecordLocationsForAffectedPartitions(HoodieSimpleIndex.java:147)
	at org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocationInternal(HoodieSimpleIndex.java:118)
	at org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocation(HoodieSimpleIndex.java:91)
	at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:55)
	at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:37)
	at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:59)
	... 49 more
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: java.util.concurrent.atomic.AtomicBoolean
Serialization trace:
isDeleteComputed (org.apache.spark.sql.hudi.command.payload.ExpressionPayload)
	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
	at org.apache.hudi.common.model.HoodieAvroRecord.writeRecordPayload(HoodieAvroRecord.java:225)
	at org.apache.hudi.common.model.HoodieAvroRecord.writeRecordPayload(HoodieAvroRecord.java:48)
	at org.apache.hudi.common.model.HoodieRecord.write(HoodieRecord.java:356)
	at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.write(DefaultSerializers.java:514)
	at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.write(DefaultSerializers.java:512)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
	at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:274)
	at org.apache.spark.storage.memory.SerializedValuesHolder.storeValue(MemoryStore.scala:727)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1548)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1458)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1522)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.IllegalArgumentException: Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: java.util.concurrent.atomic.AtomicBoolean
	at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:65)
	at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:43)
	at com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:396)
	at com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:62)
	at com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:380)
	at com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:74)
	at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:508)
	at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:97)
	at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:540)
	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:75)
	... 41 more
Caused by: java.lang.reflect.InvocationTargetException
	at jdk.internal.reflect.GeneratedConstructorAccessor37.newInstance(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
	at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:51)
	... 50 more
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private volatile int java.util.concurrent.atomic.AtomicBoolean.value accessible: module java.base does not "opens java.util.concurrent.atomic" to unnamed module @7c29daf3
	at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
	at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
	at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178)
	at java.base/java.lang.reflect.Field.setAccessible(Field.java:172)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.buildValidFields(FieldSerializer.java:283)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:216)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:157)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:150)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:134)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:130)
	... 55 more
24/05/08 11:45:18 INFO TransactionWriter: acid-writer-0 finished.

@ad1happy2go
Copy link
Contributor

@matthijseikelenboom Looks like some library conflicts are there in the project. Need to reproduce it.

@ad1happy2go
Copy link
Contributor

@matthijseikelenboom I noticed you are using JAVA 17 for the same. Hudi 0.14.1 doesn't support JAVA 17 yet. The newer Hudi version will be able to support the same.

Some reference to similar issue related to java 17 here - EsotericSoftware/kryo#885

Can you try with JAVA 8 once. Thanks.

@codope codope added the priority:critical production down; pipelines stalled; Need help asap. label May 9, 2024
@matthijseikelenboom
Copy link
Author

Okay, yeah sure. The original test was written with Java 11, but I updated to 17 because I thought why not and Spark 3.4.2 supports it.

Is it known that Hudi (Or Kryo) also doesn't work with Java 11 and is that why you suggest Java 8?

@matthijseikelenboom
Copy link
Author

@ad1happy2go I've pushed a new branch on the repo where the project is downgraded to Java 8. When running the test then, the writers don't seem to fail anymore, but it still fails the verification test.

image

@ad1happy2go
Copy link
Contributor

@matthijseikelenboom I tried to run in my local but again seeing issues. We can connect once. If you are on Apache Hudi slack can you ping me "Aditya Goenka"

@ad1happy2go
Copy link
Contributor

@matthijseikelenboom I was able to successfully test. There were two issues -

  1. InprocessLockProvider doesn't work for multiple writes. So use FileSystemBasedLockProvider in transactionWriter.java
dataSet.write().format("hudi")
                    .option("hoodie.table.name", tableName)
                    .option("hoodie.datasource.write.recordkey.field", "primaryKeyValue")
                    .option("hoodie.datasource.write.partitionpath.field", "partitionKeyValue")
                    .option("hoodie.datasource.write.precombine.field", "dataValue")
                    .option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider")
                    .mode(SaveMode.Append)
                    .save(tablePath);
  1. Along with refresh, to add partitions to mock repair also in ReaderThread.
session.sql("REFRESH TABLE " + fullyQualifiedTableName);
session.sql("MSCK REPAIR TABLE" + fullyQualifiedTableName);

@ad1happy2go
Copy link
Contributor

@matthijseikelenboom Please let us know if it works for you also. Thanks.

@matthijseikelenboom
Copy link
Author

Tested and verified. Closing issues.

More info

Solution has been tested on:

  • Java 8 ✅
  • Java 11 ✅
  • Java 17 ❌ (As of this moment, Hudi doesn't support this version)

@ad1happy2go
Copy link
Contributor

Thanks @matthijseikelenboom for the update

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data-consistency phantoms, duplicates, write skew, inconsistent snapshot feature-enquiry issue contains feature enquiries/requests or great improvement ideas on-call-triaged priority:critical production down; pipelines stalled; Need help asap.
Projects
Status: Done
Development

No branches or pull requests

5 participants