From 6fe7e589672c320a14ff07557cc10c1f0c76ea8f Mon Sep 17 00:00:00 2001 From: Joseph White Date: Fri, 27 May 2022 17:37:24 -0400 Subject: [PATCH] Add support for job arrays. --- background_scripts/supremm_arrayjobs.php | 195 ++++++++++++++++++ background_scripts/xdmod-supremm-admin | 5 +- bin/aggregate_supremm.sh | 2 + .../Query/SUPREMM/JobDataset.php | 34 +++ .../Query/SUPREMM/JobMetadata.php | 4 + 5 files changed, 238 insertions(+), 2 deletions(-) create mode 100644 background_scripts/supremm_arrayjobs.php diff --git a/background_scripts/supremm_arrayjobs.php b/background_scripts/supremm_arrayjobs.php new file mode 100644 index 00000000..08f7b04b --- /dev/null +++ b/background_scripts/supremm_arrayjobs.php @@ -0,0 +1,195 @@ + gethostname() . ': XDMOD: Data Warehouse: SUPReMM ETL Log', + ); + + $options = array( + 'h' => 'help', + 'q' => 'quiet', + 'v' => 'verbose', + 'r:' => 'resource:', + 'a' => 'all', + 's:' => 'start:', + 'e:' => 'end:', + 'd' => 'debug' + ); + + $conf['end'] = time(); + $conf['start'] = $conf['end'] - (60*60*24 * 10); + + $args = getopt(implode('', array_keys($options)), $options); + + foreach ($args as $arg => $value) + { + switch ($arg) + { + case 'h': + case 'help': + usage_and_exit(); + break; + case 'q': + case 'quiet': + $conf['consoleLogLevel'] = CCR\Log::ERR; + break; + case 'v': + case 'verbose': + $conf['consoleLogLevel'] = CCR\Log::INFO; + break; + case 'r': + case 'resource': + $conf['resource'] = $value; + break; + case 's': + case 'start': + $conf['start'] = strtotime($value); + break; + case 'e': + case 'end': + $conf['end'] = strtotime($value); + break; + case 'a': + case 'all': + $conf['start'] = 0; + $conf['end'] = time(); + break; + case 'd': + case 'debug': + $conf['consoleLogLevel'] = CCR\Log::DEBUG; + break; + default: + break; + } + } + return $conf; +} + +function usage_and_exit() +{ + global $argv; + + fwrite( + STDERR, + <<<"EOMSG" +Usage: {$argv[0]} + -h, --help Display this help + + Controlling which jobs are processed: + -r, --resource=RESOURCE Only process array jobs on the given resource. + (default all resources in the SUPREMM realm are + processed). The resource code or the resource_id + may be specified. + -s, --start=START_TIME Specify the start of the time window. (default + 10 days ago). + -e, --end=END_TIME Specify the end of the time window. (default + now). + -a, --all Do not restrict the jobs by time. + + Controlling log output: + -q, --quiet Quiet mode. Only print error messages to the + console + -v, --verbose Enable informational messages to the console + -d, --debug Enable debug messages to the console +EOMSG +); + exit(1); +} + + +/** + * Get the list of resources to process + */ +function get_resource_list($conf) +{ + $s = new \DataWarehouse\Query\SUPREMM\SupremmDbInterface(); + $resources = $s->getResources(); + + $stmt = 'SELECT id FROM `modw`.`resourcefact` WHERE id IN (' . implode(',', $resources) . ')'; + $args = array(); + + if (isset($conf['resource'])) { + $stmt .= ' AND (id = :resource OR code = :resource)'; + $args['resource'] = $conf['resource']; + } + + $db = DB::factory('datawarehouse'); + $query = $db->handle()->prepare($stmt); + $query->execute($args); + + return $query->fetchAll(PDO::FETCH_COLUMN, 0); +} + +/** + * Build a table with mapping between the diffent jobs in a job array + * + * @param string $resource_id the resource id + * @param string $start the minimum data range to select + * @param string $end the maximum date range to select + * + * @return null + */ +function get_array_jobs($resource_id, $start, $end) +{ + global $logger; + $db = DB::factory('datawarehouse'); + + $logger->debug('Checking for array jobs on resource_id=' . $resource_id . ' between ' . $start . ' and ' . $end); + + $db->handle()->exec('DROP TABLE IF EXISTS `modw_supremm`.`job_array_tmp`'); + $createtmp = $db->handle()->prepare('CREATE TABLE `modw_supremm`.`job_array_tmp` (KEY (resource_id, local_jobid, submit_time_ts, _id)) SELECT s._id, j.resource_id, j.local_jobid, j.submit_time_ts FROM modw_supremm.job s, modw.job_tasks j WHERE s.tg_job_id = j.job_id AND j.local_job_array_index != -1 and j.local_job_id_raw is not null and s.end_time_ts BETWEEN :start AND :end AND s.resource_id = :resource_id'); + $createtmp->execute(array('start' => $start, 'end' => $end, 'resource_id' => $resource_id)); + + $logger->debug('Updating array job mapping table on resource_id=' . $resource_id . ' between ' . $start . ' and ' . $end); + + $db->handle()->exec('INSERT IGNORE INTO `modw_supremm`.`job_array_peers` SELECT DISTINCT s1._id as job_id, s2._id as other_job_id FROM `modw_supremm`.`job_array_tmp` s1, `modw_supremm`.`job_array_tmp` s2 WHERE s1.resource_id = s2.resource_id and s1.submit_time_ts = s2.submit_time_ts and s1.local_jobid = s2.local_jobid and s1._id != s2._id'); + + $db->handle()->exec('DROP TABLE IF EXISTS `modw_supremm`.`job_array_tmp`'); +} + +$conf = get_config(); + +$logger = CCR\Log::factory('SUPREMM', $conf); + +$cmd = implode(' ', array_map('escapeshellarg', $argv)); +$logger->info("Command: $cmd"); + +$logger->notice( + array( + 'message' => 'process array jobs start', + 'process_start_time' => date('Y-m-d H:i:s'), + ) +); + +try +{ + foreach (get_resource_list($conf) as $resource_id) { + get_array_jobs($resource_id, $conf['start'], $conf['end']); + } +} +catch (\Exception $e) { + + $msg = 'Caught exception while executing: ' . $e->getMessage(); + $logger->err( + array( + 'message' => $msg, + 'stacktrace' => $e->getTraceAsString() + ) + ); +} + +$logger->notice( + array( + 'message' => 'process array jobs end', + 'process_end_time' => date('Y-m-d H:i:s'), + ) +); diff --git a/background_scripts/xdmod-supremm-admin b/background_scripts/xdmod-supremm-admin index b777c496..cc3b4581 100755 --- a/background_scripts/xdmod-supremm-admin +++ b/background_scripts/xdmod-supremm-admin @@ -204,15 +204,16 @@ function truncateAction($config, $deleteSqlData) } $multiDel = <<execute($multiDel, array('resource_id' => $resource_id)); - $logger->notice('Deleted ' . $rows . ' rows from job_name and job_peers tables.'); + $logger->notice('Deleted ' . $rows . ' rows from job_name, job array peers and job_peers tables.'); } } diff --git a/bin/aggregate_supremm.sh b/bin/aggregate_supremm.sh index 9f43ebdc..2c3ff48a 100755 --- a/bin/aggregate_supremm.sh +++ b/bin/aggregate_supremm.sh @@ -52,6 +52,8 @@ shift $((OPTIND-1)) php ${XDMOD_LIB_PATH}/supremm_sharedjobs.php $FLAGS + php ${XDMOD_LIB_PATH}/supremm_arrayjobs.php $FLAGS + php ${XDMOD_LIB_PATH}/aggregate_supremm.php $FLAGS $AGG_FLAGS ${XDMOD_BIN_PATH}/xdmod-build-filter-lists --realm SUPREMM $FLAGS diff --git a/classes/DataWarehouse/Query/SUPREMM/JobDataset.php b/classes/DataWarehouse/Query/SUPREMM/JobDataset.php index 6fe15342..58e2fdb8 100644 --- a/classes/DataWarehouse/Query/SUPREMM/JobDataset.php +++ b/classes/DataWarehouse/Query/SUPREMM/JobDataset.php @@ -124,6 +124,11 @@ public function __construct( $this->addField(new TableField($rf, 'timezone')); $this->addField(new TableField($rf, 'code', 'resource')); + $jt = new Table(new Schema('modw'), 'job_tasks', 'jt'); + $this->addTable($jt); + $this->addWhereCondition(new WhereCondition(new TableField($dataTable, 'tg_job_id'), '=', new TableField($jt, 'job_id'))); + $this->addField(new TableField($jt, 'local_job_array_index', 'local_job_array_index')); + } elseif ($stat == "peers") { $jp = new Table(new Schema("modw_supremm"), "job_peers", "jp"); $this->joinTo($jp, "_id", "other_job_id", "jobid", "job_id"); @@ -145,6 +150,35 @@ public function __construct( $this->addWhereCondition(new WhereCondition(new TableField($jf, "person_id"), '=', new TableField($pt, "id"))); $this->addField(new TableField($pt, "long_name", "name")); + $this->addOrder( + new \DataWarehouse\Query\Model\OrderBy( + new TableField($jf, 'start_time_ts'), + 'asc', + 'start_time_ts' + ) + ); + } elseif ($stat == "array") { + + $jp = new Table(new Schema("modw_supremm"), "job_array_peers", "jp"); + $this->joinTo($jp, "_id", "other_job_id", "jobid", "job_id"); + + $jf = new Table(new Schema("modw_supremm"), "job", "jf1"); + $this->addTable($jf); + $this->addWhereCondition(new WhereCondition(new TableField($jp, "other_job_id"), '=', new TableField($jf, "_id"))); + $this->addField(new TableField($jf, "local_job_id")); + $this->addField(new TableField($jf, "start_time_ts")); + $this->addField(new TableField($jf, "end_time_ts")); + + $rt = new Table(new Schema("modw"), "resourcefact", "rf"); + $this->addTable($rt); + $this->addWhereCondition(new WhereCondition(new TableField($jf, "resource_id"), '=', new TableField($rt, "id"))); + $this->addField(new TableField($rt, "code", "resource")); + + $pt = new Table(new Schema('modw'), 'person', 'p'); + $this->addTable($pt); + $this->addWhereCondition(new WhereCondition(new TableField($jf, "person_id"), '=', new TableField($pt, "id"))); + $this->addField(new TableField($pt, "long_name", "name")); + $this->addOrder( new \DataWarehouse\Query\Model\OrderBy( new TableField($jf, 'start_time_ts'), diff --git a/classes/DataWarehouse/Query/SUPREMM/JobMetadata.php b/classes/DataWarehouse/Query/SUPREMM/JobMetadata.php index 0ceb8c4d..21ffcdcc 100644 --- a/classes/DataWarehouse/Query/SUPREMM/JobMetadata.php +++ b/classes/DataWarehouse/Query/SUPREMM/JobMetadata.php @@ -60,6 +60,10 @@ public function getJobMetadata(XDUser $user, $jobid) $available_data[\DataWarehouse\Query\RawQueryTypes::PEERS] = true; } + if ($job['local_job_array_index'] != -1) { + $available_data[\DataWarehouse\Query\RawQueryTypes::ARRAY_PEERS] = true; + } + // Always report that analytics are present; the data endpoint will // report the error reason for any that are missing. $available_data[\DataWarehouse\Query\RawQueryTypes::ANALYTICS] = true;