Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sparklyr errors when arrow is enabled in Databricks Runtime Environment 14.3 LTS #3435

Open
hfkist opened this issue Apr 22, 2024 · 4 comments
Labels
databricks Issues related to Databricks connection mode

Comments

@hfkist
Copy link

hfkist commented Apr 22, 2024

I have encountered a problem with sparklyr interacting with arrow in Databricks Runtime 14.3 LTS. The below example code of creating/dropping a table and displaying a data frame fails in DBR 14.3 (which uses arrow version 12.0.1, sparklyr version 1.8.1). The error message is pasted at the bottom. I note that this same example code runs successfully if either:

  • the code is run in DBR 13.3 LTS (which uses arrow version 10.0.1 and sparklyr version 1.7.9)
  • the "sparklyr.arrow" option is set to FALSE instead of TRUE in the example code below

I have also experimented with trying to install and load other specific versions of arrow & sparklyr within DBR 14.3, but this has not resulted in successfully running the example. Any help would be appreciated.

Thanks!


Example Code:

location <- "/usr/local/lib/R/site-library"
library(sparklyr, lib.loc = location)
library(arrow, lib.loc = location)
library(dplyr, lib.loc = location)

options(sparklyr.arrow = TRUE)
sc <- spark_connect(method = "databricks")

sparklyr::sdf_sql(sc,"CREATE TABLE IF NOT EXISTS test_sparklyr AS (SELECT 2024 as test)")
sparklyr::sdf_sql(sc,"DROP TABLE IF EXISTS test_sparklyr") #code still fails in DBR 14.3 if this line is excluded
df <- sparklyr::sdf_sql(sc,"SELECT 2024 as test")
df %>% select(test) %>% display()

This produces the following error:

Error in db_query_fields.DBIConnection(con, ...) : Can't query fields.
Caused by error in `value[[3L]]()`:
! Failed to fetch data: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 26.0 failed 4 times, most recent failure: Lost task 0.3 in stage 26.0 (TID 33) (10.105.82.14 executor driver): java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(Lorg/apache/spark/sql/types/StructType;)Lorg/apache/spark/sql/catalyst/encoders/ExpressionEncoder;
	at sparklyr.ArrowConvertersImpl.toBatchIterator(arrowconverters.scala:120)
	at sparklyr.ArrowConvertersImpl.toBatchIterator(arrowconverters.scala:67)
	at sparklyr.ArrowConverters$.$anonfun$toArrowDataset$1(arrowconverters.scala:249)
	at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:226)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:933)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:933)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186)
	at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103)
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108)
	at scala.util.Using$.resource(Using.scala:269)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:957)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:960)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:852)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3798)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3707)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3707)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1613)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1598)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1598)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:4044)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3956)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3944)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:54)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1276)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1264)
	at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2997)
	at org.apache.spark.sql.execution.collect.Collector.$anonfun$runSparkJobs$1(Collector.scala:303)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:299)
	at org.apache.spark.sql.execution.collect.Collector.$anonfun$collect$1(Collector.scala:384)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:381)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:122)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:131)
	at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:94)
	at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:90)
	at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:78)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:549)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:540)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:557)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:400)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:400)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:318)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:558)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:555)
	at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3758)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4707)
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3725)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$3(Dataset.scala:4698)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1079)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4696)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$9(SQLExecution.scala:376)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:659)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$wi
Error in `db_query_fields.DBIConnection()`:
@edgararuiz edgararuiz added the databricks Issues related to Databricks connection mode label Apr 27, 2024
@edgararuiz
Copy link
Collaborator

Hi @hfkist , shouldn't this be:

df <- tbl(test_sparklyr)
df %>% select(test) %>% display()

@hfkist
Copy link
Author

hfkist commented May 8, 2024

Hey @edgararuiz.....your suggested code didn't quite work for me, but if modified to the below then it results in the same output as the code in my example.

df <- tbl(sc,"test_sparklyr")
df %>% select(test) %>% display()

Indeed, I tried both this code and the code from my original example when "options(sparklyr.arrow = FALSE)" and they both work and return the same output. When I run this code I still receive a similar error message as my original post when "options(sparklyr.arrow = TRUE)".

@edgararuiz
Copy link
Collaborator

Ok, what does simply running tbl(sc,"test_sparklyr") return?

@hfkist
Copy link
Author

hfkist commented May 9, 2024

I first created the "test_sparklyr" table with arrow disabled, and then restarted my cluster and ran the following code:

location <- "/usr/local/lib/R/site-library"
library(sparklyr, lib.loc = location)
library(arrow, lib.loc = location)
library(dplyr, lib.loc = location)

options(sparklyr.arrow = TRUE)
sc <- spark_connect(method = "databricks")

tbl(sc, "test_sparklyr")

which returned the following error:

Error in db_query_fields.DBIConnection(con, ...) : Can't query fields.
Caused by error in `value[[3L]]()`:
! Failed to fetch data: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (10.105.82.15 executor driver): java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(Lorg/apache/spark/sql/types/StructType;)Lorg/apache/spark/sql/catalyst/encoders/ExpressionEncoder;
	at sparklyr.ArrowConvertersImpl.toBatchIterator(arrowconverters.scala:120)
	at sparklyr.ArrowConvertersImpl.toBatchIterator(arrowconverters.scala:67)
	at sparklyr.ArrowConverters$.$anonfun$toArrowDataset$1(arrowconverters.scala:249)
	at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:226)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:933)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:933)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186)
	at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103)
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108)
	at scala.util.Using$.resource(Using.scala:269)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:957)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:960)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:852)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3870)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1685)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1670)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1670)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:4116)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4028)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4016)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:54)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1348)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1336)
	at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2997)
	at org.apache.spark.sql.execution.collect.Collector.$anonfun$runSparkJobs$1(Collector.scala:303)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:299)
	at org.apache.spark.sql.execution.collect.Collector.$anonfun$collect$1(Collector.scala:384)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:381)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:122)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:131)
	at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:94)
	at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:90)
	at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:78)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:549)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:540)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:557)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:400)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:400)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:318)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:558)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:555)
	at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3758)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4707)
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3725)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$3(Dataset.scala:4698)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1081)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4696)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$9(SQLExecution.scala:386)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:669)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withN
Error in `db_query_fields.DBIConnection()`:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
databricks Issues related to Databricks connection mode
Projects
None yet
Development

No branches or pull requests

2 participants