Source code for schrodinger.tasks.jobtasks

r"""
=================
Job incorporation
=================

If a CmdJobTask's job registers an incorporation file with job control, the file
can be accessed after the task is done via output.incorporation_file.

ComboJobTasks support the option to specify an incorporation file in the backend
of the task using JobOutput. Example::

    class MyJobTask(jobtasks.ComboJobTask):
        output: jobtasks.JobOutput

        def mainFunction(self):
            self.output.incorporation_file = 'foo.maegz'  # In backend

Specifying an incorporation file in the frontend of the task will have no
effect and thus should never be done.
"""
import enum
import os
import typing
from typing import List

import schrodinger
from schrodinger.job import jobcontrol
from schrodinger.job import jobhandler
from schrodinger.job import jobwriter
from schrodinger.job import launchapi
from schrodinger.job import launchparams
from schrodinger.models import json
from schrodinger.models import jsonable
from schrodinger.models import parameters
from schrodinger.tasks import _filepaths
from schrodinger.tasks import cmdline
from schrodinger.tasks import hosts
from schrodinger.tasks import tasks
from schrodinger.tasks.hosts import strip_gpu_from_localhost
# Imported so other modules can access tasks features here.
from schrodinger.tasks.tasks import AFTER_TASKDIR  # noqa: F401
from schrodinger.tasks.tasks import AUTO_TASKDIR  # noqa: F401
from schrodinger.tasks.tasks import BEFORE_TASKDIR
from schrodinger.tasks.tasks import FINISHED_STATUSES  # noqa: F401
from schrodinger.tasks.tasks import TEMP_TASKDIR  # noqa: F401
from schrodinger.tasks.tasks import TaskFailure  # noqa: F401
from schrodinger.tasks.tasks import TaskFile
from schrodinger.tasks.tasks import TaskFolder  # noqa: F401
from schrodinger.tasks.tasks import TaskKilled
from schrodinger.tasks.tasks import postprocessor  # noqa: F401
from schrodinger.tasks.tasks import preprocessor
from schrodinger.utils import fileutils
from schrodinger.utils import mmutil

maestro = schrodinger.get_maestro()


[docs]def is_jobtask(task): """ Utility function to check if an object is a jobtask. """ return isinstance(task, _AbstractJobMixin)
#=============================================================================== # Job Config - Hosts #===============================================================================
[docs]class AllowedHostTypes(jsonable.JsonableEnum): CPU_ONLY = enum.auto() GPU_ONLY = enum.auto() CPU_AND_GPU = enum.auto()
[docs]class HostParam(parameters.Param): DataClass = hosts.Host
[docs] def __init__(self, default_value='localhost', allowed_types=AllowedHostTypes.CPU_ONLY, *args, **kwargs): self.allowed_types = allowed_types super().__init__(default_value, *args, **kwargs)
[docs]def get_hosts(): return hosts.get_hosts(excludeGPGPUs=False)
[docs]def get_default_host(allowed_host_types=AllowedHostTypes.CPU_AND_GPU): """ Gets the default host for a job to run on, which will be the jobhost if this function is called from within a jobcontrol backend, or the localhost otherwise. If the specified host type is GPU_ONLY, and the localhost and jobhost don't have GPUs, then the returned host will be the first gpu-enabled host returned from `get_hosts()`. """ default_host = hosts.get_host_by_name('localhost') if allowed_host_types is AllowedHostTypes.GPU_ONLY and not default_host.num_gpus: default_host = next((host for host in get_hosts() if host.num_gpus), None) return default_host
""" NOT_SUPPORTED is a sentinel that can be used to signal that a host or subjob are not supported for a particular host settings. For example:: class MyJobConfig(JobConfig): host_settings = HostSettings(allowed_host_types=NOT_SUPPORTED) or: class MyJobConfig(JobConfig): host_settings = HostSettings(num_subjobs=NOT_SUPPORTED) """ NOT_SUPPORTED = None
[docs]class HostSettings(parameters.CompoundParam): """ :cvar HOST_PLACEHOLDER_ARGS: Placeholders to pass into the -HOST argument when no host is available based on the currently allowed host types. """ host = HostParam() num_subjobs: int = None allowed_host_types: AllowedHostTypes = AllowedHostTypes.CPU_ONLY CPU_PLACEHOLDER_ARG = '<CPU-host-placeholder>' GPU_PLACEHOLDER_ARG = '<GPU-host-placeholder>' CPU_AND_GPU_PLACEHOLDER_ARG = '<host-placeholder>' HOST_PLACEHOLDER_ARGS = { AllowedHostTypes.CPU_ONLY: CPU_PLACEHOLDER_ARG, AllowedHostTypes.GPU_ONLY: GPU_PLACEHOLDER_ARG, AllowedHostTypes.CPU_AND_GPU: CPU_AND_GPU_PLACEHOLDER_ARG }
[docs] def toCmdArg(self): if self.host is None: return self.getHostPlaceholderArg() host_arg = self.host.name if self.num_subjobs is not None: host_arg += ':' + str(self.num_subjobs) return strip_gpu_from_localhost(host_arg)
[docs] def initializeValue(self): if self.allowed_host_types is not None: self.host = get_default_host(self.allowed_host_types)
[docs] @json.adapter(version=55039) def adapter55040(self, json_dict): if json_dict['host'] is not None: json_dict['host'] = json_dict['host']['name'] return json_dict
[docs] def toJsonImplementation(self): json_dict = super().toJsonImplementation() if self.host is not None: json_dict['host'] = self.host.name return json_dict
[docs] @classmethod def fromJsonImplementation(cls, json_dict): host_name = json_dict.pop('host') host_settings = super().fromJsonImplementation(json_dict) if host_name: host_settings.host = hosts.get_host_by_name(host_name) else: host_settings.host = None return host_settings
[docs] def getHostPlaceholderArg(self) -> str: """ Return the host placeholder argument for the currently allowed host types. """ return self.HOST_PLACEHOLDER_ARGS.get(self.allowed_host_types, '')
#=============================================================================== # Job Config - Incorporation #===============================================================================
[docs]class IncorporationMode(jsonable.JsonableEnum): APPEND = 'append' APPENDINPLACE = 'appendinplace' REPLACE = 'replace' IGNORE = 'ignore'
[docs]class IncorporationParam(parameters.EnumParam): DEFAULT_ALLOWED_MODES = ( IncorporationMode.APPEND, IncorporationMode.APPENDINPLACE, IncorporationMode.IGNORE, )
[docs] def __init__(self, *args, allowed_modes=DEFAULT_ALLOWED_MODES, **kwargs): super().__init__(IncorporationMode, *args, **kwargs) self.allowed_modes = allowed_modes
INCORPORATION_MODE_MAP = { IncorporationMode.APPEND: launchparams.ProjectDisposition.APPEND, IncorporationMode.APPENDINPLACE: launchparams.ProjectDisposition.APPENDINPLACE, IncorporationMode.IGNORE: launchparams.ProjectDisposition.IGNORE, } #=============================================================================== # Job Config #===============================================================================
[docs]class JobConfig(parameters.CompoundParam): """ Subclass JobConfig to customize what job settings are available for a given jobtask. To disable an option, set an ordinary (non-param) class variable with value None for that option. Subclasses may add any arbitrary options as desired; it is the responsibility of the task to handle those options. """ viewname: str = None # A default value will be assigned by the task jobname: str host_settings: HostSettings incorporation = None # Override with an IncorporationParam to enable @json.adapter(version=52085) def _adaptHost(self, json_dict): """ In 52085, we switched from defining host on the toplevel of jobconfig to inside a HostSettings param. """ json_dict['host_settings'] = {'host': json_dict.pop('host')} return json_dict
[docs]def job_config_factory(allowed_host_types=AllowedHostTypes.CPU_ONLY, default_incorp_mode=None, supports_subjobs=False, viewname=None): """ Generate JobConfig objects with typical options. :param allowed_host_types: Whether this job accepts cpu hosts, gpu hosts, or both. Pass None to disable remote hosts (always run on localhost) :type allowed_host_types: AllowedHostTypes or None :param default_incorp_mode: The default disposition. Pass None for jobs that do not incorporate at all. :type default_incorp_mode: IncorporationMode or None :param supports_subjobs: whether this job can be split into subjobs :type supports_subjobs: bool :param viewname: what viewname should be used for this type of job :type viewname: str or None """ # Have to give `viewname` a different variable name before assigning it # inside of `NewJobConfig` otherwise you get a weird NameError. default_viewname = viewname default_allowed_host_types = allowed_host_types class NewJobConfig(JobConfig): if default_incorp_mode is not None: incorporation = IncorporationParam(default_incorp_mode) viewname: str = default_viewname host_settings = HostSettings( allowed_host_types=default_allowed_host_types) def initializeValue(self): super().initializeValue() if not allowed_host_types: self.host_settings.host = None if supports_subjobs: self.host_settings.num_subjobs = 0 return NewJobConfig()
#=============================================================================== # Job Task Execution Mixins #===============================================================================
[docs]class JobOutput(parameters.CompoundParam): """ Base class for jobtask output. """ incorporation_file: TaskFile = None
class _CmdJobTaskOutput(JobOutput): """ Base class for CmdJobTask output. All params defined thus far are intended to be read-only, as CmdJobTask sets them during postprocessing. Any new params defined by subclasses are safe to use as normal. """ output_files: List[TaskFile] log_file: TaskFile = None
[docs]class SetJobRuntimeError(RuntimeError): """ An error while trying to a set a job on a task. """
class _AbstractJobMixin(parameters.CompoundParamMixin): """ Base class for running tasks via job control. Child classes must be mixed in with `AbstractCmdTask`. """ # If PROGRAM_NAME is not overridden, it will default to the class name. # Used for specifying the program name field of the job visible in the # job monitor. PROGRAM_NAME = None job_config: JobConfig input: parameters.CompoundParam job_id = parameters.NonParamAttribute() _use_async_jobhandler: bool = False @json.adapter(50002) def configToJobConfigAdapter(cls, json_dict): json_dict['job_config'] = json_dict.pop('config') return json_dict @classmethod def configureParam(cls): """ @overrides: parameters.CompoundParam """ super().configureParam() cls.setReference(cls.name, cls.job_config.jobname) def initializeValue(self): """ @overrides: paramters.CompoundParam """ super().initializeValue() self._write_mode = False if self.job_config.viewname is None: self.job_config.viewname = type(self).__name__ def inWriteMode(self): return self._write_mode @classmethod def runFromCmdLine(cls): """ @overrides: tasks.AbstractTask """ return cmdline.run_jobtask_from_cmdline(cls) @classmethod def _populateClassParams(cls): """ @overrides: tasks.AbstractTask """ cls._convertNestedClassToDescriptor('JobConfig', 'job_config') super()._populateClassParams() def runCmd(self, cmd): """ @overrides: tasks.AbstractCmdTask """ wrapped_cmd = self._wrapCmd(cmd) self._launchCmd(wrapped_cmd) def runToCmd(self, skip_preprocessing=False): """ Does the same thing as start except it doesn't actually launch the job. Instead it just returns the final job cmd. Intended to be used for running jobtasks on JobDJ, which requires a job cmd rather than a task. """ if not skip_preprocessing: self.runPreprocessing() return self._wrapCmd(self.makeCmd()) def write(self, skip_preprocessing=False): self._write_mode = True try: cmd = self.runToCmd(skip_preprocessing=skip_preprocessing) sh_fname = self.getTaskFilename(self.name + '.sh') jobwriter.write_job_cmd(cmd, sh_fname, self.getTaskDir()) finally: self._write_mode = False def replicate(self): """ @overrides: tasks.AbstractTask """ old_task = self new_task = super().replicate() new_task.job_config.setValue(old_task.job_config) return new_task @property def PROGRAM_NAME(self): return type(self).__name__ def _launchCmd(self, cmd): if self._use_async_jobhandler: JobHandlerClass = jobhandler.AsyncJobHandler else: JobHandlerClass = jobhandler.JobHandler jhandler = JobHandlerClass(cmd, launch_dir=self.getTaskDir(), viewname=self.job_config.viewname) # Need to keep a reference so that jobCompleted signal get emitted: self._jhandler = jhandler jhandler.jobDownloadFailed.connect(self._onJobDownloadFailed) jhandler.jobCompleted.connect(self.__onJobCompletion) jhandler.jobProgressChanged.connect(self._onJobProgressChanged) if self._use_async_jobhandler: jhandler.jobStarted.connect(self._onJobStarted) jhandler.jobLaunchFailed.connect(self._onJobLaunchFailed) jhandler.launchJob() if not self._use_async_jobhandler: self._onJobStarted(jhandler.job) def _onJobStarted(self, job): self.printDebug('job launched:', job.job_id) self.job_id = job.job_id def _getWrappedCmd(self): cmd = self.makeCmd() return self._wrapCmd(cmd) def _wrapCmd(self, cmd): return cmd def _onJobProgressChanged(self, job, steps, total_steps, message): self.progress = steps self.max_progress = total_steps self.progress_string = message def writeStuZipFile(self): # TODO raise NotImplementedError def _onJobDownloadFailed(self, job, error: str): """ Mark the task as failed if the job fails to download """ self.printDebug('job download failed') self._recordFailure(RuntimeError(error)) @typing.final def __onJobCompletion(self): self.printDebug('job completed') with self.guard(): self._onJobCompletion() self._finish() def _onJobCompletion(self): """ Hook for subclasses to customize behavior on job completion """ pass def _onJobLaunchFailed(self, exception): with self.guard(): raise exception if self.failure_info is not None: self.status = self.FAILED def kill(self): """ @overrides: tasks.AbstractTask """ if self.status is not self.RUNNING: raise RuntimeError("Can't kill a task that's not running.") # Need to record failure before calling job.cancel(), because it can # cause jobmanager.jobCompleted to be emitted self._recordFailure(TaskKilled()) try: job = self._jhandler.job job.cancel() if schrodinger.get_maestro(): # Non-GUI-blocking wait for jobComplete signal self.wait() else: job.wait() finally: self._finish() def stop(self): if self.status is not self.RUNNING: raise RuntimeError("Can't stop a task that's not running.") # Need to record failure before calling job.stop(), because it can # cause jobmanager.jobCompleted to be emitted self._recordFailure(TaskKilled()) try: job = self._jhandler.job if mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER): job.stop() else: job.kill() if schrodinger.get_maestro(): # Non-GUI-blocking wait for jobComplete signal self.wait() else: job.wait() finally: self._finish() def setJob(self, job: jobcontrol.Job): """" Use given `jobcontrol.Job` to incorporate job results into the task and run postprocessors. Example:: task = FooTask() cmd = task.runToCommand() job = jobcontrol.launch_job(cmd) job.wait() task.setJob(job) If the job has not been downloaded, the task will be set to FAILED with a SetJobRuntimeError. :param job: `jobcontrol.Job` with results to incorporate into the task. """ self._jhandler = jobhandler.JobHandler([]) self._jhandler.job = job self.specifyTaskDir(job.Dir) self.name = job.Name with self.guard(): if not job.isDownloaded(): err_msg = ("Job has not been downloaded. Check job for " f"further info: {self._jhandler.job.JobId}") raise SetJobRuntimeError(err_msg) self.__onJobCompletion()
[docs]class JobBackendCmdMixin(_AbstractJobMixin): """ Base class for running backends that already support job control. Combine with an AbstractCmdTask. To use, override `makeCmd`. """
[docs] def __init__(self, *args, cmd_list=None, **kwargs): super().__init__(*args, **kwargs) if cmd_list is not None: # If this turns out to be too restrictive, feel free to remove it. assert self.__class__ is CmdJobTask self._cmd_list = cmd_list
[docs] def makeCmd(self): """ @overrides: tasks.AbstractCmdTask Child classes must override. """ if self._cmd_list is not None: return self._cmd_list else: return []
def _wrapCmd(self, cmd): """ @overrides: tasks._AbstractJobMixin """ cmd.extend(['-JOBNAME', self.name]) if self.job_config.host_settings.host is not None: cmd.extend(['-HOST', self.job_config.host_settings.toCmdArg()]) if self.job_config.incorporation is not None and not self.inWriteMode(): cmd.extend(['-DISP', self.job_config.incorporation.value]) return cmd def _onJobCompletion(self): """ @overrides: _AbstractJobMixin """ if not self._jhandler.job.succeeded(): self._recordFailure(RuntimeError("Job returned nonzero exit code.")) super()._onJobCompletion()
class _LaunchAPIMixin(_AbstractJobMixin): """ Base class for running python code under job control by wrapping with launchapi. Combine with an AbstractCmdTask. To use, override `makeCmd`. """ _input_files: list _output_files: list def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._license_reservations = {} def runCmd(self, cmd): """ @overrides: tasks._AbstractJobMixin Override runCmd to set SCHRODINGER_COMMANDLINE which is ultimately used to set the job.Commandline attribute. Without setting SCHRODINGER_COMMANDLINE ourselves, jobcontrol reports an incorrect command. .. WARNING:: There's a potential race condition when using asynchronous job launching which may result in subjobs with incorrect Commandlines. """ old_SCHRODINGER_COMMANDLINE = os.environ.get('SCHRODINGER_COMMANDLINE', None) os.environ['SCHRODINGER_COMMANDLINE'] = ' '.join(cmd) try: return super().runCmd(cmd) finally: os.environ.pop('SCHRODINGER_COMMANDLINE') if old_SCHRODINGER_COMMANDLINE is not None: os.environ[ 'SCHRODINGER_COMMANDLINE'] = old_SCHRODINGER_COMMANDLINE def addLicenseReservation(self, license, num_tokens=1): """ Add a license reservation for this job. This information is used by job control to ensure the job is only started once the required licenses become available. In a preprocessor, (i.e. before launching the backend), a reservation should be added for each license that will be checked out directly by that backend. Example:: class GlideTask(ComboJobTask): @preprocessor def _reserveGlideLicense(self): # Reserve a Glide license. self.addLicenseReservation(license.GLIDE_MAIN) def mainFunction(self): # Check out the Glide license lic = license.License(license.GLIDE_MAIN) # ... Do computations requiring Glide ... lic.checkin() Licenses that will be checked out by subjobs of this job do not need reservations added here; subjobs are responsible for their own license reservations. :param license: a license that will be used by the backend :type license: module-constant from schrodinger.utils.license (e.g. license.AUTODESIGNER) :param num_tokens: number of tokens for this license reservations :type num_tokens: int """ self._license_reservations[license] = num_tokens def _addInputFile(self, filename): """ Register the given file with job control as an input file, so that it gets copied to the job directory when the task starts. Note: Only meant to be used within `jobtasks.py` :param filename: Input file path. :type filename: str """ self._input_files.append(filename) def _addOutputFile(self, filename): """ Register the given file with job control as an output file, so that it gets copied to the launch directory after the tasks completes. Note: Only meant to be used within `jobtasks.py` :param filename: Input file path. :type filename: str """ self._output_files.append(filename) def initConcrete(self): super().initConcrete() self._input_dirs = [] def addInputDirectory(self, directory): """ Add an input directory to be copied over with the job. """ self._input_dirs.append(directory) def wrapCmdInLaunchApi(self, cmd): job_spec = self.makeJobSpecFromCmd(cmd) launch_parameters = self.makeLaunchParams() if self._write_mode: # This is a temporary fix to achieve portability of written jobs # and can be removed after PANEL-19244 with fileutils.chdir(self.getTaskDir()): full_cmd = jobcontrol._get_job_spec_launch_command( job_spec, launch_parameters, write_output=True) else: full_cmd = jobcontrol._get_job_spec_launch_command( job_spec, launch_parameters) return full_cmd def makeJobSpecFromCmd(self, cmd): cmd = list(map(str, cmd)) job_builder = launchapi.JobSpecificationArgsBuilder(cmd) for license, num_tokens in self._license_reservations.items(): job_builder.addLicense(license, num_tokens) logfilename = self._getLogFilename() job_builder.setStderr(logfilename, stream=True) job_builder.setStdout(logfilename, stream=True) for filename in self._input_files: job_builder.setInputFile(filename) for dir in self._input_dirs: job_builder.setInputDirectory(dir) self._output_files = list(set(self._output_files)) for filename in self._output_files: job_builder.setOutputFile(os.path.basename(filename)) job_builder.setProgramName(self.PROGRAM_NAME) return job_builder.getJobSpec() def makeLaunchParams(self): launch_parameters = launchparams.LaunchParameters() if self.job_config.host_settings.host is not None: launch_parameters.setHostname( self.job_config.host_settings.host.name) launch_parameters.setJobname(self.name) if self.job_config.host_settings.num_subjobs is not None: launch_parameters.setNumberOfSubjobs( self.job_config.host_settings.num_subjobs) if maestro and self.job_config.incorporation is not None and not \ self.inWriteMode(): proj_disp = INCORPORATION_MODE_MAP[self.job_config.incorporation] launch_parameters.setMaestroProjectDisposition(proj_disp) pt = maestro.project_table_get() launch_parameters.setMaestroProjectName(pt.project_name) return launch_parameters def _wrapCmd(self, cmd): """ @overrides: _AbstractJobMixin """ return self.wrapCmdInLaunchApi(cmd) def _getLogFilename(self): return self.name + '.log' def getLogAsString(self) -> str: log_fn = self.getTaskFilename(self._getLogFilename()) if not os.path.isfile(log_fn): return f'Log file not found: {log_fn}' with open(log_fn) as log_file: return log_file.read() @json.adapter(version=55013) def adapter55013(self, json_dict): json_dict['_input_files'] = json_dict.pop('input_files') json_dict['_output_files'] = json_dict.pop('output_files') return json_dict @json.adapter(version=56046) def _adaptIncorporationFile(self, json_dict): """ In 56046, we switched from defining incorporation_file as a top-level param to only allowing it inside of `output`. """ if 'incorporation_file' in json_dict.keys(): json_dict['output'] = { 'incorporation_file': json_dict.pop('incorporation_file') } return json_dict
[docs]class ComboJobMixin(_LaunchAPIMixin): """ Base class for running python code using the "combo" task pattern. Combine with AbstractComboTask. To use, define: mainFunction (or, equivalently backendMain): the python function that will be executed in the backend process under job control. """ # Jobtask-specific params to serialize in frontend/backend conversions _JOBTASK_SERIALIZATION_PARAMS = [ 'job_config', '_use_async_jobhandler', '_input_files', '_output_files' ]
[docs] def setJob(self, job: jobcontrol.Job): for filename in job.OutputFiles: if filename.endswith('_backend.json') and filename.startswith('.'): _, self._combo_id, _ = filename.rsplit('_', maxsplit=2) super().setJob(job)
[docs] def write(self, skip_preprocessing=False): try: super().write(skip_preprocessing) finally: self._regenerateComboId()
def _writeFrontendJsonFile(self): """ @overrides: AbstractComboTask """ self._addInputFile(_filepaths.get_launch_path(self.json_filename)) self._addOutputFile(os.path.basename(self.json_out_filename)) def register_input(path, launchdir): if os.path.isdir(path): dummy_file = os.path.join(path, ".dummy_file") with open(dummy_file, 'w'): pass self.addInputDirectory(path) else: self._addInputFile(path) return path # Need to do this in preprocessing for now. In PANEL-19244 we will # register files in the launchapi layer so we can remove this. self._processTaskFilesForFrontendWrite() self._assertTaskFileExistence(self.input) self._processTaskFiles(self.input, process_func=register_input) super()._writeFrontendJsonFile() def _copyScriptToBackend(self): script_filename = super()._copyScriptToBackend() self._addInputFile(script_filename) return script_filename
[docs] def makeCmd(self): """ @overrides: tasks.AbstractCmdTask Child classes must not override this method. """ cmd = super().makeCmd() assert cmd[0] == 'run' cmd.pop(0) return cmd
def _getFrontEndJsonArg(self): arg = super()._getFrontEndJsonArg() return _filepaths.get_launch_path(arg)
[docs] def isBackendMode(self): is_backend_mode = self._run_as_backend return is_backend_mode
[docs] def getTaskDir(self): taskdir = super().getTaskDir() if jobcontrol.get_backend(): # workaround for JOBCON-6136 return os.path.relpath(taskdir) return taskdir
[docs] def getTaskFilename(self, fname): task_fname = super().getTaskFilename(fname) if jobcontrol.get_backend(): # workaround for JOBCON-6136 return os.path.relpath(task_fname) return task_fname
def _onJobCompletion(self): """ @overrides: _AbstractJobMixin """ self._processBackend() super()._onJobCompletion()
[docs] def runBackend(self): """ @overrides: AbstractComboTask """ super().runBackend() self._registerBackendOutputFiles()
def _onBackendProgressChanged(self): backend = jobcontrol.get_backend() backend.setJobProgress(self.progress, self.max_progress, description=self.progress_string) def _registerBackendOutputFiles(self): """ Called by the backend to make sure any dynamically added output files get registered with job control to be copied back to the launch dir. """ backend = jobcontrol.get_backend() if backend is None: # Running in-process, usually as a test or debugging. return for file in self._output_files: if isinstance(self.output, JobOutput) and file == self.output.incorporation_file: backend.setStructureOutputFile(file) else: backend.addOutputFile(file) def _getSerializationParamNames(self) -> List[str]: """ @overrides AbstractComboTask Include jobtask-specific param names in the list of param names to serialize. """ return super()._getSerializationParamNames() + \ self._JOBTASK_SERIALIZATION_PARAMS
#=============================================================================== # Prepackaged Job Task Classes #===============================================================================
[docs]class CmdJobTask(JobBackendCmdMixin, tasks.AbstractCmdTask): """ Class for running backends that already support jobcontrol. CmdJobTask can either be subclassed to implement custom input/output params and other behavior, or can be instantiated and run directly by supplying the optional cmd_list constructor argument. For example: task = jobtasks.CmdJobTask(cmd_list=['testapp', '-t', '1']) task.start() CmdJobTask has a standard output that auto-populates with output files from its respective job. Custom output classes must inherit CmdJobTask.Output. Note that specifying cmd_list will bypass some custom functionality and should not be used with CmdJobTask subclasses. """ Output = _CmdJobTaskOutput
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if not isinstance(self.output, CmdJobTask.Output): msg = (f'Expected {self.__class__.__name__}.Output to inherit' f' CmdJobTask.Output.') raise TypeError(msg)
def _onJobCompletion(self): self._populateOutput() super()._onJobCompletion() def _populateOutput(self): """ Populate output with the files available from the completed or failed job. """ try: job = self._jhandler.job except AttributeError: # _jhandler may not exist when running just postprocessors return struct_file = self.getTaskFilename(job.StructureOutputFile) if struct_file and os.path.exists(struct_file): self.output.incorporation_file = struct_file if job.LogFiles: log_file = self.getTaskFilename(job.LogFiles[0]) self.output.log_file = log_file if os.path.exists( log_file) else None output_files = [ self.getTaskFilename(fname) for fname in job.OutputFiles ] self.output.output_files = [ fname for fname in output_files if os.path.exists(fname) ]
WINDOWS_SEP = '\\' POSIX_SEP = '/'
[docs]class ComboJobTask(ComboJobMixin, tasks.AbstractComboTask): #=========================================================================== # TaskFile Processing #=========================================================================== def _processTaskFilesForFrontendWrite(self): # Override parent class behavior to make all paths relative to the CWD # instead of the launch dir. This will be fixed in PANEL-19244 where the # launchdir will be changed to the taskdir. self._assertTaskFileExistence(self.input) self._processTaskFiles(self.input, process_func=_filepaths.get_launch_path, dir=os.getcwd()) def _processTaskFilesForBackendExecution(self): self._processTaskFiles(self.input, process_func=_filepaths.get_job_backend_path) self._assertTaskFileExistence(self.input) def _processTaskFilesForBackendWrite(self): # Extends parent class behavior to add a dummy file to all directories # because jobserver will not copy empty directories to the backend. super()._processTaskFilesForBackendWrite() def process_output(path, launchdir): if os.path.isdir(path): dummy_file = os.path.join(path, ".dummy_file") with open(dummy_file, 'w'): pass self._addOutputFile(path) return path self._assertTaskFileExistence(self.output) self._processTaskFiles(self.output, process_func=process_output) def _processTaskFilesForBackendRehydration(self): self._processTaskFiles(self.output, process_func=_filepaths.get_job_output_path) self._assertTaskFileExistence(self.output)
[docs] def runBackend(self): # Specify the task dir as the cwd since we've already chdirs into # the directory with all the task files self.specifyTaskDir(None) return super().runBackend()
[docs] def getTaskDir(self): if self.isBackendMode(): return '' return super().getTaskDir()
def _updateFromBackend(self, rehydrated_backend): super()._updateFromBackend(rehydrated_backend) self._output_files = rehydrated_backend._output_files @preprocessor(BEFORE_TASKDIR - 1000) def _clearInputAndOutputFiles(self): self._input_files.clear() self._output_files.clear()