Source code for schrodinger.job.launchparams

import enum
import os
import re
import warnings
from typing import Union

from schrodinger.utils import mmutil


[docs]@enum.unique class ProjectDisposition(enum.Enum): NONE = "None" # Unspecified APPEND = "append" # append new entries APPEND_FIT = "append:fit" # force a workspace fitting after incorporation IGNORE = "ignore" # don't incorporate structures CUSTOM = "custom" # Incorporate the job with custom incorporation only, # as set by maestro.job_incorporation_function_add # You probably don't want to use this option, think # carefully before doing so. ADDTOENTRY = "addtoentry" # To use this value, you must set by a string to # a value like addtoentry_{entry_id} since the # entry_id belongs to the current project. # The following status MAY be supported by maestro, if you use one of # them, please add a comment saying WHAT the purpose of it is. Otherwise, # try not to use them. APPENDINPLACE = "appendinplace" APPENDINPLACE_FIT = "appendinplace:fit" # force a workspace fitting after incorporation APPENDUNGROUPED = "appendungrouped" REPLACE = "replace" REPLACE_FIT = "replace:fit" # force a workspace fitting after incorporation WORKSPACE = "workspace"
class _MaestroParameters: def __init__(self): self._project_name = None self._project_disposition = ProjectDisposition.NONE self._project_dispostion_add_to_entry = None self._viewname = None
[docs]class LaunchParameters: """ These are parameters that can change with each job launch. """
[docs] def __init__(self): self._hostname = None self._maestro_params = _MaestroParameters() # Initialize to none, since there is a different meaning in # -HOST localhost:1 and -HOST localhost, sometimes we have # none as "as many as possible" self._nsubjobs = None self._nprocessors_many_nodes = 1 self._nprocessors_one_node = 1 self._launch_directory = None self._jobname = None self._save_output = False self._save_subjob_output = False self._debug_level = 0 self._delete_launch_dir = False self._oplsdir = None self._original_hosts = None self._driverhost = None self._subhost = None self._tpp = None
[docs] def setDebugLevel(self, debug_level): """ :param save: if True, copy back full output directory as zip :type save: bool """ self._debug_level = debug_level
[docs] def getDebugLevel(self): """ Returns debug level (integer) """ return self._debug_level
[docs] def setDeleteAfterIncorporation(self, delete): """ :param delete: if True, the directory the job was launched from is deleted after incporation :type delete: bool """ self._delete_launch_dir = delete
[docs] def setSaveAllJobOutput(self, save): """ :param save: if True, copy back full output directory as zip :type save: bool """ self._save_output = save
[docs] def setSaveAllSubJobOutput(self, save): """ :param save: if True, copy back full output directories of subjobs as zip :type save: bool """ self._save_subjob_output = save
[docs] def setLaunchDirectory(self, launch_directory): """ Sets cwd for job launch. This avoids a chdir command before calling launch potentially allowing for more than one launch from maestro. """ self._launch_directory = launch_directory
[docs] def getLaunchDirectory(self): """ Sets cwd for job launch. This avoids a chdir command before calling launch potentially allowing for more than one launch from maestro. """ return self._launch_directory
def _setOriginalHosts(self, cmdline_str): """ Sets the original string from the cmdline for the -HOST entry since it may include multiple entries. """ self._original_hosts = cmdline_str def _setDriverHostname(self, hostname): """ Sets a DRIVERHOST hostname. This is an artifact of when we supported remote and local drivers. NOTE: This option is ONLY utilized by toplevel.py since it only is used as an environment variable """ if re.search('[ :,].*', hostname): raise RuntimeError(f"{hostname} must be a single hostname") self._driverhost = hostname def _setSubHost(self, hostname): """ -HOST argument typically refers to the HOST that will run the driver. This host is used to run subjobs. NOTE: This option is ONLY utilized by toplevel.py since it only is used as an environment variable """ self._subhost = hostname
[docs] def setHostname(self, hostname): """ Hostname is the name of the host to run the job on. This name needs to correspond to something in schrodinger.hosts file or DNS-resolveable. :param hostname: name of host :type hostname: str """ self._hostname = hostname
[docs] def getHostname(self): """ Returns hostname is the name of the host to run the job on. This name needs to correspond to something in schrodinger.hosts file or DNS-resolveable. """ if self._driverhost: return self._driverhost return self._hostname
[docs] def getSubHostName(self) -> Union[str, None]: """ Returns None if -SUBHOST isn't set and the hostname if -SUBHOST is set. """ if self._subhost: return self._subhost
[docs] def getJobname(self): """ Return the jobname. """ return self._jobname
[docs] def setJobname(self, jobname): """ Jobname is a non mandatory variable that can be used to indicate name of job in job record. :param jobname: name of job :type jobname: str """ self._jobname = jobname
[docs] def getMaestroViewname(self): """ Viewname is used by a maestro panel to identify a job as belonging to a panel. :returns: viewname name of panel :rtype: str """ return self._maestro_params._viewname
[docs] def setMaestroViewname(self, viewname): """ Viewname is used by a maestro panel to identify a job as belonging to a panel. :param viewname: name of panel :type viewname: str """ # Should we assert this is a str or could be coerced into one? # Nearly all python objects can be coerced into strings, but we have # a bug if you point this to a instance value because it will change # everytime self._maestro_params._viewname = viewname
[docs] def getMaestroProjectDisposition(self): """ :returns ProjectDisposition """ return self._maestro_params._project_disposition
[docs] def getMaestroProjectDispositionString(self): """ :returns str matching project disposition, appropriate for cmdline """ disp = self.getMaestroProjectDisposition() if disp is not ProjectDisposition.ADDTOENTRY: return disp.value return "{disp}_{entry_id}".format( disp=disp.value, entry_id=self._maestro_params._project_disposition_entry_id)
[docs] def setMaestroProjectDisposition(self, disp): """ Mark the state of project incorporation. :param disp: should we incorporate? :type disp: ProjectDisposition """ if not isinstance(disp, ProjectDisposition): try: disp = ProjectDisposition[disp.upper()] except KeyError: addtoentry_prefix = ProjectDisposition.ADDTOENTRY.value.upper( ) + "_" # We can't use ":" in a enum name, so # we special case this if disp.upper() == "APPEND:FIT": disp = ProjectDisposition.APPEND_FIT elif disp.upper() == "APPENDINPLACE:FIT": disp = ProjectDisposition.APPENDINPLACE_FIT elif disp.upper() == "REPLACE:FIT": disp = ProjectDisposition.REPLACE_FIT elif disp.upper().startswith(addtoentry_prefix): entry_id = int(disp[len(addtoentry_prefix):]) disp = ProjectDisposition.ADDTOENTRY self._maestro_params._project_disposition_entry_id = entry_id else: raise # Should we assert this is the enum, or the enum's values? self._maestro_params._project_disposition = disp
[docs] def getMaestroProjectName(self): """ Returns associated project for a job, marking which project should do the incorporation of results. :returns: path to project :rtype: str """ return self._maestro_params._project_name
[docs] def setMaestroProjectName(self, project): """ Set associated project for a job, marking which project should do the incorporation of results. :param project: path to project :type project: str """ # Should we assert this path exists? self._maestro_params._project_name = project
[docs] def setNumberOfProcessorsManyNodes(self, nprocessors): """ Set number of processors to allocate to a job. These may be across multiple nodes. Used by MPI jobs to indicate how many processors the queuing system needs to allocate. :param nprocessors: number of processors requested for this job :type nprocessors: int greater than 0 """ self._nprocessors_many_nodes = int(nprocessors)
[docs] def setNumberOfSubjobs(self, nsubjobs): """ Set number of subjobs that a job could run. This is used by workflow jobs to indicate that a driver job might run and use subjobs. This value is NOT used by the queueing system, but can be accessed from inside the job. :param nsubjobs: number of subjobs :type nprocesses: int greater than 0 """ self._nsubjobs = int(nsubjobs)
[docs] def getNumberOfProcessorsManyNodes(self): """ Get number of processors to allocate to a job. Used by MPI jobs. For example 4 processors = 4 nodes allocated by single jobs. """ return self._nprocessors_many_nodes
[docs] def getNumberOfSubjobs(self): """ Get number of subjobs that a job may run. This value is used by product backend (not jobcontrol). In a workflow job, 4 process = 4 jobs running simultaneously. """ return self._nsubjobs
[docs] def setNumberOfProcessorsOneNode(self, nprocessors): """ Set number of processors on one node. Useful for OpenMP-type jobs, indicates to the queueing system to allocate the requisite number of processors on the same node.. :param nprocessors: number of processors requested for this job :type nprocessors: int greater than 0 """ self._nprocessors_one_node = int(nprocessors)
[docs] def getNumberOfProcessorsOneNode(self): """ Return number of processors on one node. Useful for OpenMP-type jobs, indicates to the queueing system to allocate the requisite number of processors on the same node. :returns: number of processors requested for this job :rtype: int greater than 0 """ return self._nprocessors_one_node
[docs] def getTPP(self): """ Return the TPP option set by toplevel.py only. """ return self._tpp
[docs] def setTPP(self, tpp): """ Sets the TPP option on the cmdline. This will not be used by the parent job unless jobUsesTPP is set in the job specification. """ self._tpp = tpp
[docs] def setOPLSDir(self, oplsdir): self._oplsdir = oplsdir
[docs] def getNumberOfQueueSlots(self): """ Return the number of slots we need to request for the queueing system. This value current maps to NPROC. If we don't know how many, return None. """ return self._nprocessors_many_nodes * self._nprocessors_one_node
[docs] def verify(self): disp = self._maestro_params._project_disposition if disp != ProjectDisposition.NONE and not self._maestro_params._project_name: if disp == ProjectDisposition.IGNORE: warnings.warn(( "No project was specified, but a disposition of {} was given. " + "You should not be setting disposition without a project." ).format(disp), DeprecationWarning) else: raise RuntimeError( "Setting a project disposition {} without a project name is invalid" .format(disp)) if self._subhost: if not self._hostname: hostarg = self._driverhost or "localhost" warnings.warn(( "No HOST was specified, but a SUBHOST was given. A HOST of {} is being used for this job, " + "but this usage will result in an error in future releases." ).format(hostarg), DeprecationWarning) self._hostname = hostarg if self._original_hosts: first_host = self._original_hosts.split()[0].split(',')[0] if first_host != self._original_hosts: warnings.warn(( "HOST arguments containing multiple entries (\"{}\") are discouraged. " + "They may not be supported in a future release").format( self._original_hosts), UserWarning) if self._subhost: first_subhost = self._subhost.split()[0].split(',')[0] if first_subhost != self._subhost: warnings.warn(( "SUBHOST arguments containing multiple entries (\"{}\") are discouraged. " + "They may not be supported in a future release").format( self._subhost), UserWarning)
[docs] def constructHostArg(self): if self._hostname: host = self._hostname if self._nsubjobs: host += f":{self._nsubjobs}" return host
[docs] def convertToJLaunchOptions(self): """ Validate state of Launch Parameters (may throw RuntimeError) and return list of cmdline parameters. :returns: list of str """ self.verify() cmdline = [] if self.getHostname(): cmdline.extend(["-HOST", self.getHostname()]) env = self.convertToEnv() cmdline.extend( ["-schrodinger_nodelist", env["SCHRODINGER_NODELIST"]]) if self._maestro_params._project_name: cmdline.extend(["-PROJ", self._maestro_params._project_name]) if self._maestro_params._viewname: cmdline.extend(["-VIEWNAME", self._maestro_params._viewname]) if self.getMaestroProjectDisposition() != ProjectDisposition.NONE: cmdline.extend(["-DISP", self.getMaestroProjectDispositionString()]) if self._nprocessors_one_node > 1: cmdline.extend(["-TPP", f"{self._nprocessors_one_node}"]) if self._oplsdir: cmdline.extend(["-OPLSDIR", self._oplsdir]) if self._save_output: cmdline.append("-SAVE") if self._save_subjob_output and mmutil.feature_flag_is_enabled( mmutil.JOB_SERVER): # Check the feature flag, to make sure the new argument doesn't get passed # to legacy jobcontrol cmdline.append("-SAVESUBJOBS") if self._debug_level: # -DDEBUG is only valid to toplevel.py, it gets stripped # to -DEBUG for jlaunch cmdline.append("-DEBUG") if self._jobname: cmdline.extend(["-name", self._jobname]) if self._delete_launch_dir: cmdline.append("-tmplaunchdir") return cmdline
[docs] def convertToEnv(self): """ Converts LaunchParameters to environment. Not all parameters are supported. This option should be limited to use by toplevel.py if possible. """ self.verify() # Should we raise if there's a an argument that's not suitable? # NJOBS, OPLSDIR ? env = {} if self._save_output: env["SCHRODINGER_SAVE_JOBDIR"] = "yes" if self._save_subjob_output: env["SCHRODINGER_SAVE_SUBJOBDIR"] = "yes" if self._debug_level: env["SCHRODINGER_JOB_DEBUG"] = f"{self._debug_level}" if self._maestro_params._project_name: env["SCHRODINGER_PROJECT"] = self._maestro_params._project_name if self._maestro_params._viewname: env["SCHRODINGER_VIEWNAME"] = self._maestro_params._viewname if self.getMaestroProjectDisposition() != ProjectDisposition.NONE: env["SCHRODINGER_JOB_DISPOSITION"] = self.getMaestroProjectDispositionString( ) if self._nprocessors_one_node > 1: env["SCHRODINGER_TPP"] = f"{self._nprocessors_one_node}" if self._jobname: env["SCHRODINGER_JOBNAME"] = self._jobname if self._delete_launch_dir: env["SCHRODINGER_TMP_LAUNCHDIR"] = "yes" if self._oplsdir: env["OPLS_DIR"] = self._oplsdir if self.getHostname(): toplevel_host_args = [] original_hosts = self._original_hosts or self.constructHostArg() env["JOBHOST"] = self.getHostname() if self._driverhost: toplevel_host_args.extend(["-DRIVERHOST", self._driverhost]) if self._subhost: env["SCHRODINGER_NODELIST"] = self._subhost toplevel_host_args.extend(["-SUBHOST", self._subhost]) else: env["SCHRODINGER_NODELIST"] = original_hosts or "" if original_hosts: toplevel_host_args.extend(["-HOST", original_hosts]) env['TOPLEVEL_HOST_ARGS'] = " ".join(toplevel_host_args) env['HOST_ARGS'] = "-HOST {}".format(env["JOBHOST"]) env['ORIG_HOST_ARGS'] = '-HOST "{}"'.format( env["SCHRODINGER_NODELIST"]) return env
[docs] def consumeCommandLine(self, cmdline): """ This consumes arguments from cmdline. The primary purpose is to consume arguments from toplevel $SCHRODINGER/run. :returns cmdline with options filtered out. """ output_cmdline = [] skip_arg = False for i, arg in enumerate(cmdline): if skip_arg: skip_arg = False elif arg in ("-DETACHED", "-ATTACHED"): continue elif arg == "-SAVE": self.setSaveAllJobOutput(True) elif arg == "-SAVESUBJOBS": self.setSaveAllSubJobOutput(True) # Debugging output elif arg in ['-D', '-DEBUG']: self.setDebugLevel(1) output_cmdline.append("-DEBUG") # Debugging output elif arg in ['-DD', '-DDEBUG']: self.setDebugLevel(2) output_cmdline.append("-DEBUG") elif arg == "-JOBNAME": _require_argument(arg, cmdline, i) self.setJobname(cmdline[i + 1]) skip_arg = True elif arg == "-OPLSDIR": _require_argument(arg, cmdline, i) oplsdir = cmdline[i + 1] if oplsdir == 'custom': # keep this rare import here so launchparams doesn't # require an mmlibs license in the general case. from schrodinger.utils import fileutils local_appdata_dir = fileutils.get_local_appdata_dir() oplsdir = os.path.join(local_appdata_dir, 'opls_dir') self.setOPLSDir(oplsdir) skip_arg = True elif arg == '-PROJ': _require_argument(arg, cmdline, i) self.setMaestroProjectName(cmdline[i + 1]) skip_arg = True elif arg == '-DISP': _require_argument(arg, cmdline, i) self.setMaestroProjectDisposition(cmdline[i + 1]) skip_arg = True elif arg == '-VIEWNAME': _require_argument(arg, cmdline, i) self.setMaestroViewname(cmdline[i + 1]) skip_arg = True # Send back an archive of the job directory elif arg == '-TMPLAUNCHDIR': self.setDeleteAfterIncorporation(True) # Host args elif arg == "-HOST": _require_argument(arg, cmdline, i) host_arg = cmdline[i + 1].strip("\"'") self._setOriginalHosts(host_arg) if mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER): if "," in host_arg or " " in host_arg.strip(): raise SyntaxError( f"Only a single -HOST <host> argument can be used, the currently supplied argument is {host_arg}" ) # Apparently, these are valid # -HOST myhost:2,myotherhost # -HOST myhost:2 myotherhost # -HOST myhost,myotherhost first_host = host_arg.split()[0].split(',')[0] if ":" in first_host: first_host, nproc = first_host.split(":") self.setNumberOfSubjobs(nproc) self.setHostname(first_host) skip_arg = True elif arg == "-DRIVERHOST": _require_argument(arg, cmdline, i) self._setDriverHostname(cmdline[i + 1]) skip_arg = True elif arg == "-SUBHOST": _require_argument(arg, cmdline, i) self._setSubHost(cmdline[i + 1]) skip_arg = True elif arg == "-MPICORES": _require_argument(arg, cmdline, i) self.setNumberOfProcessorsManyNodes(cmdline[i + 1]) skip_arg = True else: if arg == "-TPP": _require_argument(arg, cmdline, i) self.setTPP(cmdline[i + 1]) output_cmdline.append(arg) return output_cmdline
[docs]class ArgumentParserError(Exception): pass
def _require_argument(arg_name, cmdline, index): if index + 1 >= len(cmdline): raise ArgumentParserError(f"Argument {arg_name} requires an argument")