Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Insert into a jdbc table will cause exception when parse lineage in spark #6314

Open
2 of 4 tasks
Jack1007 opened this issue Apr 17, 2024 · 1 comment
Open
2 of 4 tasks
Labels
kind:bug This is a clearly a bug priority:major

Comments

@Jack1007
Copy link

Code of Conduct

Search before asking

  • I have searched in the issues and found no similar issues.

Describe the bug

spark version:3.2.3
kyuubi version:1.8.0
spark lineage compile: mvn clean package -pl :kyuubi-spark-lineage_2.12 -am -DskipTests -Dspark.version=3.2.3

source table :

CREATE TABLE `test`.`ta` (
  `id` INT,
  `name` STRING)
USING text;

destination table:

CREATE TEMPORARY VIEW test1_view 
USING org.apache.spark.sql.jdbc 
OPTIONS 
(url 'jdbc:mysql://xxxx:3306/test', 
    user 'test', 
    password 'xxxxxx', 
    dbtable 'test.test1', 
    columns 'id,name');

sql:

insert into test1_view select id,name from test.ta;

In the driver log, spark lineage parser throw exception like this:

24/04/17 10:21:42 WARN SparkSQLLineageParseHelper: Extract Statement[483] columns lineage failed.
scala.MatchError: (id#20,{id#17}) (of class scala.Tuple2)
	at org.apache.kyuubi.plugin.lineage.helper.LineageParser.$anonfun$extractColumnsLineage$10(SparkSQLLineageParseHelper.scala:265)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.kyuubi.plugin.lineage.helper.LineageParser.extractColumnsLineage(SparkSQLLineageParseHelper.scala:265)
	at org.apache.kyuubi.plugin.lineage.helper.LineageParser.parse(SparkSQLLineageParseHelper.scala:54)
	at org.apache.kyuubi.plugin.lineage.helper.LineageParser.parse$(SparkSQLLineageParseHelper.scala:52)
	at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.parse(SparkSQLLineageParseHelper.scala:510)
	at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.$anonfun$transformToLineage$1(SparkSQLLineageParseHelper.scala:516)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.transformToLineage(SparkSQLLineageParseHelper.scala:516)
	at org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListener.onSuccess(SparkOperationLineageQueryExecutionListener.scala:34)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:158)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:128)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:128)
	at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:140)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1433)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)

The relational code:

      case p if p.nodeName == "InsertIntoDataSourceCommand" =>
        val logicalRelation = getField[LogicalRelation](plan, "logicalRelation")
        val table = logicalRelation
          .catalogTable.map(t => getV1TableName(t.qualifiedName)).getOrElse("")
        extractColumnsLineage(getQuery(plan), parentColumnsLineage).map {
          case (k, v) if table.nonEmpty =>
            k.withName(s"$table.${k.name}") -> v
        }

The table val in the code must be None or Empty,so case match failed.But why the table val is empty?

Affects Version(s)

1.8.0

Kyuubi Server Log Output

No response

Kyuubi Engine Log Output

24/04/17 10:21:42 WARN SparkSQLLineageParseHelper: Extract Statement[483] columns lineage failed.
scala.MatchError: (id#20,{id#17}) (of class scala.Tuple2)
	at org.apache.kyuubi.plugin.lineage.helper.LineageParser.$anonfun$extractColumnsLineage$10(SparkSQLLineageParseHelper.scala:265)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.kyuubi.plugin.lineage.helper.LineageParser.extractColumnsLineage(SparkSQLLineageParseHelper.scala:265)
	at org.apache.kyuubi.plugin.lineage.helper.LineageParser.parse(SparkSQLLineageParseHelper.scala:54)
	at org.apache.kyuubi.plugin.lineage.helper.LineageParser.parse$(SparkSQLLineageParseHelper.scala:52)
	at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.parse(SparkSQLLineageParseHelper.scala:510)
	at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.$anonfun$transformToLineage$1(SparkSQLLineageParseHelper.scala:516)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.transformToLineage(SparkSQLLineageParseHelper.scala:516)
	at org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListener.onSuccess(SparkOperationLineageQueryExecutionListener.scala:34)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:158)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:128)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:128)
	at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:140)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1433)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)

Kyuubi Server Configurations

No response

Kyuubi Engine Configurations

No response

Additional context

No response

Are you willing to submit PR?

  • Yes. I would be willing to submit a PR with guidance from the Kyuubi community to fix.
  • No. I cannot submit a PR at this time.
@Jack1007 Jack1007 added kind:bug This is a clearly a bug priority:major labels Apr 17, 2024
Copy link

Hello @Jack1007,
Thanks for finding the time to report the issue!
We really appreciate the community's efforts to improve Apache Kyuubi.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:bug This is a clearly a bug priority:major
Projects
None yet
Development

No branches or pull requests

1 participant