From 0fd28f9e5d9c44a678e38bc622253f79b3c3b3c9 Mon Sep 17 00:00:00 2001 From: lart Date: Fri, 1 Mar 2024 22:40:59 +0800 Subject: [PATCH 1/5] fixed the gpu assignment --- run_it.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/run_it.py b/run_it.py index 50c5f3c..58e62fb 100644 --- a/run_it.py +++ b/run_it.py @@ -66,7 +66,7 @@ def get_used_mem(self, return_ratio=False): def _create_sub_proc(self, cmd=""): self.sub_proc = subprocess.Popen( - args=cmd, + args=f"CUDA_VISIBLE_DEVICES={self.gpu_id} {cmd}", stdin=self.stdin, stdout=self.stdout, stderr=self.stderr, From a7b1cac8b430b38f20b5cd8dfd29fd1283c05a74 Mon Sep 17 00:00:00 2001 From: lart Date: Sat, 13 Apr 2024 00:00:09 +0800 Subject: [PATCH 2/5] Refactoring based on process pools and cross-process communication managers. --- README.md | 46 +++++--- examples/config.txt | 5 - examples/config.yaml | 19 ++++ examples/demo.py | 14 ++- requirements.txt | 3 +- run_it.py | 263 +++++++++++++++---------------------------- 6 files changed, 156 insertions(+), 194 deletions(-) delete mode 100644 examples/config.txt create mode 100644 examples/config.yaml diff --git a/README.md b/README.md index 34fa794..56a12a1 100644 --- a/README.md +++ b/README.md @@ -14,40 +14,54 @@ Putting the machine into sleep is a disrespect for time. ```shell $ python run_it.py --help -usage: run_it.py [-h] [--verbose] [--gpu-pool GPU_POOL [GPU_POOL ...]] [--max-workers MAX_WORKERS] --cmd-pool CMD_POOL - [--max-used-ratio MAX_USED_RATIO] +usage: run_it.py [-h] [--gpu-pool GPU_POOL [GPU_POOL ...]] --max-workers MAX_WORKERS --cmd-pool CMD_POOL + [--interval-for-waiting-gpu INTERVAL_FOR_WAITING_GPU] [--interval-for-loop INTERVAL_FOR_LOOP] optional arguments: -h, --help show this help message and exit - --verbose Whether to print the output of the subprocess. --gpu-pool GPU_POOL [GPU_POOL ...] The pool containing all ids of your gpu devices. --max-workers MAX_WORKERS The max number of the workers. - --cmd-pool CMD_POOL The text file containing all your commands. It need to contain the launcher. - --max-used-ratio MAX_USED_RATIO - The max used ratio of the gpu. + --cmd-pool CMD_POOL The path of the yaml containing all cmds. + --interval-for-waiting-gpu INTERVAL_FOR_WAITING_GPU + In seconds, the interval for waiting for a GPU to be available. + --interval-for-loop INTERVAL_FOR_LOOP + In seconds, the interval for looping. ``` ## demo ```shell -$ python run_it.py --verbose --cmd-pool ./examples/config.txt # with the default `gpu-pool` and `max-workers` -$ python run_it.py --verbose --gpu-pool 0 1 --max-workers 2 --cmd-pool ./examples/config.txt +$ python run_it.py --cmd-pool ./examples/config.yaml # with the default `gpu-pool` and `max-workers` +$ python run_it.py --gpu-pool 0 2 3 --max-workers 3 --cmd-pool .\examples\config.yaml ```
-./examples/config.txt +./examples/config.yaml -```shell -$ cat ./examples/config.txt -python -m pip list -python --version -python ./examples/demo.py -python ./examples/demo.py -python ./examples/demo.py +```yaml +- name: job1 + command: "python ./examples/demo.py --value 1" + num_gpus: 1 + +- name: job2 + command: "python ./examples/demo.py --value 2" + num_gpus: 1 + +- name: job3 + command: "python ./examples/demo.py --value 3" + num_gpus: 1 + +- name: job4 + command: "python ./examples/demo.py --value 4" + num_gpus: 1 + +- name: job5 + command: "python ./examples/demo.py --value 5" + num_gpus: 2 ```
diff --git a/examples/config.txt b/examples/config.txt deleted file mode 100644 index 71ffcab..0000000 --- a/examples/config.txt +++ /dev/null @@ -1,5 +0,0 @@ -python -m pip list -python --version -python ./examples/demo.py -python ./examples/demo.py -python ./examples/demo.py diff --git a/examples/config.yaml b/examples/config.yaml new file mode 100644 index 0000000..22f2a48 --- /dev/null +++ b/examples/config.yaml @@ -0,0 +1,19 @@ +- name: job1 + command: "python ./examples/demo.py --value 1" + num_gpus: 1 + +- name: job2 + command: "python ./examples/demo.py --value 2" + num_gpus: 1 + +- name: job3 + command: "python ./examples/demo.py --value 3" + num_gpus: 1 + +- name: job4 + command: "python ./examples/demo.py --value 4" + num_gpus: 1 + +- name: job5 + command: "python ./examples/demo.py --value 5" + num_gpus: 2 diff --git a/examples/demo.py b/examples/demo.py index 12d3639..7718ef8 100644 --- a/examples/demo.py +++ b/examples/demo.py @@ -1 +1,13 @@ -print('Hello!') +import argparse +import os +import time + +parser = argparse.ArgumentParser() +parser.add_argument("--value", type=int, default=0) +args = parser.parse_args() + + +GPU_IDS = os.environ["CUDA_VISIBLE_DEVICES"] +print(f"[GPUs: {GPU_IDS}] Start {args.value}") +time.sleep(args.value * 2) +print(f"[GPUs: {GPU_IDS}] End {args.value}") diff --git a/requirements.txt b/requirements.txt index b3c5cb6..52d2303 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ # Automatically generated by https://github.com/damnever/pigar. -# RunIt/run_it.py: 12 -nvidia_ml_py3 == 7.352.0 +PyYAML==6.0 diff --git a/run_it.py b/run_it.py index 58e62fb..b22343d 100644 --- a/run_it.py +++ b/run_it.py @@ -4,197 +4,120 @@ # @GitHub : https://github.com/lartpang import argparse +import os import subprocess import time from enum import Enum -from multiprocessing import Process +from multiprocessing import Manager, Pool, freeze_support +from queue import Queue -import pynvml +import yaml -pynvml.nvmlInit() - -# SOME CONSTANT class STATUS(Enum): - NORMAL = 0 - CMD_INVALID = 1 - GPU_BUSY = 2 - - -class MyProcess: - slot_idx = -1 - curr_task_id = 0 - - def __init__( - self, - gpu_id, - verbose=True, - num_cmds=None, - max_used_ratio=0.5, - *, - stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - ): - super().__init__() - self.gpu_id = gpu_id - self.verbose = verbose - self.num_cmds = num_cmds - - self.stdin = stdin - self.stdout = stdout - self.stderr = stderr - - self.sub_proc = None - self.proc = None - - self.gpu_handler = pynvml.nvmlDeviceGetHandleByIndex(gpu_id) - self.max_used_ratio = max_used_ratio - MyProcess.slot_idx += 1 - - def __str__(self): - return f"[ID {self.slot_idx} INFO] NEW PROCESS SLOT ON GPU {self.gpu_id} IS CREATED!" - - def _used_ratio(self, used, total): - return used / total - - def get_used_mem(self, return_ratio=False): - meminfo = pynvml.nvmlDeviceGetMemoryInfo(self.gpu_handler) - if return_ratio: - return self._used_ratio(meminfo.used, meminfo.total) - return meminfo.used - - def _create_sub_proc(self, cmd=""): - self.sub_proc = subprocess.Popen( - args=f"CUDA_VISIBLE_DEVICES={self.gpu_id} {cmd}", - stdin=self.stdin, - stdout=self.stdout, - stderr=self.stderr, - shell=True, - # executable="bash", - # env=None, - close_fds=True, - bufsize=1, - text=True, - encoding="utf-8", - ) - print(f"[NEW TASK PID: {self.sub_proc.pid}] {self.sub_proc.args}") - - if self.verbose: - if self.stdout is not None and self.sub_proc is not None: - for l in self.sub_proc.stdout: - print(f"[ID: {self.curr_task_id}/{self.num_cmds} GPU: {self.gpu_id}] {l}", end="") - - def create_and_start_proc(self, cmd=None): - if (used_mem := self.get_used_mem(return_ratio=True)) > self.max_used_ratio: - # TODO: - # 当前的判定方式并不是太准确。最好的方式是由程序提供设置周期数的选项(`--num-epochs`), - # 首先按照num_epoch=1来进行初步的运行,并统计各个命令对应使用的显存。 - # 之后根据这些程序实际使用的显存来安排后续的操作。 - # 这可能需要程序对输出可以实现覆盖式(`--overwrite`)操作。 - self.status = STATUS.GPU_BUSY - print( - f"[ID {self.slot_idx} WARN] the memory usage of the GPU {self.gpu_id} is currently {used_mem}, " - f"which exceeds the maximum threshold {self.max_used_ratio}." - ) - return - - print(f"[ID {self.slot_idx} INFO] {cmd}") - MyProcess.curr_task_id += 1 - self.proc = Process(target=self._create_sub_proc, kwargs=dict(cmd=cmd)) - self.proc.start() - - # 只有成功创建并启动了进城后才改变状态 - self.status = STATUS.NORMAL - - def is_alive(self): - if self.status == STATUS.NORMAL: - return self.proc.is_alive() - return False - - -def read_cmds_from_txt(path): - with open(path, encoding="utf-8", mode="r") as f: - cmds = [] - for line in f: - line = line.strip() - if line: - cmds.append(line) - return cmds - - -# fmt: off + WAITING = 0 + RUNNING = 1 + DONE = 2 + FAILED = 3 + + +def worker(cmd: str, gpu_ids: str, queue: Queue, job_id: int, done_jobs: dict): + job_identifier = f"[Job-{job_id}:GPU-{gpu_ids}]" + + try: + print(f"{job_identifier} Executing {cmd}...") + + # 设置子程序环境变量 + env = os.environ.copy() + env["CUDA_VISIBLE_DEVICES"] = gpu_ids + subprocess.run(cmd, shell=True, check=True, env=env) + done_jobs[job_id] = STATUS.DONE + except subprocess.CalledProcessError as e: + print(f"{job_identifier} Command '{cmd}' failed: {e}") + + done_jobs[job_id] = STATUS.FAILED + + # 释放GPU资源回队列 + for gpu in gpu_ids.split(","): + queue.put(gpu) + print(f"{job_identifier} Release GPU {gpu_ids}...") + + def get_args(): + # fmt: off parser = argparse.ArgumentParser() - parser.add_argument("--verbose", action="store_true", help="Whether to print the output of the subprocess.") parser.add_argument("--gpu-pool", nargs="+", type=int, default=[0], help="The pool containing all ids of your gpu devices.") parser.add_argument("--max-workers", type=int, help="The max number of the workers.") - parser.add_argument("--cmd-pool",type=str, required=True, help="The text file containing all your commands. It need to contain the launcher.") - parser.add_argument("--max-used-ratio", type=float, default=0.5, help="The max used ratio of the gpu.") + parser.add_argument("--cmd-pool",type=str, required=True, help="The path of the yaml containing all cmds.") + parser.add_argument("--interval-for-waiting-gpu",type=int, default=3, help="In seconds, the interval for waiting for a GPU to be available.") + parser.add_argument("--interval-for-loop",type=int, default=1, help="In seconds, the interval for looping.") + # fmt: on + args = parser.parse_args() if args.max_workers is None: args.max_workers = len(args.gpu_pool) return args -# fmt: on def main(): args = get_args() print("[YOUR CONFIG]\n" + str(args)) - cmd_pool = read_cmds_from_txt(path=args.cmd_pool) - print("[YOUR CMDS]\n" + "\n".join(cmd_pool)) - - num_gpus = len(args.gpu_pool) - - print("[CREATE PROCESS OBJECTS]") - proc_slots = [] - for i in range(min(args.max_workers, len(cmd_pool))): # 确保slots数量小于等于命令数量 - gpu_id = i % num_gpus - proc = MyProcess( - gpu_id=args.gpu_pool[gpu_id], - verbose=args.verbose, - num_cmds=len(cmd_pool), - max_used_ratio=args.max_used_ratio, - ) - print(proc) - proc_slots.append(proc) - - for p in proc_slots: - if len(cmd_pool) == 0: # 确保出栈不会异常 - break - cmd = cmd_pool.pop() # 指令出栈 - p.create_and_start_proc(cmd=cmd) - if p.status == STATUS.GPU_BUSY: # 当前GPU显存不足,暂先跳过 - cmd_pool.append(cmd) # 指令未能顺利执行,重新入栈 - continue - - is_normal_ending = True - while proc_slots: - # the pool of the processes is not empty - for slot_idx, p in enumerate(proc_slots): # polling - if not p.is_alive(): - if len(cmd_pool) == 0: # 指令均在执行或者已被执行 - del proc_slots[slot_idx] - print("[NO MORE COMMANDS, DELETE THE PROCESS SLOT!]") - break - - cmd = cmd_pool.pop() - p.create_and_start_proc(cmd=cmd) - if p.status == STATUS.GPU_BUSY: # 当前GPU显存不足,暂先跳过 - cmd_pool.append(cmd) # 指令未能顺利执行,重新入栈 - continue - - if proc_slots and all([_p.status == STATUS.GPU_BUSY for _p in proc_slots]): - # 所有GPU都被外部程序占用,直接退出。因为如果我们的程序正常执行时,状态是NORMAL - print("[ALL GPUS ARE BUSY, EXIT THE LOOP!]") - proc_slots.clear() - is_normal_ending = False - break - time.sleep(1) - - if is_normal_ending: - print("[ALL COMMANDS HAVE BEEN COMPLETED!]") + + with open(args.cmd_pool, mode="r", encoding="utf-8") as f: + jobs = yaml.safe_load(f) + assert isinstance(jobs, (tuple, list)), jobs + print("[YOUR CMDS]\n" + "\n\t".join([str(job) for job in jobs])) + + manager = Manager() + # 创建一个跨进程共享的队列来统计空余的GPU资源 + available_gpus = manager.Queue() + for i in args.gpu_pool: + available_gpus.put(str(i)) + # 创建一个跨进程共享的dict来跟踪已完成的命令 + done_jobs = manager.dict() + for job_id, job_info in enumerate(jobs): + if job_info["num_gpus"] > len(args.gpu_pool): + raise ValueError(f"The number of gpus in job {job_id} is larger than the number of available gpus.") + done_jobs[job_id] = STATUS.WAITING + + # 创建进程池 + pool = Pool(processes=args.max_workers) + # 循环处理指令,直到所有指令都被处理 + while not all([status is STATUS.DONE for status in done_jobs.values()]): + for job_id, job_info in enumerate(jobs): + if done_jobs[job_id] in [STATUS.DONE, STATUS.RUNNING]: + continue + # else: STATUS.WAITING, STATUS.FAILED + + # job_name = job_info["name"] + command = job_info["command"] + num_gpus = job_info["num_gpus"] + + num_avaliable_gpus = available_gpus.qsize() + # 如果当前有足够的GPU资源,执行指令 + if num_gpus <= num_avaliable_gpus: + done_jobs[job_id] = STATUS.RUNNING + # 从队列中获取可用的GPU资源 + gpu_ids = ",".join([available_gpus.get() for _ in range(num_gpus)]) + # 执行给定的指令,并提供回调函数来更新完成的命令列表 + pool.apply_async(worker, args=(command, gpu_ids, available_gpus, job_id, done_jobs)) + else: + # 如果GPU资源不足,跳过当前指令,稍后重试 + print(f"Skipping '{command}', not enough GPUs available ({num_gpus} > {num_avaliable_gpus}).") + # 等待一段时间再次检查 + time.sleep(args.interval_for_waiting_gpu) + + # 等待一段时间再次检查 + time.sleep(args.interval_for_loop) + + # 关闭进程池并等待所有任务完成 + pool.close() + pool.join() + manager.shutdown() + + print("[ALL COMMANDS HAVE BEEN COMPLETED!]") if __name__ == "__main__": + freeze_support() main() From 8957feb01e718d45a1f45cc9d828a47bdc14c3e3 Mon Sep 17 00:00:00 2001 From: pang Date: Fri, 19 Apr 2024 15:57:31 +0800 Subject: [PATCH 3/5] Updated and fixed some bugs. --- README.md | 9 ++-- examples/config.yaml | 6 +-- examples/demo.py | 1 - run_it.py | 114 ++++++++++++++++++++++++------------------- 4 files changed, 70 insertions(+), 60 deletions(-) diff --git a/README.md b/README.md index 56a12a1..6049a62 100644 --- a/README.md +++ b/README.md @@ -14,8 +14,7 @@ Putting the machine into sleep is a disrespect for time. ```shell $ python run_it.py --help -usage: run_it.py [-h] [--gpu-pool GPU_POOL [GPU_POOL ...]] --max-workers MAX_WORKERS --cmd-pool CMD_POOL - [--interval-for-waiting-gpu INTERVAL_FOR_WAITING_GPU] [--interval-for-loop INTERVAL_FOR_LOOP] +usage: run_it.py [-h] [--gpu-pool GPU_POOL [GPU_POOL ...]] [--max-workers MAX_WORKERS] --cmd-pool CMD_POOL [--interval-for-waiting-gpu INTERVAL_FOR_WAITING_GPU] [--interval-for-loop INTERVAL_FOR_LOOP] optional arguments: -h, --help show this help message and exit @@ -46,22 +45,20 @@ $ python run_it.py --gpu-pool 0 2 3 --max-workers 3 --cmd-pool .\examples\config - name: job1 command: "python ./examples/demo.py --value 1" num_gpus: 1 - - name: job2 command: "python ./examples/demo.py --value 2" num_gpus: 1 - - name: job3 command: "python ./examples/demo.py --value 3" num_gpus: 1 - - name: job4 command: "python ./examples/demo.py --value 4" num_gpus: 1 - - name: job5 command: "python ./examples/demo.py --value 5" num_gpus: 2 +- { name: job6, command: "python ./examples/demo.py --value 5", num_gpus: 2 } +- { name: job7, command: "python ./examples/demo.py --value 5", num_gpus: 2 } ``` diff --git a/examples/config.yaml b/examples/config.yaml index 22f2a48..19a6479 100644 --- a/examples/config.yaml +++ b/examples/config.yaml @@ -1,19 +1,17 @@ - name: job1 command: "python ./examples/demo.py --value 1" num_gpus: 1 - - name: job2 command: "python ./examples/demo.py --value 2" num_gpus: 1 - - name: job3 command: "python ./examples/demo.py --value 3" num_gpus: 1 - - name: job4 command: "python ./examples/demo.py --value 4" num_gpus: 1 - - name: job5 command: "python ./examples/demo.py --value 5" num_gpus: 2 +- { name: job6, command: "python ./examples/demo.py --value 5", num_gpus: 2 } +- { name: job7, command: "python ./examples/demo.py --value 5", num_gpus: 2 } diff --git a/examples/demo.py b/examples/demo.py index 7718ef8..c44770e 100644 --- a/examples/demo.py +++ b/examples/demo.py @@ -6,7 +6,6 @@ parser.add_argument("--value", type=int, default=0) args = parser.parse_args() - GPU_IDS = os.environ["CUDA_VISIBLE_DEVICES"] print(f"[GPUs: {GPU_IDS}] Start {args.value}") time.sleep(args.value * 2) diff --git a/run_it.py b/run_it.py index b22343d..1416f56 100644 --- a/run_it.py +++ b/run_it.py @@ -5,6 +5,7 @@ import argparse import os +import signal import subprocess import time from enum import Enum @@ -24,18 +25,21 @@ class STATUS(Enum): def worker(cmd: str, gpu_ids: str, queue: Queue, job_id: int, done_jobs: dict): job_identifier = f"[Job-{job_id}:GPU-{gpu_ids}]" - try: - print(f"{job_identifier} Executing {cmd}...") - - # 设置子程序环境变量 - env = os.environ.copy() - env["CUDA_VISIBLE_DEVICES"] = gpu_ids - subprocess.run(cmd, shell=True, check=True, env=env) - done_jobs[job_id] = STATUS.DONE - except subprocess.CalledProcessError as e: - print(f"{job_identifier} Command '{cmd}' failed: {e}") - - done_jobs[job_id] = STATUS.FAILED + # 设置子程序环境变量 + env = os.environ.copy() + env["CUDA_VISIBLE_DEVICES"] = gpu_ids + + # subprocess.run(cmd, shell=True, check=True, env=env) + with subprocess.Popen(cmd, shell=True, env=env) as sub_proc: + # 使用subprocess.Popen代替subprocess.run + try: + print(f"{job_identifier} Executing {cmd}...") + sub_proc.wait() + done_jobs[job_id] = STATUS.DONE + except Exception as e: + print(f"{job_identifier} Command '{cmd}' failed: {e}") + sub_proc.terminate() + done_jobs[job_id] = STATUS.FAILED # 释放GPU资源回队列 for gpu in gpu_ids.split(","): @@ -48,9 +52,9 @@ def get_args(): parser = argparse.ArgumentParser() parser.add_argument("--gpu-pool", nargs="+", type=int, default=[0], help="The pool containing all ids of your gpu devices.") parser.add_argument("--max-workers", type=int, help="The max number of the workers.") - parser.add_argument("--cmd-pool",type=str, required=True, help="The path of the yaml containing all cmds.") - parser.add_argument("--interval-for-waiting-gpu",type=int, default=3, help="In seconds, the interval for waiting for a GPU to be available.") - parser.add_argument("--interval-for-loop",type=int, default=1, help="In seconds, the interval for looping.") + parser.add_argument("--cmd-pool", type=str, required=True, help="The path of the yaml containing all cmds.") + parser.add_argument("--interval-for-waiting-gpu", type=int, default=3, help="In seconds, the interval for waiting for a GPU to be available.") + parser.add_argument("--interval-for-loop", type=int, default=1, help="In seconds, the interval for looping.") # fmt: on args = parser.parse_args() @@ -59,6 +63,10 @@ def get_args(): return args +def init_worker(): + signal.signal(signal.SIGINT, signal.SIG_IGN) + + def main(): args = get_args() print("[YOUR CONFIG]\n" + str(args)) @@ -80,41 +88,49 @@ def main(): raise ValueError(f"The number of gpus in job {job_id} is larger than the number of available gpus.") done_jobs[job_id] = STATUS.WAITING - # 创建进程池 - pool = Pool(processes=args.max_workers) - # 循环处理指令,直到所有指令都被处理 - while not all([status is STATUS.DONE for status in done_jobs.values()]): - for job_id, job_info in enumerate(jobs): - if done_jobs[job_id] in [STATUS.DONE, STATUS.RUNNING]: - continue - # else: STATUS.WAITING, STATUS.FAILED - - # job_name = job_info["name"] - command = job_info["command"] - num_gpus = job_info["num_gpus"] - - num_avaliable_gpus = available_gpus.qsize() - # 如果当前有足够的GPU资源,执行指令 - if num_gpus <= num_avaliable_gpus: - done_jobs[job_id] = STATUS.RUNNING - # 从队列中获取可用的GPU资源 - gpu_ids = ",".join([available_gpus.get() for _ in range(num_gpus)]) - # 执行给定的指令,并提供回调函数来更新完成的命令列表 - pool.apply_async(worker, args=(command, gpu_ids, available_gpus, job_id, done_jobs)) - else: - # 如果GPU资源不足,跳过当前指令,稍后重试 - print(f"Skipping '{command}', not enough GPUs available ({num_gpus} > {num_avaliable_gpus}).") - # 等待一段时间再次检查 - time.sleep(args.interval_for_waiting_gpu) - - # 等待一段时间再次检查 - time.sleep(args.interval_for_loop) - - # 关闭进程池并等待所有任务完成 - pool.close() - pool.join() - manager.shutdown() + # 在创建进程池之前注册信号处理器,以便在接收到中断信号时执行清理操作 + original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) + pool = Pool(processes=args.max_workers, initializer=init_worker) + # 将原始的信号处理器恢复 + signal.signal(signal.SIGINT, original_sigint_handler) + try: + # 循环处理指令,直到所有指令都被处理 + while not all([status is STATUS.DONE for status in done_jobs.values()]): + for job_id, job_info in enumerate(jobs): + if done_jobs[job_id] in [STATUS.DONE, STATUS.RUNNING]: + continue + # else: STATUS.WAITING, STATUS.FAILED + + # job_name = job_info["name"] + command = job_info["command"] + num_gpus = job_info["num_gpus"] + + num_avaliable_gpus = available_gpus.qsize() + # 如果当前有足够的GPU资源,执行指令 + if num_gpus <= num_avaliable_gpus: + done_jobs[job_id] = STATUS.RUNNING + # 从队列中获取可用的GPU资源 + gpu_ids = ",".join([available_gpus.get() for _ in range(num_gpus)]) + # 执行给定的指令,并提供回调函数来更新完成的命令列表 + pool.apply_async(worker, args=(command, gpu_ids, available_gpus, job_id, done_jobs)) + else: + # 如果GPU资源不足,跳过当前指令,稍后重试 + print(f"Skipping '{command}', not enough GPUs available ({num_gpus} > {num_avaliable_gpus}).") + # 等待一段时间再次检查 + time.sleep(args.interval_for_waiting_gpu) + + # 等待一段时间再次检查 + time.sleep(args.interval_for_loop) + + # 关闭进程池并等待所有任务完成 + pool.close() + except KeyboardInterrupt: + print("[CAUGHT KEYBOARDINTERRUPT, TERMINATING WORKERS!]") + pool.terminate() + finally: + pool.join() + manager.shutdown() print("[ALL COMMANDS HAVE BEEN COMPLETED!]") From 64cb4fa1aefabbb60dfd0984e9a58547c50e267f Mon Sep 17 00:00:00 2001 From: lart Date: Sat, 20 Apr 2024 12:32:47 +0800 Subject: [PATCH 4/5] Update examples, readme.md. --- README.md | 26 +++++++++++++++++++++++++- examples/config.yaml | 6 ++++++ examples/demo.py | 6 ++++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 6049a62..e24aad6 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,8 @@ Putting the machine into sleep is a disrespect for time. ```shell $ python run_it.py --help -usage: run_it.py [-h] [--gpu-pool GPU_POOL [GPU_POOL ...]] [--max-workers MAX_WORKERS] --cmd-pool CMD_POOL [--interval-for-waiting-gpu INTERVAL_FOR_WAITING_GPU] [--interval-for-loop INTERVAL_FOR_LOOP] +usage: run_it.py [-h] [--gpu-pool GPU_POOL [GPU_POOL ...]] [--max-workers MAX_WORKERS] --cmd-pool CMD_POOL + [--interval-for-waiting-gpu INTERVAL_FOR_WAITING_GPU] [--interval-for-loop INTERVAL_FOR_LOOP] optional arguments: -h, --help show this help message and exit @@ -63,8 +64,31 @@ $ python run_it.py --gpu-pool 0 2 3 --max-workers 3 --cmd-pool .\examples\config +```mermaid +graph TD + A[Start] --> B[Read Configuration and Command Pool] + B --> C[Initialize Shared Resources] + C --> |Maximum number of requirements met| D[Loop Until All Jobs Done] + D --> E[Check Available GPUs] + E -->|Enough GPUs| F[Run Job in Separate Process] + E -->|Not Enough GPUs| G[Wait and Retry] + F --> H[Job Completes] + F --> I[Job Fails] + H --> J[Update Job Status and Return GPUs] + I --> J + G --> D + J -->|All Jobs Done| K[End] + C -->|Maximum number of requirements not met| L[Terminate Workers] + L --> M[Shutdown Manager and Join Pool] + M --> K +``` + ## Thanks +[@BitCalSaul](https://github.com/BitCalSaul): Thanks for the positive feedbacks! + - + - + - - https://www.jb51.net/article/142787.htm - https://docs.python.org/zh-cn/3/library/subprocess.html - https://stackoverflow.com/a/23616229 diff --git a/examples/config.yaml b/examples/config.yaml index 19a6479..6a21a12 100644 --- a/examples/config.yaml +++ b/examples/config.yaml @@ -1,6 +1,12 @@ - name: job1 command: "python ./examples/demo.py --value 1" num_gpus: 1 +- name: job02 + command: "python ./examples/demo.py --value 1 --exception" + num_gpus: 1 +- name: job03 + command: "python ./examples/demo.py --value 1 --exception" + num_gpus: 1 - name: job2 command: "python ./examples/demo.py --value 2" num_gpus: 1 diff --git a/examples/demo.py b/examples/demo.py index c44770e..a756f9d 100644 --- a/examples/demo.py +++ b/examples/demo.py @@ -4,9 +4,15 @@ parser = argparse.ArgumentParser() parser.add_argument("--value", type=int, default=0) +parser.add_argument("--exception", action="store_true", default=False) args = parser.parse_args() + GPU_IDS = os.environ["CUDA_VISIBLE_DEVICES"] print(f"[GPUs: {GPU_IDS}] Start {args.value}") + +if args.exception: + raise Exception(f"[GPUs: {GPU_IDS}] Internal Exception!") + time.sleep(args.value * 2) print(f"[GPUs: {GPU_IDS}] End {args.value}") From 3c4e18989bc1904d8b4ddaabf132b65cf43e1bd8 Mon Sep 17 00:00:00 2001 From: lart Date: Sat, 20 Apr 2024 12:34:09 +0800 Subject: [PATCH 5/5] Fixed typos. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index e24aad6..53caede 100644 --- a/README.md +++ b/README.md @@ -85,7 +85,7 @@ graph TD ## Thanks -[@BitCalSaul](https://github.com/BitCalSaul): Thanks for the positive feedbacks! +- [@BitCalSaul](https://github.com/BitCalSaul): Thanks for the positive feedbacks! - - -