-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathworkflow-libs.sh
executable file
·276 lines (236 loc) · 10.5 KB
/
workflow-libs.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
#!/bin/bash
# These are utility bash functions intended to make workflow building easier. They can
# be imported by any bash workflow using source /swift-pw-bin/utils/workflow-libs.sh
# All these functions run as part of the workflow inside the user workspace.
if [ -z "$sshcmd" ]; then
export sshcmd="ssh -o StrictHostKeyChecking=no ${resource_publicIp}"
fi
single_cluster_rsync() {
resource_dir=$1
resource_label=$(basename ${resource_dir})
# Load resource inputs
echo "export PW_RESOURCE_DIR=${PWD}/${resource_dir}" >> ${resource_dir}/inputs.sh
source ${resource_dir}/inputs.sh
# Copy the file containing this function to the resource directory
cp ${BASH_SOURCE[0]} ${resource_dir}
# Rsync resource directory in user space to job directory in the resource
origin=${resource_dir}/
destination=${resource_publicIp}:${resource_jobdir}/${resource_label}/
echo "rsync -avzq --rsync-path="mkdir -p ${resource_jobdir} && rsync " ${origin} ${destination}"
rsync -avzq --rsync-path="mkdir -p ${resource_jobdir} && rsync " ${origin} ${destination}
}
cluster_rsync() {
# DESCRIPTION:
# Copies the ./resources/<resource-label>/ directory to the job directory in the remote resource
# PREREQUISITES:
# Run python3 /swift-pw-bin/utils/input_form_resource_wrapper.py before this function
for resource_dir in "resources"/*/; do
single_cluster_rsync "${PWD}/${resource_dir}"
return_code=$?
if [ ${return_code} -ne 0 ]; then
${sshcmd} ${resource_jobdir}/${resource_label}/cancel.sh
exit 1
fi
done
}
single_cluster_rsync_exec() {
path_to_rsync_exec_sh=$1
chmod +x ${path_to_rsync_exec_sh}
resource_dir=$(dirname ${path_to_rsync_exec_sh})
resource_label=$(basename ${resource_dir})
# Load resource inputs
echo "export PW_RESOURCE_DIR=${PWD}/${resource_dir}" >> ${resource_dir}/inputs.sh
source ${resource_dir}/inputs.sh
echo; echo "Running ${path_to_rsync_exec_sh} in ${resource_publicIp}"
# Copy the file containing this function to the resource directory
cp ${BASH_SOURCE[0]} ${resource_dir}
# Rsync resource directory in user space to job directory in the resource
origin=${resource_dir}/
destination=${resource_publicIp}:${resource_jobdir}/${resource_label}/
echo "rsync -avzq --rsync-path="mkdir -p ${resource_jobdir} && rsync " ${origin} ${destination}"
rsync -avzq --rsync-path="mkdir -p ${resource_jobdir} && rsync " ${origin} ${destination}
# Execute the script
echo "${sshcmd} ${resource_jobdir}/${resource_label}/cluster_rsync_exec.sh"
${sshcmd} ${resource_jobdir}/${resource_label}/cluster_rsync_exec.sh
# Check if the SSH command failed
if [ $? -ne 0 ]; then
echo "single_cluster_rsync_exec ${path_to_rsync_exec_sh} failed"
return 1
fi
}
cluster_rsync_exec() {
# DESCRIPTION:
# 1. Looks for every script named cluster_rsync_exec.sh under the ./resources directory
# 1. Copies the ./resources/<resource-label>/ directory to the job directory in the remote resource
# 2. Executes the script ./resources/<resource-label>/cluster_rsync_exec.sh in the remote resource
# PREREQUISITES:
# Run python3 /swift-pw-bin/utils/input_form_resource_wrapper.py before this function
for path_to_rsync_exec_sh in $(find resources -name cluster_rsync_exec.sh); do
single_cluster_rsync_exec ${path_to_rsync_exec_sh}
return_code=$?
if [ ${return_code} -ne 0 ]; then
${sshcmd} ${resource_jobdir}/${resource_label}/cancel.sh
exit 1
fi
done
}
cancel_jobs_by_name() {
# Cancels the jobs submitted by the cluster_rsync_exec function using the job's name
for resource_inputs_sh in $(find resources -name inputs.sh); do
resource_dir=$(dirname ${resource_inputs_sh})
resource_label=$(basename ${resource_dir})
source ${resource_inputs_sh}
if [ "${jobschedulertype}" != "PBS" ] && [ "${jobschedulertype}" != "SLURM" ]; then
continue
fi
echo; echo "Canceling jobs in ${resource_name} - ${resource_publicIp}"
# Prepare cancel script
if [[ ${jobschedulertype} == "SLURM" ]]; then
# FIXME: Add job_name to input_form_resource_wrapper
job_name=$(cat ${resource_dir}/batch_header.sh | grep -e '--job-name' | cut -d'=' -f2)
job_ids=$(ssh -o StrictHostKeyChecking=no ${resource_publicIp} squeue -h -o "%i" -n ${job_name})
if [ -z "${job_ids}" ]; then
echo "No jobs found in ${resource_name} - ${resource_publicIp}"
continue
fi
elif [[ ${jobschedulertype} == "PBS" ]]; then
# FIXME: Add job_name to input_form_resource_wrapper
job_name=$(cat ${resource_dir}/batch_header.sh | grep -e '#PBS -N' | cut -d'=' -f2)
job_ids=$(ssh -o StrictHostKeyChecking=no ${resource_publicIp} qselect -N ${job_name})
if [ -z "${job_ids}" ]; then
echo "No jobs found in ${resource_name} - ${resource_publicIp}"
continue
fi
fi
echo "${cancel_cmd} ${job_ids}" | tr '\n' ' ' >> ${resource_dir}/cancel_job.sh
echo "${resource_dir}/cancel_job.sh:"
cat ${resource_dir}/cancel_job.sh
# Run cancel script
echo "ssh -o StrictHostKeyChecking=no ${resource_publicIp} 'bash -s' < ${resource_dir}/cancel_job.sh"
ssh -o StrictHostKeyChecking=no ${resource_publicIp} 'bash -s' < ${resource_dir}/cancel_job.sh
done
}
cancel_jobs_by_script() {
# Runs every cancel.sh script located on the remote resource directory
for resource_inputs_sh in $(find resources -name inputs.sh); do
resource_dir=$(dirname ${resource_inputs_sh})
resource_label=$(basename ${resource_dir})
source ${resource_inputs_sh}
cancel_script="${resource_jobdir}/${resource_label}/cancel.sh"
cancel_script_exists=$(ssh ${resource_publicIp} "[ -f \"${cancel_script}\" ]" && echo "True" || echo "False")
if [[ ${cancel_script_exists} == "True" ]]; then
echo; echo "Running Canceling script ${cancel_script} in ${resource_name} - ${resource_publicIp}"
ssh -o StrictHostKeyChecking=no ${resource_publicIp} ${cancel_script}
fi
done
}
get_slurm_job_status() {
# Get the header line to determine the column index corresponding to the job status
if [ -z "${SQUEUE_HEADER}" ]; then
export SQUEUE_HEADER="$(eval "$sshcmd ${status_cmd}" | awk 'NR==1')"
fi
status_column=$(echo "${SQUEUE_HEADER}" | awk '{ for (i=1; i<=NF; i++) if ($i ~ /^S/) { print i; exit } }')
status_response=$(eval $sshcmd ${status_cmd} | awk -v jobid="${jobid}" '$1 == jobid')
echo "${SQUEUE_HEADER}"
echo "${status_response}"
export job_status=$(echo ${status_response} | awk -v id="${jobid}" -v col="$status_column" '{print $col}')
}
get_pbs_job_status() {
# Get the header line to determine the column index corresponding to the job status
if [ -z "${QSTAT_HEADER}" ]; then
export QSTAT_HEADER="$(eval "$sshcmd ${status_cmd}" | awk 'NR==1')"
fi
status_response=$(eval $sshcmd ${status_cmd} 2>/dev/null | grep "\<${jobid}\>")
echo "${QSTAT_HEADER}"
echo "${status_response}"
export job_status="$(eval $sshcmd ${status_cmd} -f ${jobid} 2>/dev/null | grep job_state | cut -d'=' -f2 | tr -d ' ')"
}
wait_job() {
while true; do
sleep 15
# squeue won't give you status of jobs that are not running or waiting to run
# qstat returns the status of all recent jobs
if [[ ${jobschedulertype} == "SLURM" ]]; then
get_slurm_job_status
# If job status is empty job is no longer running
if [ -z "${job_status}" ]; then
job_status=$($sshcmd sacct -j ${jobid} --format=state | tail -n1)
break
fi
elif [[ ${jobschedulertype} == "PBS" ]]; then
get_pbs_job_status
if [[ "${job_status}" == "C" ]]; then
break
elif [ -z "${job_status}" ]; then
break
fi
fi
done
}
install_miniconda() {
install_dir=$1
echo "Installing Miniconda3-latest-Linux-x86_64.sh"
conda_repo="https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh"
ID=$(date +%s)-${RANDOM} # This script may run at the same time!
nohup wget --no-check-certificate ${conda_repo} -O /tmp/miniconda-${ID}.sh 2>&1 > /tmp/miniconda_wget-${ID}.out
rm -rf ${install_dir}
mkdir -p $(dirname ${install_dir})
nohup bash /tmp/miniconda-${ID}.sh -b -p ${install_dir} 2>&1 > /tmp/miniconda_sh-${ID}.out
}
create_conda_env_from_yaml() {
CONDA_DIR=$1
CONDA_ENV=$2
CONDA_YAML=$3
CONDA_SH="${CONDA_DIR}/etc/profile.d/conda.sh"
# conda env export
# Remove line starting with name, prefix and remove empty lines
sed -i -e 's/name.*$//' -e 's/prefix.*$//' -e '/^$/d' ${CONDA_YAML}
if [ ! -d "${CONDA_DIR}" ]; then
echo "Conda directory <${CONDA_DIR}> not found. Installing conda..."
install_miniconda ${CONDA_DIR}
fi
echo "Sourcing Conda SH <${CONDA_SH}>"
source ${CONDA_SH}
echo "Activating Conda Environment <${CONDA_ENV}>"
{
conda activate ${CONDA_ENV}
} || {
echo "Conda environment <${CONDA_ENV}> not found. Installing conda environment from YAML file <${CONDA_YAML}>"
conda env update -n ${CONDA_ENV} -q -f ${CONDA_YAML} #--prune
{
echo "Activating Conda Environment <${CONDA_ENV}> again"
conda activate ${CONDA_ENV}
} || {
echo "ERROR: Conda environment <${CONDA_ENV}> not found. Exiting workflow"
exit 1
}
}
}
findAvailablePort() {
# Find an available availablePort
minPort=6000
maxPort=9000
for port in $(seq ${minPort} ${maxPort} | shuf); do
out=$(netstat -aln | grep LISTEN | grep ${port})
if [ -z "${out}" ]; then
# To prevent multiple users from using the same available port --> Write file to reserve it
portFile=/tmp/${port}.port.used
if ! [ -f "${portFile}" ]; then
touch ${portFile}
availablePort=${port}
echo ${port}
break
fi
fi
done
}
# Function to print the SLURM logs
print_slurm_logs() {
echo; echo; echo; "SLURM LOGS"
local log_file_paths=$1
for log_file in ${log_file_paths}; do
echo "${sshcmd} cat ${log_file}"
${sshcmd} cat ${log_file}
echo
done
}