diff --git a/README.md b/README.md
index 34fa794..53caede 100644
--- a/README.md
+++ b/README.md
@@ -14,46 +14,81 @@ Putting the machine into sleep is a disrespect for time.
```shell
$ python run_it.py --help
-usage: run_it.py [-h] [--verbose] [--gpu-pool GPU_POOL [GPU_POOL ...]] [--max-workers MAX_WORKERS] --cmd-pool CMD_POOL
- [--max-used-ratio MAX_USED_RATIO]
+usage: run_it.py [-h] [--gpu-pool GPU_POOL [GPU_POOL ...]] [--max-workers MAX_WORKERS] --cmd-pool CMD_POOL
+ [--interval-for-waiting-gpu INTERVAL_FOR_WAITING_GPU] [--interval-for-loop INTERVAL_FOR_LOOP]
optional arguments:
-h, --help show this help message and exit
- --verbose Whether to print the output of the subprocess.
--gpu-pool GPU_POOL [GPU_POOL ...]
The pool containing all ids of your gpu devices.
--max-workers MAX_WORKERS
The max number of the workers.
- --cmd-pool CMD_POOL The text file containing all your commands. It need to contain the launcher.
- --max-used-ratio MAX_USED_RATIO
- The max used ratio of the gpu.
+ --cmd-pool CMD_POOL The path of the yaml containing all cmds.
+ --interval-for-waiting-gpu INTERVAL_FOR_WAITING_GPU
+ In seconds, the interval for waiting for a GPU to be available.
+ --interval-for-loop INTERVAL_FOR_LOOP
+ In seconds, the interval for looping.
```
## demo
```shell
-$ python run_it.py --verbose --cmd-pool ./examples/config.txt # with the default `gpu-pool` and `max-workers`
-$ python run_it.py --verbose --gpu-pool 0 1 --max-workers 2 --cmd-pool ./examples/config.txt
+$ python run_it.py --cmd-pool ./examples/config.yaml # with the default `gpu-pool` and `max-workers`
+$ python run_it.py --gpu-pool 0 2 3 --max-workers 3 --cmd-pool .\examples\config.yaml
```
-./examples/config.txt
+./examples/config.yaml
-```shell
-$ cat ./examples/config.txt
-python -m pip list
-python --version
-python ./examples/demo.py
-python ./examples/demo.py
-python ./examples/demo.py
+```yaml
+- name: job1
+ command: "python ./examples/demo.py --value 1"
+ num_gpus: 1
+- name: job2
+ command: "python ./examples/demo.py --value 2"
+ num_gpus: 1
+- name: job3
+ command: "python ./examples/demo.py --value 3"
+ num_gpus: 1
+- name: job4
+ command: "python ./examples/demo.py --value 4"
+ num_gpus: 1
+- name: job5
+ command: "python ./examples/demo.py --value 5"
+ num_gpus: 2
+- { name: job6, command: "python ./examples/demo.py --value 5", num_gpus: 2 }
+- { name: job7, command: "python ./examples/demo.py --value 5", num_gpus: 2 }
```
+```mermaid
+graph TD
+ A[Start] --> B[Read Configuration and Command Pool]
+ B --> C[Initialize Shared Resources]
+ C --> |Maximum number of requirements met| D[Loop Until All Jobs Done]
+ D --> E[Check Available GPUs]
+ E -->|Enough GPUs| F[Run Job in Separate Process]
+ E -->|Not Enough GPUs| G[Wait and Retry]
+ F --> H[Job Completes]
+ F --> I[Job Fails]
+ H --> J[Update Job Status and Return GPUs]
+ I --> J
+ G --> D
+ J -->|All Jobs Done| K[End]
+ C -->|Maximum number of requirements not met| L[Terminate Workers]
+ L --> M[Shutdown Manager and Join Pool]
+ M --> K
+```
+
## Thanks
+- [@BitCalSaul](https://github.com/BitCalSaul): Thanks for the positive feedbacks!
+ -
+ -
+ -
- https://www.jb51.net/article/142787.htm
- https://docs.python.org/zh-cn/3/library/subprocess.html
- https://stackoverflow.com/a/23616229
diff --git a/examples/config.txt b/examples/config.txt
deleted file mode 100644
index 71ffcab..0000000
--- a/examples/config.txt
+++ /dev/null
@@ -1,5 +0,0 @@
-python -m pip list
-python --version
-python ./examples/demo.py
-python ./examples/demo.py
-python ./examples/demo.py
diff --git a/examples/config.yaml b/examples/config.yaml
new file mode 100644
index 0000000..6a21a12
--- /dev/null
+++ b/examples/config.yaml
@@ -0,0 +1,23 @@
+- name: job1
+ command: "python ./examples/demo.py --value 1"
+ num_gpus: 1
+- name: job02
+ command: "python ./examples/demo.py --value 1 --exception"
+ num_gpus: 1
+- name: job03
+ command: "python ./examples/demo.py --value 1 --exception"
+ num_gpus: 1
+- name: job2
+ command: "python ./examples/demo.py --value 2"
+ num_gpus: 1
+- name: job3
+ command: "python ./examples/demo.py --value 3"
+ num_gpus: 1
+- name: job4
+ command: "python ./examples/demo.py --value 4"
+ num_gpus: 1
+- name: job5
+ command: "python ./examples/demo.py --value 5"
+ num_gpus: 2
+- { name: job6, command: "python ./examples/demo.py --value 5", num_gpus: 2 }
+- { name: job7, command: "python ./examples/demo.py --value 5", num_gpus: 2 }
diff --git a/examples/demo.py b/examples/demo.py
index 12d3639..a756f9d 100644
--- a/examples/demo.py
+++ b/examples/demo.py
@@ -1 +1,18 @@
-print('Hello!')
+import argparse
+import os
+import time
+
+parser = argparse.ArgumentParser()
+parser.add_argument("--value", type=int, default=0)
+parser.add_argument("--exception", action="store_true", default=False)
+args = parser.parse_args()
+
+
+GPU_IDS = os.environ["CUDA_VISIBLE_DEVICES"]
+print(f"[GPUs: {GPU_IDS}] Start {args.value}")
+
+if args.exception:
+ raise Exception(f"[GPUs: {GPU_IDS}] Internal Exception!")
+
+time.sleep(args.value * 2)
+print(f"[GPUs: {GPU_IDS}] End {args.value}")
diff --git a/requirements.txt b/requirements.txt
index b3c5cb6..52d2303 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,3 @@
# Automatically generated by https://github.com/damnever/pigar.
-# RunIt/run_it.py: 12
-nvidia_ml_py3 == 7.352.0
+PyYAML==6.0
diff --git a/run_it.py b/run_it.py
index 50c5f3c..1416f56 100644
--- a/run_it.py
+++ b/run_it.py
@@ -4,197 +4,136 @@
# @GitHub : https://github.com/lartpang
import argparse
+import os
+import signal
import subprocess
import time
from enum import Enum
-from multiprocessing import Process
+from multiprocessing import Manager, Pool, freeze_support
+from queue import Queue
-import pynvml
+import yaml
-pynvml.nvmlInit()
-
-# SOME CONSTANT
class STATUS(Enum):
- NORMAL = 0
- CMD_INVALID = 1
- GPU_BUSY = 2
-
-
-class MyProcess:
- slot_idx = -1
- curr_task_id = 0
-
- def __init__(
- self,
- gpu_id,
- verbose=True,
- num_cmds=None,
- max_used_ratio=0.5,
- *,
- stdin=None,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- ):
- super().__init__()
- self.gpu_id = gpu_id
- self.verbose = verbose
- self.num_cmds = num_cmds
-
- self.stdin = stdin
- self.stdout = stdout
- self.stderr = stderr
-
- self.sub_proc = None
- self.proc = None
-
- self.gpu_handler = pynvml.nvmlDeviceGetHandleByIndex(gpu_id)
- self.max_used_ratio = max_used_ratio
- MyProcess.slot_idx += 1
-
- def __str__(self):
- return f"[ID {self.slot_idx} INFO] NEW PROCESS SLOT ON GPU {self.gpu_id} IS CREATED!"
-
- def _used_ratio(self, used, total):
- return used / total
-
- def get_used_mem(self, return_ratio=False):
- meminfo = pynvml.nvmlDeviceGetMemoryInfo(self.gpu_handler)
- if return_ratio:
- return self._used_ratio(meminfo.used, meminfo.total)
- return meminfo.used
-
- def _create_sub_proc(self, cmd=""):
- self.sub_proc = subprocess.Popen(
- args=cmd,
- stdin=self.stdin,
- stdout=self.stdout,
- stderr=self.stderr,
- shell=True,
- # executable="bash",
- # env=None,
- close_fds=True,
- bufsize=1,
- text=True,
- encoding="utf-8",
- )
- print(f"[NEW TASK PID: {self.sub_proc.pid}] {self.sub_proc.args}")
-
- if self.verbose:
- if self.stdout is not None and self.sub_proc is not None:
- for l in self.sub_proc.stdout:
- print(f"[ID: {self.curr_task_id}/{self.num_cmds} GPU: {self.gpu_id}] {l}", end="")
-
- def create_and_start_proc(self, cmd=None):
- if (used_mem := self.get_used_mem(return_ratio=True)) > self.max_used_ratio:
- # TODO:
- # 当前的判定方式并不是太准确。最好的方式是由程序提供设置周期数的选项(`--num-epochs`),
- # 首先按照num_epoch=1来进行初步的运行,并统计各个命令对应使用的显存。
- # 之后根据这些程序实际使用的显存来安排后续的操作。
- # 这可能需要程序对输出可以实现覆盖式(`--overwrite`)操作。
- self.status = STATUS.GPU_BUSY
- print(
- f"[ID {self.slot_idx} WARN] the memory usage of the GPU {self.gpu_id} is currently {used_mem}, "
- f"which exceeds the maximum threshold {self.max_used_ratio}."
- )
- return
-
- print(f"[ID {self.slot_idx} INFO] {cmd}")
- MyProcess.curr_task_id += 1
- self.proc = Process(target=self._create_sub_proc, kwargs=dict(cmd=cmd))
- self.proc.start()
-
- # 只有成功创建并启动了进城后才改变状态
- self.status = STATUS.NORMAL
-
- def is_alive(self):
- if self.status == STATUS.NORMAL:
- return self.proc.is_alive()
- return False
-
-
-def read_cmds_from_txt(path):
- with open(path, encoding="utf-8", mode="r") as f:
- cmds = []
- for line in f:
- line = line.strip()
- if line:
- cmds.append(line)
- return cmds
-
-
-# fmt: off
+ WAITING = 0
+ RUNNING = 1
+ DONE = 2
+ FAILED = 3
+
+
+def worker(cmd: str, gpu_ids: str, queue: Queue, job_id: int, done_jobs: dict):
+ job_identifier = f"[Job-{job_id}:GPU-{gpu_ids}]"
+
+ # 设置子程序环境变量
+ env = os.environ.copy()
+ env["CUDA_VISIBLE_DEVICES"] = gpu_ids
+
+ # subprocess.run(cmd, shell=True, check=True, env=env)
+ with subprocess.Popen(cmd, shell=True, env=env) as sub_proc:
+ # 使用subprocess.Popen代替subprocess.run
+ try:
+ print(f"{job_identifier} Executing {cmd}...")
+ sub_proc.wait()
+ done_jobs[job_id] = STATUS.DONE
+ except Exception as e:
+ print(f"{job_identifier} Command '{cmd}' failed: {e}")
+ sub_proc.terminate()
+ done_jobs[job_id] = STATUS.FAILED
+
+ # 释放GPU资源回队列
+ for gpu in gpu_ids.split(","):
+ queue.put(gpu)
+ print(f"{job_identifier} Release GPU {gpu_ids}...")
+
+
def get_args():
+ # fmt: off
parser = argparse.ArgumentParser()
- parser.add_argument("--verbose", action="store_true", help="Whether to print the output of the subprocess.")
parser.add_argument("--gpu-pool", nargs="+", type=int, default=[0], help="The pool containing all ids of your gpu devices.")
parser.add_argument("--max-workers", type=int, help="The max number of the workers.")
- parser.add_argument("--cmd-pool",type=str, required=True, help="The text file containing all your commands. It need to contain the launcher.")
- parser.add_argument("--max-used-ratio", type=float, default=0.5, help="The max used ratio of the gpu.")
+ parser.add_argument("--cmd-pool", type=str, required=True, help="The path of the yaml containing all cmds.")
+ parser.add_argument("--interval-for-waiting-gpu", type=int, default=3, help="In seconds, the interval for waiting for a GPU to be available.")
+ parser.add_argument("--interval-for-loop", type=int, default=1, help="In seconds, the interval for looping.")
+ # fmt: on
+
args = parser.parse_args()
if args.max_workers is None:
args.max_workers = len(args.gpu_pool)
return args
-# fmt: on
+
+
+def init_worker():
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
def main():
args = get_args()
print("[YOUR CONFIG]\n" + str(args))
- cmd_pool = read_cmds_from_txt(path=args.cmd_pool)
- print("[YOUR CMDS]\n" + "\n".join(cmd_pool))
-
- num_gpus = len(args.gpu_pool)
-
- print("[CREATE PROCESS OBJECTS]")
- proc_slots = []
- for i in range(min(args.max_workers, len(cmd_pool))): # 确保slots数量小于等于命令数量
- gpu_id = i % num_gpus
- proc = MyProcess(
- gpu_id=args.gpu_pool[gpu_id],
- verbose=args.verbose,
- num_cmds=len(cmd_pool),
- max_used_ratio=args.max_used_ratio,
- )
- print(proc)
- proc_slots.append(proc)
-
- for p in proc_slots:
- if len(cmd_pool) == 0: # 确保出栈不会异常
- break
- cmd = cmd_pool.pop() # 指令出栈
- p.create_and_start_proc(cmd=cmd)
- if p.status == STATUS.GPU_BUSY: # 当前GPU显存不足,暂先跳过
- cmd_pool.append(cmd) # 指令未能顺利执行,重新入栈
- continue
-
- is_normal_ending = True
- while proc_slots:
- # the pool of the processes is not empty
- for slot_idx, p in enumerate(proc_slots): # polling
- if not p.is_alive():
- if len(cmd_pool) == 0: # 指令均在执行或者已被执行
- del proc_slots[slot_idx]
- print("[NO MORE COMMANDS, DELETE THE PROCESS SLOT!]")
- break
-
- cmd = cmd_pool.pop()
- p.create_and_start_proc(cmd=cmd)
- if p.status == STATUS.GPU_BUSY: # 当前GPU显存不足,暂先跳过
- cmd_pool.append(cmd) # 指令未能顺利执行,重新入栈
- continue
-
- if proc_slots and all([_p.status == STATUS.GPU_BUSY for _p in proc_slots]):
- # 所有GPU都被外部程序占用,直接退出。因为如果我们的程序正常执行时,状态是NORMAL
- print("[ALL GPUS ARE BUSY, EXIT THE LOOP!]")
- proc_slots.clear()
- is_normal_ending = False
- break
- time.sleep(1)
- if is_normal_ending:
- print("[ALL COMMANDS HAVE BEEN COMPLETED!]")
+ with open(args.cmd_pool, mode="r", encoding="utf-8") as f:
+ jobs = yaml.safe_load(f)
+ assert isinstance(jobs, (tuple, list)), jobs
+ print("[YOUR CMDS]\n" + "\n\t".join([str(job) for job in jobs]))
+
+ manager = Manager()
+ # 创建一个跨进程共享的队列来统计空余的GPU资源
+ available_gpus = manager.Queue()
+ for i in args.gpu_pool:
+ available_gpus.put(str(i))
+ # 创建一个跨进程共享的dict来跟踪已完成的命令
+ done_jobs = manager.dict()
+ for job_id, job_info in enumerate(jobs):
+ if job_info["num_gpus"] > len(args.gpu_pool):
+ raise ValueError(f"The number of gpus in job {job_id} is larger than the number of available gpus.")
+ done_jobs[job_id] = STATUS.WAITING
+
+ # 在创建进程池之前注册信号处理器,以便在接收到中断信号时执行清理操作
+ original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
+ pool = Pool(processes=args.max_workers, initializer=init_worker)
+ # 将原始的信号处理器恢复
+ signal.signal(signal.SIGINT, original_sigint_handler)
+
+ try:
+ # 循环处理指令,直到所有指令都被处理
+ while not all([status is STATUS.DONE for status in done_jobs.values()]):
+ for job_id, job_info in enumerate(jobs):
+ if done_jobs[job_id] in [STATUS.DONE, STATUS.RUNNING]:
+ continue
+ # else: STATUS.WAITING, STATUS.FAILED
+
+ # job_name = job_info["name"]
+ command = job_info["command"]
+ num_gpus = job_info["num_gpus"]
+
+ num_avaliable_gpus = available_gpus.qsize()
+ # 如果当前有足够的GPU资源,执行指令
+ if num_gpus <= num_avaliable_gpus:
+ done_jobs[job_id] = STATUS.RUNNING
+ # 从队列中获取可用的GPU资源
+ gpu_ids = ",".join([available_gpus.get() for _ in range(num_gpus)])
+ # 执行给定的指令,并提供回调函数来更新完成的命令列表
+ pool.apply_async(worker, args=(command, gpu_ids, available_gpus, job_id, done_jobs))
+ else:
+ # 如果GPU资源不足,跳过当前指令,稍后重试
+ print(f"Skipping '{command}', not enough GPUs available ({num_gpus} > {num_avaliable_gpus}).")
+ # 等待一段时间再次检查
+ time.sleep(args.interval_for_waiting_gpu)
+
+ # 等待一段时间再次检查
+ time.sleep(args.interval_for_loop)
+
+ # 关闭进程池并等待所有任务完成
+ pool.close()
+ except KeyboardInterrupt:
+ print("[CAUGHT KEYBOARDINTERRUPT, TERMINATING WORKERS!]")
+ pool.terminate()
+ finally:
+ pool.join()
+ manager.shutdown()
+ print("[ALL COMMANDS HAVE BEEN COMPLETED!]")
if __name__ == "__main__":
+ freeze_support()
main()