You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Thanks for filing an issue with us! Below are some guidelines when filing an issue. In general, the more detail the better!
Feature Requests:
What issue are you trying to solve? With Service Principal and Secret to authenticate EventHub using Scala
How do you want to solve it?
What is your use case for this feature? Wherever package I have for AuthBySecretCallBackWithParams class, Based on Event Hub AAD documentation, Scala code has to Authenticate EventHub and should get token and have EventHubConf . I am using Maven Scala .I confirmed that AuthBySecretCallBackWithParams class is in my Jar . I put AuthBySecretCallBackWithParams in the same package with EventHubsConf
Bug Report: Class Loader could not find AuthBySecretCallBackWithParams class
Actual behavior: Wherever Package AuthBySecretCallBackWithParams class is being getting ClassNotFoundException
java.lang.ClassNotFoundException: org.apache.spark.eventhubs.AuthBySecretCallBackWithParams
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.spark.eventhubs.EventHubsConf$$anonfun$aadAuthCallback$2.apply(EventHubsConf.scala:640)
at org.apache.spark.eventhubs.EventHubsConf$$anonfun$aadAuthCallback$2.apply(EventHubsConf.scala:638)
at scala.Option.map(Option.scala:146)
at org.apache.spark.eventhubs.EventHubsConf.aadAuthCallback(EventHubsConf.scala:638)
at org.apache.spark.eventhubs.client.ClientConnectionPool$$anonfun$org$apache$spark$eventhubs$client$ClientConnectionPool$$borrowClient$3.apply(ClientConnectionPool.scala:73)
at org.apache.spark.eventhubs.client.ClientConnectionPool$$anonfun$org$apache$spark$eventhubs$client$ClientConnectionPool$$borrowClient$3.apply(ClientConnectionPool.scala:71)
at org.apache.spark.eventhubs.utils.RetryUtils$$anonfun$retryJava$1.apply(RetryUtils.scala:91)
at org.apache.spark.eventhubs.utils.RetryUtils$$anonfun$retryJava$1.apply(RetryUtils.scala:91)
at org.apache.spark.eventhubs.utils.RetryUtils$.org$apache$spark$eventhubs$utils$RetryUtils$$retryHelper$1(RetryUtils.scala:116)
at org.apache.spark.eventhubs.utils.RetryUtils$.retryScala(RetryUtils.scala:149)
at org.apache.spark.eventhubs.utils.RetryUtils$.retryJava(RetryUtils.scala:91)
at org.apache.spark.eventhubs.client.ClientConnectionPool.org$apache$spark$eventhubs$client$ClientConnectionPool$$borrowClient(ClientConnectionPool.scala:69)
at org.apache.spark.eventhubs.client.ClientConnectionPool$.borrowClient(ClientConnectionPool.scala:170)
at org.apache.spark.eventhubs.client.EventHubsClient.org$apache$spark$eventhubs$client$EventHubsClient$$client(EventHubsClient.scala:62)
at org.apache.spark.eventhubs.client.EventHubsClient.liftedTree1$1(EventHubsClient.scala:187)
at org.apache.spark.eventhubs.client.EventHubsClient.partitionCountLazyVal$lzycompute(EventHubsClient.scala:184)
at org.apache.spark.eventhubs.client.EventHubsClient.partitionCountLazyVal(EventHubsClient.scala:183)
at org.apache.spark.eventhubs.client.EventHubsClient.partitionCount(EventHubsClient.scala:176)
at org.apache.spark.sql.eventhubs.EventHubsSource.org$apache$spark$sql$eventhubs$EventHubsSource$$partitionCount(EventHubsSource.scala:81)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply$mcJ$sp(EventHubsSource.scala:96)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(EventHubsSource.scala:96)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(EventHubsSource.scala:96)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1.apply$mcJ$sp(EventHubsSource.scala:96)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1.apply(EventHubsSource.scala:96)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1.apply(EventHubsSource.scala:96)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.eventhubs.EventHubsSource.(EventHubsSource.scala:95)
at org.apache.spark.sql.eventhubs.EventHubsSourceProvider.createSource(EventHubsSourceProvider.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:268)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:88)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:85)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:79)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:85)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:83)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$6.apply(TreeNode.scala:343)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:196)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:341)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$6.apply(TreeNode.scala:343)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:196)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:341)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:257)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:83)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:65)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:269)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
org.apache.spark.sql.streaming.StreamingQueryException: org.apache.spark.eventhubs.AuthBySecretCallBackWithParams
=== Streaming Query ===
Expected behavior : Running Spark job on Synapse Spark , Expected to stream from EventHub
Spark version : Apache Spark 2.4.0, Scala 2.11
spark-eventhubs artifactId and version : com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.21
The text was updated successfully, but these errors were encountered:
@seblea here are what I did to tackle the "class not found issue" in Synapse Spark. Hopefully, this is helpful to you or others to some extent.
Solution 1
Download all dependencies jars after including only azure-{eventhubs, msal4j}:
Run mvn dependency:copy-dependencies -DoutputDirectory=<PATH>
40+ jars at the time of testing.
Add all these jars to workspace library and upload to Spark-pool.
Caveat: (Jar conflicts) Some downloaded jars are the same as the existing ones in Spark-Pool.
Exception:
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.13.4 requires Jackson Databind version >= 2.13.0 and < 2.14.0 - Found jackson-databind version 2.12.1 at com.fasterxml.jackson.module.scala.JacksonModule.setupModule(JacksonModule.scala:61)
Workaround: remove the conflicted library one at the time.
Issue: require manual work and not ideal for package management.
Solution 2
Include azure-{eventhubs, msal4j}:
How to identify libraries' prefixes for renaming:
Find patterns in the console output of downloaded dependencies paths in IDE or mvn.
Shade specific patterns using relocation under Plugin (maven-shade-plugin):
shaded.com.fasterxml.jackson.databind
shaded.com.microsoft.azure
shaded.net.minidev
Exclude META-INF directory files since it can cause compilation issues (inform customer, used with discretion):
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
META-INF/*.MF
Issue:
This solution removes some of the metadata files from the final JAR file, which can cause issues with some tools or libraries.
It is recommended to keep the metadata files for better compatibility and troubleshooting.
Thanks for filing an issue with us! Below are some guidelines when filing an issue. In general, the more detail the better!
Feature Requests:
//import org.apache.spark.eventhubs.{AuthBySecretCallBackWithParams, ConnectionStringBuilder, EventHubsConf, EventPosition}
//import java.net.URI
//eventHubConfigurations holds all parameters
sparkSession.readStream.format("org.apache.spark.sql.eventhubs.EventHubsSourceProvider").options(eventHubsConf.toMap).load().writeStream.format("delta")
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.withColumn("IngestionTimeStamp", lit(TimeStampFunction()))
.write.format("delta").mode("append")
.option("checkpointLocation", eventHubcheckpointPath).save(eventHubSinkPath)
Bug Report: Class Loader could not find AuthBySecretCallBackWithParams class
java.lang.ClassNotFoundException: org.apache.spark.eventhubs.AuthBySecretCallBackWithParams
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.spark.eventhubs.EventHubsConf$$anonfun$aadAuthCallback$2.apply(EventHubsConf.scala:640)
at org.apache.spark.eventhubs.EventHubsConf$$anonfun$aadAuthCallback$2.apply(EventHubsConf.scala:638)
at scala.Option.map(Option.scala:146)
at org.apache.spark.eventhubs.EventHubsConf.aadAuthCallback(EventHubsConf.scala:638)
at org.apache.spark.eventhubs.client.ClientConnectionPool$$anonfun$org$apache$spark$eventhubs$client$ClientConnectionPool$$borrowClient$3.apply(ClientConnectionPool.scala:73)
at org.apache.spark.eventhubs.client.ClientConnectionPool$$anonfun$org$apache$spark$eventhubs$client$ClientConnectionPool$$borrowClient$3.apply(ClientConnectionPool.scala:71)
at org.apache.spark.eventhubs.utils.RetryUtils$$anonfun$retryJava$1.apply(RetryUtils.scala:91)
at org.apache.spark.eventhubs.utils.RetryUtils$$anonfun$retryJava$1.apply(RetryUtils.scala:91)
at org.apache.spark.eventhubs.utils.RetryUtils$.org$apache$spark$eventhubs$utils$RetryUtils$$retryHelper$1(RetryUtils.scala:116)
at org.apache.spark.eventhubs.utils.RetryUtils$.retryScala(RetryUtils.scala:149)
at org.apache.spark.eventhubs.utils.RetryUtils$.retryJava(RetryUtils.scala:91)
at org.apache.spark.eventhubs.client.ClientConnectionPool.org$apache$spark$eventhubs$client$ClientConnectionPool$$borrowClient(ClientConnectionPool.scala:69)
at org.apache.spark.eventhubs.client.ClientConnectionPool$.borrowClient(ClientConnectionPool.scala:170)
at org.apache.spark.eventhubs.client.EventHubsClient.org$apache$spark$eventhubs$client$EventHubsClient$$client(EventHubsClient.scala:62)
at org.apache.spark.eventhubs.client.EventHubsClient.liftedTree1$1(EventHubsClient.scala:187)
at org.apache.spark.eventhubs.client.EventHubsClient.partitionCountLazyVal$lzycompute(EventHubsClient.scala:184)
at org.apache.spark.eventhubs.client.EventHubsClient.partitionCountLazyVal(EventHubsClient.scala:183)
at org.apache.spark.eventhubs.client.EventHubsClient.partitionCount(EventHubsClient.scala:176)
at org.apache.spark.sql.eventhubs.EventHubsSource.org$apache$spark$sql$eventhubs$EventHubsSource$$partitionCount(EventHubsSource.scala:81)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply$mcJ$sp(EventHubsSource.scala:96)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(EventHubsSource.scala:96)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(EventHubsSource.scala:96)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1.apply$mcJ$sp(EventHubsSource.scala:96)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1.apply(EventHubsSource.scala:96)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1.apply(EventHubsSource.scala:96)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.eventhubs.EventHubsSource.(EventHubsSource.scala:95)
at org.apache.spark.sql.eventhubs.EventHubsSourceProvider.createSource(EventHubsSourceProvider.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:268)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:88)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:85)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:79)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:85)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:83)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$6.apply(TreeNode.scala:343)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:196)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:341)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$6.apply(TreeNode.scala:343)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:196)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:341)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:257)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:83)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:65)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:269)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
org.apache.spark.sql.streaming.StreamingQueryException: org.apache.spark.eventhubs.AuthBySecretCallBackWithParams
=== Streaming Query ===
The text was updated successfully, but these errors were encountered: