Skip to content

Commit

Permalink
Add Spark 3.5.0 support (#436)
Browse files Browse the repository at this point in the history
* Add Spark 3.5.0 support

* Use 2.12.12 in CI to make scover work

* Disable AQE for test 'intermediate storage level' in ConnectedComponentsSuite
  • Loading branch information
EnricoMi committed Sep 30, 2023
1 parent 8ede065 commit e54f249
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 16 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ jobs:
fail-fast: false
matrix:
include:
- spark-version: 3.5.0
scala-version: 2.12.18
python-version: 3.9
- spark-version: 3.4.1
scala-version: 2.12.17
python-version: 3.9
Expand Down
8 changes: 6 additions & 2 deletions .github/workflows/scala-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ jobs:
fail-fast: false
matrix:
include:
- spark-version: 3.5.0
scala-version: 2.13.8
- spark-version: 3.5.0
scala-version: 2.12.12
- spark-version: 3.4.1
scala-version: 2.13.8
- spark-version: 3.4.1
Expand All @@ -14,10 +18,10 @@ jobs:
scala-version: 2.13.8
- spark-version: 3.3.3
scala-version: 2.12.12
- spark-version: 3.2.4
scala-version: 2.12.12
- spark-version: 3.2.4
scala-version: 2.13.5
- spark-version: 3.2.4
scala-version: 2.12.12
runs-on: ubuntu-22.04
env:
# fixing this error after tests success: sbt.ForkMain failed with exit code 134
Expand Down
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import ReleaseTransformations._

resolvers += "Spark snapshot repository" at "https://repository.apache.org/snapshots/"

val sparkVer = sys.props.getOrElse("spark.version", "3.4.1")
val sparkVer = sys.props.getOrElse("spark.version", "3.5.0")
val sparkBranch = sparkVer.substring(0, 3)
val defaultScalaVer = sparkBranch match {
case "3.5" => "2.12.18"
case "3.4" => "2.12.17"
case "3.3" => "2.12.15"
case "3.2" => "2.12.15"
Expand Down
2 changes: 1 addition & 1 deletion dev/release.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def verify(prompt, interactive):
@click.option("--publish-docs", type=bool, default=PUBLISH_DOCS_DEFAULT, show_default=True,
help="Publish docs to github-pages.")
@click.option("--spark-version", multiple=True, show_default=True,
default=["3.2.4", "3.3.3", "3.4.1"])
default=["3.2.4", "3.3.3", "3.4.1", "3.5.0"])
def main(release_version, next_version, publish_to, no_prompt, git_remote, publish_docs,
spark_version):
interactive = not no_prompt
Expand Down
37 changes: 25 additions & 12 deletions src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -215,18 +215,31 @@ class ConnectedComponentsSuite extends SparkFunSuite with GraphFrameTestSparkCon
}

test("intermediate storage level") {
val friends = Graphs.friends
val expected = Set(Set("a", "b", "c", "d", "e", "f"), Set("g"))

val cc = friends.connectedComponents
assert(cc.getIntermediateStorageLevel === StorageLevel.MEMORY_AND_DISK)

for (storageLevel <- Seq(StorageLevel.DISK_ONLY, StorageLevel.MEMORY_ONLY, StorageLevel.NONE)) {
// TODO: it is not trivial to confirm the actual storage level used
val components = cc
.setIntermediateStorageLevel(storageLevel)
.run()
assertComponents(components, expected)
// disabling adaptive query execution helps assertComponents
val enabled = spark.conf.getOption("spark.sql.adaptive.enabled")
try {
spark.conf.set("spark.sql.adaptive.enabled", value = false)

val friends = Graphs.friends
val expected = Set(Set("a", "b", "c", "d", "e", "f"), Set("g"))

val cc = friends.connectedComponents
assert(cc.getIntermediateStorageLevel === StorageLevel.MEMORY_AND_DISK)

for (storageLevel <- Seq(StorageLevel.DISK_ONLY, StorageLevel.MEMORY_ONLY, StorageLevel.NONE)) {
// TODO: it is not trivial to confirm the actual storage level used
val components = cc
.setIntermediateStorageLevel(storageLevel)
.run()
assertComponents(components, expected)
}
} finally {
// restoring earlier conf
if (enabled.isDefined) {
spark.conf.set("spark.sql.adaptive.enabled", value = enabled.get)
} else {
spark.conf.unset("spark.sql.adaptive.enabled")
}
}
}

Expand Down

0 comments on commit e54f249

Please sign in to comment.