Source code for schrodinger.application.job_monitor.job_monitor_models

import time
import typing
from enum import Enum

import schrodinger
from schrodinger.application.job_monitor import util
from schrodinger.infra import jobhub
from schrodinger.job.jobcontrol import DisplayStatus
from schrodinger.models import parameters
from schrodinger.Qt import QtCore

maestro = schrodinger.get_maestro()

# TODO: Will maybe eventually have some sort of "Unavailable" status once there
#  is an API for that (PANEL-15988)
ACTIVE_STATUSES = (DisplayStatus.RUNNING, DisplayStatus.WAITING)

RequestedStatus = Enum('RequestedStatus', 'NONE STOP CANCEL')
DownloadStatus = Enum('DownloadStatus',
                      'NONE READY_TO_DOWNLOAD DOWNLOADING DOWNLOADED')

timestamp_format = "%Y-%m-%d-%H:%M:%S"


[docs]class JobModel(parameters.CompoundParam): """ Model class containing all job details that are relevant to the panel """ job_name: str job_id: str parent_job_id: str = None sub_job_ids: typing.List[str] program: str # program name, e.g. Glide host: str # host name, e.g. nyc-desk-w45.schrodinger.com directory: str # directory that all job files are found in job_started: float # time that job was started, in seconds since the # epoch, e.g. 1582603481.0 project_name: str # name of project that job was started from max_progress: float # max number of progress steps files: typing.List[str] # list of all files registered for a job logfiles: typing.List[str] # list of log file(s) current_progress: float # current progress steps last_updated: int # time job was updated from server in seconds since epoch status: DisplayStatus = None # job status relative_id: int job_index: int # index of the job in job_table launch_time: str # time that the job was launched, e.g. 2020-04-01-11:48:15 requested_status: RequestedStatus = RequestedStatus.NONE download_status: DownloadStatus = DownloadStatus.NONE @property def is_top_level_job(self): """ Top level jobs are jobs which are started by user as opposed to being spawned by another job """ return self.parent_job_id is None and not self.is_null_job @property def is_active(self): """ Return whether the job is active. """ if not self.status: return False return jobhub.is_active_job(self.status.value) @property def is_null_job(self): return self.job_id is None
[docs] @staticmethod def fromJobObject(job): """ Convert a Job object into a JobModel, with the necessary info taken from the job record :param job: The backend job object :type job: schrodinger.job.jobcontrol.Job :return: A Job model with the info needed by GUI :rtype: JobModel """ model = JobModel() model.job_name = job.Name model.job_id = job.JobId model.program = job.Program model.project_name = jobhub.get_job_manager().getJobProject( job.JobId, job.Project) model.host = job.JobHost model.directory = job.Dir if job.StartTime: model.job_started = time.mktime( time.strptime(job.StartTime, timestamp_format)) model.files = job.LogFiles + job.OutputFiles + job.InputFiles model.logfiles = job.LogFiles model.last_updated = util.get_current_utc_timestamp() # Update the last updated time as the StatusTime of the job # if the job is no longer active if not jobhub.is_active_job(job.DisplayStatus.value): model.last_updated = time.mktime( time.strptime(job.StatusTime, timestamp_format)) model.parent_job_id = job.ParentJobId model.current_progress, model.max_progress = job.getProgressAsSteps() model.status = job.DisplayStatus model.launch_time = job.LaunchTime if job.isDownloaded(): model.download_status = DownloadStatus.DOWNLOADED for sub_job in job.SubJobs: # legacy job control reports subjobs as "<jobid> <host> status" # so we must split and grab just jobid sub_job_id = sub_job.split(" ")[0] model.sub_job_ids.append(sub_job_id) return model
def __repr__(self): if self.is_null_job: return "null_job" else: return f"{self.job_id}: {self.job_name}" @property def is_downloaded(self): return self.download_status == DownloadStatus.DOWNLOADED
def _on_null_job_changed(): raise RuntimeError('null_job was modified')
[docs]def get_null_job(): """ Get a null job model. The null job model's job_id is None. The parent job of all top-level jobs is a null job. Null jobs should not be changed, and will raise an exception if they are. :return: A new null job object :rtype: JobModel """ null_job = JobModel(job_id=None) null_job.valueChanged.connect(_on_null_job_changed) return null_job
[docs]class JobMonitorPanelModel(parameters.CompoundParam): # PLP for top level job table top_level_jobs: typing.List[JobModel] # when theres a top level job, show top level job bar current_top_level_job: JobModel = get_null_job() # when current job changes, update JobDetailsPane current_job: JobModel = get_null_job() # PLP for sub job table subjobs: typing.List[JobModel] # Job Filtering current_project_name: str = None is_curr_project_scratch: bool active_jobs_only: bool = True current_project_jobs_only: bool = bool(maestro) status: str filtersInvalidated = QtCore.pyqtSignal()
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._job_dict = {None: get_null_job()}
[docs] def updateJobs(self, job_models): """ Updates the panel model with the given job models. If the JobModel is not already in _job_dict, add it. :param job_models: The job models to add to the panel :type job_models: List[JobModel] """ # Add any job models that haven't already been added self._addJobs( [jm for jm in job_models if jm.job_id not in self._job_dict]) for job_model in job_models: self._updateJob(job_model) self.filtersInvalidated.emit()
[docs] def deleteJobs(self, job_ids): """ Remove the deleted jobs from the model :param job_ids: Delete jobs' ids :type job_ids: list[str] """ self.top_level_jobs = [ job_model for job_model in self.top_level_jobs if job_model.job_id not in job_ids ] self.subjobs = [ job_model for job_model in self.subjobs if job_model.job_id not in job_ids ] self.current_job.sub_job_ids = [ job_id for job_id in self.current_job.sub_job_ids if job_id not in job_ids ] for job_id in job_ids: if job_id in self._job_dict: self._job_dict.pop(job_id)
[docs] def setCurrentTopLevelJob(self, job_model): """ Set the top level job. Called when you double click on a job/row in the job table on the Jobs List pane. Used to populate to top level job bar. :param job_model: The new top level job model :type job_model: JobModel """ self.current_top_level_job = job_model
[docs] def setCurrentJob(self, job_model): """ Set the current job being viewed in the Job Detail pane and add its subjobs to the subjobs PLP. Used to populate job info widget and subjobs table :param job_model: The new current job model :type job_model: JobModel """ self.current_job.setValue(job_model) subjobs = [] for subjob_id in job_model.sub_job_ids: if subjob_id in self._job_dict: subjobs.append(self.getJob(subjob_id)) self.subjobs = subjobs self.updateRelativeID(self.subjobs)
[docs] def setTopLevelJobs(self, jobs): """ Set the current top level jobs to be viewed and update the relative IDs of these jobs. :type jobs: list[schrodinger.job.jobcontrol.Job] :param jobs: the list of top level jobs """ self.top_level_jobs = jobs self.updateRelativeID(self.top_level_jobs)
[docs] def updateRelativeID(self, jobs_to_update): """ Update the model with the relative IDs for the jobs being viewed in the current state. :type jobs_to_update: list[schrodinger.job.jobcontrol.Job] :param jobs_to_update: the list of jobs to be updated with the relative id """ jobs_to_update.sort(key=lambda job_model: job_model.launch_time) for idx, job_model in enumerate(jobs_to_update): job_model.relative_id = idx + 1
def _addJobs(self, job_models): """ Add jobs to the panel. :param job_models: The job models to add to the panel :type job_models: List[JobModel] """ for job_model in job_models: job_id = job_model.job_id self._job_dict[job_id] = job_model self.top_level_jobs.extend( jm for jm in job_models if jm.is_top_level_job) def _updateJob(self, job_model): """ Update the given job model in the panel's model dict :param job_model: The new, updated job model :type job_model: JobModel """ job_id = job_model.job_id old_job_model = self.getJob(job_id) # No job to update since the job is no longer in job database if old_job_model is None: return # Don't update widgets if there's no change in job's attributes if old_job_model == job_model: return is_active_job = jobhub.is_active_job(job_model.status.value) # Preserve requested_status info if the job is active if is_active_job: job_model.requested_status = old_job_model.requested_status # When the job is completed, change the download_status from the # default value to READY_TO_DOWNLOAD elif old_job_model.download_status is DownloadStatus.NONE: job_model.download_status = DownloadStatus.READY_TO_DOWNLOAD # When the job is updated - other than when completed- preserve the # download_status. elif job_model.download_status is DownloadStatus.NONE: job_model.download_status = old_job_model.download_status old_job_model.setValue(job_model) # let parent know its child changed parent_job_model = self.getParentJob(job_model) if parent_job_model and not parent_job_model.is_null_job: self._updateJob(parent_job_model) # update widgets if job_model was being viewed if self.current_top_level_job.job_id == job_id: self.setCurrentTopLevelJob(job_model) if self.current_job.job_id == job_id: self.setCurrentJob(job_model)
[docs] def getJob(self, job_id): """ Get job by job id :param job_id: Job id of reqested job :type job_id: str :return: The job model with the given job id :rtype: JobModel or NoneType """ return self._job_dict.get(job_id)
[docs] def getParentJob(self, job_model): """ Get the parent job of the given job model :param job_model: The job model to get the parent of :type job_model: JobModel :return: The parent's job model, or None if the job has no parent ( i.e. it's a top level job) :rtype: JobModel or NoneType """ parent_id = job_model.parent_job_id parent_job_model = self._job_dict.get(parent_id) return parent_job_model
[docs] def getAllTopLevelJobs(self): """ Get a list of all top level jobs tracked by the panel. A job is considered top level if its parent id is None :return: All top level jobs, i.e. all jobs :rtype: list[JobModel] """ top_level_jobs = [] for job_model in self._job_dict.values(): if job_model.is_top_level_job: top_level_jobs.append(job_model) return top_level_jobs
@property def on_jobs_list(self): """ The GUI is on the Jobs List Pane when current_job or current_top_level_job is a null job """ return self.current_top_level_job.is_null_job
[docs] def showJobsList(self): """ The GUI is on the Jobs List Pane when current_job or current_top_level_job is a null job """ self.setCurrentTopLevelJob(get_null_job()) self.setCurrentJob(get_null_job())
def __repr__(self): d = self.toDict() d["current_top_level_job"] = repr(self.current_top_level_job) d["current_job"] = repr(self.current_job) return str(d)
[docs] def setJobDownloadStatus(self, job_id, status): """ Set the 'download_status' to job_model of the given job. :param job_id: The job id. :type job_id: str :param status: Download status. :type status: member of'DownloadStatus enum. """ job_model = self.getJob(job_id) if job_model is not None: job_model.download_status = status self._updateJob(job_model)