From 259033a99e6e2f4dabe2555b37dabfb87aeae236 Mon Sep 17 00:00:00 2001 From: Curtis Vogt Date: Thu, 14 Sep 2023 15:23:56 -0500 Subject: [PATCH] Runtime environment can specify Julia executable and arguments (#8) * Support specifying runtime env executable * Support specifying runtime env args * Support specifying executable/args via runtime env * Avoid quoting default * Switch to using command in RuntimeEnvContext * Use separate command for Julia * Add TODO about switching to plugin Co-authored-by: Dave Kleinschmidt Signed-off-by: Curtis Vogt --------- Signed-off-by: Curtis Vogt Co-authored-by: Dave Kleinschmidt --- .../runtime_env/agent/runtime_env_agent.py | 6 +++- python/ray/_private/runtime_env/context.py | 34 ++++++++----------- python/ray/runtime_env/runtime_env.py | 8 +++++ 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/python/ray/_private/runtime_env/agent/runtime_env_agent.py b/python/ray/_private/runtime_env/agent/runtime_env_agent.py index 2d5052a633a7..5ed8a56182e9 100644 --- a/python/ray/_private/runtime_env/agent/runtime_env_agent.py +++ b/python/ray/_private/runtime_env/agent/runtime_env_agent.py @@ -298,7 +298,11 @@ async def _setup_runtime_env( # TODO(chenk008): Add log about allocated_resource to # avoid lint error. That will be moved to cgroup plugin. per_job_logger.debug(f"Worker has resource :" f"{allocated_resource}") - context = RuntimeEnvContext(env_vars=runtime_env.env_vars()) + context = RuntimeEnvContext( + env_vars=runtime_env.env_vars(), + # TODO: use plugin instead of special casing + julia_command=runtime_env.julia_command(), + ) await self._container_manager.setup( runtime_env, context, logger=per_job_logger ) diff --git a/python/ray/_private/runtime_env/context.py b/python/ray/_private/runtime_env/context.py index b65390efa5d1..210a954916ac 100644 --- a/python/ray/_private/runtime_env/context.py +++ b/python/ray/_private/runtime_env/context.py @@ -1,6 +1,7 @@ import json import logging import os +import shlex import subprocess import sys from typing import Any, Dict, List, Optional @@ -25,6 +26,7 @@ def __init__( resources_dir: Optional[str] = None, container: Dict[str, Any] = None, java_jars: List[str] = None, + julia_command: List[str] = None, ): self.command_prefix = command_prefix or [] self.env_vars = env_vars or {} @@ -36,6 +38,7 @@ def __init__( self.resources_dir: str = resources_dir self.container = container or {} self.java_jars = java_jars or [] + self.julia_command = julia_command or [] def serialize(self) -> str: return json.dumps(self.__dict__) @@ -47,16 +50,16 @@ def deserialize(json_string): return RuntimeEnvContext(**json.loads(json_string)) def exec_worker(self, passthrough_args: List[str], language: Language): - # TODO(Beacon): remove these when we're done + # TODO(Beacon): remove these when we're done logger.debug(f"Worker context env: {self.env_vars}") update_envs(self.env_vars) if language == Language.PYTHON and sys.platform == "win32": - executable = self.py_executable + command = [self.py_executable] elif language == Language.PYTHON: - executable = f"exec {self.py_executable}" + command = ["exec", self.py_executable] elif language == Language.JAVA: - executable = "java" + command = ["java"] ray_jars = os.path.join(get_ray_jars_dir(), "*") local_java_jars = [] @@ -67,23 +70,15 @@ def exec_worker(self, passthrough_args: List[str], language: Language): class_path_args = ["-cp", ray_jars + ":" + str(":".join(local_java_jars))] passthrough_args = class_path_args + passthrough_args elif language == Language.JULIA: - executable = "julia" - args = [ - "-e", "'using Ray; start_worker()'", - "--" - ] - # TODO(omus): required to avoid escaping the Julia code. Ideally - # this information would be passed in via the serialized runtime - # context. - executable = " ".join([executable] + args) + command = self.julia_command or ["julia", "-e", "using Ray; start_worker()"] + command += ["--"] elif sys.platform == "win32": - executable = "" + command = [""] else: - executable = "exec " + command = ["exec"] - passthrough_args = [s.replace(" ", r"\ ") for s in passthrough_args] - exec_command = " ".join([f"{executable}"] + passthrough_args) - command_str = " ".join(self.command_prefix + [exec_command]) + command = self.command_prefix + command + passthrough_args + command_str = shlex.join(command) # TODO(SongGuyang): We add this env to command for macOS because it doesn't # work for the C++ process of `os.execvp`. We should find a better way to # fix it. @@ -98,8 +93,7 @@ def exec_worker(self, passthrough_args: List[str], language: Language): ) logger.debug(f"Exec'ing worker with command: {command_str}") if sys.platform == "win32": - cmd = [*self.command_prefix, executable, *passthrough_args] - subprocess.Popen(cmd, shell=True).wait() + subprocess.Popen(command, shell=True).wait() else: # PyCharm will monkey patch the os.execvp at # .pycharm_helpers/pydev/_pydev_bundle/pydev_monkey.py diff --git a/python/ray/runtime_env/runtime_env.py b/python/ray/runtime_env/runtime_env.py index ecba04a260b1..757eb8012ce2 100644 --- a/python/ray/runtime_env/runtime_env.py +++ b/python/ray/runtime_env/runtime_env.py @@ -254,6 +254,7 @@ class MyClass: "container", "excludes", "env_vars", + "julia_command", "_ray_release", "_ray_commit", "_inject_current_ray", @@ -308,6 +309,8 @@ def __init__( if runtime_env.get("java_jars"): runtime_env["java_jars"] = runtime_env.get("java_jars") + if runtime_env.get("julia_command"): + runtime_env["julia_command"] = runtime_env.get("julia_command") self.update(runtime_env) @@ -442,6 +445,11 @@ def java_jars(self) -> List[str]: def env_vars(self) -> Dict: return self.get("env_vars", {}) + def julia_command(self) -> List[str]: + if "julia_command" in self: + return list(self["julia_command"]) + return [] + def has_conda(self) -> str: if self.get("conda"): return True