Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

can we have the option to use localCheckpoint instead of checkpoint? #429

Open
skanderboudawara opened this issue Mar 21, 2023 · 1 comment

Comments

@skanderboudawara
Copy link

Hello,

can we have the option to use localCheckpoint instead of checkpoint?

In fact in our project we do not have access to put setCheckpointDir due to access limitation but localCheckpoint is working fine.

It would be good to have this feature available?

@TinoSM
Copy link

TinoSM commented Jan 31, 2024

+1 on this, we swapped to GraphX algorithm because we don't want to checkpoint in S3 and Kubernetes Sparks typically runs without an HDFS

Not sure about GraphX vs GraphFrames performance, we didn't see impact in mock tests (but our graphs usually dont very long "paths"), would be nice if someone knows about any benchmark.

Btw we had to revert to GraphFrames because GraphX is buggy in our system (i.e. wrong results). For those interested, adding a guide on how we added localCheckpoint (maybe we'll push this to this repo, but still under testing and we are not contributors here, so would need time to adapt the test we delete, which should be fairly easy). It passes all other tests.

I'm doing testing as of now, but tbh this should be even safer than our default mechanism since S3 has some consistency issues...

1. I just went to https://github.com/graphframes/graphframes/tree/master and downloaded the tag I wanted
2. Download and remove "python" folder
3. In "ConnectedComponentsSuite.scala" just remove the test named "checkpoint interval"
4. In "ConnectedComponents.scala"
    1. Add previous_ee var
    ```
    var previous_ee : DataFrame = null
    while (!converged)
    ```
    2. Change this loop (and the next line) to:
    ```
      if (shouldCheckpoint && (iteration % checkpointInterval == 0)) {
        ee = ee.localCheckpoint()
        // remove previous checkpoint
        if (previous_ee != null) {
          previous_ee.unpersist()
        }
        previous_ee = ee

        System.gc() // hint Spark to clean shuffle directories
      }
      
      // ee.persist(intermediateStorageLevel)
    ```
    3. Remove this if:
    ```
    val checkpointDir: Option[String] = if (shouldCheckpoint) {
      val dir = sc.getCheckpointDir.map { d =>
        new Path(d, s"$CHECKPOINT_NAME_PREFIX-$runId").toString
      }.getOrElse {
        throw new IOException(
          "Checkpoint directory is not set. Please set it first using sc.setCheckpointDir().")
      }
      logInfo(s"$logPrefix Using $dir for checkpointing with interval $checkpointInterval.")
      Some(dir)
    } else {
      logInfo(
        s"$logPrefix Checkpointing is disabled because checkpointInterval=$checkpointInterval.")
      None
    }
    ```
5. Run "build/sbt assembly -Dspark.version=3.4.1", the jar is in $PWD/target/scala2.12/...

PS: If anyone is willing, feel free to push it to the repo (aka create a pullrequest) without asking me

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants