Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sparklyr invocation chooses incorrect overloaded method #3360

Open
joveyuan-db opened this issue Aug 1, 2023 · 6 comments
Open

sparklyr invocation chooses incorrect overloaded method #3360

joveyuan-db opened this issue Aug 1, 2023 · 6 comments
Labels
databricks Issues related to Databricks connection mode

Comments

@joveyuan-db
Copy link
Contributor

sparklyr's invocation does not correctly handle overloaded methods with Array and Map parameter types. Specifically, in the following example

class C {
  def f(args: Array[_]): Unit = {
    println(args.nonEmpty)
  }

  def f(args: Map[String, Any]): Unit = {}
}

val index = findMatchedSignature(classOf[C].getMethods.filter(_.getName == "f").map(_.getParameterTypes), Array(Map()))
classOf[C].getMethods.filter(_.getName == "f")(index.get).invoke(new C, Map())

findMatchedSignature chooses the first method and we invoke f(args: Array[_]) with Map(). Checking whether the array is non-empty then throws a MapError. This issue is not exclusive to the fact that the map is empty.

image

I believe this is the root cause of why parameterized queries break on Databricks (

# TODO: DATABRICKS restore when figuring out how to do parameters
# sdf <- invoke(hive_context(conn), "sql", sql, as.environment(params))
), because Spark 3.5 further overloads the sql method (https://github.com/apache/spark/blob/eb4152dfcd6e9ef42750e7bac5238efe641458d2/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L641-L660). There now exists both a sql(sqlText: String, args: Array[_]) and a sql(sqlText: String, args: Map[String, Any]) method signature in newer Spark versions, leading to sparklyr choosing the wrong one.

@edgararuiz
Copy link
Collaborator

Hi, is this using sparklyr dev? or CRAN?

@joveyuan-db
Copy link
Contributor Author

This was using sparklyr 1.8.2, although I suspect it affects other sparklyr versions including dev.

I copied the findMatchedSignature method from

def findMatchedSignature(
parameterTypesOfMethods: Array[Array[Class[_]]],
args: Array[Object],
logger: Logger): Option[Int] = {
val numArgs = args.length
var mismatches = Array("")
for (index <- 0 until parameterTypesOfMethods.length) {
val parameterTypes = parameterTypesOfMethods(index)
if (parameterTypes.length == numArgs) {
var argMatched = true
var i = 0
while (i < numArgs && argMatched) {
val parameterType = parameterTypes(i)
if (parameterType.isAssignableFrom(classOf[Seq[Any]]) &&
args(i) != null && args(i).getClass.isArray) {
// The case that the parameter type is a Scala Seq and the argument
// is a Java array is considered matching. The array will be converted
// to a Seq later if this method is matched.
} else if (parameterTypes(i).isAssignableFrom(classOf[Array[Float]]) &&
(args(i) != null && args(i).getClass == classOf[Array[java.lang.Float]])) {
// Array[java.lang.Float] is compatible with Array[scala.Float]
} else if ((parameterType == classOf[Char] ||
parameterType == classOf[java.lang.Character]) &&
args(i) != null && args(i).isInstanceOf[String]) {
// Pparameter type is Char and argument is String.
// Check that the string has length 1.
if (args(i).asInstanceOf[String].length != 1) argMatched = false
} else if (parameterType == classOf[Short] && args(i) != null &&
args(i).isInstanceOf[Integer]) {
// Parameter type is Short and argument is Integer.
} else if (parameterType == classOf[Long] && args(i) != null &&
args(i).isInstanceOf[Integer]) {
// Parameter type is Long and argument is Integer.
// This is done for backwards compatibility.
} else if (args(i) != null &&
((args(i).isInstanceOf[java.lang.Float] && parameterType == classOf[Float]) ||
(args(i).isInstanceOf[Float] && parameterType == classOf[java.lang.Float]))) {
// Consider java.lang.Float as compatible with scala.Float
} else {
var parameterWrapperType = parameterType
// Convert native parameters to Object types as args is Array[Object] here
if (parameterType.isPrimitive) {
parameterWrapperType = parameterType match {
case java.lang.Integer.TYPE => classOf[java.lang.Integer]
case java.lang.Long.TYPE => classOf[java.lang.Double]
case java.lang.Double.TYPE => classOf[java.lang.Double]
case java.lang.Boolean.TYPE => classOf[java.lang.Boolean]
case _ => parameterType
}
}
if (args(i) != null &&
parameterWrapperType.isAssignableFrom(args(i).getClass)) {
// If the parameter type in question is assignable from args(i), then
// consider args(i) a match.
} else if ((parameterType.isPrimitive || args(i) != null) &&
!parameterWrapperType.isInstance(args(i))) {
argMatched = false
mismatches = mismatches ++ Array("method instance " + index + " with " + parameterTypes.length + " parameters for parameter " + i)
}
}
i = i + 1
}
if (argMatched) {
// Convert args if needed
val parameterTypes = parameterTypesOfMethods(index)
(0 until numArgs).map { i =>
if (parameterTypes(i) == classOf[Seq[Any]] &&
args(i) != null && args(i).getClass.isArray) {
// Convert a Java array to scala Seq
args(i) = args(i).asInstanceOf[Array[_]].toSeq
} else if (parameterTypes(i).isAssignableFrom(classOf[Array[Float]]) &&
(args(i) != null && args(i).getClass == classOf[Array[java.lang.Float]])) {
args(i) = args(i).asInstanceOf[Array[java.lang.Float]].map(x => x.asInstanceOf[Float]).toArray
} else if ((parameterTypes(i) == classOf[Char] ||
parameterTypes(i) == classOf[java.lang.Character]) &&
args(i) != null && args(i).isInstanceOf[String]) {
// Convert String to Char
args(i) = new java.lang.Character(args(i).asInstanceOf[String](0))
} else if (parameterTypes(i) == classOf[Short] &&
args(i) != null && args(i).isInstanceOf[Integer]) {
// Convert Integer to Short
val argInt = args(i).asInstanceOf[Integer]
if (argInt > Short.MaxValue || argInt < Short.MinValue) {
throw new Exception("Unable to cast integer to Short: out of range.")
}
args(i) = new java.lang.Short(argInt.toShort)
} else if (parameterTypes(i) == classOf[Long] &&
args(i) != null && args(i).isInstanceOf[Double]) {
// Try to convert Double to Long
val argDouble = args(i).asInstanceOf[Double]
if (argDouble > Long.MaxValue || argDouble < Long.MinValue) {
throw new Exception("Unable to cast numeric to Long: out of range.")
}
args(i) = new java.lang.Long(argDouble.toLong)
} else if (parameterTypes(i) == classOf[Long] &&
args(i) != null && args(i).isInstanceOf[Integer]) {
args(i) = new java.lang.Long(args(i).asInstanceOf[Integer].toLong)
}
}
return Some(index)
}
}
}
logger.logWarning(s"Failed to match" + mismatches.mkString(", ") + ".")
None
}
into a Scala REPL and was able to confirm the issue on the toy example above.

@edgararuiz
Copy link
Collaborator

Ok thanks, is there some R code we can run to test that? I don't see an error that we can recreate

@joveyuan-db
Copy link
Contributor Author

Yes, apologies. I reproduced this in Databricks with Spark 3.5 and sparklyr 1.8.2 with the following query. The third line is an implementation detail of sparklyr (eventually called by sdf_len, for example), but isolates the core issue.

library(sparklyr)
sc <- spark_connect(method = "databricks")
DBI::dbSendQuery(sc, "SELECT 0L as `id`")
image

I haven't gotten the chance to verify this outside of Databricks, but I believe any environment with Spark 3.5 will be sufficient.

@edgararuiz
Copy link
Collaborator

Hi @joveyuan-db, are you sure you're on Spark 3.5? The latest GA release is 3.4.1. Also, with Databricks, Spark 3.4 (ML 13) and above does not currently work with CRAN sparklyr. Here is the tracking issue: #3334
We have an article on how to use the new solution: https://spark.rstudio.com/deployment/databricks-spark-connect

@edgararuiz edgararuiz added awaiting response databricks Issues related to Databricks connection mode labels Aug 8, 2023
@joveyuan-db
Copy link
Contributor Author

Hi Edgar, yeah, I'm testing a preview build internally in Databricks. I'm using sparklyr from the Databricks cluster itself, so it doesn't require Databricks Connect though.

The root cause (bug in findMatchedSignature) is not specific to Spark 3.5 by the way, but 3.5 surfaces it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
databricks Issues related to Databricks connection mode
Projects
None yet
Development

No branches or pull requests

2 participants