Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merging cwl #16

Merged
merged 11 commits into from
Aug 9, 2021
67 changes: 36 additions & 31 deletions src/classes/localex/seed-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ module.exports = async (job: any) => {
id: opid,
name: opfilename,
url: opfileurl,
role: output.role,
time_period: time_period ?? {},
spatial_coverage: spatial_coverage
} as DataResource
Expand All @@ -125,7 +126,9 @@ module.exports = async (job: any) => {
let cwl_file = comp.rundir + "/run.cwl";
console.log(cwl_file)
let cwl_outputs: any = {}

if (fs.existsSync(cwl_file)) {
console.log("Running cwl:" )
if (! fs.existsSync(tempdir))
fs.mkdirSync(tempdir)
let cwl_values_file = write_cwl_values(comp, seed, inputdir, tempdir, outputdir, plainargs)
Expand Down Expand Up @@ -162,6 +165,7 @@ module.exports = async (job: any) => {
}
}
else if (softwareImage != null) {
console.log("Running as a Docker Image:" )
logstream = fs.createWriteStream(logstdout, { 'flags': 'a' });

// Run command in docker image
Expand All @@ -176,6 +180,7 @@ module.exports = async (job: any) => {
await container.remove({force: true});
}
else {
console.log("Running as a Singularity Information")
let pegasus_jobprops_file = comp.rundir + "/__pegasus-job.properties";
if (fs.existsSync(pegasus_jobprops_file)) {
let jobprops = fs.readFileSync(pegasus_jobprops_file);
Expand Down Expand Up @@ -208,24 +213,35 @@ module.exports = async (job: any) => {
}
statusCode = spawnResult.status;
}

// Check for Errors
if(statusCode != 0) {
error = "Execution returned with non-zero status code";
}
else if (fs.existsSync(cwl_file)) {
Object.values(results).map((result: DataResource) => {
let tmpfile = cwl_outputs[result.id]["path"]
let extension = path.extname(tmpfile)
let opfilepath = outputdir + "/" + result.name + extension;
result.url = result.url + extension
if (fs.existsSync(tmpfile)) {
fs.copyFileSync(tmpfile, opfilepath);
}
else {
//console.log(`${tmpfile} not found!`)
error = `${tmpfile} not found!`;
}
Object.values(results).map((result: any) => {
result.name = result.role
if (result.role in cwl_outputs){
//TODO: this assumes one resource per output FIXIT
let outputs = cwl_outputs[result.role].map((file: any) => {
let output_suffix_cwl = Md5.hashAsciiStr(seed.execution.modelid + plainargs.join());
let output_directory = outputdir + '/' + output_suffix_cwl;
if (!fs.existsSync(output_directory)){
fs.mkdirSync(output_directory)
}
let output_file = output_directory + '/' + file['basename'];
let tmpfile = file['path']

console.log("the temporal file " + tmpfile )
if (fs.existsSync(tmpfile)) {
fs.copyFileSync(tmpfile, output_file);
console.log("copy the outputs " + output_file )
}
let url = output_file.replace(localex.datadir, localex.dataurl);
console.log("the url is going to be" + url)
result.url = url;
return url
})
}
});
// Set the results
seed.execution.results = results;
Expand Down Expand Up @@ -262,8 +278,7 @@ module.exports = async (job: any) => {
}

// Remove temporary directory
fs.remove(tempdir);

fs.remove(tempdir)
// Update execution status and results in backend
if(!DEVMODE) {
updateExecutionStatusAndResults(seed.execution);
Expand Down Expand Up @@ -291,38 +306,28 @@ const write_cwl_values = (comp: Component, seed: any, inputdir: string,
location: string
}
let data : Record<string, string | CwlValueFile> = {}

comp.inputs.map((input: ComponentArgument) => {
comp.inputs.map((input: any) => {
if (input.isParam) {
//let paramtype = seed.paramtypes[input.role];
let paramvalue = seed.parameters[input.role];
let paramvalue = seed.parameters[input.id];
if (!paramvalue)
paramvalue = input.paramDefaultValue;
data[input.role] = paramvalue
}
else {
let datasets = seed.datasets[input.role];
let datasets = seed.datasets[input.id];
datasets.map((ds: string) => {
// Copy input files to tempdir
let ifile = inputdir + "/" + ds;
let newifile = tempdir + "/" + ds;
//fs.symlinkSync(ifile, newifile);
fs.copyFileSync(ifile, newifile);
data[input.role] = {"class": "File", "location": newifile}
data[input.role] = {"class": "File", "location": ds["url"]}
});
console.log(datasets)
}
})

// Set the output file arguments for the command
// Create the output file suffix based on a hash of inputs
let opsuffix = Md5.hashAsciiStr(seed.ensemble.modelid + plainargs.join());
let opsuffix = Md5.hashAsciiStr(seed.execution.modelid + plainargs.join());
let results: any = {};
comp.outputs.map((output: any) => {
let opfilename = output.role + "-" + opsuffix;
let opfilepath = outputdir + "/" + opfilename;
data[output.role] = {"class": "File", "location": opfilename}
});

let valuesFile = execution_dir + "/values.yml";
let ymlStr = yaml.safeDump(data);
fs.writeFileSync(valuesFile, ymlStr, 'utf8')
Expand Down