Source code for schrodinger.ui.qt.appframework2.jobs

"""
Job runners are subclasses of task runners. See the tasks module for more
general information.
"""

import copy
import enum
import os

import schrodinger
from schrodinger import project
from schrodinger.infra import jobhub
from schrodinger.job import jobcontrol
from schrodinger.job import jobwriter
from schrodinger.Qt import QtCore
from schrodinger.ui.qt.appframework2 import tasks
from schrodinger.ui.qt.appframework2.validation import validator
from schrodinger.utils import fileutils

maestro = schrodinger.get_maestro()

SCHRODINGER_VAR = '${SCHRODINGER}'
CHDIR_MUTEX = QtCore.QMutex()
CHDIR_LOCKED_TEXT = ('Another panel is submitting a job. Please wait a few '
                     'seconds and try again.')

RunMode = enum.Enum('RunMode', ('START', 'WRITE', 'STU'))


[docs]class JobOptions(object): """ A simple class for storing the options for a particular job runner. These options pertain to a runner, not to a specific job. For example, the host parameter only determines whether a host can be specified for this job type, not which host to use for any particular run. """
[docs] def __init__(self): self.incorporation = True self.default_disp = 'ignore' self.host = True self.cpus = False self.gpus = False self.create_job_dir = True
[docs] def toDict(self): settings = {} for name in self.__dict__: settings[name] = getattr(self, name) return settings
[docs]class JobConfig(object): """ Holds the standard configuration settings for a particular job run. The job options determine what settings are available. """
[docs] def __init__(self, job_options): """ :param job_options: the options for the associated job type. :type job_options: JobOptions """ self.disp = job_options.default_disp self.job_options = job_options self.jobname = '' self.host = 'localhost' self.cpus = 1 self.viewname = 'job'
[docs] def applySettings(self, settings): for alias, value in settings.items(): if not hasattr(self, alias): raise KeyError('Alias not found: %s' % alias) setattr(self, alias, value)
[docs] def hostFlag(self): if self.job_options.cpus: return ['-HOST', '%s:%s' % (self.host, self.cpus)] else: return ['-HOST', self.host]
[docs] def dispFlag(self): if not maestro: return [] return ['-DISP', self.disp]
[docs] def projFlag(self): if not maestro: return [] try: pt = maestro.project_table_get() except project.ProjectException: return [] return ['-PROJ', pt.project_name]
[docs] def viewnameFlag(self): if not maestro: return [] return ['-VIEWNAME', self.viewname]
[docs] def appendFlags(self, cmdlist, run_mode=RunMode.START): """ Takes a cmdlist and appends the standard configuration flags appropriate for the context. This will depend on whether the intent is to start or write the job and whether maestro is available. :param cmdlist: the original command list :type cmdlist: list :param run_mode: which action is being taken - start, write, or STU :type run_mode: RunMode """ # Note: no -HOST flag for STU jobs if self.job_options.host and run_mode != RunMode.STU: cmdlist.extend(self.hostFlag()) if run_mode != RunMode.START: return cmdlist cmdlist.extend(self.projFlag()) cmdlist.extend(self.viewnameFlag()) if self.job_options.incorporation: cmdlist.extend(self.dispFlag()) # Tell job control that launch directory should be removed as well # when removing all job files, PANEL-2164: if self.job_options.create_job_dir: cmdlist.append('-TMPLAUNCHDIR') return cmdlist
[docs] def summaryText(self): """ Generates the text to display in the status bar via the updateStatusText method. """ options = self.job_options text_items = [] if options.host: text = 'Host=' + self.host if options.cpus: text += ':' + str(self.cpus) text_items.append(text) if options.incorporation: text_items.append('Incorporate=' + self.disp) return ', '.join(text_items)
#=============================================================================== # Job Wrapper Classes #===============================================================================
[docs]class JobWrapper(tasks.AbstractTaskWrapper): """ Wraps jobcontrol.Job objects to present a common interface for af2.tasks. See tasks.AbstractTaskWrapper for more information. """ TASK_CLASS = jobcontrol.Job
[docs] def __init__(self, job, settings=None, name='', **kwargs): tasks.AbstractTaskWrapper.__init__(self, job, settings, name, **kwargs) self._status = 'None'
[docs] def isRunning(self): # Preemptive job ID try: job = jobhub.get_cached_job(self._task.job_id) except jobhub.StdException: # Job record is missing, so do not update self._task pass else: self._task = job return not self._task.isComplete()
[docs] def status(self): # Preemptive job ID try: job = jobhub.get_cached_job(self._task.job_id) except jobhub.StdException: # Job record is missing, so do not update self._task pass else: self._task = job if self._task.Status == 'completed': return self._task.ExitStatus return self._task.Status
[docs] def jobId(self): return self._task.job_id
[docs]class WrittenJobWrapper(tasks.AbstractTaskWrapper): """ This is basically an empty wrapper for representing a written job. """ # In this case the task is simply the filename of the .sh script TASK_CLASS = str
[docs] def isRunning(self): return False
[docs] def status(self): return 'written'
[docs] def jobId(self): return None
[docs] def filename(self): return self._task
#=============================================================================== # Runner classes #===============================================================================
[docs]class BaseJobRunner(tasks.AbstractTaskRunner): """ A job runner is a type of task runner that performs its task via launching a job under job control. """
[docs] def __init__(self, messaging_callback=None, settings_callback=None): tasks.AbstractTaskRunner.__init__(self, messaging_callback=messaging_callback, settings_callback=settings_callback) self.tasks_by_job_id = {} self.run_mode = None self.orig_dir = '' options = self.jobOptions() self.job_config = JobConfig(options) self.job_config.viewname = self.viewname() self.job_timer = QtCore.QTimer() self.job_timer.setInterval(1000) self.job_timer.timeout.connect(self.update) self.cached_namelist = os.listdir(".")
def _getTakenNames(self): """ Returns the cached contents of the cwd as names that are "taken" in the sense that using them as job names would result in an overwrite. See parent class for more information. """ return self.cached_namelist def _preFlight(self): """ In addition to the parent class _preFlight logic, this will also create the job directory (prompt for overwrite if necessary and offer to abort), and chdir into the job directory. The original directory will be stored as self.orig_dir. self.orig_dir having a value signifies that a chdir has been performed. It is the responsibility of the calling code to restore the orig_dir and set self.orig_dir back to an empty string. """ if tasks.AbstractTaskRunner._preFlight(self): if self.jobOptions().create_job_dir: if not CHDIR_MUTEX.tryLock(): self.warning(CHDIR_LOCKED_TEXT) return False self.orig_dir = os.getcwd() job_dir = self.createJobDir() if not job_dir: return False os.chdir(job_dir) return True return True return False
[docs] def start(self): # See parent class for details self.run_mode = RunMode.START try: success = tasks.AbstractTaskRunner.start(self) finally: self.resetState() if success: self.status('Job started')
[docs] def postProcess(self, task): """ Download job outputs after the job completes. """ super().postProcess(task) jobid = task.jobId() if jobid: job = jobhub.get_cached_job(jobid) job.download()
[docs] def resetState(self): self.run_mode = None if self.orig_dir: os.chdir(self.orig_dir) CHDIR_MUTEX.unlock() self.orig_dir = '' self.stateChanged.emit()
@validator() def validateJobName(self): if not fileutils.is_valid_jobname(self.nextName()): msg = fileutils.INVALID_JOBNAME_ERR % self.nextName() return False, msg return True #=========================================================================== # Monitoring #===========================================================================
[docs] def addTask(self, task): # See parent class for documentation tasks.AbstractTaskRunner.addTask(self, task) # Update the cached file list with the new job dir if self.orig_dir: self.cached_namelist = os.listdir(self.orig_dir) else: self.cached_namelist = os.listdir(".") jobid = task.jobId() if jobid: self.tasks_by_job_id[jobid] = task
[docs] def findJob(self, jobid): """ Finds a wrapped job by its job id. :param jobid: the job id :type jobid: str :return: the job :rtype: JobWrapper """ return self.tasks_by_job_id.get(jobid)
[docs] def viewname(self): return str(self)
[docs] def update(self): """ Slot method to update the state of the job runner. """ self._update() if not self.active_tasks: self.job_timer.stop() self.stateChanged.emit()
#=========================================================================== # Job writing #===========================================================================
[docs] def write(self): """ Call this to write out a job to be run later. """ self.startRequested.emit() self.run_mode = RunMode.WRITE job_dir = None try: if not self._preFlight(): self.startFailed.emit() return False job_dir = os.getcwd() cmdlist = self._makeCmdList() filename = self._writeCmd(cmdlist) if not filename: self.startFailed.emit() return False finally: self.resetState() if job_dir is None: return False task = WrittenJobWrapper(filename, settings=self.settings()) self.addTask(task) self.taskStarted.emit(task) self.stateChanged.emit() self.status('Job written to {}'.format(job_dir)) return True
def _writeCmd(self, cmdlist): """ Writes the given cmd to disk. :param cmdlist: :type cmdlist: """ filename = self.getSHFilename() cmd = cmdlist_to_cmd(cmdlist) with open(filename, 'w') as f: f.write(cmd) set_sh_file_flags(filename) return filename # TODO: make locale independent
[docs] def getSHFilename(self): """ Returns the .sh filename for the next job. """ return os.path.join(self.nextJobDir(), self.nextName() + '.sh')
[docs] def writeSTU(self): """ Writes out a STU test set. """ self.run_mode = RunMode.STU os.chdir(self.nextJobDir()) try: if not self._preFlight(): return False cmdlist = self._makeCmdList() self._writeSTU(cmdlist) finally: self.resetState()
def _writeSTU(self): pass #=========================================================================== # Job Directory #===========================================================================
[docs] def createJobDir(self): dirname = self.nextJobDir() if os.path.exists(dirname): qtext = ('The job directory, %s, already exists.\nWould you like ' 'to delete its contents and continue?' % dirname) overwrite = self.question(qtext, caption='Overwrite contents?') if not overwrite: return False fileutils.force_rmtree(dirname) os.mkdir(dirname) return dirname
[docs] def nextJobDir(self): if self.orig_dir: # We've chdir'ed into the jobdir already base_dir = self.orig_dir else: base_dir = os.getcwd() if self.jobOptions().create_job_dir: return os.path.join(base_dir, self.nextName()) return base_dir
#=========================================================================== # Forming the command #===========================================================================
[docs] def makeSchrodingerCmd(self, *args): """ Builds a $SCHRODINGER command string from all the args passed in. The resulting string is suitable for use in a cmdlist and formatted for use in starting or writing the job depending on self.run_mode. In START mode, the $SCHRODINGER environment variable will be expanded. In WRITE mode, it will stay as $SCHRODINGER and the path will be delimited with linux-style forward slashes. Example: self.makeSchrodingerCmd('utilities', 'my_utility') will return "${SCHRODINGER}/utilities/my_utility" in WRITE mode. """ if self.run_mode == RunMode.START: schrodinger_path = os.environ['SCHRODINGER'] return os.path.join(schrodinger_path, *args) else: return SCHRODINGER_VAR + '/' + '/'.join(args)
[docs] def getSchrodingerRun(self): """ Returns the correct version of the $SCHRODINGER/run string. This will depend on whether the intent is to start or to write the job. """ return self.makeSchrodingerCmd('run')
def _makeCmdList(self): """ Gets the cmdlist and appends the standard flags. This will also do an explicit str cast on every element of the cmdlist. """ cmdlist = self.makeCmdList() cmdlist = self.appendConfigFlags(cmdlist) return [ str(item) if isinstance(item, str) else item for item in cmdlist ]
[docs] def makeCmdList(self): """ Implement this to generate a cmdlist. This cmdlist will be used for write functionality. """ raise NotImplementedError()
#=========================================================================== # Job options - options for job runner #===========================================================================
[docs] def setupJobOptions(self, options): """ Override this to set the job options for this job. The options is passed in by the framework. Modify and return the options object. The options object will determine, for example, what the config dialog should look like. For example:: options.incorporation = False options.create_job_dir = False return options If this method is not overridden, default options will be used. :param options: the options object to be customized. :type options: JobOptions """ return options
[docs] def jobOptions(self): options = JobOptions() return self.setupJobOptions(options)
#=========================================================================== # Job config - options for launching the next job #===========================================================================
[docs] def setConfig(self, config): self.job_config = config if config.jobname != self.nextName(): self.setCustomName(config.jobname) self.updateStatusText()
[docs] def getNextConfig(self): config = self.job_config config.jobname = self.nextName() config.viewname = self.viewname() # Make a copy so that modifying the return value will not affect the # runner directly. return copy.deepcopy(config)
[docs] def appendConfigFlags(self, cmdlist): config = self.getNextConfig() return config.appendFlags(cmdlist, run_mode=self.run_mode)
[docs] def setCustomName(self, name): tasks.AbstractTaskRunner.setCustomName(self, name) self.job_config.jobname = self.nextName()
[docs] def updateStatusText(self): config = self.getNextConfig() self.status(config.summaryText(), timeout=0)
[docs]class CmdJobRunner(BaseJobRunner): """ This is the basic job runner for setting up and running a cmd line job. The job is launched using jobcontrol.launch_job(). """ def _launchCmd(self, cmd): return jobcontrol.launch_job(cmd) def _start(self): try: cmd = self._makeCmdList() job = self._launchCmd(cmd) except: self.startFailed.emit() raise self.job_timer.start() task = JobWrapper(job, settings=self.settings()) if maestro and job: maestro.job_started(job.JobId) return task
[docs] def makeCmdList(self): """ This is the main method that needs to be implemented to define a specific cmd job runner. It should just return a complete cmd list for the job to be launched. Standard job options should be left off. """ raise NotImplementedError()
#=============================================================================== # Job-related Utility Functions #===============================================================================
[docs]def cmdlist_to_cmd(cmdlist): """ Converts a command list to a command string. Don't do this if you can possibly avoid it. :param cmdlist: a list of commands :type cmdlist: list :return: str """ return jobwriter.cmdlist_to_cmd(cmdlist)
[docs]def set_sh_file_flags(filename): jobwriter.set_sh_file_flags(filename)
[docs]def get_first_hostname(host): """ Given a host string, get the corresponding hosts list from jobcontrol and return the first hostname from that list. :type host: string :param host: Hosts string which determine the string value of result server argument. These are values usually from configuration dialog and is of the form "galina" or "galina:1" or "galina,monica" or "galina:2,monica:3" or "galina monica" or "galina:2 monica:3". :rtype: str :return: Hostname based on the first of the first hosts in the jobcontrol list. """ # FIXME Multiple hosts are being deprecated per JOBCON-4605 # This function should be removed once that is implemented. hosts = jobcontrol.host_str_to_list(host) if len(hosts): return hosts[0][0] return None
[docs]def job_belongs_to_panel(jobid, viewname): """ Return True if jobid belongs to viewname of a panel. Used by incorporation callbacks to determine if the job belongs to us. :param jobid: jobid for a given job :type jobid: str :param viewname: viewname corresponding to panel :type viewname: str """ job = jobhub.get_cached_job(jobid) if job.Viewname: return viewname == job.Viewname return False