Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue with reading the data using hudi streamer #11263

Closed
Pavan792reddy opened this issue May 20, 2024 · 4 comments
Closed

issue with reading the data using hudi streamer #11263

Pavan792reddy opened this issue May 20, 2024 · 4 comments

Comments

@Pavan792reddy
Copy link

Pavan792reddy commented May 20, 2024

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced
i have tried to load the messages from pulsar into spark using hoodie streamer.but facing unexpected behaviour in the error log
A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

  • Hudi version :

  • Spark version :3.3.2

  • Hive version :

  • Hadoop version :

  • Storage (HDFS/S3/GCS..) :GCS

  • Running on Docker? (yes/no) :NO

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

spark-submit
--master 'local[*]'
--deploy-mode client
--class org.apache.hudi.utilities.streamer.HoodieStreamer
--packages 'org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0,io.streamnative.connectors:pulsar-spark-connector_2.12:3.1.1.4'
--repositories https://repo.maven.apache.org/maven2
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
/home/pavankumar_reddy/hudi-utilities-slim-bundle_2.12-0.14.0.jar
--source-class org.apache.hudi.utilities.sources.PulsarSource
--source-ordering-field when
--target-base-path gs://test-hudi-searce-q6/hudi_data/avroschema_STREAM
--target-table avroschema_stream
--hoodie-conf hoodie.datasource.write.recordkey.field=id
--hoodie-conf hoodie.datasource.write.partitionpath.field=id
--table-type COPY_ON_WRITE
--op UPSERT
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
--hoodie-conf hoodie.streamer.source.pulsar.topic=persistent://mytenant/mynamespace/avroschema
--hoodie-conf hoodie.streamer.source.pulsar.endpoint.service.url=http://localhost::6650
--hoodie-conf hoodie.streamer.source.pulsar.endpoint.admin.url=http://localhost::8080

Error:-

https://repo.maven.apache.org/maven2 added as a remote repository with the name: repo-1
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hudi#hudi-spark3.4-bundle_2.12 added as a dependency
io.streamnative.connectors#pulsar-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-cb5638c3-36ce-4d04-ae3a-fbf707f05694;1.0
confs: [default]
found org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0 in central
found io.streamnative.connectors#pulsar-spark-connector_2.12;3.1.1.4 in central
found io.swagger#swagger-annotations;1.5.21 in central
found org.slf4j#slf4j-api;1.7.25 in central
found com.sun.activation#javax.activation;1.2.0 in central
found org.slf4j#jul-to-slf4j;1.7.25 in central
found org.checkerframework#checker-qual;2.0.0 in central
found org.codehaus.mojo#animal-sniffer-annotations;1.14 in central
:: resolution report :: resolve 346ms :: artifacts dl 16ms
:: modules in use:
com.sun.activation#javax.activation;1.2.0 from central in [default]
io.streamnative.connectors#pulsar-spark-connector_2.12;3.1.1.4 from central in [default]
io.swagger#swagger-annotations;1.5.21 from central in [default]
org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0 from central in [default]
org.checkerframework#checker-qual;2.0.0 from central in [default]
org.codehaus.mojo#animal-sniffer-annotations;1.14 from central in [default]
org.slf4j#jul-to-slf4j;1.7.25 from central in [default]
org.slf4j#slf4j-api;1.7.25 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 8 | 0 | 0 | 0 || 8 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-cb5638c3-36ce-4d04-ae3a-fbf707f05694
confs: [default]
0 artifacts copied, 8 already retrieved (0kB/10ms)
Exception in thread "main" java.lang.NoSuchFieldError: SENSITIVE_CONFIG_KEYS_FILTER
at org.apache.hudi.utilities.streamer.HoodieStreamer.(HoodieStreamer.java:110)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.spark.util.Utils$.classForName(Utils.scala:219)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:938)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1061)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1070)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

root@pulsar-2-hudi-2-trino-m:/home/pavankumar_reddy#

@ad1happy2go
Copy link
Contributor

@Pavan792reddy Looks to be the library version conflict between utilities jar and spark bundle. can you check hudi-utilities-slim-bundle_2.12-0.14.0.jar

@Pavan792reddy
Copy link
Author

Pavan792reddy commented May 22, 2024

@ad1happy2go i have made all the changes it was working as expected. Now the script was failing with below error .

spark-submit --master 'local[*]' --deploy-mode client --packages 'org.apache.hudi:hudi-spark3.1-bundle_2.12:0.14.1,io.streamnative.connectors:pulsar-spark-connector_2.12:3.2.0.2' --repositories https://repo.maven.apache.org/maven2 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --jars '/home/pavankumar_reddy/hudi-spark3.1-bundle_2.12-0.14.1.jar,/home/pavankumar_reddy/hudi-utilities_2.12-0.14.1.jar' --class org.apache.hudi.utilities.streamer.HoodieStreamer ls /home/pavankumar_reddy/hudi-utilities-slim-bundle_2.12-0.14.1.jar --source-class org.apache.hudi.utilities.sources.PulsarSource --source-ordering-field when --target-base-path gs://pulsarstreamer-test/hudi_data/avroschema_stream --target-table avroschema_stre --hoodie-conf hoodie.datasource.write.recordkey.field=id --hoodie-conf hoodie.datasource.write.partitionpath.field=id --table-type COPY_ON_WRITE --op UPSERT --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator --hoodie-conf hoodie.streamer.source.pulsar.topic=persistent://mytenant/mynamespace/avroschema --hoodie-conf hoodie.streamer.source.pulsar.endpoint.service.url=pulsar://localhost:6650 --hoodie-conf hoodie.streamer.source.pulsar.endpoint.admin.url=pulsar://localhost:8080 --continuous

24/05/22 13:00:51 INFO org.apache.pulsar.client.impl.ConnectionPool: [[id: 0x6e32bfc9, L:/10.128.0.70:55298 - R:10.128.0.40/10.128.0.40:6650]] Connected to server
24/05/22 13:00:51 INFO org.apache.pulsar.client.impl.ClientCnx: [id: 0x6e32bfc9, L:/10.128.0.70:55298 - R:10.128.0.40/10.128.0.40:6650] Connected through proxy to target broker at localhost:6650
24/05/22 13:00:51 INFO org.apache.pulsar.client.impl.ConsumerImpl: [persistent://mytenant/mynamespace/avroschema][spark-pulsar-batch-97273cbf-ccc7-4e63-9c0c-60642c1ff1ed-persistent://mytenant/mynamespace/avroschema] Subscribing to topic on cnx [id: 0x6e32bfc9, L:/10.128.0.70:55298 - R:10.128.0.40/10.128.0.40:6650], consumerId 0
24/05/22 13:00:51 INFO org.apache.pulsar.client.impl.ConsumerImpl: [persistent://mytenant/mynamespace/avroschema][spark-pulsar-batch-97273cbf-ccc7-4e63-9c0c-60642c1ff1ed-persistent://mytenant/mynamespace/avroschema] Subscribed to topic on 10.128.0.40/10.128.0.40:6650 -- consumer: 0
24/05/22 13:00:51 ERROR org.apache.hudi.utilities.streamer.HoodieStreamer: Shutting down delta-sync due to exception
java.lang.UnsupportedOperationException: MessageId is null
at org.apache.pulsar.client.impl.MessageIdImpl.compareTo(MessageIdImpl.java:214)
at org.apache.pulsar.client.impl.MessageIdImpl.compareTo(MessageIdImpl.java:32)
at org.apache.pulsar.client.impl.ConsumerImpl.hasMoreMessages(ConsumerImpl.java:2291)
at org.apache.pulsar.client.impl.ConsumerImpl.hasMessageAvailableAsync(ConsumerImpl.java:2237)
at org.apache.pulsar.client.impl.ConsumerImpl.hasMessageAvailable(ConsumerImpl.java:2181)
at org.apache.spark.sql.pulsar.PulsarHelper.getUserProvidedMessageId(PulsarHelper.scala:451)
at org.apache.spark.sql.pulsar.PulsarHelper.$anonfun$fetchCurrentOffsets$1(PulsarHelper.scala:415)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:193)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.pulsar.PulsarHelper.fetchCurrentOffsets(PulsarHelper.scala:408)
at org.apache.spark.sql.pulsar.PulsarHelper.actualOffsets(PulsarHelper.scala:398)
at org.apache.spark.sql.pulsar.PulsarProvider.$anonfun$createRelation$2(PulsarProvider.scala:143)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2622)
at org.apache.spark.sql.pulsar.PulsarProvider.createRelation(PulsarProvider.scala:136)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)
at org.apache.hudi.utilities.sources.PulsarSource.fetchNextBatch(PulsarSource.java:131)
at org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:44)
at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76)
at org.apache.hudi.utilities.streamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:161)
at org.apache.hudi.utilities.streamer.StreamSync.fetchNextBatchFromSource(StreamSync.java:629)
at org.apache.hudi.utilities.streamer.StreamSync.fetchFromSourceAndPrepareRecords(StreamSync.java:525)
at org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:498)
at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:404)
at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:767)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
24/05/22 13:00:51 INFO org.apache.hudi.utilities.streamer.HoodieStreamer: Delta Sync shutdown. Error ?true
24/05/22 13:00:51 INFO org.apache.hudi.utilities.streamer.HoodieStreamer: Ingestion completed. Has error: true
24/05/22 13:00:51 ERROR org.apache.hudi.async.HoodieAsyncService: Service shutdown with error
java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: MessageId is null
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:65)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)

@Pavan792reddy
Copy link
Author

the messageid is generating from the pulsar topic but it was generating as __messageId| not the MessageId ,

@ad1happy2go
Copy link
Contributor

Using schema registry fixed this issue. Discussed in this thread - https://apache-hudi.slack.com/archives/C4D716NPQ/p1716384858692059

@codope codope closed this as completed May 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants