Source code for schrodinger.application.job_monitor.job_monitor_models
import time
import typing
from enum import Enum
import schrodinger
from schrodinger.application.job_monitor import util
from schrodinger.infra import jobhub
from schrodinger.job.jobcontrol import DisplayStatus
from schrodinger.models import parameters
from schrodinger.Qt import QtCore
maestro = schrodinger.get_maestro()
# TODO: Will maybe eventually have some sort of "Unavailable" status once there
# is an API for that (PANEL-15988)
ACTIVE_STATUSES = (DisplayStatus.RUNNING, DisplayStatus.WAITING)
RequestedStatus = Enum('RequestedStatus', 'NONE STOP CANCEL')
DownloadStatus = Enum('DownloadStatus',
'NONE READY_TO_DOWNLOAD DOWNLOADING DOWNLOADED')
timestamp_format = "%Y-%m-%d-%H:%M:%S"
[docs]class JobModel(parameters.CompoundParam):
"""
Model class containing all job details that are relevant to the panel
"""
job_name: str
job_id: str
parent_job_id: str = None
sub_job_ids: typing.List[str]
program: str # program name, e.g. Glide
host: str # host name, e.g. nyc-desk-w45.schrodinger.com
directory: str # directory that all job files are found in
job_started: float # time that job was started, in seconds since the
# epoch, e.g. 1582603481.0
project_name: str # name of project that job was started from
max_progress: float # max number of progress steps
files: typing.List[str] # list of all files registered for a job
logfiles: typing.List[str] # list of log file(s)
current_progress: float # current progress steps
last_updated: int # time job was updated from server in seconds since epoch
status: DisplayStatus = None # job status
relative_id: int
job_index: int # index of the job in job_table
launch_time: str # time that the job was launched, e.g. 2020-04-01-11:48:15
requested_status: RequestedStatus = RequestedStatus.NONE
download_status: DownloadStatus = DownloadStatus.NONE
@property
def is_top_level_job(self):
"""
Top level jobs are jobs which are started by user as opposed to being
spawned by another job
"""
return self.parent_job_id is None and not self.is_null_job
@property
def is_active(self):
"""
Return whether the job is active.
"""
if not self.status:
return False
return jobhub.is_active_job(self.status.value)
@property
def is_null_job(self):
return self.job_id is None
[docs] @staticmethod
def fromJobObject(job):
"""
Convert a Job object into a JobModel, with the necessary info taken from
the job record
:param job: The backend job object
:type job: schrodinger.job.jobcontrol.Job
:return: A Job model with the info needed by GUI
:rtype: JobModel
"""
model = JobModel()
model.job_name = job.Name
model.job_id = job.JobId
model.program = job.Program
model.project_name = jobhub.get_job_manager().getJobProject(
job.JobId, job.Project)
model.host = job.JobHost
model.directory = job.Dir
if job.StartTime:
model.job_started = time.mktime(
time.strptime(job.StartTime, timestamp_format))
model.files = job.LogFiles + job.OutputFiles + job.InputFiles
model.logfiles = job.LogFiles
model.last_updated = util.get_current_utc_timestamp()
# Update the last updated time as the StatusTime of the job
# if the job is no longer active
if not jobhub.is_active_job(job.DisplayStatus.value):
model.last_updated = time.mktime(
time.strptime(job.StatusTime, timestamp_format))
model.parent_job_id = job.ParentJobId
model.current_progress, model.max_progress = job.getProgressAsSteps()
model.status = job.DisplayStatus
model.launch_time = job.LaunchTime
if job.isDownloaded():
model.download_status = DownloadStatus.DOWNLOADED
for sub_job in job.SubJobs:
# legacy job control reports subjobs as "<jobid> <host> status"
# so we must split and grab just jobid
sub_job_id = sub_job.split(" ")[0]
model.sub_job_ids.append(sub_job_id)
return model
def __repr__(self):
if self.is_null_job:
return "null_job"
else:
return f"{self.job_id}: {self.job_name}"
@property
def is_downloaded(self):
return self.download_status == DownloadStatus.DOWNLOADED
def _on_null_job_changed():
raise RuntimeError('null_job was modified')
[docs]def get_null_job():
"""
Get a null job model. The null job model's job_id is None. The parent job
of all top-level jobs is a null job. Null jobs should not be changed,
and will raise an exception if they are.
:return: A new null job object
:rtype: JobModel
"""
null_job = JobModel(job_id=None)
null_job.valueChanged.connect(_on_null_job_changed)
return null_job
[docs]class JobMonitorPanelModel(parameters.CompoundParam):
# PLP for top level job table
top_level_jobs: typing.List[JobModel]
# when theres a top level job, show top level job bar
current_top_level_job: JobModel = get_null_job()
# when current job changes, update JobDetailsPane
current_job: JobModel = get_null_job()
# PLP for sub job table
subjobs: typing.List[JobModel]
# Job Filtering
current_project_name: str = None
is_curr_project_scratch: bool
active_jobs_only: bool = True
current_project_jobs_only: bool = bool(maestro)
status: str
filtersInvalidated = QtCore.pyqtSignal()
[docs] def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._job_dict = {None: get_null_job()}
[docs] def updateJobs(self, job_models):
"""
Updates the panel model with the given job models. If the JobModel is
not already in _job_dict, add it.
:param job_models: The job models to add to the panel
:type job_models: List[JobModel]
"""
# Add any job models that haven't already been added
self._addJobs(
[jm for jm in job_models if jm.job_id not in self._job_dict])
for job_model in job_models:
self._updateJob(job_model)
self.filtersInvalidated.emit()
[docs] def deleteJobs(self, job_ids):
"""
Remove the deleted jobs from the model
:param job_ids: Delete jobs' ids
:type job_ids: list[str]
"""
self.top_level_jobs = [
job_model for job_model in self.top_level_jobs
if job_model.job_id not in job_ids
]
self.subjobs = [
job_model for job_model in self.subjobs
if job_model.job_id not in job_ids
]
self.current_job.sub_job_ids = [
job_id for job_id in self.current_job.sub_job_ids
if job_id not in job_ids
]
for job_id in job_ids:
if job_id in self._job_dict:
self._job_dict.pop(job_id)
[docs] def setCurrentTopLevelJob(self, job_model):
"""
Set the top level job. Called when you double click on a job/row in
the job table on the Jobs List pane. Used to populate to top level
job bar.
:param job_model: The new top level job model
:type job_model: JobModel
"""
self.current_top_level_job = job_model
[docs] def setCurrentJob(self, job_model):
"""
Set the current job being viewed in the Job Detail pane and add its
subjobs to the subjobs PLP. Used to populate job info widget and
subjobs table
:param job_model: The new current job model
:type job_model: JobModel
"""
self.current_job.setValue(job_model)
subjobs = []
for subjob_id in job_model.sub_job_ids:
if subjob_id in self._job_dict:
subjobs.append(self.getJob(subjob_id))
self.subjobs = subjobs
self.updateRelativeID(self.subjobs)
[docs] def setTopLevelJobs(self, jobs):
"""
Set the current top level jobs to be viewed and update the
relative IDs of these jobs.
:type jobs: list[schrodinger.job.jobcontrol.Job]
:param jobs: the list of top level jobs
"""
self.top_level_jobs = jobs
self.updateRelativeID(self.top_level_jobs)
[docs] def updateRelativeID(self, jobs_to_update):
"""
Update the model with the relative IDs for the jobs being viewed
in the current state.
:type jobs_to_update: list[schrodinger.job.jobcontrol.Job]
:param jobs_to_update: the list of jobs to be updated with the relative id
"""
jobs_to_update.sort(key=lambda job_model: job_model.launch_time)
for idx, job_model in enumerate(jobs_to_update):
job_model.relative_id = idx + 1
def _addJobs(self, job_models):
"""
Add jobs to the panel.
:param job_models: The job models to add to the panel
:type job_models: List[JobModel]
"""
for job_model in job_models:
job_id = job_model.job_id
self._job_dict[job_id] = job_model
self.top_level_jobs.extend(
jm for jm in job_models if jm.is_top_level_job)
def _updateJob(self, job_model):
"""
Update the given job model in the panel's model dict
:param job_model: The new, updated job model
:type job_model: JobModel
"""
job_id = job_model.job_id
old_job_model = self.getJob(job_id)
# No job to update since the job is no longer in job database
if old_job_model is None:
return
# Don't update widgets if there's no change in job's attributes
if old_job_model == job_model:
return
is_active_job = jobhub.is_active_job(job_model.status.value)
# Preserve requested_status info if the job is active
if is_active_job:
job_model.requested_status = old_job_model.requested_status
# When the job is completed, change the download_status from the
# default value to READY_TO_DOWNLOAD
elif old_job_model.download_status is DownloadStatus.NONE:
job_model.download_status = DownloadStatus.READY_TO_DOWNLOAD
# When the job is updated - other than when completed- preserve the
# download_status.
elif job_model.download_status is DownloadStatus.NONE:
job_model.download_status = old_job_model.download_status
old_job_model.setValue(job_model)
# let parent know its child changed
parent_job_model = self.getParentJob(job_model)
if parent_job_model and not parent_job_model.is_null_job:
self._updateJob(parent_job_model)
# update widgets if job_model was being viewed
if self.current_top_level_job.job_id == job_id:
self.setCurrentTopLevelJob(job_model)
if self.current_job.job_id == job_id:
self.setCurrentJob(job_model)
[docs] def getJob(self, job_id):
"""
Get job by job id
:param job_id: Job id of reqested job
:type job_id: str
:return: The job model with the given job id
:rtype: JobModel or NoneType
"""
return self._job_dict.get(job_id)
[docs] def getParentJob(self, job_model):
"""
Get the parent job of the given job model
:param job_model: The job model to get the parent of
:type job_model: JobModel
:return: The parent's job model, or None if the job has no parent (
i.e. it's a top level job)
:rtype: JobModel or NoneType
"""
parent_id = job_model.parent_job_id
parent_job_model = self._job_dict.get(parent_id)
return parent_job_model
[docs] def getAllTopLevelJobs(self):
"""
Get a list of all top level jobs tracked by the panel. A job is
considered top level if its parent id is None
:return: All top level jobs, i.e. all jobs
:rtype: list[JobModel]
"""
top_level_jobs = []
for job_model in self._job_dict.values():
if job_model.is_top_level_job:
top_level_jobs.append(job_model)
return top_level_jobs
@property
def on_jobs_list(self):
"""
The GUI is on the Jobs List Pane when current_job or
current_top_level_job is a null job
"""
return self.current_top_level_job.is_null_job
[docs] def showJobsList(self):
"""
The GUI is on the Jobs List Pane when current_job or
current_top_level_job is a null job
"""
self.setCurrentTopLevelJob(get_null_job())
self.setCurrentJob(get_null_job())
def __repr__(self):
d = self.toDict()
d["current_top_level_job"] = repr(self.current_top_level_job)
d["current_job"] = repr(self.current_job)
return str(d)
[docs] def setJobDownloadStatus(self, job_id, status):
"""
Set the 'download_status' to job_model of the given job.
:param job_id: The job id.
:type job_id: str
:param status: Download status.
:type status: member of'DownloadStatus enum.
"""
job_model = self.getJob(job_id)
if job_model is not None:
job_model.download_status = status
self._updateJob(job_model)