Source code for schrodinger.application.matsci.jobdirdlg

"""
A dialog for obtaining a job directory of a previously submitted job

Copyright Schrodinger, LLC. All rights reserved.
"""

import os.path
import time

from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import mswidgets
from schrodinger.infra import jobhub
from schrodinger.Qt import QtCore
from schrodinger.Qt import QtGui
from schrodinger.Qt import QtWidgets
from schrodinger.Qt.QtCore import Qt
from schrodinger.ui.qt import filedialog
from schrodinger.ui.qt import swidgets
from schrodinger.ui.qt import utils as qtutils
from schrodinger.utils import fileutils


[docs]def start_job_manager(): """ Start the job manager so that it begins an asynchronous update of the job list in the background. It is best to do this as soon as possible in a new process because obtaining the job list may take some time after the first access of the jobhub job manager. In Maestro, this call doesn't have any affect because Maestro starts the job manager at startup. But this call does have an effect for panels run from the command line. This call cannot be done on import of this module because a QApplication needs to be started first, which often isn't done at the point this module is imported. """ JobManager.getManager()
[docs]class JobManager(QtCore.QObject): """ A class for managing job information and alerts when new jobs start """ _singleton = None TIME_FORMAT = '%a %b %d %I:%M:%S %p' UPDATED_TEXT = 'Last updated: {utime}' NEW_DATA_TEXT = '<font color=green>New jobs available</font>' UPDATING_TEXT = 'Updating job information...' new_data_available = QtCore.pyqtSignal()
[docs] def __init__(self): """ Create a JobManager instance """ super().__init__() self.jobhub_manager = jobhub.get_job_manager() # Check to see if the job manager knows about jobs yet # status_text has two states: # UPDATING_TEXT: If no job information is available yet # UPDATED_TEXT: The last time new job information was updated jobs = self.getAllJobs() if jobs: self.update_time = self.getTime() self.status_text = self.UPDATED_TEXT.format(utime=self.update_time) else: self.update_time = None self.status_text = self.UPDATING_TEXT self.prev_num_jobs = len(jobs)
[docs] def connectToJobHub(self): """ Connect to the job hub manager's periodic callback """ # This connection is done in a method rather than directly so that it is # easy to mock in a session fixture. We must prevent actually connecting # to this signal in unit tests otherwise the number of tests that use # this combined with the short periodicity of the callback causes # problems with Windows builds # # The session fixture (curretly in # schrodinger.test.pytest.sessionfixture) mocks this method out # automatically for the entire test framework so that # unit tests don't # have to do it individually for every panel that uses this class. self.jobhub_manager.jobsChanged.connect(self.updateStatus)
[docs] @staticmethod def getManager(*args, **kwargs): """ Get JobManager singleton Calling this method the first time starts the jobhub manager loading job information """ if not JobManager._singleton: JobManager._singleton = JobManager(*args, **kwargs) return JobManager._singleton
[docs] def getTime(self): """ Get the current time in user-format :rtype: str :return: The current time """ return time.strftime(self.TIME_FORMAT)
[docs] def updateStatus(self, jobids): """ Callback for the jobhub jobsChanged signal. This gets called periodically whether any job changed or not. :param list jobids: The job ids for jobs changing state """ if not jobids: # This function is called as a periodic callback (despite the name # of the calling signal, it is *not* called only when jobs change. # Do nothing if no jobids have changed. return # Check if there are more jobs available than before self.update_time = self.getTime() self.status_text = self.UPDATED_TEXT.format(utime=self.update_time) num_jobs = len(self.getAllJobs()) if num_jobs > self.prev_num_jobs: self.new_data_available.emit() self.prev_num_jobs = num_jobs
[docs] def getAllJobs(self): """ Get all jobs in the database :rtype: view :return: Each item in the view is a job object for a job in the database """ # Note that getJobs can return an empty list of jobs if called early in # a session before the jobhub manager has updated the jobs list. After # discussions with the jobcontrol team (Python GChat room 12/7/20) it # looks like this is just something we'll have to live with. return self.jobhub_manager.getJobs(jobhub.JobOption.ALL_JOBS).values()
[docs] def getRunningJobIDs(self): """ Get all running jobs :rtype: view :return: Each item in the view is a job ID for a running job """ return self.jobhub_manager.getJobs(jobhub.JobOption.ACTIVE_JOBS).keys()
[docs]class NewJobDirFrame(swidgets.SFrame): """ A collection of widgets that reads in the JobDB and puts jobs in a table that can be selected. It also allows the user to specify a job directory manually. """
[docs] def __init__(self, master, layout=None, dclick_callback=None, show_subjobs=False): """ Create a NewJobDirFrame instance :type master: QWidget :param master: The master panel for this frame, should have a warning method :type layout: QBoxLayout :param layout: The layout to place this frame into :type dclick_callback: callable :param dclick_callback: The function to call when the user double-clicks on a table cell. The function is called with 2 arguments, the first is the cell row, the second is the cell column. :param bool show_subjobs: If False, only show top-level jobs in the table. Note that if programs are specified (see setAllowedPrograms), subjobs of that program type will be shown regardless of this setting. """ self.show_subjobs = show_subjobs self.manager = JobManager.getManager() self.manager.new_data_available.connect(self.newDataAlert) self.master = master self.programs = set() swidgets.SFrame.__init__(self, layout=layout) layout = self.mylayout self.table_rb = swidgets.SRadioButton('Load from job database', command=self.loadToggled, layout=layout, checked=True) # Status update ulayout = swidgets.SHBoxLayout(layout=layout) self.update_button = swidgets.SPushButton( 'Refresh', layout=ulayout, command=self.loadJobsIntoTable) self.update_label = swidgets.SLabel(self.manager.status_text, layout=ulayout) ulayout.addStretch() # Job database table tlayout = swidgets.SHBoxLayout(layout=layout, indent=True) self.table = QtWidgets.QTableWidget() table_headers = [ 'Job Name', 'Launched', 'Status', 'Program', 'Directory' ] self.table.setColumnCount(len(table_headers)) self.table.setHorizontalHeaderLabels(table_headers) self.table.setSelectionBehavior(self.table.SelectRows) self.table.setSelectionMode(self.table.SingleSelection) self.table.setSortingEnabled(True) if dclick_callback: self.table.cellDoubleClicked.connect(dclick_callback) self.loadJobsIntoTable() tlayout.addWidget(self.table) # Manual directory browsing self.browse_rb = swidgets.SRadioButton('Manually locate job directory', command=self.loadToggled, layout=layout) blayout = swidgets.SHBoxLayout(layout=layout, indent=True) self.browse_le = swidgets.SLabeledEdit('Path:', stretch=False, layout=blayout) self.browse_button = swidgets.SPushButton('Browse...', command=self.browseDirectory, layout=blayout) self.loadToggled()
[docs] def newDataAlert(self): """ Set the text label to alert user there are new jobs available """ self.update_label.setText(self.manager.NEW_DATA_TEXT)
[docs] def reset(self, load_jobs=True): """ Reset the entire frame :type load_jobs: bool :param load_jobs: Whether the job database should be loaded back into the table after reset """ self.programs = set() self.resetTable() self.table_rb.reset() self.browse_rb.reset() self.browse_le.reset() self.loadToggled() if load_jobs: self.loadJobsIntoTable()
[docs] def resetTable(self): """ Remove all rows from the table """ self.table.setRowCount(0)
[docs] def setAllowedPrograms(self, programs): """ Set the programs whose jobs are allowed to show up in the table :type programs: set :param programs: The strings that show up in the job.Program field for programs whose jobs should show in the dialog. Use None to show all active jobs. """ self.programs = programs
[docs] def readJobsFromDatabase(self): """ Read the jobs from the JobDB database :rtype: list, list :return: Two lists. The first contains the completed jobs, the second currently running jobs. All list items are `schrodinger.job.jobcontrol.Job` objects and the lists are sorted so that the newest jobs appear first """ # Read in all the jobs in the job database and find the jobs of # interest - parse them into running and completed running = [] completed = [] all_jobs = self.manager.getAllJobs() all_running_job_ids = set(self.manager.getRunningJobIDs()) self.update_label.setText(self.manager.status_text) if self.programs: jobs = [] for job in all_jobs: try: if job.Program in self.programs: jobs.append(job) except AttributeError: # Some jobs may not have a program set (MATSCI-1713) pass else: if self.show_subjobs: jobs = all_jobs else: jobs = [x for x in all_jobs if not x.ParentJobId] for ajob in jobs: try: exists = os.path.exists(ajob.Dir) except AttributeError: # It is possible for jobs to not have a Dir attribute if they # are not fully fleshed out yet exists = False if not exists: continue if ajob.JobId in all_running_job_ids: running.append(ajob) else: completed.append(ajob) # Sort all the jobs by reversed Launch Time running.sort(reverse=True, key=lambda x: x.LaunchTime) completed.sort(reverse=True, key=lambda x: x.LaunchTime) return running, completed
[docs] @qtutils.wait_cursor def loadJobsIntoTable(self): """ Load all the desired jobs from the job database into the table """ # Turn off sorting because adding data to a table with sorting on will # mess up the rows self.table.setSortingEnabled(False) self.resetTable() # Standard item for the table - used to clone new items base_item = QtWidgets.QTableWidgetItem() base_item.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled) def set_table_text(row, column, text, jobid=None): """ Put new text in the table in row, column :type row: int :param row: The row of the table cell :type column: int :param column: the column of the table cell :type text: str :param text: The text to insert into row, column :type jobid: str :param jobid: The job id of the job for this row """ item = QtWidgets.QTableWidgetItem(base_item) item.setText(text) if jobid: item.setData(Qt.UserRole, jobid) self.table.setItem(row, column, item) running, completed = self.readJobsFromDatabase() # Put all the jobs in the table, running first, then completed for row, ajob in enumerate(running + completed): self.table.insertRow(row) set_table_text(row, 0, ajob.Name, jobid=ajob.job_id) set_table_text(row, 1, ajob.LaunchTime) set_table_text(row, 2, ajob.Status) try: set_table_text(row, 3, ajob.Program) except AttributeError: set_table_text(row, 3, "") set_table_text(row, 4, ajob.Dir) header = self.table.horizontalHeader() header.resizeSections(header.ResizeToContents) self.table.setSortingEnabled(True)
[docs] def loadToggled(self): """ Whether to load from the job table or a manual directory has been toggled - react to that """ browsing = self.browse_rb.isChecked() if browsing: sranges = self.table.selectedRanges() for srange in sranges: self.table.setRangeSelected(srange, False) self.table.setEnabled(not browsing) self.browse_le.setEnabled(browsing) self.browse_button.setEnabled(browsing)
[docs] def getCurrentJobDir(self): """ Get the currently selected job directory and the associated job if applicable :rtype: (str, `schrodinger.job.jobcontrol.Job` or (str, None) or (None, None) :return: The path to the selected job directory and if possible, the associated Job object. If the user specified a job directory manually, the Job object will be None. If no job directory has been specified, the return value is (None, None) """ if self.browse_rb.isChecked(): # Manual directory path = str(self.browse_le.text()) job = None if not path: raise RuntimeError('A directory must be specified manually.') else: # Read a directory from the job database path = "" for item in self.table.selectedItems(): if item.column() == 0: jobid = str(item.data(Qt.UserRole)) job = self.manager.jobhub_manager.getJob(jobid) path = job.Dir break if not path: raise RuntimeError('Please select a job in the table to load.') return path, job
[docs] def browseDirectory(self): """ Allow the user to browse to a new directory via file dialog """ job_dir = filedialog.get_existing_directory(id='jobdirdlg') if not job_dir: return self.browse_le.setText(job_dir)
[docs] def setWaitCursor(self, app_wide=True): """ Set the cursor to the wait cursor. This will be an hourglass, clock or similar. Call restoreCursor() to return to the default cursor. If 'app_wide' is True then it will apply to the entire application (including Maestro if running there). If it's False then it will apply only to this panel. Added for the wait_cursor decorator """ if app_wide: QtWidgets.QApplication.instance().setOverrideCursor( QtGui.QCursor(QtCore.Qt.WaitCursor)) else: self.setCursor(QtGui.QCursor(QtCore.Qt.WaitCursor))
[docs] def restoreCursor(self, app_wide=True): """ Restore the application level cursor to the default. If 'app_wide' is True then if will be restored for the entire application, if it's False, it will be just for this panel. Added for the wait_cursor decorator """ if app_wide: QtWidgets.QApplication.instance().restoreOverrideCursor() else: self.setCursor(QtGui.QCursor(QtCore.Qt.ArrowCursor))
[docs]class NewJobDirDialog(swidgets.SDialog): """ SDialog to allow the user to read in information for a new job. Should be created with the user_accept_function parameter set to a function that accepts (path, job=job) api, where path is the path to the job directory and job is a Job object if available. """
[docs] def __init__(self, *args, show_subjobs=False, **kwargs): """ Create a NewJobDirDialog instance :param bool show_subjobs: If False, only show top-level jobs in the table. Note that if programs are specified (see setAllowedPrograms), subjobs of that program type will be shown regardless of this setting. All other arguments are passed to the parent class """ self.show_subjobs = show_subjobs super().__init__(*args, **kwargs)
def __getattr__(self, attribute): """ Pass on any requests for unknown attributes to the job loader. If the attribute is still unknown, raise an AttributeError for this class """ try: return getattr(self.jdir_loader, attribute) except AttributeError: raise AttributeError('%s has no attribute %s' % (self.__class__.__name__, attribute))
[docs] def layOut(self): """ Lay out the widgets for the dialog We use a wait cursor because searching the job database may take a few seconds (or more) """ layout = self.mylayout self.jdir_loader = NewJobDirFrame(self, layout, dclick_callback=self.accept, show_subjobs=self.show_subjobs) size = QtCore.QSize(800, 400) self.resize(size)
[docs] def accept(self): """ The user has pressed the Accept button. Call the user_accept_function with the chosen job information. """ try: path, job = self.jdir_loader.getCurrentJobDir() except RuntimeError as msg: self.warning(str(msg)) return if not path: return self.user_accept_function(path, job=job) return swidgets.SDialog.accept(self)
[docs]class InteractiveFileDownloader(jobutils.FileDownloader): """ Check for files on the server and download them while allowing the user to cancel the processes via a procgress dialog. See parent class for additional information. """ DLG_TITLE = 'Contacting Job Server'
[docs] def __init__(self, head, *args, **kwargs): """ Create an InteractiveFileDownloader instance :param `QtWidgets.QWidget` head: The parent widget for dialogs posted by this class """ self.head = head super().__init__(*args, **kwargs) self.dialog = None
[docs] def listAvailableFiles(self, *args, **kwargs): """ Find all files available on the server for the given job. Posts a dialog during the server query to allow the user to cancel the process. If the user cancels the process, and empty list is returned. See the parent method for additional information """ kwargs['wait'] = False super().listAvailableFiles(*args, **kwargs) # Post a dialog to allow the user to cancel text = 'Getting available file information from server' dialog = mswidgets.ProcessBusyDialog(text, self.process, self.head, title=self.DLG_TITLE) code = dialog.activate() # Check for cancellation/errors if code == dialog.KILLED: return self.available_filenames elif code != dialog.SUCCESS: self.raiseProcessError() return self.parseAvailableFiles()
[docs] def downloadFilesToDir(self, job, filenames, dirname): """ Download a series of files from the server. The dialog will remain open during the entire duration but will update text to inform the user which file is currently downloading. Use of this function versus looping over downloadJobFileToTemp avoids the dialog flickering and constantly taking focus. :param `jobconrol.Job` job: The job the files belong to :param list filenames: The list of file names to download :param str dirname: The directory to download the files to :rtype: list or None :return: filenames is returned if everything completes successfully, otherwise None is returned. :raise `jobutils.FileDownloadError`: If an error occurs while downloading """ total = len(filenames) for index, name in enumerate(filenames, 1): text = f'Downloading file {index} of {total} from server...' temp_path = os.path.join(dirname, name) persistent = index != total full_path = self.downloadJobFileToTemp(text, job, name, persistent=persistent, temp_path=temp_path) if full_path is None: # User killed self.cleanFiles() return None return filenames
[docs] def downloadJobFileToTemp(self, text, *args, persistent=False, **kwargs): """ Download the requested file from the server. Posts a dialog during the download to allow the user to cancel the process. If the user cancels the process, None is returned. :param bool persistent: If False, the dialog is closed when the download completes. If True, the dialog remains open for additional use - see downloadManyFilesToDirectory for instance. It is up to the caller to handle closing the dialog by eventually calling self.dialog.accept or calling this function with persistent=False. See the parent method for additional information """ kwargs['wait'] = False super().downloadJobFileToTemp(*args, **kwargs) # Post a dialog to allow the user to cancel if self.dialog: dialog = self.dialog dialog.setNewData(text, self.process) else: dialog = mswidgets.ProcessBusyDialog(text, self.process, self.head, title=self.DLG_TITLE) code = dialog.activate(persistent=persistent) success = code == dialog.SUCCESS # Keep a handle to the dialog if we will be re-using it if persistent and success: self.dialog = dialog else: if persistent: dialog.accept() self.dialog = None # Check for cancellation/errors if not success: fileutils.force_remove(self.out_filename) self.out_filename = None if code != dialog.KILLED: self.raiseProcessError() return self.out_filename
[docs]class DownloadingMonitorMixin: """ A mixin class for panels that monitor running jobs and have to download files from running job server jobs in order to function """ DOWNLOAD_FILE_ENDINGS = [] ALWAYS_REDOWNLOAD = []
[docs] def __init__(self, *args, **kwargs): """ Create a Downloading Monitor Mixin object """ super().__init__(*args, **kwargs) # Start the job manager so it gets a list of jobs asap start_job_manager() self.downloader = InteractiveFileDownloader(self) self.downloaded_files = set() self.current_path = None
[docs] def downloadJobFiles(self, job): """ Download available files from a running Job Server job :param `jobcontrol.Job` job: The job to download files for :rtype: str or None :return: The path to the directory where files were downloaded, or None if the process failed or was killed by the user :raise `jobutils.FileDownloadError` if a process fails """ # Make a directory to hold the files if this is the first download for # this job if self.current_path is None: with fileutils.tempfilename() as dirname: # Get a temp directory name pass os.makedirs(dirname) self.downloader.temp_directories.append(dirname) else: dirname = self.current_path if not self.DOWNLOAD_FILE_ENDINGS: # Nothing will ever qualify as downloadable return dirname # Find the list of all files that can be downloaded available_path_data = self.downloader.listAvailableFiles(job) if not available_path_data: # User may have killed the job after some files downloaded self.downloader.cleanFiles() return None # Filter the list down to just those we need for the viewer and haven't # downloaded already to_download = [] for pdata in available_path_data: for ending in self.DOWNLOAD_FILE_ENDINGS: if pdata.match(ending): if (any(pdata.match(x) for x in self.ALWAYS_REDOWNLOAD) or pdata.name not in self.downloaded_files): to_download.append(pdata.name) downloaded = self.downloader.downloadFilesToDir(job, to_download, dirname) if downloaded is not None: self.downloaded_files.update(to_download) return dirname else: return None
[docs] def downloadJobFilesIfNecessary(self, job, path): """ Download required files if this is a running Job Server job :param `jobcontrol.Job` job: The job to check and download files for :param str path: The current path to the job directory :rtype: str :return: The path to the directory the files were downloaded into """ if not job or not jobutils.is_downloadable_job_server_job(job): return path try: path = self.downloadJobFiles(job) except jobutils.FileDownloadError as err: self.error(f'Unable to download files: {str(err)}') return return path
[docs] def myResetMethod(self): """ Reset data """ self.downloader.cleanFiles() self.downloaded_files = set() self.current_path = None