Source code for schrodinger.job.jobcontrol

"""
Core job control for python.

There are currently four major sections of this module - "Job database,"
"Job launching," "Job backend," and "Job hosts." The job database section
deals with getting info about existing Jobs, the job launching section
deals with starting up a subjob, and the job backend section provides
utilities for a python script running as a job.

Copyright Schrodinger, LLC. All rights reserved.
"""

import base64
import collections
import contextlib
import enum
import functools
import inspect
import logging
import os
import re
import shlex
import subprocess
import sys
import tempfile
import time
import warnings
from datetime import datetime
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
from urllib.parse import urlparse
from urllib.request import url2pathname

import backoff
import more_itertools

import schrodinger.infra.mm as mm
import schrodinger.infra.mmjob as mmjob
import schrodinger.utils.fileutils as fileutils
import schrodinger.utils.log
import schrodinger.utils.mmutil as mmutil
from schrodinger import get_maestro
from schrodinger import get_mmshare_version
from schrodinger import get_release_name
from schrodinger import gpgpu
from schrodinger.job import jobcontrol
from schrodinger.Qt import QtCore
from schrodinger.utils import qt_utils
from schrodinger.utils import subprocess as subprocess_utils
from schrodinger.utils import qapplication

from . import resource

_version = "$Revision: 1.105 $"

LOCAL_RUN = os.path.join('${SCHRODINGER}', 'run')

logger = logging.getLogger("schrodinger.jobcontrol")
schrodinger.utils.log.default_logging_config()
# If JobControl debugging is on, also turn on Python debugging for this module.
if os.environ.get('SCHRODINGER_JOB_DEBUG'):
    logger.setLevel(logging.DEBUG)
profiling = os.getenv("JC_PROFILING", 0)

mmjob.mmjob_initialize(mm.MMERR_DEFAULT_HANDLER, "")

TOPLEVEL_HOST_ARGS_ENV = 'TOPLEVEL_HOST_ARGS'

LOCALHOST_ENTRY_NAME = "localhost"


[docs]class DisplayStatus(enum.Enum): WAITING = "Waiting" RUNNING = "Running" CANCELED = "Canceled" STOPPED = "Stopped" FAILED = "Failed" COMPLETED = "Completed"
[docs]def timestamp(msg): if profiling: print("@@", datetime.now().strftime("%H:%M:%S.%f"), "-", msg)
## # A regular expression to grab a JobId, e.g. out of a launch command's # stdout. # jobid_re = re.compile(r"^JobId:\s+(\S+)", re.IGNORECASE | re.MULTILINE) ## # The timestamp format used for the Job database in the LaunchTime and # StartTime fields. # timestamp_format = "%Y-%m-%d-%H:%M:%S" ## # The order of host-entry fields, in which they will be stored # in the _lines attribute of a Host object entry_fields = [ "name", "base", "host", "nodelist", "user", "queue", "qargs", "schrodinger", "proxyhost", "proxyport", "proxyexec", "tmpdir", "shareddir", "env", "processors", "processors_per_node", "parallel", "ngpu", "gpgpu", "cuda_cores", "maestrocontrols", "walltime", "memory", "accountcodes" ] field_sortkey = {k: v for (v, k) in enumerate(entry_fields)} if sys.platform == "darwin": INSTALL_ROOT = "/opt/schrodinger" elif sys.platform == "win32": INSTALL_ROOT = "C:\\Program Files\\Schrodinger" else: INSTALL_ROOT = "/opt/schrodinger" HOSTS_FILE = "schrodinger.hosts" # Characters that don't require escaping on command-line of any OS: SAFE_COMMAND_CHARS = set( 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.') # Programs which are stoppable by sending 'halt' message to the backend. STOPPABLE_PROGRAMS = { "Desmond", "WaterMap", "multisim", "Quantum ESPRESSO executable", "Periodic DFT" } # A regular expression to check if the argument of the command list # is a shell redirection operator. shell_redirect_re = re.compile(r'^\d*(<{1,2}|>{1,2})&?\d*$') qapp = None
[docs]class JobcontrolException(Exception): pass
[docs]class JobLaunchFailure(JobcontrolException, RuntimeError): pass
[docs]class MissingHostsFileException(JobcontrolException): pass
[docs]class UnreadableHostsFileException(JobcontrolException): pass
## # Job database stuff #
[docs]class Job: """ A Job instance is always a snapshot of the job state at a specific point in time. It is only updated when the `readAgain` method is explicitly invoked. """
[docs] def __init__(self, job_id: str, cpp_job: mmjob.Job = None): """ Initialize a read-only Job object. :param job_id: Unique identifier for a job :param cpp_job: provide a c++ job object in memory, used for constructing objects in wrapper objects from c++ APIs, rather than direct construction. """ _launch_qapp() self._case_insensitive_properties = self._get_case_insensitive_lookup() if cpp_job: self._cpp_job = cpp_job self.job_id = self.JobId else: self.job_id = job_id self.readAgain()
def _get_case_insensitive_lookup(self) -> Dict[str, str]: """ Construct a map of lowercase properties to valid attribute members """ d = {} for name, _ in inspect.getmembers(Job, inspect.isdatadescriptor): if name.startswith("_"): continue d[name.lower()] = name return d
[docs] def readAgain(self): """ Reread the database. Calling this routine is necessary to get fresh values. """ backend = get_backend() if backend and backend.job_id == self.job_id: self._cpp_job = mmjob.get_backend_job() return self._cpp_job = mmjob.Job(self.job_id)
[docs] def isComplete(self) -> bool: """ Returns True if the job is complete. Note that this does not necessarily mean the output files have been downloaded. """ return self._cpp_job.isComplete()
[docs] def isQueued(self) -> bool: """ Returns True if the job runs on a HPC queueing system. """ return self._cpp_job.isQueueJob()
[docs] def succeeded(self) -> bool: """ Returns False if the job was killed, died or fizzled. Returns True if ExitStatus is finished. :raises RuntimeError: if the job isn't completed, so use isComplete() before calling. """ # Check ExitStatus before isComplete. There is a race condition in # legacy jobcontrol where ExitStatus may be "finished" before # isComplete is true, but the ExitStatus property will block if this # is the case." if self.ExitStatus == "finished": return True if not self.isComplete(): raise RuntimeError( f"Job '{self.job_id}' is not complete. Current status is: {self.Status}" ) return False
[docs] def wait_before_kill(self): # Experiments show that attempting to kill a (non-job-server) job too # quickly fails, so wait at least 5 seconds. seconds_since_start = time.time() - time.mktime( time.strptime(self.LaunchTime, timestamp_format)) if seconds_since_start < 5: time.sleep(5 - seconds_since_start)
[docs] def stop(self): """ Kill the job while collecting output files. """ self._cpp_job.stopJob()
[docs] def kill(self): """ Kill the job if it is running. This cancels a running job and does not return output files. """ self._cpp_job.killJob()
[docs] def cancel(self): """ Cancel a running job and do not return output files. This method will eventually deprecate job.kill """ self._cpp_job.killJob()
[docs] def kill_for_smart_distribution(self): """ Kill the job for smart distribution if it is running. """ if self.isComplete(): return if not mmjob.mmjob_is_job_server_job(self.job_id): self.wait_before_kill() self._cpp_job.killJobForSmartDistribution()
def _wait(self, max_interval: int): """ Wait for the job to complete. :param max_interval: maximum interval to sleep between checking for completion. """ if mmjob.mmjob_is_job_server_job(self.job_id): self.download() return # The python implementation differs from the job server C++ # implementation in that it reads the job record again if it fails. interval = 2 while 1: self.readAgain() if self.isComplete(): return else: time.sleep(interval) # exponential fallback if interval * 2 > max_interval: interval = max_interval else: interval = interval * 2
[docs] def wait(self, max_interval: int = 60, throw_on_failure: bool = False): """ Wait for the job to complete; sleeping up to 'max_interval' seconds between each database check. (Interval increase gradually from 2 sec up to the maximum.) NOTE: Do not use if your program is running in Maestro, as this will make Maestro unresponsive while the job is running. :param throw_on_failure: whether to raise an exception if not succeeded :type throw_on_failure: bool :raises RuntimeError: if the job did not succeed. The error message will contain the last 20 lines of the job's logfile (if available). """ self._wait(max_interval) if throw_on_failure: if not self.succeeded(): msg = f"Job '{self.job_id}' did not succeed." if not self.LogFiles or not os.path.exists(self.LogFiles[0]): msg += "\nNo log file available." else: logfile = self.LogFiles[0] num_lines = 20 with open(logfile) as fh: last_lines = "".join(more_itertools.tail(num_lines, fh)) msg += (f"\nLast {num_lines} lines of {logfile}:\n" f"{schrodinger.utils.log.SINGLE_LINE}\n" f"{last_lines}") raise RuntimeError(msg)
[docs] def download(self): """ Download the output of the job into job's launch directory. No-op in legacy jobcontrol. """ if not mmjob.mmjob_is_job_server_job(self.job_id): return mmjob.mmjob_wait_and_download(self.job_id) self.readAgain()
def __repr__(self): """ Returns the formal string representation of the Job object. """ return "Job(\"%s\")" % self.job_id
[docs] def get(self, attr, default=None): """ This function will always raise an error, but is provided to guide users to a new syntax. """ if attr.lower() not in self._case_insensitive_properties: raise TypeError(f"{attr} is not an attribute of Job") canonical_name = self._case_insensitive_properties[attr.lower()] raise TypeError( f'Update syntax: Job.{canonical_name} replaces Job.get("{attr}") ' 'to reflect that these attributes are always available')
def __getattr__(self, name): if name in self._case_insensitive_properties.values(): return super().__getattribute__(name) if name.lower() not in self._case_insensitive_properties: return super().__getattribute__(name) canonical_name = self._case_insensitive_properties[name.lower()] warnings.warn( f"Job.{canonical_name} is the appropriate way to call Job.{name}", SyntaxWarning, stacklevel=2) return getattr(self, canonical_name)
[docs] def summary(self) -> str: """ Return a string summarizing all current Job attributes. """ return self._cpp_job.getSummary()
[docs] def getDuration(self) -> Optional[int]: """ Returns the wallclock running time of the job if it is complete. This does not include time is submission status. Returns time in seconds. If the job is not complete, returns None. """ if not self.StartTime or not self.StopTime: return None start = time.mktime(time.strptime(self.StartTime, timestamp_format)) stop = time.mktime(time.strptime(self.StopTime, timestamp_format)) return int(stop - start)
[docs] def isDownloaded(self): """ Check if output files were downloaded. For legacy job control, identical to `isComplete()`. :return: Whether the job files were downloaded. :rtype: bool """ is_complete = self.isComplete() if not mmjob.mmjob_is_job_server_job(self.job_id): # With JOB_SERVER off, job gets downloaded when its completed. return is_complete return is_complete and not self._cpp_job.hasDownloadableFiles()
@property def BatchId(self) -> Optional[str]: """ Return the batch id, if running on an HPC queueing system. Otherwise return None. """ batchid = self._cpp_job.getBatchId() if not batchid: return None return batchid @property def Dir(self) -> str: """ Return the absolute path of the launch directory. """ return self._cpp_job.getLaunchDir() @property def ExitCode(self) -> Union[int, str]: """ Returns the exit code of a process. If the job is still running, or it was killed without collecting the exit code, return a string indicating unknown status. """ try: return self._cpp_job.getExitCode() except RuntimeError: return "Exit code unknown" @property def Host(self) -> str: """ Return the hostname of the host which launched this job. """ return self._cpp_job.getString("Host") @property def HostEntry(self) -> str: """ Return the name of the host entry this job was launched to. """ return self._cpp_job.getHostEntry() @property def LaunchTime(self) -> str: """ Return a string timestamp for the time that the job was launched. This will before the job starts running, as soon as it is registered with jobcontrol as a job to be run. """ return self._cpp_job.getLaunchTime() @property def JobId(self) -> str: """ Return an identifier for a job. """ return self._cpp_job.getJobId() @property def Name(self) -> str: """ Returns a string representing -JOBNAME that was specified on launch. This may be an empty string. """ return self._cpp_job.getJobName() @property def ParentJobId(self) -> Optional[str]: """ Return the jobid of a parent job. If the job does not have a parent, return None. """ parent_jobid = self._cpp_job.getParentJobId() if not parent_jobid: return None return parent_jobid @property def Processors(self) -> int: """ For a batch job, returns the number of queue slots attached to this job. For a local job, return the number of CPU cores allowed to be used. """ return self._cpp_job.getProcessorCount() @property def Program(self) -> str: """ Return descriptive text for the name of the program running this job, e.g. Jaguar. This field is optional and may return an empty string. """ return self._cpp_job.getProgram() @property def Version(self) -> str: """ Return the build number. """ mmshare_version = str(get_mmshare_version()) return "{release} build {version}".format(release=get_release_name(), version=mmshare_version[-3:]) @property def Project(self) -> str: """ Return the job's project name field. This will be an empty string if no project is set. """ return self._cpp_job.getProject() @property def QueueHost(self) -> str: """ Return the hostname of the submission node of a HPC queueing system. If not an HPC host, this will be an empty string. """ return self._cpp_job.getQueueHost() @property def StructureOutputFile(self) -> str: """ Return the name of the file returned by the job that will get incorporated into a project of Maestro. Returns an empty string if no file is specified. """ return self._cpp_job.getStructureOutputFile() @property def DisplayStatus(self) -> Optional[DisplayStatus]: """ Return a user-focused status that indicates the current state of the job. Returns None in the case of non JOB_SERVER jobs. """ # The opposite of "user-focused" in this case is # implementation-dependent. if mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER): return DisplayStatus(self._cpp_job.getDisplayStatus()) else: return None @property def StatusChangeReason(self) -> str: """ Returns a human-readable reason that a job entered its current state, such as "job canceled by the user." If the reason was not recorded or is not particularly interesting (e.g. normal transition from waiting to running) it may be the empty string. """ if mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER): return self._cpp_job.getStatusChangeReason() else: return "" @property def Status(self) -> str: """ Get the Status of the job. This is used by legacy jobcontrol API, but is superseded by DisplayStatus for JOB_SERVER jobs. """ return self._cpp_job.getString("Status") @property def StartTime(self) -> str: """ Return a string for the starting time of the job. Returns an empty string if the job is not yet started, for example, enqueued in an HPC environment. """ return self._cpp_job.getStartTime() @property def StopTime(self) -> str: """ Return a string for the completion time of the job. Returns an empty string if the job is not yet completed. """ return self._cpp_job.getStopTime() @property def StatusTime(self) -> str: """ Return a string for the time when the job was last updated. """ return self._cpp_job.getStatusTime() @property def Viewname(self) -> str: """ Return a representation of name used to filter jobs in maestro. May be empty. """ return self._cpp_job.getViewName() def _wait_for_complete(self, interval: int = 1, max_time: int = 30): """ Wait for a job to transition from finished to completed. This is only meaningful with legacy jobcontrol. :param interval: seconds to wait between reread of job record :param max_interval: maximum time to wait for job to be completed """ @backoff.on_predicate(backoff.constant, lambda job: not job.isComplete(), interval=interval, max_time=max_time) def _reread(): self.readAgain() return self _reread() @property def ExitStatus(self) -> str: """ Get the ExitStatus of the job. This is a string representation of a job. Consider using DisplayStatus instead. :raises: RuntimeError if the job is not yet complete. """ def get_status(): return self._cpp_job.getString("ExitStatus") exitstatus = get_status() if exitstatus: if self.isComplete(): return exitstatus elif not mmjob.mmjob_is_job_server_job( self.job_id) and exitstatus == "finished": # There seems to sometimes a wait between "finished" and # "completed", especially in jobdj. Do a 30s retry to wait for # the specific finished->completed status change. This comes # from the fact that a message can happen before job record # is updated. self._wait_for_complete() exitstatus = get_status() if self.isComplete(): return exitstatus raise RuntimeError( f"ExitStatus is not valid until the job {self.JobId} has " f"completed; it is currently '{self.Status}:{exitstatus}'. " f"Full job record: {self.summary()}") @property def JobDir(self) -> str: """ Return the directory where the job is run. This will be an empty string if the job has not yet started. """ return self._cpp_job.getJobDir() @property def JobHost(self) -> str: """ Return the hostname where the job is run. This will be an empty string if the job has not yet started. """ return self._cpp_job.getJobHost() @property def JobSchrodinger(self) -> str: """ Return the directory of Schrodinger installation where the job is running. """ job_schrodinger = self._cpp_job.getString("JobSchrodinger") if job_schrodinger: return job_schrodinger jobMMshareExec = self._cpp_job.getString("JobMMshareExec") return os.path.dirname(os.path.dirname(os.path.dirname(jobMMshareExec))) @property def Envs(self) -> List[str]: """ Return a list of environment varaibles that are set by job, in addition to a default environment on a machine. The format is ["SPECIAL_VAR=0", "SPECIAL_VAR2=yes"] """ return list(self._cpp_job.getEnvironment()) @property def Errors(self) -> List[str]: """ Return possible error messages associated with a job. This will only return values in legacy jobcontrol. """ return list(self._cpp_job.getErrors()) @property def LogFiles(self) -> List[str]: """ Get list of log files associated with a log. May be an empty list. """ return list(self._cpp_job.getFiles(mmjob.JobFilesType_LOG_FILES)) @property def SubJobs(self) -> List[str]: """ Return list of subjob job ids. """ return list(self._cpp_job.getSubjobIds()) @property def Commandline(self) -> str: """ Return the command used to launch the job. Note that this may not be accurate when the job is called directly from a jobspec. In that case it will instead return the commandline of the parent process. """ return self._cpp_job.getString("Commandline") @property def User(self) -> str: """ Return the username of user who launched the job. """ return self._cpp_job.getUser()
[docs] def getApplicationHeaderFields(self, default=None) -> Dict[str, str]: """ Returns a dictionary of commonly used jobcontrol keyword:value pairs used to standardize application log files. :param default: Value assigned to a keyword if the corresponding attribute is not defined. :type default: any """ fields = {} keywords = [ 'JobId', 'Name', 'Program', 'Version', 'Host', 'Dir', 'HostEntry', 'Queue', 'JobHost', 'JobDir', 'JobSchrodinger', 'Commandline', 'StartTime' ] for item in keywords: fields[item] = getattr(self, item, default) return fields
[docs] def getApplicationHeaderString(self, field_sep: str = ' : ') -> str: """ Returns a formatted string, suitable for printing at the top of a log file printing helpful information about the state of the job. :param field_sep: String that delimits the keyword and value. Example:: backend = schrodinger.job.jobcontrol.get_backend() if backend: print backend.getJob().getApplicationHeaderString() """ undefined_field = None fields = self.getApplicationHeaderFields(default=undefined_field) max_length = max([len(key) for key in fields]) kw_args = [] for key in fields: # Only show Queue field if it is defined. if key == 'Queue' and fields[key] == undefined_field: continue kw_args.append( field_sep.join([key.ljust(max_length), str(fields[key])])) section_delim = 80 * '-' kw_args.insert(0, section_delim) kw_args.append(section_delim) header = '\n'.join(kw_args) return header
[docs] def getInputFiles(self) -> List[str]: return self.InputFiles
@property def InputFiles(self) -> List[str]: """ Return list of files that will be transferred to the job directory on launch. """ return list(self._cpp_job.getFiles(mmjob.JobFilesType_INPUT_FILES)) @property def JobDB(self) -> str: """ Path to the Job Database in legacy jobcontrol. This is an empty str for JOB_SERVER jobs. """ return self._cpp_job.getString("JobDB") @property def OrigLaunchDir(self) -> str: """ Return the launch directory of the oldest ancestor of this job. """ return self._cpp_job.getOriginalLaunchDir() @property def OrigLaunchHost(self) -> str: """ Return the hostname of the oldest ancestor of this job. """ return self._cpp_job.getOriginalLaunchHost()
[docs] def getOutputFiles(self) -> List[str]: return self.OutputFiles
@property def OutputFiles(self) -> List[str]: """ Return a list of output filenames which will be copied back, if existing, at the end of a job. Note that this list can grow while the backend is running, since output files can be registered by the backend. """ return list(self._cpp_job.getFiles(mmjob.JobFilesType_OUTPUT_FILES))
[docs] def getProgressAsPercentage(self) -> float: """ Get the value of backend job progress in terms of percentage (values from 0.0 - 100.0) Return 0.0 when a job is not yet in running state. """ return self._cpp_job.getProgress().percentage_completed
[docs] def getProgressAsSteps(self) -> Tuple[int, int]: """ Get the value of backend job progress in terms of steps and totalsteps. Return (0,1) when a job is not yet in 'running' state. """ progress = self._cpp_job.getProgress() return (progress.completed_steps, progress.total_steps)
[docs] def getProgressAsString(self) -> str: """ Get the value of backend job progress in terms of descriptive text. Return "The job has not yet started." when a job is not yet in running state. """ return self._cpp_job.getProgress().description
[docs] def purgeRecord(self): """ Purge the job record for the job from the database. """ return self._cpp_job.removeRecord()
def _launch_qapp(): """ Launch QCoreApplication so mmjob can run async calls under the hood. If we are calling from maestro or other PyQt GUI, this should already be instantiated, so don't put this somewhere that will get called on import. """ global qapp if not qapp: qapp = QtCore.QCoreApplication.instance() if not qapp: qapp = QtCore.QCoreApplication([]) # # Job launching stuff # def _get_viewname_from_launch_cmd(cmd: List[str]) -> str: """ Returns an empty viewname if not found. """ is_viewname = False for arg in cmd: if is_viewname: return arg elif arg == "-VIEWNAME": is_viewname = True return "" @qapplication.require_application(create=True, use_qtcore_app=True) def launch_job(cmd: List[str], print_output: bool = False, expandvars: bool = True, launch_dir: Optional[str] = None, timeout: Optional[int] = None, env: Optional[Dict[str, str]] = None, show_failure_dialog: bool = True, _debug_delay=None) -> jobcontrol.Job: """ Run a process under job control and return a Job object. For a process to be under job control, it must print a valid JobId: line to stdout. If such a line isn't printed, a RuntimeError will be raised. The cmd argument should be a list of command arguments (including the executable) as expected by the subprocess module. If the executable is present in $SCHRODINGER or $SCHRODINGER/utilities, an absolute path does not need to be specified. NOTE: UI events will be processed while the job is launching. :param print_output: Determines if the output from job launch is printed to the terminal or not. Output will be logged (to stderr by default) if Python or JobControl debugging is turned on or if there is a launch failure, even if 'print_output' is False. :param expandvars: If True, any environment variables of the form `$var` or `${var`} will be expanded with their values by the `os.path.expandvars` function. :param launch_dir: Launch the job from the specified directory. If unspecified use current working directory. :param timeout: Timeout (in seconds) to be applied while waiting for the job control launch process to start or finish. The launch process will be terminated after this time. If None, the launch process will run with a default timeout of 300s. :param env: This dictionary will replace the environment for the launch process. If env is None, use the current environment for the launch process. :param show_failure_dialog: If True, show failure dialog if we detect we are using a graphical application and the job launch fails. :raise RuntimeError: If there is a problem launching the job (e.g., no JobId gets printed). If running within Maestro, an error dialog will first be shown to the user. :raise FileNotFoundError: If launch_dir doesn't exist. """ from schrodinger.infra import jobhub if timeout is not None and timeout < 1: # JobCommand only deals with timeouts of integer seconds raise RuntimeError( f"Timeout {timeout} will be truncated to the latest second and " "will fail if set to a number less than 1 sec") cmd = fix_cmd(cmd, expandvars) timestamp('execute perl jlaunch.pl') event_loop = QtCore.QEventLoop() job = None err_message = None job_cmd = jobhub.JobCommand(cmd, _get_viewname_from_launch_cmd(cmd), launch_dir) if env: qprocess_env = QtCore.QProcessEnvironment() for key, value in env.items(): qprocess_env.insert(key, value) job_cmd.setEnvironment(qprocess_env) if timeout is not None: job_cmd.setTimeout(int(timeout)) elif "_SCHRODINGER_JOB_LAUNCHING" in os.environ: # NOTE: If this environment variable is set, it means a parent process # has already launched the current process with a defined timeout. # Assume this function is being called as a result of launchapi/launcher # so don't set our own timeout; instead, rely on the timeout defined # from the parent process. job_cmd.setTimeout(-1) # unlimited @qt_utils.exit_event_loop_on_exception def job_started(launched_job: jobcontrol.Job, *, event_loop=None): nonlocal job job = launched_job if _debug_delay is None: event_loop.quit() else: QtCore.QTimer.singleShot(_debug_delay * 1000, event_loop.quit) @qt_utils.exit_event_loop_on_exception def job_launch_failed(launch_err: str, *, event_loop=None): nonlocal err_message nonlocal err_message err_message = launch_err event_loop.quit() job_launcher = jobhub.JobLauncher(job_cmd) job_launcher.jobStarted.connect( functools.partial(job_started, event_loop=event_loop)) job_launcher.jobLaunchFailed.connect( functools.partial(job_launch_failed, event_loop=event_loop)) job_launcher.launch() # Confirm that neither signal was emitted yet assert job is None assert err_message is None event_loop.exec() exception = qt_utils.get_last_exception() if exception: raise exception if not err_message and not job: err_message = "Job launch failed" if err_message: if show_failure_dialog and get_maestro(): # Import QtWidgets only if running within Maestro: # Note similarities with utils.JobLaunchFailureDialog from schrodinger.Qt import QtWidgets dialog = QtWidgets.QMessageBox() dialog.setIcon(QtWidgets.QMessageBox.Warning) dialog.setText(f"Job launch failed: {cmd}") dialog.setDetailedText(err_message) dialog.exec() raise jobcontrol.JobLaunchFailure(err_message) if print_output: print(f"JobId: {job.JobId}") return job
[docs]def prepend_schrodinger_run(cmd: List[str]) -> List[str]: """ Check if a command executes a Python script and prepend $SCHRODINGER/run to the command if it does not already begin with it. :param cmd: Command to prepend $SCHRODINGER/run to. """ if len(cmd) == 0: raise ValueError('Empty command list specified') if cmd[0].endswith('.py') or cmd[0].endswith('.pyc'): cmd = [LOCAL_RUN] + cmd return cmd
[docs]def fix_cmd(cmd: List[str], expandvars: bool = True) -> List[str]: """ A function to clean up the command passed to launch_job. :param cmd: A list of strings for command line launching. :param expandvars: If True, any environment variables of the form `$var` or `${var`} will be expanded with their values by the `os.path.expandvars` function. :return: The command to be launched """ cmd = prepend_schrodinger_run(cmd) # This function exists as a separate entity from launch_job mostly so it # can be tested. try: cmd + [] except TypeError: raise TypeError("String commands are not accepted. " "Please use lists of arguments.") # cmd is a list logger.debug("launch_job command: %s" % " ".join(cmd)) if expandvars: cmd = [os.path.expandvars(arg) for arg in cmd] logger.debug(" expandvars? command: %s" % " ".join(cmd)) cmd = _fix_program_path(cmd[0]) + cmd[1:] logger.debug(" modified? command: %s" % " ".join(cmd)) return cmd
def _fix_program_path(prog: str) -> List[str]: """ If the program executable doesn't exist as a file and isn't an absolute path, prepend the SCHRODINGER directory if it exists there. Return a list of command line arguments so this function can be used from launch_job when 'cmd' is a list. """ # Shortcut the search if the program is python; this prevents an extra # layer of $SCHRODINGER/utilities/python for python scripts launched # from python. This is an issue because of toplevel.py. if prog == "python": if sys.platform == "win32": return ["python.exe"] else: return ["python"] prog = subprocess_utils.abs_schrodinger_path(prog) return [prog]
[docs]def list2jmonitorcmdline(cmdlist: List[str]) -> str: """ Turn a command in list form to a single string that can be executed by jmonitor. """ cmd = [] for arg in cmdlist: if not mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER): arg = subprocess.list2cmdline((arg,)) # Quote all args since they end up in a string. Make an exception # for strings that contain only "safe" characters or shell redirect. if not (arg.startswith('"') and arg.endswith('"')) \ and not shell_redirect_re.match(arg): if not set(arg).issubset(SAFE_COMMAND_CHARS): # If argument contains "unsafe" characters, quote it: arg = '\"' + arg + '\"' else: if not shell_redirect_re.match(arg): arg = shlex.quote(arg) cmd.append(arg) return ' '.join(cmd)
def _get_file_path(url: str) -> str: """ From a job spec url, return a filename. """ if not url.startswith("file://"): raise RuntimeError( "File URI is %s but job_control_launch doesn't know how " "to deal with non-file URIs at this time." % url) parsed_url = urlparse(url) netloc = parsed_url.netloc # for UNC paths if netloc: netloc = r"\\" + netloc return url2pathname(netloc + parsed_url.path) def _get_job_spec_launch_command(job_spec, launch_parameters, write_output=False): """ Get the launch command for a job based on its specification. :param job_spec: Data defining the job. :type job_spec: schrodinger.job.launchapi.JobSpecification :param launch_parameters: launch parameters for the job :type launch_parmeters: schrodinger.job.launchparams.LaunchParameters :param write_output: If true, construct a launch command that can be run from any SCHRODINGER. Used by appframework2 to write a shell script that can run the job later. :type write_output: bool :return: Command to execute as a list of strings. """ # Set the launch_parameters jobname in the job_spec so it can sub out # any <JOBNAME> variables when calling getCommand(). lp_jobname = launch_parameters.getJobname() if lp_jobname: job_spec.setJobname(lp_jobname) job_spec.validate() cmd = job_spec.getCommand() stdout_file = job_spec.getStdout() if not stdout_file: stdout_file = job_spec.getDefaultStdout() cmd = list2jmonitorcmdline(cmd) cmd += f' > "{stdout_file}" ' stderr_file = job_spec.getStderr() if not stderr_file: stderr_file = job_spec.getDefaultStderr() cmd = f" {cmd} " if stdout_file == stderr_file: cmd += "2>&1 " else: cmd += f"2> {stderr_file}" if write_output: launch_command = ["$SCHRODINGER/run", "jlaunch.pl"] else: launch_command = [ "perl", os.path.join(os.environ['MMSHARE_EXEC'], "jlaunch.pl") ] # Use the -usetmpdir option so that missing input files in the spec will # cause the job to fail. launch_command.extend([ "-usetmpdir", "-cmd", "BASE64 " + base64.b64encode(cmd.encode('ascii')).decode('ascii') ]) if job_spec.jobUsesTPP(): tpp = launch_parameters.getTPP() if tpp: launch_parameters.setNumberOfProcessorsOneNode(tpp) launch_command.extend(launch_parameters.convertToJLaunchOptions()) if not lp_jobname and job_spec.getJobname(): launch_command.extend(["-name", job_spec.getJobname()]) if job_spec.getProgramName(): launch_command.extend(["-prog", job_spec.getProgramName()]) if mmutil.feature_flag_is_enabled( mmutil.JOB_SERVER) and job_spec.isStoppable(): launch_command.extend(["-stoppable"]) # In general, the number of queue slots is dictated by the number # of threads needed. (i.e. each job requests N queue slots. For some # jobs that mix parallelism, this is N * M jobs. In special case where # the driver jobs launches all the jobs, we reserve by number of processors # (only) as the number of queue slots nprocs = launch_parameters.getNumberOfQueueSlots() if job_spec.driverReservesCores(): if nprocs != 1: raise RuntimeError( f"Setting number of slots ({nprocs}) and processors consumed " "by driver are incompatible") nprocs = launch_parameters.getNumberOfSubjobs() if not nprocs: raise RuntimeError( "For the driver to reserve cores, please set the number of " "subjobs explicitly (:X) as -HOST host:X") if nprocs: launch_command.extend(["-NPROC", str(nprocs)]) licenses = job_spec.getLicenses() if licenses: for license_name, token_count in licenses: launch_command.extend(["-lic", f"{license_name}:{token_count}"]) input_file_args = input_file_arguments(job_spec, launch_parameters, write_output) cmdline_file_args = file_arguments_for_launch_command(input_file_args) launch_command.extend(cmdline_file_args) for file_ in job_spec.getOutputFiles(stream=False, incorporate=False): launch_command.extend(["-out", file_]) for file_ in job_spec.getOutputFiles(stream=True, incorporate=False): launch_command.extend(["-log", file_]) for file_ in job_spec.getOutputFiles(incorporate=True): launch_command.extend(['-structout', file_]) logger.debug("launch_from_job_spec launch command %s" % launch_command) logger.debug("launch_from_job_spec job spec %s" % job_spec.asJSON(indent=4, sort_keys=True)) logger.debug("launch_from_job_spec task command %s" % cmd) return launch_command
[docs]def input_file_arguments(job_spec, launch_parameters, write_output): """ Return a set of file arguments (a list of (option, value) tuples) corresponding to the input files of a given job. If any of the input files are missing, raises an error. """ file_args = [] missing_input_files = [] for (url, runtime_path) in job_spec.getInputFiles(): source = _get_file_path(url) if not os.path.exists(source): missing_input_files.append(source) else: if write_output: launch_dir = launch_parameters.getLaunchDirectory( ) or os.getcwd() if fileutils.on_same_drive_letter(source, launch_dir): source = os.path.relpath(source, launch_dir) if sys.platform == "win32": source = source.replace("\\", "/") file_args.append(("-f", f"'{source}' > '{runtime_path}'")) if missing_input_files: s = "s" if len(missing_input_files) > 1 else "" raise RuntimeError("Input file%s could not be found: %s" % (s, ", ".join(missing_input_files))) return file_args
[docs]def file_arguments_for_launch_command(file_args): """ Given a set of "raw" file arguments, return the set of those to be used on an actual command line. If the given set is too long, the arguments will be written to an argfile. (It is the responsibility of the caller to remove that file after use.) """ # Command-line length limit is normally about 8192 or higher. # Choose the maximum permissible length of all file arguments # so as to make sure not to surpass it. if total_file_arguments_length(file_args) < 5000: args = [] for (option, value) in file_args: args.extend([option, value]) return args argfile = write_argfile(file_args) return ['-argfile', argfile]
[docs]def total_file_arguments_length(args): """ Determine the total length of the given set of file arguments (which is a list of 2-tuples) as they would be represented on the command line. """ # The total length of all tuple elements, plus spaces between them return sum([sum([len(a) for a in tuple]) for tuple in args]) + 2 * len(args)
[docs]def write_argfile(file_args): """ Write a set of file arguments to a temporary "argfile" (one option-value pair per line) and return the name of that file. (The caller is responsible for removing it.) :param file_args: A list of (option, value) tuples """ tfd, tfpath = tempfile.mkstemp(text=True) with os.fdopen(tfd, 'w') as tfh: for (option, value) in file_args: tfh.write('{} "{}"\n'.format(option, value)) # Protect spaces in filenames return tfpath
[docs]def launch_from_job_spec(job_spec, launch_parameters, display_commandline=None): """ Launch a job based on its specification. :param job_spec: Data defining the job. :type job_spec: schrodinger.job.launchapi.JobSpecification :param launch_parameters: Data defining how the job is run :type launch_parameters: schrodinger.job.launchparams.LaunchParameters :param display_commandline: commandline attribute of resulting job. Most cases will require this value to be specified, optional value to make it easier to refactor out in the future. :type display_commandline: str :return: A schrodinger.job.jobcontrol.Job object. """ launch_command = _get_job_spec_launch_command(job_spec, launch_parameters) # Provide the user temp dir for the Go launch process to locate the localhost # jobserver directory. This is slow to obtain within the Go launch process. if mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER): launch_command.extend( ["-user-tempdir", mmjob.get_job_server_parent_path()]) env = os.environ.copy() if display_commandline: env["SCHRODINGER_COMMANDLINE"] = display_commandline return launch_job(launch_command, launch_dir=launch_parameters.getLaunchDirectory(), env=env)
# # Job control back end stuff # _backend_singleton = None
[docs]def get_backend() -> Optional["_Backend"]: """ A convenience function to see if we're running under job control. If so, return a _Backend object. Otherwise, return None. """ global _backend_singleton if _backend_singleton is not None: return _backend_singleton backend = _Backend() if backend.job_id: _backend_singleton = backend return backend return None
[docs]def get_runtime_path(pathname: str) -> str: """ Return the runtime path for the input file 'pathname'. If the pathname is of a type that job control will not copy to the job directory or no runtime file can be found, returns the original path name. """ if not pathname: raise TypeError("Pathname has to be non empty string") # We don't care about the return code, so just ignore it. try: runtime_path = mmjob.mmjobbe_path_runtime(pathname) except mm.MmException as e: if e.rc == mmjob.MMJOBBE_ERROR: runtime_path = pathname else: raise return runtime_path
[docs]def under_job_control() -> bool: """ Returns True if this process is running under job control; False otherwise. """ return bool(mmjob.mmjobbe_has_jobcontrol())
class _Backend: """ An interface to mmjobbe. Because this class does nothing useful if not already running under jobcontrol, all methods should be no-ops if there is no job_id attribute. """ def __init__(self): """ Get the job_id and jobdir pathname of the current Job Control process (only meaningful if running under Job Control). Using the get_backend() function should be preferred to initializing a _Backend object explicitly. """ self.job_id = None self.job_dir = None # Store mmjobbe_terminate as an attribute so it will be available # even if the mm module is garbage collected before the _Backend # instance is. self.mmjobbe_terminate = mmjob.mmjobbe_terminate if not under_job_control(): return try: mmjob.mmjobbe_initialize(mm.error_handler) self.job_id = mmjob.mmjobbe_jobid() self.job_dir = mmjob.mmjobbe_job_directory() logger.debug("_Backend jobid is %s" % self.job_id) except mm.MmException as e: if e.rc != mmjob.MMJOBBE_NO_JOBID: logger.warn("_Backend.__init__ exception: %s", e) raise except Exception as e: logger.warn("_Backend.__init__ exception: %s", e) raise def getJob(self) -> Job: """ Retrieve a read-only instance of a job record (as an instance of the Job class). Changes made to the job record after this method is called (e.g. via setStructureOutputFile) will not be visible in the record returned. It should be called again to get any updates. """ return Job(self.job_id) def __getstate__(self): raise Exception("_Backend objects cannot be pickled.") def __del__(self): """ Clean up library initializations. """ if self.job_id: self.mmjobbe_terminate() self.job_id = None def setStructureOutputFile(self, filename: str): """ Set the path for a file that will be the output file which will be marked for incorporation by maestro. """ if self.job_id: mmjob.mmjobbe_set_structure_output_file(filename) def addOutputFile(self, filename: str): """ Add an output file or directory for job control to copy back to the launch host. Job control will silently skip the file or directory if it does not exist. """ if os.path.isabs(filename) and not mmutil.feature_flag_is_enabled( mmutil.JOB_SERVER): raise ValueError(f"Invalid output file path {filename}. Cannot " f"use absolute path to add output file when " f"JOB_SERVER is disabled") if self.job_id: mmjob.mmjobbe_add_outputfile(filename) def addRequiredOutputFile(self, filename: str): """ Add a required output file for job control to copy back to the launch host. If this file does not exist at the end of the job, then job control will mark the job as "died". """ if self.job_id: mmjob.mmjobbe_add_required_outputfile(filename) def addLogFile(self, filename: str): """ Add a log file for this job. A log file is continuously updated as the log file appends. Log files can only be append-only text data. """ if self.job_id: mmjob.mmjobbe_add_logfile(filename) def addMonitorFile(self, filename: str): """ This function has no effect. """ if self.job_id: mmjob.mmjobbe_add_monitorfile(filename) def setStructureMonitorFile(self, filename: str): """ This function has no effect. """ if self.job_id: mmjob.mmjobbe_set_structure_monitor_file(filename) def copyOutputFile(self, path: str): """ Copy a completed output file or directory back to the launch directory. :param path: The path to the file or directory that should be copied """ if self.job_id: mmjob.mmjobbe_copy_outputfile(path) def archiveFiles(self, filenames: List[str], archive_file: str): """ Archive the given files into a given archive file and add the latter as an output file. """ if self.job_id: mmjob.mmjobbe_archive_files(filenames, archive_file) def sendMessageToParent(self, message: str): """ Send message <message> (string) to the parent of this job. If this job does not have a parent, nothing will be done. Normally it should not be necessary to use this method. """ if self.job_id: mmjob.mmjobbe_send_parent(message) def addMessageName(self, msgname: str): """ Add message type to be queued for the backend to read using nextMessage(). Only messages whose first word is in this list will be returned by nextMessage(). """ if self.job_id: mmjob.mmjobbe_add_message_name(msgname) def nextMessage(self) -> Optional[str]: """ Return next unread message from the queue if there is one or None if there are no messages. The types of messages to return must first be specified via addMessageName(). """ if self.job_id: try: return mmjob.mmjobbe_next_message() except mm.MmException as e: if e.rc == mmjob.MMJOBBE_EMPTY_LIST: return None # No message else: raise # Any other exception return None def setJobProgress(self, steps: int = 0, totalsteps: int = 0, description: str = ""): """ Update the progress of a job. :param steps: number of steps completed :param totalsteps: total number of steps :param description: text description of progress """ mmjob.mmjobbe_set_jobprogress(steps, totalsteps, description) def deleteSubJob(self, jobid: str): """ Tell jmonitor of the backend to delete the subjob field from the parent job record. """ if self.job_id: mmjob.mmjobbe_delete_subjob(jobid) def haltRequested(self) -> bool: """ Check if a halt has been requested for the current job. """ if self.job_id and mmjob.mmjobbe_halt_requested(): return True else: return False # # Job hosts stuff #
[docs]class Host: """ A class to encapsulate host info from the schrodinger.hosts file. Use the module level functions get_host or get_hosts to create Host instances. :ivar name: Label for the Host. :ivar user: Username by which to run jobs. :ivar processors: Number of processors for the host/cluster. :ivar tmpdir: Temporary/scratch directory to use for jobs. List :ivar schrodinger: $SCHRODINGER installation to use for jobs. :ivar env: Variables to set in the job environment. List. :ivar gpgpu: GPGPU entries. List. :ivar queue: Queue entries only. Queue type (e.g., SGE, PBS). :ivar qargs: Queue entries only. Optional arguments passed to the queue submission command. """
[docs] def __init__(self, name: str): """ Create a named Host object. The various host attributes must be set after object instatiation. Only host-entry fields can be public attributes of a Host object. Attributes introduced to capture other information about the entry must be private (named with a leading underscore.) :param name: name of the host entry. """ self.name = name self.server_address = None self._host = None self.user = None self.processors = 1 self.tmpdir = [] self.schrodinger = None self.env = [] self.gpgpu = [] self.queue = None self.qargs = None self._is_queue = False self._lines = []
[docs] def to_hostentry(self) -> str: """ Return a string representation of the Host object suitable for including in a hosts file. """ lines = [] for key in entry_fields: if hasattr(self, key): value = getattr(self, key) if value and value != "0": if type(value) is list: if key == "gpgpu": for item in value: lines.append("{}: {}, {}".format( key, item[0], item[1])) else: for item in value: lines.append("{}: {}".format(key, str(item))) else: lines.append("{}: {}".format(key, str(value))) return "\n".join(lines)
[docs] def getHost(self) -> str: """ Return the name of the host, which defaults to 'name' if a separate 'host' attribute wasn't specified. """ if self._host: return self._host else: return self.name
[docs] def setHost(self, host: str): """ Store host as _host to allow us to use a property for the 'host' attr. """ self._host = host
host = property(getHost, setHost)
[docs] def isQueue(self) -> bool: """ Check to see whether the host represents a batch queue. Returns True if the host is a HPC queueing system. """ return self._is_queue
def __str__(self): """ Return the informal string representation of the Host -- a comma-separated list of "Key: value" attribute pairs. """ return ", ".join(self._hostLines()) def _hostLines(self): """ Return an array of lines that if joined with newlines would create a schrodinger.hosts entry. """ return self._lines def __repr__(self): """ Return the formal string representation of the Host. """ return "Host(%s)" % self.name
[docs] def matchesRequirement( self, resource_requirement: resource.ComputeRequirement) -> bool: """ Return True if this host can meet the resource requirements provided. All hosts will meet CPU resource requirements. """ if resource_requirement.compute_type == resource.ComputeType.GPU: if self.gpgpu: return True elif self.name == LOCALHOST_ENTRY_NAME: return gpgpu.is_any_gpu_available() return False # resource type CPU, all hosts allow cpu jobs at this time return True
[docs]def get_hostfile() -> str: """ Return the name of the schrodinger.hosts file last used by get_hosts(). The file is found using the standard search path ($SCHRODINGER_HOSTS, local dir, $HOME/.schrodinger, $SCHRODINGER). """ return mmjob.mmjob_hostfile_path()
[docs]def hostfile_is_empty(host_filepath: str) -> bool: """ Return if the given host_filepath host is empty, meaning it contains only the localhost entry. If the host_filepath str is empty or invalid, then this function will raise an invalid path exception - IOError. :param host_filepath: schrodinger.hosts file to use. :type host_filepath: str """ if not os.path.isfile(host_filepath): raise OSError("Host file not found in path: " + host_filepath) try: with _temp_hostfile(host_filepath) as num_hosts: hosts = num_hosts except (mm.MmException, MissingHostsFileException): hosts = 0 # check if host only contains the localhost return not hosts or len( hosts) == 1 and hosts[0].name == LOCALHOST_ENTRY_NAME
[docs]def get_installed_hostfiles(root_dir="") -> List[str]: """ Return the pathname for the schrodinger.hosts file installed in the most recent previous installation directory we can find. If a root pathname is passed in, previous installations are searched for there. Otherwise, we look in the standard install locations. """ if not root_dir: root_dir = INSTALL_ROOT if not os.path.isdir(root_dir): return [] schrod_quarterly_re = re.compile(r"[a-z](20\d\d)-(\d)", re.IGNORECASE) schrod_yearly_re = re.compile(r"[a-z](20\d\d)", re.IGNORECASE) hostfiles = [] for dirname in os.listdir(root_dir): hostsfile = os.path.join(root_dir, dirname, HOSTS_FILE) if os.path.exists(hostsfile): m = schrod_quarterly_re.search(dirname) if m: hostfiles.append((m.group(1), m.group(2), hostsfile)) continue m = schrod_yearly_re.search(dirname) if m: hostfiles.append((m.group(1), "0", hostsfile)) continue hostfiles.sort(reverse=True) return [h[-1] for h in hostfiles]
[docs]def get_hosts() -> List[Host]: """ Return a list of all Hosts in the schrodinger.hosts file. After this is called, get_hostfile() will return the pathname for the schrodinger.hosts file that was used. Raises UnreadableHostsFileException or MissingHostsFileException on error. """ hosts = [] # PANEL-3412 try: mmjob.mmjob_hosts_reload() except mm.MmException as e: if e.rc == mmjob.MMJOB_HOSTS_MISSING: raise MissingHostsFileException( "Hosts file could not be loaded. (It could be missing.)") # NOTE: If the schrodinger.hosts file is present but is empty, the host # list will contain one host: localhost. nhosts = _get_num_hosts() for i in range(1, nhosts + 1): name = mmjob.mmjob_host_name(i) hosts.append(_get_host(name)) return hosts
@contextlib.contextmanager def _temp_hostfile(fname: str): try: orig_hostfile = os.environ.get("SCHRODINGER_HOSTS", None) os.environ["SCHRODINGER_HOSTS"] = fname # The only way to check for validity is to actually set it and see what # happens hosts = get_hosts() yield hosts finally: # now we restore the original value if orig_hostfile is None: del os.environ["SCHRODINGER_HOSTS"] else: os.environ["SCHRODINGER_HOSTS"] = orig_hostfile # clear the bad host after resetting the environmental variable get_hosts()
[docs]def hostfile_is_valid(fname: str) -> Tuple[bool, str]: """ :param fname: The full path of the host file to validate :return: a (bool, str) tuple indicating whether the host file is valid """ is_valid = False msg = "" try: with _temp_hostfile(fname): is_valid = True except (mm.MmException, MissingHostsFileException) as e: msg = str(e) return (is_valid, msg)
[docs]def get_host(name: str) -> int: """ Return a Host object for the named host. If the host is not found, we return a Host object with the provided name and details that match localhost. This matches behavior that jobcontrol uses. Raises UnreadableHostsFileException or MissingHostsFileException on error. """ return _get_host(name)
def _get_num_hosts() -> int: """ Returns the number of hosts in the schrodinger.hosts file. Will raise UnreadableHostsFileException or MissingHostsFileException on error. """ try: num_hosts = mmjob.mmjob_hosts_length() except mm.MmException as err: if err.rc == mmjob.MMJOB_INCLUDE_MISSING: raise UnreadableHostsFileException( "The Schrodinger hosts file is invalid.") raise if num_hosts == 0: raise MissingHostsFileException( "Hosts file could not be loaded. (It could be missing.)") return num_hosts def _get_host(name) -> Host: """ Return a Host object for 'name', with all attributes read in from the corresponding entry in the schrodinger.hosts file. Requires that mmjob be initialized. Raises UnreadableHostsFileException or MissingHostsFileException on error. """ original_host_name = None h = Host(name) try: num_keys = mmjob.mmjob_host_keys_length(name) except mm.MmException as e: _get_num_hosts() # To verify validity of the hosts file. Will raise # a more useful exception. if e.rc == mmjob.MMJOB_NO_SUCH_HOST: original_host_name = name name = LOCALHOST_ENTRY_NAME num_keys = mmjob.mmjob_host_keys_length(name) else: raise setattr(h, "server_address", mmjob.mmjob_host_server(name)) h._lines = ["name: %s" % h.name] # Go through each schrodinger.hosts entry for this host: for i in range(1, num_keys + 1): # Lowercase the names for consistency, since mmjob_host_get_int etc. # don't care about the case and our style guide precscribes # lowercase attribute names. attr = mmjob.mmjob_host_keys_item(name, i).lower() if mmjob.mmjob_host_is_list(name, attr) and attr != "gpgpu": if not hasattr(h, attr): setattr(h, attr, []) for j in range(1, mmjob.mmjob_host_list_length(name, attr) + 1): value = mmjob.mmjob_host_list_item(name, attr, j) getattr(h, attr).append(value) h._lines.append(f"{attr}: {value}") elif attr == "gpgpu": if not hasattr(h, attr): setattr(h, attr, []) for j in range(1, mmjob.mmjob_host_gpgpu_length(name) + 1): value = mmjob.mmjob_host_list_item(name, attr, j) (index, description) = get_gpgpu_params(value) getattr(h, attr).append((index, description)) h._lines.append(f"{attr}: {index} {description}") elif attr == 'processors': value = mmjob.mmjob_host_get_int(name, attr) setattr(h, attr, value) if value > 1: h._lines.append("processors: %s" % h.processors) elif attr == 'processors_per_node': value = mmjob.mmjob_host_get_str(name, attr) # convert the string that mmjob gives us to an integer setattr(h, attr, int(value)) if h.processors_per_node > 1: h._lines.append("processors_per_node: %s" % h.processors_per_node) else: value = mmjob.mmjob_host_get_str(name, attr) setattr(h, attr, value) h._lines.append(f"{attr}: {value}") # sort lines in the predefined order of fields h._lines.sort(key=lambda line: field_sortkey.get(line.split(':')[0], 999)) if mmjob.mmjob_host_is_queue(name): h._is_queue = True else: h._is_queue = False if original_host_name: h._host = original_host_name return h
[docs]def get_gpgpu_params(gpgpu_str: str) -> Tuple[str, str]: """ Convert a gpgpu string (ex. "0,V100") to a tuple (index, description). Raise an exception if the string is invalid. :param gpugpu_str: gpgpu line from schrodinger.hosts (ex. "0,V100") :type gpgpu_str: str :rtype: tuple(str, str) :raises: ValueError if the input is invalid """ fields = re.split("[, ]+", gpgpu_str, 1) if len(fields) < 2: raise ValueError("ERROR: Invalid gpgpu value: '" + gpgpu_str + "'") (index, description) = fields return (index, description)
[docs]def host_str_to_list(hosts_str: str) -> List[Tuple[str, int]]: """ Convert a hosts string (Ex: "galina:1 monica:4") to a list of tuples. First value of each tuple is the host, second value is # of cpus. """ # Implemented because of # EV 55179 host_list = [] # Use both white spaces and commas as host entry separators (Ev:56146): for spacesplit in hosts_str.split(): for hostentry in spacesplit.split(','): if hostentry: # not empty string s = hostentry.split(':') if len(s) == 1: ncpus = None else: try: ncpus = int(s[1]) except ValueError: # ncpus is not an integer raise ValueError('ERROR: Could not parse host string: ' f"{hosts_str}") host_list.append((s[0], ncpus)) return host_list
[docs]def host_list_to_str(host_list: List[Tuple[str, int]]) -> str: """ Converts a hosts list [('host1',1), ('host2', 10)] to a string. Output example: "host1:1,host2:10" """ host_strings = [] for hostname, ncpus in host_list: if ncpus is not None: host_strings.append("%s:%i" % (hostname, ncpus)) else: # No #cpus specified; means unlimited CPUs: host_strings.append(hostname) # Return comma-separated host list string: return ",".join(host_strings)
[docs]def get_command_line_host_list() -> Optional[List[Tuple[str, int]]]: """ Return a list of (host, ncpu) tuples corresponding to the host list that is specified on the command line. This function is meant to be called by scripts that are running under a toplevel job control script but are not running under jlaunch. The host list is determined from the following sources: 1. SCHRODINGER_NODELIST 2. JOBHOST (if only a single host is specified) 3. "localhost" (if no host is specified) If no SCHRODINGER_NODELIST is present in the environment, None is returned. """ hosts = None if 'SCHRODINGER_NODELIST' in os.environ: nodelist_str = os.environ['SCHRODINGER_NODELIST'] # Ev:63971 SCHRODINGER_NODELIST is "" for single host entries if nodelist_str == "": # Only one host specified via -HOST: nodelist_str = os.environ['JOBHOST'] if nodelist_str == "": # localhost nodelist_str = LOCALHOST_ENTRY_NAME hosts = host_str_to_list(nodelist_str) return hosts
_backend_hosts = []
[docs]def get_backend_host_list() -> Optional[List[Tuple[str, int]]]: """ Return a list of (host, ncpu) tuples corresponding to the host list as determined from the SCHRODINGER_NODEFILE. This function is meant to be called from scripts that are running under jlaunch (i.e. backend scripts). Returns None if SCHRODINGER_NODEFILE is not present in the environment. """ global _backend_hosts # PYTHON-2012: when something uses the contents of SCHRODINGER_NODEFILE, # its value is not supposed to be passed on to subjobs. This is # accomplished by unsetting it here. # # I'm somewhat worried that people are calling this function more than # once in the same process, however, so am caching the results in the # global variable so that its behavior is consistent within a process at # least. if _backend_hosts: return _backend_hosts if 'SCHRODINGER_NODEFILE' in os.environ: _hosts = collections.OrderedDict() nodefilename = os.environ['SCHRODINGER_NODEFILE'] with open(nodefilename) as fh: for line in fh: host = line.strip() if host: hostpieces = host.split(":") if len(hostpieces) == 2: ncpu = int(hostpieces[1]) else: ncpu = 1 if hostpieces[0] in _hosts: _hosts[hostpieces[0]] += ncpu else: _hosts[hostpieces[0]] = ncpu del os.environ['SCHRODINGER_NODEFILE'] _backend_hosts = list(_hosts.items()) # Return a copy of the global variable to ensure the return value # remains consistent. return _backend_hosts[:] else: return None
[docs]def get_host_list() -> List[Tuple[str, int]]: """ Return the host list for the current process. If running under jobcontrol, returns the backend host list; otherwise, returns a host list derived from parsing the commandline -HOST argument. :return: The job hosts from the backend or the command line. If the job hosts are undefined, the default return value is [("localhost", 1)]. """ if get_backend(): host_list = get_backend_host_list() else: host_list = get_command_line_host_list() # If no hosts have been specified, use localhost if not host_list: host_list = [(LOCALHOST_ENTRY_NAME, 1)] return host_list
[docs]def calculate_njobs(host_list: Union[str, List[Tuple[str, int]]] = None) -> int: """ Derive the number of jobs from the specified host list. This function is useful to determine number of subjobs if user didn't specified the '-NJOBS' option. :param host_list: String of hosts along with optional number of subjobs -HOST my_cluster:20 or list of tuples of hosts, typically one element [(my_cluster, 20)] If host list is not specified then it uses get_command_line_host_list() to determine njobs, else uses the user provided host list. """ if host_list: if isinstance(host_list, str): host_list = host_str_to_list(host_list) else: host_list = get_command_line_host_list() ncpus_sum = 0 for (hostname, ncpus) in host_list: if ncpus is None: ncpus_sum += 1 # Assume 1 (even for queued hosts) else: ncpus_sum += ncpus return ncpus_sum
[docs]def is_valid_hostname(hostname: str) -> bool: """ Checks if the hostname is valid. :param hostname: host name """ return mmjob.mmjob_is_valid_hostname(hostname)
[docs]def get_jobname(filename: Optional[str] = None) -> Optional[str]: """ Figure out the jobname from the first available source: 1) the SCHRODINGER_JOBNAME environment variable (comes from -JOBNAME during startup); 2) the job control backend; 3) the basename of a given filename. :param filename: if provided, and the jobname can't otherwise be determined, (e.g., running outside job control with no -FILENAME argument), construct a jobname from its basename. :return: jobname (may be None if filename was not provided) """ env_jobname = os.environ.get('SCHRODINGER_JOBNAME') if env_jobname: return os.environ['SCHRODINGER_JOBNAME'] backend = get_backend() if backend: return backend.getJob().Name if filename: return fileutils.get_jobname(filename) else: return None
[docs]def register_job_output(job: Job): """ Registers the output and log files associated with the given job to the backend if running under jobcontrol. :param job: job from which to extract output/log files """ backend = get_backend() if backend: for filename in job.OutputFiles + job.LogFiles: backend.addOutputFile(filename)