Source code for schrodinger.job.launchapi

"""
Provide an API that allows a definition of a job.

A job is composed of two pieces:

  1) A task specification that defines a generic command-line invocation
     and defines what input files are needed and what output files are
     produced.
  2) Runtime parameters that include specific input and output filenames.

To launch a job, you must also provide a set of launch parameters, which
specify things like the job host, number of threads per process, etc.

To construct a job specification for a script that you want to run under job
control, use the JobSpecificationArgsBuilder. This creates a
JobSpecification from a list of command-line arguments. You must indicate
the specific input and output files in the command-line arguments so they
can be transferred and retrieved.

"""

# A short time after writing this, it feels too complicated.  But, we think
# this module will go away with a shift to protobuf-based JobSpecifications
# and so refactoring is likely to be a waste.
#
# In case this doesn't happen, some refactoring notes that may or may not
# make sense to others:
#
#   1) TaskSpecificationBuilder and JobSpecificationBuilder are unused
#      outside of tests, which makes them good candidates for elimination.
#   2) The only point of the conversion to/from JSON is to make sure that
#      the state is serializable; i.e. an attempt to ensure separation of
#      job specification/state from launching behavior. If we can guarantee
#      this some other way, the JSON can go away. We could possibly instead
#      add an introspective test that the specification class doesn't gain
#      any unwanted behavior.
#   3) _RuntimeParameters is unnecessary if the JobSpecification stops
#      trying to be a template for jobs and instead becomes an instance of a
#      specific job. Job templates are currently unused!

import errno
import json
import os
import pathlib
import re
import urllib
from typing import Optional
from urllib.parse import urljoin

import schrodinger.job.jobcontrol as jobcontrol
from schrodinger.infra import licensing
from schrodinger.utils import fileutils
from schrodinger.utils.env import prepend_sys_path

with prepend_sys_path(os.environ['MMSHARE_EXEC']):
    import toplevel

_KEY_RE = re.compile("[a-zA-Z][a-zA-Z_0-9]+")
_VAR_RE = re.compile("<(%s)>" % _KEY_RE.pattern)

_JOBNAME_KEY = "JOBNAME"
_STDOUT_KEY = "STDOUT"
_STDERR_KEY = "STDERR"

_RESERVED_KEYS = {_JOBNAME_KEY, _STDOUT_KEY, _STDERR_KEY}

_DEFAULT_STDERR = "stderr"
_DEFAULT_STDOUT = "stdout"

_RESERVED_OUTPUT_FILENAMES = (_DEFAULT_STDERR, _DEFAULT_STDOUT)

#Keys used in the JSON serialization.

_COMMAND_LINE_ARGUMENTS = "command_line_arguments"
_FILENAME_SOURCE = "filename_source"
_HOST = "host"
_INCORPORATE = "incorporate"
_INPUT = "input"
_INPUT_FILE = "input_file"
_INPUT_FILENAME = "input_filename"
_JOB_USES_TPP = "job_uses_tpp"
_JOBNAME = "jobname"
_LICENSES = "licenses"
_OUTPUT = "output"
_OUTPUT_FILE = "output_file"
_OUTPUT_FILENAME = "output_filename"
_DRIVER_RESERVES_CORES = "processes_consumed_by_driver"
_PRODUCT = "product"
_PROGRAM_NAME = "progname"
_RUNTIME_PARAMETERS = "runtime_parameters"
_RUNTIME_PATH = "runtime_path"
_SCHRODINGER = "schrodinger"
_SOURCE = "source"
_STREAM = "stream"
_TASK_SPECIFICATION = "task_specification"
_TYPE = "type"
_USE_RUN = "use_run"
_VALUE = "value"
_JOB_STOPPABLE = "stoppable"

_TASK_INPUT_TYPES = {_INPUT_FILENAME, _OUTPUT_FILENAME}
_RUNTIME_INPUT_TYPES = {_INPUT_FILE, _OUTPUT_FILENAME}


[docs]class TaskSpecificationError(RuntimeError): pass
[docs]class RuntimeParametersError(RuntimeError): pass
[docs]class SpecificationKeyError(KeyError): """ An error thrown when a key is missing from a serialized specification. """ spec_description = "specification"
[docs] def __init__(self, key, context=None): if context is None: context = "the " + self.spec_description super().__init__(f"No '{key}' key present in {context}.")
[docs]class TaskSpecificationKeyError(SpecificationKeyError): spec_description = "task specification"
[docs]class RuntimeParametersKeyError(SpecificationKeyError): spec_description = "runtime parameters"
[docs]class JobSpecificationKeyError(SpecificationKeyError): spec_description = "job specification"
[docs]def check_valid_key(key): """ Make sure the provided key is valid. Raise a RuntimeError if it's not. """ if key in _RESERVED_KEYS: raise RuntimeError("%s is a reserved key name." % key) elif not _KEY_RE.match(key): raise RuntimeError( "'%s' is a bad key name. It must start with a-z (upper or " "lowercase) and contain only letters, numbers, or " "underscores." % key)
[docs]class TaskSpecification: """ A class that holds the information necessary to run a task on a compute resource. It holds a templated shell command along with info about required input, output created, and information about how to generate a jobname. It doesn't hold any information specific to a job run - e.g. hostname, input filenames, output filenames. This class has no methods that modify instance state. Use a builder class (TaskSpecificationBuilder or JobSpecificationBuilder) to set up a TaskSpecification. """
[docs] def __init__(self): self._command_line_arguments = [] self._input = {} self._jobname_source = None self._program_name = None self._output = {} self._schrodinger = {} self._licenses = [] self._driver_reserves_cores = False self._job_uses_tpp = False self._stoppable = False
[docs] def validate(self): """ Make sure that the variables used in the command-line template are defined with input or output specifications. Raise a TaskSpecificationError if not. """ commandline_keys = set() for arg in self.getCommandTemplate(): m = _VAR_RE.search(arg) if m: key = m.group(1) # _JOBNAME_KEY is special and not in input/output files. if key != _JOBNAME_KEY: commandline_keys.add(m.group(1)) input_and_output_keys = set() input_and_output_keys.update(self.getInputKeys()) input_and_output_keys.update(self.getOutputKeys()) if not commandline_keys.issubset(input_and_output_keys): raise TaskSpecificationError( "The variables used in the command-line template don't match " "up with the input and output keys. Unspecified files: %s" % ",".join(commandline_keys - input_and_output_keys))
[docs] @staticmethod def fromJSON(json_spec): data = json.loads(json_spec) return TaskSpecificationBuilder(data).getTaskSpec()
[docs] def asJSON(self, **kwargs): self.validate() data = {} data[_COMMAND_LINE_ARGUMENTS] = self._command_line_arguments data[_INPUT] = self._input if self._jobname_source: data[_JOBNAME] = {_FILENAME_SOURCE: self._jobname_source} if self._program_name: data[_PROGRAM_NAME] = self._program_name if self._output: data[_OUTPUT] = self._output data[_SCHRODINGER] = self._schrodinger if self._licenses: data[_LICENSES] = self._licenses if self._driver_reserves_cores: data[_DRIVER_RESERVES_CORES] = self._driver_reserves_cores if self._job_uses_tpp: data[_JOB_USES_TPP] = self._job_uses_tpp if self._stoppable: data[_JOB_STOPPABLE] = self._stoppable return json.dumps(data, **kwargs)
[docs] def getCommandTemplate(self): """ Return the command-line as a list of arguments with keys in the form of <KEY> that will be replaced with runtime parameters. """ return self._command_line_arguments
[docs] def needsExplicitJobname(self): """ Return True if a jobname must be specified as part of the runtime parameters. """ if self._jobname_source: return False for output_file in self._output.values(): if _filename_uses_jobname(output_file[_RUNTIME_PATH]): return True for arg in self.getCommandTemplate(): if _filename_uses_jobname(arg): return True return False
[docs] def getInputKeys(self): """ Return a list of keys that define inputs. """ return list(self._input)
[docs] def getJobnameSource(self): """ Return the input key that should be used to derive the jobname. """ return self._jobname_source
[docs] def getProgramName(self): return self._program_name
[docs] def isStoppable(self): """ If True, a job is stoppable by sending 'halt' message to the backend. And transfer output files if any. Otherwise, job supervisor kills the backend and transfer output files. """ return self._stoppable
[docs] def getLicenses(self): """ Returns licenses as a list of tuples of ( license_name, num_tokens) """ return self._licenses
[docs] def driverReservesCores(self): """ If True, the driver will allocate N slots matching -HOST hostname: N processes. This is useful when the driver allocates parallel slots on one node so it can resubmit jobs to localhost. If no N is specified, assume to be 1. This is an optimization to avoid resubmission back to the queue if we know we are running a set of parallel jobs with the same CPU requirements. """ return self._driver_reserves_cores
[docs] def jobUsesTPP(self): """ If True, this job will allocate N slots on one node matching -TPP N. If False, this job only passes TPP on as an option for subjobs. TPP stands for Threads Per Process. """ return self._job_uses_tpp
[docs] def getOutputKeys(self): """ Return a list of keys that define outputs. """ return list(self._output)
[docs] def useSchrodingerRun(self): """ Return whether to run the command under $SCHRODINGER/run. """ return self._schrodinger.get(_USE_RUN)
[docs] def useSchrodingerProduct(self): """ Return whether to run the command under $SCHRODINGER/run. """ return self._schrodinger.get(_PRODUCT)
[docs]def var(string): """ Format the provided string to be used as a variable in the TaskSpecification command-line template. """ return "<" + string + ">"
def _filename_uses_jobname(filename): """ Check whether the file depends on the JOBNAME key. """ return var(_JOBNAME_KEY) in filename
[docs]class TaskSpecificationBuilder: """ A class for building up a TaskSpecification from a specific list of command-line arguments. """
[docs] def __init__(self, data=None): self._task_spec = TaskSpecification() if data: self._fromData(data)
[docs] def getTaskSpec(self): """ Return a TaskSpecification from this builder. """ self._task_spec.validate() return self._task_spec
[docs] def setCommandLine(self, args, use_schrodinger_run=True, schrodinger_product=None): """ Set the command line as provided. Add a SCHRODINGER/run if requested. :param args: The command-line script and arguments. :type args: list of str :param use_schrodinger_run: If True, run the command under $SCHRODINGER/run. :type schrodinger_run: bool :param schrodinger_product: A product directory to search for the script/executable. This should be the name of a directory under SCHRODINGER without the trailing version (i.e. the "-v*" part). :type schrodinger_product: str """ self._task_spec._command_line_arguments = args[:] self._task_spec._schrodinger[_USE_RUN] = use_schrodinger_run if schrodinger_product: if not use_schrodinger_run: raise RuntimeError( "Specifying a Schrodinger product without using " "'$SCHRODINGER/run' is inconsistent") self._task_spec._schrodinger[_PRODUCT] = schrodinger_product
[docs] def setInputFile(self, key, jobname_source=False): """ Set an input file for the task specification. """ check_valid_key(key) if key in self._task_spec._input: raise RuntimeError("Key '%s' has already been used." % key) if jobname_source: if self._task_spec._jobname_source: raise RuntimeError( "The file <%s> has already been set as the jobname " "source." % self._task_spec._jobname_source) self._task_spec._jobname_source = key self._task_spec._input[key] = dict(type=_INPUT_FILENAME)
[docs] def addLicense(self, license_name, license_tokens): """ :param license_name: Name of a license token that is checked out. :type license_name: schrodinger.infra.licensing.MMLIC3* :param license_tokens: The number of tokens required by this job type. :type license_tokens: str """ license_string = licensing.getFeatureName(license_name) self._task_spec._licenses.append((license_string, license_tokens))
[docs] def setOutputFile(self, key, runtime_path, stream=False, incorporate=False): """ :param runtime_path: The path of an output file that will be generated. May contain input keys in the form <KEY> or <JOBNAME> (if a jobname source has been specified). :type runtime_path: str :param key: A key to be used to reference this output file. :type key: str :param stream: If True and if possible, the output file will be streamed back while the job is running. Defaults to False. Optional. :type stream: bool """ check_valid_key(key) if runtime_path in _RESERVED_OUTPUT_FILENAMES: raise TaskSpecificationError("'%s' is a reserved output filename." % runtime_path) _validate_output_in_subdir(runtime_path) # Don't allow the same output destination except for stderr/stdout. stdout_stderr = {_STDOUT_KEY, _STDERR_KEY} for (output_key, value) in self._task_spec._output.items(): if (value[_RUNTIME_PATH] == runtime_path and {output_key, key} != stdout_stderr): raise TaskSpecificationError( "The output path of '%s' has already " "been added for %s." % (runtime_path, output_key)) if value.get(_INCORPORATE) and incorporate: raise TaskSpecificationError( "The output file of '{}' already has incorporate " "status and only one file can be incorporated".format( output_key)) self._task_spec._output[key] = { _TYPE: _OUTPUT_FILE, _RUNTIME_PATH: runtime_path, _STREAM: stream, _INCORPORATE: incorporate, }
def _setStdStreamLocation(self, key, runtime_path, stream=False): """ Specify a file for stderr or stdout to be redirected to. :param key: _STDERR_KEY or _STDOUT_KEY :type key: str :param runtime_path: A runtime_path with input keys in the form <KEY>. :type runtime_path: str :param stream: If True and if possible, the file will be streamed back while the job is running. Defaults to False. Optional. :type stream: bool """ redirect_keys = {_STDERR_KEY: "stderr", _STDOUT_KEY: "stdout"} if key not in redirect_keys: raise RuntimeError("Redirection of %s is unknown." % key) # Don't allow redirection to a file listed as a non-redirect output # file. for (output_key, value) in self._task_spec._output.items(): if (value[_RUNTIME_PATH] == runtime_path and output_key != key and output_key not in redirect_keys): raise RuntimeError( "The output path of '%s' has already been used for %s." % (runtime_path, output_key)) if not _filename_uses_jobname(runtime_path): # If stderr/stdout don't use a <JOBNAME> template they must use # a hardwired pathname. if _VAR_RE.search(runtime_path): raise RuntimeError( "A %s specification must be based on JOBNAME or " "a hardwired value." % redirect_keys[key]) self._task_spec._output[key] = { _TYPE: _OUTPUT_FILE, _RUNTIME_PATH: runtime_path, _STREAM: stream, _INCORPORATE: False, }
[docs] def setStderr(self, runtime_path, stream=False): """ :param runtime_path: A runtime_path template with input keys in the form <KEY>. :type runtime_path: str :param stream: If True and if possible, the file will be streamed back while the job is running. Defaults to False. Optional. :type stream: bool """ return self._setStdStreamLocation(_STDERR_KEY, runtime_path, stream)
[docs] def setStdout(self, runtime_path, stream=False): """ :param runtime_path: A runtime_path template with input keys in the form <KEY>. :type runtime_path: str :param stream: If True and if possible, the file will be streamed back while the job is running. Defaults to False. Optional. :type stream: bool """ return self._setStdStreamLocation(_STDOUT_KEY, runtime_path, stream)
[docs] def setStoppable(self, stoppable): """ If passed True, the job supervisor sends the 'halt' message to the backend to stop. And transfer output files if any. Otherwise, the supervisor kills the backend and transfer output files. """ self._task_spec._stoppable = stoppable
[docs] def setProgramName(self, program_name): if not program_name: raise ValueError("The value of program name cannot be empty.") if self._task_spec._program_name: raise RuntimeError( "The value <%s> has already been set as the program name." % \ self._task_spec._program_name) self._task_spec._program_name = program_name if program_name in jobcontrol.STOPPABLE_PROGRAMS: self.setStoppable(True)
[docs] def setDriverReservesCores(self, reserved): """ If passed True, the driver will allocate N slots matching -HOST hostname: N processes. This is useful when the driver allocates parallel slots on one node so it can resubmit jobs to localhost. If no N is specified, assume to be 1. This is an optimization to avoid resubmission back to the queue if we know we are running a set of parallel jobs with the same CPU requirements. """ self._task_spec._driver_reserves_cores = reserved
[docs] def setJobUsesTPP(self, uses_tpp): """ If passed True, this job will reserve the number of cores specified from -TPP on the cmdline. Unfortunately, we use -TPP N to mean two things 1) this job will reserve N processors on one node directly 2) this job takes -TPP as a cmdline argument, but only to pass along to subjobs In case 1, the uses_tpp should be set to True. """ self._task_spec._job_uses_tpp = uses_tpp
def _fromData(self, data): self._loadCommandLine(data) self._loadInputSpec(data) self._loadJobname(data) self._loadOutputSpec(data) if _PROGRAM_NAME in data: self._task_spec._program_name = data[_PROGRAM_NAME] if _LICENSES in data: self._task_spec._licenses = data[_LICENSES] if _DRIVER_RESERVES_CORES in data: self._task_spec._driver_reserves_cores = data[ _DRIVER_RESERVES_CORES] if _JOB_USES_TPP in data: self._task_spec._job_uses_tpp = data[_JOB_USES_TPP] if _JOB_STOPPABLE in data: self._task_spec._stoppable = data[_JOB_STOPPABLE] def _loadCommandLine(self, data): """ Verify and load the command-line specification from the JSON data. """ key = _COMMAND_LINE_ARGUMENTS if key not in data: raise TaskSpecificationKeyError(key) try: data[key] + [] [arg + "" for arg in data[key]] except TypeError: raise TaskSpecificationError( "Value for '%s' is not a list of strings." % key) if _SCHRODINGER in data: if _USE_RUN in data[_SCHRODINGER]: use_run = data[_SCHRODINGER][_USE_RUN] else: use_run = False if _PRODUCT in data[_SCHRODINGER]: schrodinger_product = data[_SCHRODINGER][_PRODUCT] else: schrodinger_product = None self.setCommandLine(data[key], use_schrodinger_run=use_run, schrodinger_product=schrodinger_product) def _loadInputSpec(self, data): key = _INPUT if key not in data: return input_ = data[key] type_key = _TYPE for (key, value) in input_.items(): if type_key not in value: raise TaskSpecificationKeyError(type_key, "the '%s' input value." % key) if value[type_key] not in _TASK_INPUT_TYPES: raise TaskSpecificationError( "Unrecognized input type '%s' for '%s' input value." % (value[type_key], key)) self._task_spec._input = input_ def _loadJobname(self, data): """ Load the optional jobname spec. """ key = _JOBNAME if key not in data: return jobname = data[key] if _FILENAME_SOURCE not in jobname: raise TaskSpecificationKeyError(_FILENAME_SOURCE, "the jobname spec") jobname_source = jobname[_FILENAME_SOURCE] self._task_spec._jobname_source = jobname_source if jobname_source not in self._task_spec._input: raise TaskSpecificationError( "The jobname source file is not in the input specification.") elif self._task_spec._input[jobname_source][_TYPE] != \ _INPUT_FILENAME: raise TaskSpecificationError( "The jobname source is not an input file name.") def _loadOutputSpec(self, data): """ Load the optional output spec. """ key = _OUTPUT if key not in data: return output = data[key] for (key, value) in output.items(): if value[_TYPE] != _OUTPUT_FILE: raise TaskSpecificationError( "Unrecognized output type '{}' for {}.".format( value[_TYPE], key)) self._task_spec._output = output
class _RuntimeParameters: """ A class to hold job-specific input and output filenames. """ def __init__(self): self._input = {} self._jobname_val = None def asJSON(self, **kwargs): data = {} data[_INPUT] = self._input if self._jobname_val: data[_JOBNAME] = {_VALUE: self._jobname_val} return json.dumps(data, **kwargs) @staticmethod def fromJSON(json_params): data = json.loads(json_params) return _RuntimeParametersBuilder(data).getRuntimeParams() def getInputKeys(self): return list(self._input) def getInputFiles(self): input_files = [] for value in self._input.values(): if value[_TYPE] == _INPUT_FILE: input_files.append((value[_SOURCE], value[_RUNTIME_PATH])) return input_files def getRuntimePath(self, key): input_file_spec = self._input[key] if input_file_spec[_TYPE] not in _RUNTIME_INPUT_TYPES: raise RuntimeError("'%s' has an unrecognized input type of '%s'." % (key, input_file_spec[_TYPE])) return input_file_spec[_RUNTIME_PATH] def getJobnameFromSource(self, jobname_source): if jobname_source not in self._input: raise RuntimeError( "The jobname can't be determined because the '%s' input " "key is not specified in the runtime parameters." % jobname_source) return fileutils.get_jobname(self._input[jobname_source][_RUNTIME_PATH]) def getJobname(self): return self._jobname_val
[docs]def get_file_url(path): """ Get a file:// URL for a file path. """ return urljoin("file:", urllib.request.pathname2url(os.path.abspath(path)))
class _RuntimeParametersBuilder: def __init__(self, data=None): self._runtime_params = _RuntimeParameters() self._runtime_params._jobname_val = \ os.getenv("SCHRODINGER_JOBNAME") or self._runtime_params._jobname_val if data: self._fromData(data) def _fromData(self, data): self._loadInput(data) if _JOBNAME in data and _VALUE in data[_JOBNAME]: self._runtime_params._jobname_val = data[_JOBNAME][_VALUE] def _loadInput(self, data): key = _INPUT if key not in data: raise RuntimeParametersKeyError(key) else: self._runtime_params._input = data[key] def getRuntimeParams(self): return self._runtime_params def setJobname(self, jobname): if not jobname: raise ValueError("The value of jobname cannot be empty.") if self._runtime_params._jobname_val: raise RuntimeError( "The value <%s> has already been set as the jobname." % \ self._runtime_params._jobname_val) self._runtime_params._jobname_val = jobname def getJobname(self): """ Get the currently set job name :rtype: str or None :return: The current set job name, or None if no name is set """ return self._runtime_params.getJobname() def setInputFile(self, key, source_file, runtime_path=None): """ Specify the input file used. :param key: The key that identifies the outputfile name in the task command-line template. :type key: str """ if not os.path.exists(source_file): raise RuntimeError("Missing input file: %s" % source_file) check_valid_key(key) input_spec = { _TYPE: _INPUT_FILE, _SOURCE: get_file_url(source_file), } if runtime_path: input_spec[_RUNTIME_PATH] = runtime_path else: input_spec[_RUNTIME_PATH] = _get_default_runtime_path(source_file) self._runtime_params._input[key] = input_spec def setOutputFile(self, key, runtime_path): """ Define an output file whose name is specified at runtime. :param key: The key that identifies the outputfile name in the task command-line template. :type key: str """ # Output files that don't rely on jobname need to be included in the # runtime input section because the filenames must be specified at # runtime. if not _filename_uses_jobname(runtime_path): self._runtime_params._input[key] = { _TYPE: _OUTPUT_FILENAME, _RUNTIME_PATH: runtime_path, }
[docs]class JobSpecification: """ This class provides a serializable job definiton and consists of a task specification along with runtime parameters. Currently unsupported features: - Runtime requirements (e.g. GPU, memory requirements). Features that will probably never be supported: - The equivalent of job control's '-LOCAL' option. - Full shell commands. (Arguments must be provided as a list of strings. Chaining commands together is not supported. Redirection is provided only through APIs.) """
[docs] def __init__(self, task_spec, runtime_params): self._task_spec = task_spec self._runtime_params = runtime_params self._validate()
[docs] def asJSON(self, **kwargs): task_spec = json.loads(self._task_spec.asJSON(**kwargs)) runtime_params = json.loads(self._runtime_params.asJSON(**kwargs)) return json.dumps( { _TASK_SPECIFICATION: task_spec, _RUNTIME_PARAMETERS: runtime_params }, **kwargs)
[docs] @staticmethod def fromJSON(json_job_spec): """ Return a JobSpecification instance created from the provided JSON. """ data = json.loads(json_job_spec) if _TASK_SPECIFICATION not in data: raise JobSpecificationKeyError(_TASK_SPECIFICATION) if _RUNTIME_PARAMETERS not in data: raise JobSpecificationKeyError(_RUNTIME_PARAMETERS) task_spec = TaskSpecificationBuilder( data[_TASK_SPECIFICATION]).getTaskSpec() runtime_params = _RuntimeParametersBuilder( data[_RUNTIME_PARAMETERS]).getRuntimeParams() return JobSpecification(task_spec, runtime_params)
def _validate(self): """ Validate that all the necessary inputs are present in the runtime parameters. Raise RuntimeParametersError if not. """ runtime_input_keys = set(self._runtime_params.getInputKeys()) missing_keys = [] for input_key in self._task_spec.getInputKeys(): if input_key not in runtime_input_keys: missing_keys.append(input_key) else: runtime_input_keys.remove(input_key) if missing_keys: if len(missing_keys) > 1: raise RuntimeParametersError( "The following input keys are missing from the runtime " "parameters: %s" % ", ".join(missing_keys)) else: raise RuntimeParametersError( "The '%s' input key is missing from the runtime " "parameters." % missing_keys[0]) if runtime_input_keys: if len(runtime_input_keys) > 1: raise RuntimeParametersError( "The following input keys are not recognized by the task " "specification: %s" % ", ".join(sorted(runtime_input_keys))) else: raise RuntimeParametersError( "The '%s' input key is not recognized by the task " "specification." % runtime_input_keys.pop())
[docs] def validate(self): """ A validation method that makes sure the JobSpecification has all the data it needs and is self-consistent. """ self._validate() self._validateOutputFilenames() if (self._task_spec.needsExplicitJobname() and not self._runtime_params.getJobname()): raise RuntimeParametersError( "A jobname is needed but none was specified.")
def _validateOutputFilenames(self): """ Check for runtime name collisions. """ non_jobname_output_files = {} jobname_output_files = {} for key in self._task_spec.getOutputKeys(): if _filename_uses_jobname( self._task_spec._output[key][_RUNTIME_PATH]): if not self._task_spec._jobname_source and \ not self._runtime_params._jobname_val: raise RuntimeParametersError( "A jobname source hasn't been specified.") jobname_output_files[self.getOutputFile(key)] = key else: non_jobname_output_files[self.getOutputFile(key)] = key collisions = set(jobname_output_files).intersection( non_jobname_output_files) if collisions: raise RuntimeParametersError( "The jobname '%s' conflicts with the hardwired output " "file(s) %s." % (self.getJobname(), ", ".join(collisions))) def _getKeyValue(self, match): """ Return the value of the key in the provided match object. :param match: A match of a template key. :type match: re.MatchObject """ key = match.group(1) if key == _JOBNAME_KEY: return self.getJobname() else: return self._runtime_params.getRuntimePath(key) def _subInputOrJobname(self, string): """ Replace template keys with the appropriate values. """ return _VAR_RE.sub(self._getKeyValue, string)
[docs] def getCommand(self): """ Return the shell command that will run the job. :return: list of str """ if self._task_spec.useSchrodingerRun(): cmd = [ # Always use / here, not os.path.join. # To be expanded by jlaunch/jmonitor "%SCHRODINGER%/run" ] product = self._task_spec.useSchrodingerProduct() if product: cmd.extend(["-FROM", product]) else: cmd = [] for arg in self._task_spec.getCommandTemplate(): cmd.append(self._subInputOrJobname(arg)) return cmd
[docs] def getOutputFile(self, key, stream=None, incorporate=None): """ Get the output file corresponding to the given key from the job specification. :param stream: If stream=True, the file is returned only if it is a log file. If stream=False, the file is returned only if it is an output file. If stream=None, the file is returned. """ output_spec = self._task_spec._output[key] streamval = output_spec.get(_STREAM, False) incorporateval = output_spec.get(_INCORPORATE, False) if (stream is None or streamval == stream) and (incorporate is None or incorporateval == incorporate): return self._subInputOrJobname(output_spec[_RUNTIME_PATH]) else: return None
[docs] def getOutputFiles(self, stream=None, incorporate=None): """ Get all the output files from the job specification. :param stream: If stream=None, both output and log files are returned. If stream=True, only log files are returned. If stream=False, only output files are returned. Defaults to None. Optional. :type stream: True, False or None(Default) :param incorporate: If incorporate=None, incorporatable and other files are returned. If incorporate=True, only files to be incorporated are returned. If incoporate=False, only output files not incorporated are returned. :type stream: True, False or None(Default) """ # Use a set here because stderr/stdout can be the same. output_files = set() for key in self._task_spec.getOutputKeys(): outfile = self.getOutputFile(key, stream, incorporate) if outfile is not None: output_files.add(outfile) return list(output_files)
def _getRedirect(self, key): if key in self._task_spec._output: return self.getOutputFile(key) else: return None
[docs] def getStderr(self): return self._getRedirect(_STDERR_KEY)
[docs] def getStdout(self): return self._getRedirect(_STDOUT_KEY)
[docs] def getDefaultStderr(self): return _DEFAULT_STDERR
[docs] def getDefaultStdout(self): return _DEFAULT_STDOUT
[docs] def useSchrodingerRun(self): return self._task_spec.useSchrodingerRun()
[docs] def getInputFiles(self): return self._runtime_params.getInputFiles()
[docs] def getHost(self): return self._runtime_params.getHost()
[docs] def debugEnabled(self): return self._runtime_params.debugEnabled()
[docs] def getJobname(self): jobname_value = self._runtime_params._jobname_val if jobname_value: return jobname_value jobname_source = self._task_spec.getJobnameSource() if not jobname_source: return None else: return self._runtime_params.getJobnameFromSource(jobname_source)
[docs] def setJobname(self, jobname): self._runtime_params._jobname_val = jobname
[docs] def getProgramName(self): return self._task_spec.getProgramName()
[docs] def isStoppable(self): """ If True, a job is stoppable by sending 'halt' message to the backend. And transfer output files if any. Otherwise, job supervisor kills the backend and transfer output files. """ return self._task_spec.isStoppable()
[docs] def driverReservesCores(self): """ If True, the driver will allocate N slots matching -HOST hostname: N processes. This is useful when the driver allocates parallel slots on one node so it can resubmit jobs to localhost. If no N is specified, assume to be 1. This is an optimization to avoid resubmission back to the queue if we know we are running a set of parallel jobs with the same CPU requirements. """ return self._task_spec.driverReservesCores()
[docs] def jobUsesTPP(self): """ If True, this job will allocate N slots on one node matching -TPP N. If False, this job uses TPP as an option to pass along to subjobs """ return self._task_spec.jobUsesTPP()
[docs] def addLicense(self, license_name, license_tokens): """ Add license to the task spec. :param license_name: Name of a license token that is checked out. :type license_name: schrodinger.infra.licensing.MMLIC3* :param license_tokens: The number of tokens required by this job type. :type license_tokens: str """ license_string = licensing.getFeatureName(license_name) self._task_spec._licenses.append((license_string, license_tokens))
[docs] def getLicenses(self): """ Returns licenses as a list of tuples of ( license_name, num_tokens) """ return self._task_spec.getLicenses()
[docs]class JobSpecificationBuilder: """ A helper class to create a JobSpecification from an existing TaskSpecification. """
[docs] def __init__(self, task_spec): self._task_spec = task_spec self._runtime_params_builder = _RuntimeParametersBuilder()
def _getTaskSpec(self): return self._task_spec def _getRuntimeParams(self): return self._runtime_params_builder.getRuntimeParams()
[docs] def getJobSpec(self): return JobSpecification(self._getTaskSpec(), self._getRuntimeParams())
[docs] def setJobname(self, jobname): self._runtime_params_builder.setJobname(jobname)
[docs] def setInputFiles(self, **input_files): """ Indicate the filenames to be used for the input keys. :param input_files: Keyword arguments with the key as INPUT_KEY and the value as the associated filename. """ in_keys = set(self._task_spec.getInputKeys()) for k, v in input_files.items(): if k not in in_keys: raise KeyError("'%s' is not a valid input file key." % k) if os.path.isdir(v): raise RuntimeParametersError( "'%s' is a directory. Only regular files are valid input files." % v) self._runtime_params_builder.setInputFile(k, v)
[docs] def setOutputFiles(self, **output_files): """ Indicate the filenames to be used for the output keys. :param output_files: Keyword arguments with the key as OUTPUT_KEY and the value as the associated filename. """ out_keys = set(self._task_spec.getOutputKeys()) for k, v in output_files.items(): if k not in out_keys: raise KeyError("'%s' is not a valid output file key." % k) self._runtime_params_builder.setOutputFile(k, v)
def _get_default_runtime_path(path): """ Absolute paths and relative paths not in a subdirectory get a runtime path of the file basename. Relative paths in subdirs are used as is. """ # Determining whether something is in a subdir is not foolproof with this # algorithm (e.g. you are in "dir" but specify a file in "subdir" as # "../dir/subdir/filename"). In this case, you'll get runtime path of # basename. if os.path.isabs(path) or os.path.relpath(path).startswith(os.pardir): return os.path.basename(path) else: return path
[docs]class JobSpecificationArgsBuilder: """ A helper class to create a JobSpecification from a specific (i.e. non-generic) set of command-line arguments. """
[docs] def __init__(self, args, use_schrodinger_run=True, schrodinger_product=None, program_name=None, default_jobname=None, use_jobname_log=False): """ See TaskSpecificationBuilder.setCommandLine for argument descriptions. :param use_jobname_log: If True, set the STDOUT, STDERR as <JOBNAME>.log and stream it. Default is False. :type schrodinger_run: bool """ self._task_spec_builder = TaskSpecificationBuilder() self._task_spec_builder.setCommandLine( args, use_schrodinger_run, schrodinger_product=schrodinger_product) self._runtime_params_builder = _RuntimeParametersBuilder() # A script that's not findable by toplevel.find_startup won't work # in a remote execution environment and is not supported. script = args[0] if schrodinger_product: search_product = schrodinger_product else: search_product = "mmshare" if not toplevel.find_startup(script, search_product, os.environ): raise RuntimeError(f"Could not find '{script}'. " "Scripts must be findable by $SCHRODINGER/run.") if program_name: self.setProgramName(program_name) if default_jobname: # runtime_params_builder may already have the jobname set # (via the SCHRODINGER_JOBNAME environment variable); # if it does, don't attempt to override it if not self.getJobname(): self.setJobname(default_jobname) if use_jobname_log: self._task_spec_builder.setStdout("<JOBNAME>.log", stream=True) self._task_spec_builder.setStderr("<JOBNAME>.log", stream=True)
def _getTaskSpec(self): return self._task_spec_builder.getTaskSpec() def _getRuntimeParams(self): return self._runtime_params_builder.getRuntimeParams()
[docs] def getJobSpec(self): return JobSpecification(self._getTaskSpec(), self._getRuntimeParams())
[docs] def setJobname(self, jobname): self._runtime_params_builder.setJobname(jobname)
[docs] def getJobname(self): """ Get the job name set from runtime parameters if there is one This will return any jobname set via command line or the setJobname function. It will not return any jobname set using jobname_source=True when setting an input/output file. :rtype: str or None :return: The current set job name, or None if no name is set """ return self._runtime_params_builder.getJobname()
[docs] def setProgramName(self, program_name): self._task_spec_builder.setProgramName(program_name)
[docs] def setStoppable(self, stoppable): """ Mark the job as stoppable, meaning the backend has been designed to listen for and respond to graceful stop requests through the jobcontrol API. Jobs that are not marked stoppable will be sent a SIGKILL and immediately terminated upon a user stop request. Please be aware that jobs that are marked stoppable but do not actually implement graceful stops are bad actors and will never be stopped by a user request. """ self._task_spec_builder.setStoppable(stoppable)
def _updateCommandLineArgs(self, filename, replacement): """ Update the input paths in the command-line arguments specified by the user-level job invocation to create a command-line that will work in the environment set up by job control to run the backend. For example, absolute paths are turned into the proper relative paths that will be available from the job directory. """ task_spec = self._task_spec_builder._task_spec new_args = [] modified = 0 for arg in task_spec._command_line_arguments: if arg == filename: new_args.append(replacement) modified += 1 elif arg.endswith("=" + filename): new_args.append(arg[:-len(filename)] + replacement) modified += 1 else: new_args.append(arg) task_spec._command_line_arguments = new_args
[docs] def setInputDirectory(self, path, runtime_path=None): """ Register a directory as input for the job. All files found under it recursively are registered as input. Please avoid using this method unless you know you want all the files in the directory. If the path given is absolute or a relative path not in a subdirectory, the default runtime path will be in the job directory. If the path is relative and in a subdirectory, it will be used as is. """ if not os.path.isdir(path): raise ValueError("'%s' is not a directory." % path) if runtime_path is None: runtime_path = _get_default_runtime_path(path) # Replace input directories with the actual runtime path instead of # a template variable. Templates are currently unused, should # probably be dropped, and would need to be updated to support # directories. self._updateCommandLineArgs(path, runtime_path) for root, dirs, files in os.walk(path): for f in files: self.setInputFile(os.path.join(root, f), runtime_path=os.path.join( runtime_path, root[len(path) + 1:], f))
[docs] def setInputFile(self, path: str, key: Optional[str] = None, runtime_path: Optional[str] = None) -> None: """ Register a file as input for the job. :param runtime_path: The path that will be used for this input file at runtime. If not specified, the behavior is as follows: - If the path given is absolute or is a relative path not in a subdirectory, the default runtime_path will always be the basename of the file. Absolute paths always default to the basename, even if they refer to a path in a subdirectory. For example: - path="/scr/input_dir/input.txt" will have a runtime_path of "input.txt". - path="../input_dir/input.txt" will have a runtime_path of "input.txt" - If the path given is relative and within a subdirectory, the default runtime_path will use the path as is. For example: - path="input_dir/input.txt" will have a runtime_path of "input_dir/input.txt" """ if key is None: key = "INPUT_{}".format( len(self._task_spec_builder._task_spec._input)) check_valid_key(key) self._task_spec_builder.setInputFile(key) # Replace paths with template variables. self._updateCommandLineArgs(path, "<%s>" % key) self._runtime_params_builder.setInputFile(key, path, runtime_path=runtime_path)
[docs] def setStderr(self, runtime_path, stream=False): self._task_spec_builder.setStderr(runtime_path, stream)
[docs] def setStdout(self, runtime_path, stream=False): self._task_spec_builder.setStdout(runtime_path, stream)
[docs] def setOutputFile(self, runtime_path, key=None, stream=False, incorporate=False): """ :param runtime_path: The path of an output file that will be created by the job. :type runtime_path: str :param stream: If True and if possible, the output file will be streamed back while the job is running. Defaults to False. Optional. :type stream: bool :param incoporate: If True, mark this file for incorporation in maestro. NOTE: Only one file can be declared incorporatable. Defaults to False. Optional. :type incorporate: bool """ task_spec = self._task_spec_builder._task_spec if key is None: key = "OUTPUT_{}".format(len(task_spec._output)) self._task_spec_builder.setOutputFile(key, runtime_path, stream, incorporate) if not _filename_uses_jobname(runtime_path): # Output files not using jobname are specified on the # command-line. self._updateCommandLineArgs(runtime_path, "<%s>" % key) task_spec._input[key] = {_TYPE: _OUTPUT_FILENAME} self._runtime_params_builder.setOutputFile(key, runtime_path)
[docs] def addLicense(self, license_string, license_tokens): self._task_spec_builder.addLicense(license_string, license_tokens)
[docs] def setDriverReservesCores(self, reserved): """ If passed True, the driver will allocate N slots matching -HOST hostname: N processes. This is useful when the driver allocates parallel slots on one node so it can resubmit jobs to localhost. If no N is specified, assume to be 1. This is an optimization to avoid resubmission back to the queue if we know we are running a set of parallel jobs with the same CPU requirements. """ self._task_spec_builder.setDriverReservesCores(reserved)
[docs] def setJobUsesTPP(self, uses_tpp): """ If uses_tpp is True, this job will reserve the number of cores specified from -TPP (Threads Per Process) on the command line. Currently we use -TPP N to mean two different things: 1) Reserve N processors for the job being launched. 2) Reserve N processors for each subjob of the job being launched. Case 1 is the case that needs uses_tpp=True. """ self._task_spec_builder.setJobUsesTPP(uses_tpp)
[docs]def is_file_not_found_exception(exc: OSError) -> bool: """ Return True if the exception indicates that a file does not exist. """ if exc.errno == errno.ENOENT: return True elif not hasattr(exc, 'winerror'): return False # ERROR_INVALID_NAME elif exc.winerror == 123: return True return False
[docs]def are_same_file(path1: pathlib.Path, path2: pathlib.Path) -> bool: """ Compare if two file paths are the same, by examining the filesystem. This does not raise an exception if either path does not exist. """ try: if path1.samefile(path2): return True except OSError as e: if not is_file_not_found_exception(e): raise return False
[docs]def resolve_to_abspath(path: pathlib.Path) -> pathlib.Path: """ Attempt resolution to absolute path. If the provided path does not exist, return original path. """ try: return path.resolve() except OSError as e: if not is_file_not_found_exception(e): raise return path
def _validate_output_in_subdir(output_filename: str): """ Raise a TaskSpecificationError if output_filename would be created outside the cwd. """ cwd = pathlib.Path.cwd() ancestor = pathlib.Path(output_filename) while True: if not ancestor: break ancestor = resolve_to_abspath(ancestor) if are_same_file(ancestor, cwd): return elif ancestor == ancestor.parent: break ancestor = ancestor.parent raise TaskSpecificationError( f"Output filename {output_filename} cannot be registered outside " f"of the launch directory {cwd}")