-
Notifications
You must be signed in to change notification settings - Fork 5.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Modify Spark on Ray to support Pex and other virtualenvs + direct scr… #45354
base: master
Are you sure you want to change the base?
Conversation
…ipts launch in databricks runtime Modify Spark on Ray to support Pex and other virtualenvs + direct scripts launch in databricks runtime Signed-off-by: fgchurch <gchurch@abyss.net>
764375d
to
e2324ba
Compare
|
||
if __name__ == "__main__": | ||
def try_clean_temp_dir_at_exit(process, lock_fd, collect_log_to_path, temp_dir): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might lose some context. Do we need to modify so many lines in python/ray/util/spark/start_ray_node.py
file ?
if I understand it correctly, to support PEX, we just need to add a config to allow changing python executable path when starting ray-on-spark child process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair question(s). I modified start_ray_node.py for a number of reasons:
-
Direct calling of
ray.scripts.scripts.main()
per your feedback in my previous P/R ( Add Pex support for ray-on-spark, this allows users to 'ship' any ver… #44411 (review) ). Based on that requirement and (from my understanding of) general python community consensus regarding when/why to utilize subprocess this is possible and most efficiently done by forking the existing runtime initialized in cluster_init.py, without having the overhead of (os.fork +os.execvp) -- path scrubbing then reapplying -- and stdin/stdout redirection. I do understand the convenience of subprocess.popen from a code perspective but it's bulky when all we really needed was a light clone of the existing runtime. I specifically saw some overhead/slowness with the existing implementation (comparatively) with higher levels of logging and heavier/complex model implementations. -
For line 69 specifically I noticed the "try_clean_temp_dir_at_exit()" function has local constants when it could simply accept them as parameters, so in staying with the overall ray codebase designs and python community guidelines I parameterized the constants so that the function itself would be more clearly/independently defined.
-
For code clarity, and to stay aligned with the rest of the ray codebase I created a "main()" function for the start_ray_node module, which is making it slightly look like I changed a little bit more of the codebase than I actually did. All of the "how it works architecture/functionality is the same".
-
I changed line 46 to implement ray on spark logging as I have seen it implemented elsewhere in the ray codebase via
ray_constants.LOGGER_LEVEL
-
I added logger.debug level information for users, I think this is especially important for non-databricks users who may be implementing their own spark clusters with their own underlying filesystem types (like they might likely do on kubernetes). One place in particular this can be problematic is with file locks, I added some logging that will explain needing to change the ray temp dir/log dir or fs in this type of a situation.
-
is_fd_closed() and list_fds() are used as more of a safety net than hard requirement type of way, that help mitigate any possible hanging file descriptors in the case of shutting down a ray cluster in an existing python/spark/notebook context and then starting up a new ray cluster within the same context. This is more of an edge case, but one that I did not want to overlook.
-
In try_clean_temp_dir_at_exit() changes reflect using fork instead of subprocess, and more explicitly handling Exceptions, with some pretty clear debug logging.
I'll add another comment showing a side-by-side with process logging, and some of my unit tests killing spark, calling ray shutdowns() etc while monitoring sys processes and fd(s).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
emm, this is a big change, my concern is it might introduce new bugs that unit tests can't cover. Previously I have fixed several bugs and current code has been fully tested.
can we split the work into
(1) setting python bin environmental variables for switching virtual python env
and (2) refactor start_ray_node.py
, avoid launching child process.
(2) is a big change, we need fully testing, do you have databricks environment ? Before merging (2) we need to fully test the change in databricks otherwise it might break our customer's workloads. Personally I don't like to do such change except we have critical bug or serious performance issue.
@jjyao FYI this is the new PR to shepherd through. |
@WeichenXu123 do you have time to finish the review? |
…ipts launch in databricks runtime
Modify Spark on Ray to support Pex and other virtualenvs + direct scripts launch in databricks runtime
@WeichenXu123
@jjyao
@anyscalesam
Why are these changes needed?
PEX is a very powerful python dependency distribution tool. It especially shines when it is used in conjunction with PySpark. It is currently the only dependency distribution utility capable of being used with any spark cluster, allowing users to "bootstrap" python dependencies isolated "per-SparkSession". In it's existing form, ray-on-spark is not capable of being deployed and executed from a .pex file on a spark cluster. This PR changes that and makes ray-on-spark fully portable with .pex. This has numerous benefits (multiple ray versions supported on a single spark cluster for complex dependency chains, lighter-weight more generic spark images, easier per-job python dependency resolution, and more).
These changes primarily alter how start_ray_node.py operates within Ray cluster_init, via a more lightweight fork than the existing subprocess method. It adds the ability to enable debug logging with much more detailed information about start_ray_node when enabled.
From the user-facing side there is no required change. Users can now distribute their python dependencies with ray-on-spark via a local path to their pex file like "~./ray-pex-deps.pex". It can be distributed to the spark workers using spark.files (s3_location or local_location). Users can now optionally also change the logging level at runtime using from ray._private.ray_constants.LOGGER_LEVEL = 'debug' or another valid logging level.
Example
Generate Ray Pex File using future version of ray
Initialize PySpark SparkSession
(Lots of options here, could use spark-submit, google-sparksubmit-operator for kubernetes, python kernel in Jupyter, etc)
For Jupyter you would do below individually in notebook cells -- note require pants-jupyter-plugin
In a python script or notebook cell
You could launch via a python script on the spark cluster like ray-pex-deps.pex my_example_python.py
I'm still planning on posting some material and examples once this is merged, happy to share some links.
Related issue number
Continuation of #44411 based on discussion.
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.