Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

casting issues for nightlies #310

Merged
merged 6 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions smartsim/_core/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class Job:
def __init__(
self,
job_name: str,
job_id: str,
job_id: t.Optional[str],
entity: t.Union[SmartSimEntity, EntityList],
launcher: str,
is_task: bool,
Expand Down Expand Up @@ -107,7 +107,7 @@ def record_history(self) -> None:
"""Record the launching history of a job."""
self.history.record(self.jid, self.status, self.returncode, self.elapsed)

def reset(self, new_job_name: str, new_job_id: str, is_task: bool) -> None:
def reset(self, new_job_name: str, new_job_id: t.Optional[str], is_task: bool) -> None:
"""Reset the job in order to be able to restart it.

:param new_job_name: name of the new job step
Expand Down Expand Up @@ -174,12 +174,12 @@ def __init__(self, runs: int = 0) -> None:
:type runs: int, optional
"""
self.runs = runs
self.jids: t.Dict[int, str] = dict()
self.jids: t.Dict[int, t.Optional[str]] = dict()
self.statuses: t.Dict[int, str] = dict()
self.returns: t.Dict[int, t.Optional[int]] = dict()
self.job_times: t.Dict[int, float] = dict()

def record(self, job_id: str, status: str, returncode: t.Optional[int], job_time: float) -> None:
def record(self, job_id: t.Optional[str], status: str, returncode: t.Optional[int], job_time: float) -> None:
"""record the history of a job"""
self.jids[self.runs] = job_id
self.statuses[self.runs] = status
Expand Down
4 changes: 2 additions & 2 deletions smartsim/_core/control/jobmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def __call__(self) -> t.Dict[str, Job]:
all_jobs = {**self.jobs, **self.db_jobs}
return all_jobs

def add_job(self, job_name: str, job_id: str, entity: t.Union[SmartSimEntity, EntityList], is_task: bool = True) -> None:
def add_job(self, job_name: str, job_id: t.Optional[str], entity: t.Union[SmartSimEntity, EntityList], is_task: bool = True) -> None:
"""Add a job to the job manager which holds specific jobs by type.

:param job_name: name of the job step
Expand Down Expand Up @@ -278,7 +278,7 @@ def query_restart(self, entity_name: str) -> bool:
return True
return False

def restart_job(self, job_name: str, job_id: str, entity_name: str, is_task: bool = True) -> None:
def restart_job(self, job_name: str, job_id: t.Optional[str], entity_name: str, is_task: bool = True) -> None:
"""Function to reset a job to record history and be
ready to launch again.

Expand Down
4 changes: 1 addition & 3 deletions smartsim/_core/launcher/cobalt/cobaltLauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def supported_rs(self) -> t.Dict[t.Type[SettingsBase], t.Type[Step]]:
}


def run(self, step: Step) -> str:
def run(self, step: Step) -> t.Optional[str]:
"""Run a job step through Cobalt

:param step: a job step instance
Expand Down Expand Up @@ -120,8 +120,6 @@ def run(self, step: Step) -> str:
# if batch submission did not successfully retrieve job ID
if not step_id and step.managed:
step_id = self._get_cobalt_step_id(step)
if not step_id:
raise ValueError("Unable to get step id for job step")

self.step_mapping.add(step.name, step_id, task_id, step.managed)
return step_id
Expand Down
4 changes: 2 additions & 2 deletions smartsim/_core/launcher/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def get_step_nodes(self, step_names: t.List[str]) -> t.List[t.List[str]]:
raise NotImplementedError

@abc.abstractmethod
def run(self, step: Step) -> str:
def run(self, step: Step) -> t.Optional[str]:
raise NotImplementedError

@abc.abstractmethod
Expand Down Expand Up @@ -117,7 +117,7 @@ def create_step(self, name: str, cwd: str, step_settings: SettingsBase) -> Step:
def get_step_nodes(self, step_names: t.List[str]) -> t.List[t.List[str]]: # pragma: no cover
raise SSUnsupportedError("Node acquisition not supported for this launcher")

def run(self, step: Step) -> str: # pragma: no cover
def run(self, step: Step) -> t.Optional[str]: # pragma: no cover
raise NotImplementedError

def stop(self, step_name: str) -> StepInfo: # pragma: no cover
Expand Down
4 changes: 1 addition & 3 deletions smartsim/_core/launcher/lsf/lsfLauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def supported_rs(self) -> t.Dict[t.Type[SettingsBase], t.Type[Step]]:
RunSettings: LocalStep,
}

def run(self, step: Step) -> str:
def run(self, step: Step) -> t.Optional[str]:
"""Run a job step through LSF

:param step: a job step instance
Expand Down Expand Up @@ -116,8 +116,6 @@ def run(self, step: Step) -> str:
task_id = self.task_manager.start_task(
cmd_list, step.cwd, out=output.fileno(), err=error.fileno()
)
if not step_id:
raise ValueError("Unable to get step id for job step")

self.step_mapping.add(step.name, step_id, task_id, step.managed)
return step_id
Expand Down
5 changes: 1 addition & 4 deletions smartsim/_core/launcher/pbs/pbsLauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def supported_rs(self) -> t.Dict[t.Type[SettingsBase], t.Type[Step]]:
PalsMpiexecSettings: MpiexecStep,
}

def run(self, step: Step) -> str:
def run(self, step: Step) -> t.Optional[str]:
"""Run a job step through PBSPro

:param step: a job step instance
Expand Down Expand Up @@ -112,9 +112,6 @@ def run(self, step: Step) -> str:
if not step_id and step.managed:
step_id = self._get_pbs_step_id(step)

if not step_id:
raise ValueError("Unable to get step id for job step")

self.step_mapping.add(step.name, step_id, task_id, step.managed)

return step_id
Expand Down
7 changes: 2 additions & 5 deletions smartsim/_core/launcher/slurm/slurmLauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def get_step_nodes(self, step_names: t.List[str]) -> t.List[t.List[str]]:
raise LauncherError("Failed to retrieve nodelist from stat")
return node_lists

def run(self, step: Step) -> str:
def run(self, step: Step) -> t.Optional[str]:
"""Run a job step through Slurm

:param step: a job step instance
Expand Down Expand Up @@ -156,9 +156,6 @@ def run(self, step: Step) -> str:
if not step_id and step.managed:
step_id = self._get_slurm_step_id(step)

if not step_id:
raise ValueError("Unable to get step id for job step")

self.step_mapping.add(step.name, step_id, task_id, step.managed)

# give slurm a rest
Expand Down Expand Up @@ -243,7 +240,7 @@ def _get_managed_step_update(self, step_ids: t.List[str]) -> t.Optional[t.List[S
_rc = int(stat_tuple[1]) if stat_tuple[1] else None
info = SlurmStepInfo(stat_tuple[0], _rc)

task_id = self.step_mapping.get_task_id(int(step_id))
task_id = self.step_mapping.get_task_id(step_id)
if task_id:
# we still check the task manager for jobs that didn't ever
# become a fully managed job (e.g. error in slurm arguments)
Expand Down
6 changes: 3 additions & 3 deletions smartsim/_core/launcher/stepMapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
class StepMap:
def __init__(self,
step_id: t.Optional[str] = None,
task_id: t.Optional[int] = None,
task_id: t.Optional[str] = None,
managed: t.Optional[bool] = None) -> None:
self.step_id = step_id
self.task_id = task_id
Expand All @@ -60,13 +60,13 @@ def add(
managed: bool = True,
) -> None:
try:
n_task_id = int(task_id) if task_id else None
n_task_id = str(task_id) if task_id else None
self.mapping[step_name] = StepMap(step_id, n_task_id, managed)
except Exception as e:
msg = f"Could not add step {step_name} to mapping: {e}"
logger.exception(msg)

def get_task_id(self, step_id: int) -> t.Optional[int]:
def get_task_id(self, step_id: str) -> t.Optional[str]:
"""Get the task id from the step id"""
task_id = None
for stepmap in self.mapping.values():
Expand Down
2 changes: 1 addition & 1 deletion smartsim/entity/dbnode.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def _parse_db_host(self, filepath: t.Optional[str] = None) -> str:
try:
ip = self._parse_ips(filepath, 1)[0]
# suppress error
except FileNotFoundError:
except (FileNotFoundError, IndexError):
pass

logger.debug("Waiting for Redis output files to populate...")
Expand Down