-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutilities.py
executable file
·68 lines (53 loc) · 2.04 KB
/
utilities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import shutil
import os
import sys
import time
import multiprocessing
import logging
def check_dependencies():
# List of commands to check
commands = ["hmmsearch", "featureCounts", "prodigal", "samtools"]
for command in commands:
add_to_path(command)
# Find the full path to the command
def get_path(command):
return shutil.which(command)
def add_to_path(command):
command_path = get_path(command)
if command_path:
# Get the directory containing the command
command_dir = os.path.dirname(command_path)
# Split the current PATH into a list of directories
current_path = os.environ['PATH'].split(os.pathsep)
# Only add the command's directory to the PATH if it's not already there
if command_dir not in current_path:
os.environ['PATH'] = command_dir + os.pathsep + os.environ['PATH']
else:
raise ValueError(f"ERROR: {command} command not found")
# Returns the number of logical cores on the cpu
def get_logical_cores():
return multiprocessing.cpu_count()
# If the command used to run in parallel is in the PATH, return the number of logical cores, otherwise return 1
def run_in_parallel(command):
try:
command_path = get_path(command)
if command_path is not None:
logical_cores = get_logical_cores()
if isinstance(logical_cores, int):
return logical_cores
return 1
except Exception:
return 1
def spinning_cursor():
while True:
for cursor in '|/-\\':
yield cursor
def spinning_cursor_task(task_done,program):
spinner = spinning_cursor()
logging.info(f"Starting {program} in the background. This may take a while...")
while not task_done.is_set():
sys.stdout.write(next(spinner)) # write the next character
sys.stdout.flush() # flush stdout buffer (actual character display)
sys.stdout.write('\b') # erase the last written char
time.sleep(0.1)
logging.info(f"{program} finished running.")