Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify Spark on Ray to support Pex and other virtualenvs + direct scr… #45354

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

GabeChurch
Copy link

…ipts launch in databricks runtime

Modify Spark on Ray to support Pex and other virtualenvs + direct scripts launch in databricks runtime

@WeichenXu123
@jjyao
@anyscalesam

Why are these changes needed?

PEX is a very powerful python dependency distribution tool. It especially shines when it is used in conjunction with PySpark. It is currently the only dependency distribution utility capable of being used with any spark cluster, allowing users to "bootstrap" python dependencies isolated "per-SparkSession". In it's existing form, ray-on-spark is not capable of being deployed and executed from a .pex file on a spark cluster. This PR changes that and makes ray-on-spark fully portable with .pex. This has numerous benefits (multiple ray versions supported on a single spark cluster for complex dependency chains, lighter-weight more generic spark images, easier per-job python dependency resolution, and more).

These changes primarily alter how start_ray_node.py operates within Ray cluster_init, via a more lightweight fork than the existing subprocess method. It adds the ability to enable debug logging with much more detailed information about start_ray_node when enabled.

From the user-facing side there is no required change. Users can now distribute their python dependencies with ray-on-spark via a local path to their pex file like "~./ray-pex-deps.pex". It can be distributed to the spark workers using spark.files (s3_location or local_location). Users can now optionally also change the logging level at runtime using from ray._private.ray_constants.LOGGER_LEVEL = 'debug' or another valid logging level.

Example

Generate Ray Pex File using future version of ray

/opt/conda/bin/pex ray[default,rllib,tune]==2.10.1 -o $(pwd)/ray-pex-deps.pex --inherit-path=fallback

Initialize PySpark SparkSession
(Lots of options here, could use spark-submit, google-sparksubmit-operator for kubernetes, python kernel in Jupyter, etc)

For Jupyter you would do below individually in notebook cells -- note require pants-jupyter-plugin

%reload_ext pants_jupyter_plugin
%pex_load ray-pex-deps.pex

In a python script or notebook cell

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import os

pyspark_deps_name = 'ray-pex-deps.pex'
os.environ['PYSPARK_PYTHON'] = f"./{pyspark_deps_name}"

conf = SparkConf().setMaster("spark://spark-master:7077") # or for local .setMaster('local[*]')
hostname = os.popen("hostname -i").read().split("\n")[0]
conf.set("spark.deployMode","client")
conf.set("spark.driver.host",hostname)
#shipping pex dependencies from local or s3, the pex file must exist locally for most use cases
conf.set("spark.files",pyspark_deps_name)
#required for ray on spark to utilize pex file for subprocesses

sc = pyspark.SparkContext(conf=conf)
sc.setLogLevel("warn")
spark = SparkSession(sc)

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
from ray._private import ray_constants

ray_constants.LOGGER_LEVEL = 'debug'
conn_str = setup_ray_cluster(max_worker_nodes=2,
                                               min_worker_nodes=0)

import ray
ray.init(conn_str[0])

#Ray code below
#End ray code
shutdown_ray_cluster()
spark.stop()

You could launch via a python script on the spark cluster like ray-pex-deps.pex my_example_python.py

I'm still planning on posting some material and examples once this is merged, happy to share some links.

Related issue number

Continuation of #44411 based on discussion.

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

…ipts launch in databricks runtime

Modify Spark on Ray to support Pex and other virtualenvs + direct scripts launch in databricks runtime

Signed-off-by: fgchurch <gchurch@abyss.net>

if __name__ == "__main__":
def try_clean_temp_dir_at_exit(process, lock_fd, collect_log_to_path, temp_dir):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might lose some context. Do we need to modify so many lines in python/ray/util/spark/start_ray_node.py file ?

if I understand it correctly, to support PEX, we just need to add a config to allow changing python executable path when starting ray-on-spark child process.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair question(s). I modified start_ray_node.py for a number of reasons:

  1. Direct calling of ray.scripts.scripts.main() per your feedback in my previous P/R ( Add Pex support for ray-on-spark, this allows users to 'ship' any ver… #44411 (review) ). Based on that requirement and (from my understanding of) general python community consensus regarding when/why to utilize subprocess this is possible and most efficiently done by forking the existing runtime initialized in cluster_init.py, without having the overhead of (os.fork +os.execvp) -- path scrubbing then reapplying -- and stdin/stdout redirection. I do understand the convenience of subprocess.popen from a code perspective but it's bulky when all we really needed was a light clone of the existing runtime. I specifically saw some overhead/slowness with the existing implementation (comparatively) with higher levels of logging and heavier/complex model implementations.

  2. For line 69 specifically I noticed the "try_clean_temp_dir_at_exit()" function has local constants when it could simply accept them as parameters, so in staying with the overall ray codebase designs and python community guidelines I parameterized the constants so that the function itself would be more clearly/independently defined.

  3. For code clarity, and to stay aligned with the rest of the ray codebase I created a "main()" function for the start_ray_node module, which is making it slightly look like I changed a little bit more of the codebase than I actually did. All of the "how it works architecture/functionality is the same".

  4. I changed line 46 to implement ray on spark logging as I have seen it implemented elsewhere in the ray codebase via ray_constants.LOGGER_LEVEL

  5. I added logger.debug level information for users, I think this is especially important for non-databricks users who may be implementing their own spark clusters with their own underlying filesystem types (like they might likely do on kubernetes). One place in particular this can be problematic is with file locks, I added some logging that will explain needing to change the ray temp dir/log dir or fs in this type of a situation.

  6. is_fd_closed() and list_fds() are used as more of a safety net than hard requirement type of way, that help mitigate any possible hanging file descriptors in the case of shutting down a ray cluster in an existing python/spark/notebook context and then starting up a new ray cluster within the same context. This is more of an edge case, but one that I did not want to overlook.

  7. In try_clean_temp_dir_at_exit() changes reflect using fork instead of subprocess, and more explicitly handling Exceptions, with some pretty clear debug logging.

I'll add another comment showing a side-by-side with process logging, and some of my unit tests killing spark, calling ray shutdowns() etc while monitoring sys processes and fd(s).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emm, this is a big change, my concern is it might introduce new bugs that unit tests can't cover. Previously I have fixed several bugs and current code has been fully tested.

can we split the work into
(1) setting python bin environmental variables for switching virtual python env

and (2) refactor start_ray_node.py, avoid launching child process.

(2) is a big change, we need fully testing, do you have databricks environment ? Before merging (2) we need to fully test the change in databricks otherwise it might break our customer's workloads. Personally I don't like to do such change except we have critical bug or serious performance issue.

@anyscalesam
Copy link
Collaborator

@jjyao FYI this is the new PR to shepherd through.

@anyscalesam anyscalesam added P1 Issue that should be fixed within a few weeks core Issues that should be addressed in Ray Core labels May 29, 2024
@jjyao
Copy link
Contributor

jjyao commented May 30, 2024

@WeichenXu123 do you have time to finish the review?

@anyscalesam anyscalesam added core Issues that should be addressed in Ray Core and removed core Issues that should be addressed in Ray Core labels May 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Issues that should be addressed in Ray Core P1 Issue that should be fixed within a few weeks
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants