Source code for schrodinger.job.queue

"""
Provides a `JobDJ` class for running multiple concurrent jobs. Jobs can have
dependencies and JobDJ can avoid starting a job until its prerequisites are met.

Step by step instructions for basic use:

1. Create a `JobDJ` instance. For example::

        job_dj = queue.JobDJ()


2. Add jobs to the `JobDJ` instance by calling the `JobDJ.addJob` method.
    the `addJob` method. For example::

        job_dj.addJob(["jaguar", "input1.inp"]))

3. Run all jobs with the `JobDJ.run` method. This is simple::

        job_dj.run()

Copyright Schrodinger, LLC. All rights reserved.

"""

import dataclasses
import enum
import functools
import heapq
import logging
import math
import os
import pathlib
import pickle
import shutil
import sys
import textwrap
import time
import uuid
import warnings
from typing import Callable
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Union

from schrodinger.infra import jobhub
from schrodinger.infra import mm
from schrodinger.infra import mmjob
from schrodinger.Qt import QtCore
from schrodinger.utils import log
from schrodinger.utils import mmutil
from schrodinger.utils import qt_utils
from schrodinger.utils import subprocess

from . import jobcontrol
from . import resource
from .jobcontrol import LOCALHOST_ENTRY_NAME

# Job control exit statuses:
FINISHED = "finished"

# For max_failures option
NOLIMIT = -1

# Maximum attempts for killing jobs of JobDJ
MAX_KILL_ATTEMPTS = 2

#############################################################################
# Global data structures and objects

# This dict holds status updates from job control indexed by JobId.
_status_messages = dict()

# Logging object for user messages:
logger = log.get_output_logger("output.schrodinger.job.queue")

# Logging (for info, warning, and debug messages):
syslog = logging.getLogger("schrodinger.job.queue")
if "SCHRODINGER_JOBDJ_TRACE" in os.environ:
    # Define all handler to log all logs with timestamps
    handler = logging.StreamHandler(sys.stdout)
    logger.removeHandler(log.get_output_logger_handler())
    for _logger in {logger, syslog}:
        _logger.addHandler(log.get_stream_handler(sys.stdout))
        _logger.setLevel(logging.DEBUG)

# A dict to cache checks on whether a host is a queue or regular host.
_host_is_queue_dict = {}

#############################################################################
# Global parameters

# A time to sleep (in seconds) after a jlaunch command, to keep from
# overwhelming the system.
SUBMIT_DELAY = 1

# A per-job delay for jobdb reads to minimize disk churn; if less than this
# time in seconds has elapsed since the job's last jobdb read, assume the
# old data is still valid.
UPDATE_DELAY = 10

# Since the update delay is now enforced on a per-job basis, this delay is
# meant to keep JobDJ from pegging the cpu.
MONITOR_DELAY = 1

# USE_JOB_CONTROL_MESSAGES determines whether to try to use subjob status
# messages from job control. To default to reading the database for all
# jobs, set this variable to False. This setting is only used for legacy
# job control.
USE_JOB_CONTROL_MESSAGES = True

# At MESSAGE_TIMEOUT seconds after the last status
# message was received for a subjob, the database will
# be polled for current status. This is to prevent
# hanging forever in the event of lost messages.
MESSAGE_TIMEOUT = 600

# MINIMUM_UPDATE_TIME indicates minimum number of seconds to wait before
# polling for new job information. This only affects JOB_SERVER jobs.
MINIMUM_UPDATE_TIME = 15

# Constant indicates "infinite" CPU resources
UNLIMITED_CPUS = None

#############################################################################


[docs]@enum.unique class PrintableStatus(str, enum.Enum): FAILED_TO_LAUNCH = "failed to launch" LAUNCHED = "launched" RESTARTED = "restarted" RESTARTING = "restarting" STARTED = "started" def __str__(self): return self.value
[docs]@enum.unique class JobState(str, enum.Enum): WAITING = "waiting" LAUNCHING = "launching" ACTIVE = "active" FAILED = "failed" FAILED_RETRYABLE = "failed but retryable" DONE = "done" def __str__(self): return self.value
[docs]class MissingResourceError(RuntimeError): template = ("A job requires a '{}' type of compute resource not provided " "by any hosts in JobDJ: {}")
[docs] def __init__(self, required_type, available_resources): message = self.template.format(required_type, available_resources) super().__init__(message)
[docs]class MaxJobFailureError(RuntimeError): msg = ('Maximum number of failed jobs has been reached. ' 'All other subjobs will be killed and program will stop.')
[docs] def __init__(self): super().__init__(self.msg)
[docs]def get_default_launch_timeout() -> int: """ Get the default launch timeout in seconds. """ if mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER): # For new jobcontrol, the timeout needs to be large enough to # account for file uploading. DEFAULT_LAUNCH_TIMEOUT = 40000 else: # For old jobcontrol, this is set higher than JPROXYPORT timeout (5 min). DEFAULT_LAUNCH_TIMEOUT = 400 return DEFAULT_LAUNCH_TIMEOUT
[docs]def get_update_delay() -> int: """ Return the delay to use for jobdb reads in seconds. """ return UPDATE_DELAY
[docs]def backup_file(log_file: pathlib.Path, copy: bool): """ Backs up the file log_file copying it to .1, .2, etc. :param log_file: name of file to backup :param copy: If True, copy file to backup location, otherwise rename """ if not os.path.isfile(log_file): return i = 1 while True: new_name = f"{log_file}.failed.{i}" if not os.path.isfile(new_name): if copy: shutil.copy(log_file, new_name) else: os.rename(log_file, new_name) break i += 1
def _host_is_queue(hostname: str) -> bool: """ Return True if specified host is a queue """ if hostname not in _host_is_queue_dict: hostobj = jobcontrol.get_host(hostname) if hostobj: _host_is_queue_dict[hostname] = hostobj.isQueue() else: _host_is_queue_dict[hostname] = False return _host_is_queue_dict[hostname]
[docs]def get_command(base_command: List[str], host: str = LOCALHOST_ENTRY_NAME, procs: int = 1, local: bool = False) -> List[str]: """ Given a base command and additional launching specifications, return a modified command that is ready to pass to jobcontrol.launch_job. """ additional_args = [] if procs > 1: additional_args += ['-HOST', f'{host}:{procs}'] elif '-HOST' not in base_command: # avoid two '-HOST' arguments in the resulting command additional_args += ['-HOST', host] if local: additional_args += ['-LOCAL'] trace_cmd = [] if "SCHRODINGER_JOBDJ_TRACE" in os.environ and sys.platform.startswith( 'linux'): trace_cmd = [ "strace", os.environ.get("SCHRODINGER_JOBDJ_STRACE_CMD_OPTIONS", "-ft"), "-o", "strace-" + str(uuid.uuid4()) ] # Fix the base command before attaching to strace. base_command = subprocess._fix_call_cmd(base_command) return trace_cmd + base_command + additional_args
def _wrap_job_finalizer(function: Callable[["BaseJob"], None], run_dir=None) -> Callable[["BaseJob"], None]: """ Wrap a function that takes a single job as an argument so that it runs in a separate directory. If `run_dir` is not specified, run it in the working directory of the job. """ def callback(job: BaseJob): start_dir = os.getcwd() dir_ = run_dir if dir_ is None: dir_ = job.getCommandDir() try: if dir_: os.chdir(dir_) function(job) finally: if dir_: os.chdir(start_dir) return callback
[docs]class BaseJob: """ A base job class for jobs run under `JobDJ`. The main methods to be implemented in subclasses are: 1. `doCommand` - The method that does the real work of the job, either running a simple local calculation or submitting a job to job control. 2. `update` - A method called periodically while a job is running to update its current state. 3. `_getState` - The get method used in the `state` property, used by `JobDJ` to determine the job's current state. A few additional methods only need to be implemented in special situations: 1. `finalize` - If you want custom behavior in your finalize method, override this method. 2. `cancelSubmitted` - If the job can run under a queue, implementing this method allows jobs that are waiting in the `submitted` state to be restarted immediately on a newly available non-queue host. 3. `getStatusStrings` - If you want to use the `JobDJ` print summary, this method should be updated to provide more useful information. The execution point for all jobs is in the `BaseJob.run` method. That method calls `preCommand`, `doCommand` and `postCommand` in order. For jobs that are run locally, all main computation should be done in the doCommand method method. Note that the doCommand method blocks until completion and so no additional work will be done (e.g. job updates or submissions) until it returns. For this reason, only short jobs should be run locally without job control. """ init_count = 0
[docs] def __init__( self, command_dir: Optional[pathlib.Path] = None, resource_requirement: Optional[resource.ComputeRequirement] = None): """ :param command_dir: The directory from which to run the command. :param resource_requirement: Whether the job will require special compute resources, such as GPU. """ if resource_requirement: self._resource_requirement = resource_requirement else: self._resource_requirement = resource.ComputeRequirement( resource.ComputeType.CPU) self._command_dir = None if command_dir: self._command_dir = command_dir self._start_dir = None self._finalizers = [] # Dependency tracking. self._prereqs = set() self._dependents = set() # An attribute to keep track of the order in which this job was # added to the JobDJ object. self._add_priority = 0 # Necessary because JobDJ sometimes looks for the host. A value of # None means that it is running locally. self.host = None self._state = JobState.WAITING self.num_failures = 0 # The error message for the most recent failed job launch. # It will be logged when a failure has exhausted its retries. self.launch_error = "" self.procs = 1 self.name = f"basejob[{BaseJob.init_count}]" BaseJob.init_count += 1 # This will be set when the BaseJob is added to a JobDJ instance # with addJob. self._jobdj = None # Track if job is asked to terminate self.abort_job = False
[docs] def runsLocally(self) -> bool: """ Return True if the job runs on the `JobDJ` control host, False if not. Jobs that run locally don't need hosts. There is no limit on the number of locally run jobs. """ return True
[docs] def update(self): """ Update the current job status, stored in the `state` property. When a job is running, this method will be called periodically by `JobDJ` until the job `state` property is `JobState.DONE`. """ # Implementations of update must be sure to set 'state' equal to # DONE when the job is finished. Other valid settings of 'state' are # to leave it unchanged at ACTIVE, or to set it to FAILED or # FAILED_RETRYABLE. raise NotImplementedError
[docs] def maxFailuresReached(self, msg: str): """ This is a method that will be called after the job has failed and the maximum number of failures per `JobDJ` run has been reached. After invoking this method, `JobDJ` will raise a `RuntimeError` and the process will exit. """ logger.error(f"ERROR: {msg}")
[docs] def finalize(self): """ Clean up after a job successfully runs. """ for fn in self._finalizers: fn(self)
[docs] def doCommand(self, *args, **kwargs): """ Execute the command associated with this job. """ raise NotImplementedError
[docs] def run(self, *args, **kwargs): """ Run the job. The steps taken are as follows: 1. Execute the preCommand method for things like changing the working directory. 2. Call the doCommand to do the actual work of computation or job launching. 3. Call the postCommand method to undo the changes from the preCommand that need to be undone. """ self.state = JobState.LAUNCHING try: self.preCommand() self.setup() self.doCommand(*args, **kwargs) except Exception as err: name = getattr(self, "name", "<unknown>") logger.error(f"ERROR: Job {name} failed with an exception: {err}") self.state = JobState.FAILED raise finally: self.postCommand()
[docs] def getCommandDir(self) -> str: """ Return the launch/command directory name. If None is returned, the job will be launched in the current directory. """ return self._command_dir
[docs] def getCommand(self) -> List[str]: """ Return the command used to run this job. """ return self._command
[docs] def preCommand(self): """ A method to make pre-command changes, like cd'ing to the correct directory to run the command in. """ self._start_dir = os.getcwd() command_dir = self.getCommandDir() if command_dir: if not os.path.exists(command_dir): os.mkdir(command_dir) os.chdir(command_dir)
[docs] def setup(self): """ A method to do initial setup; executed after `preCommand`, just before `doCommand`. """
[docs] def postCommand(self): """ A method to restore things to the pre-command state. """ if self.getCommandDir(): os.chdir(self._start_dir)
@property def state(self) -> JobState: """ Return the current state of the job. Note that this method can be overridden by subclasses that wish to provide for restartability at a higher level than unpickling `BaseJob` instances. For example, by examining some external condition (e.g. presence of output files) the state JobState.DONE could be returned immediately and the job would not run. """ return self._state @state.setter def state(self, state: JobState): """ Set the current state of the job. Note that this method can be overridden by subclasses that wish to take action every time the state of a job changes. """ self._state = JobState(state)
[docs] def hasExited(self) -> bool: """ Returns True if this job finished, successfully or not. """ return self._state in { JobState.DONE, JobState.FAILED, JobState.FAILED_RETRYABLE }
[docs] def isComplete(self) -> bool: """ Returns True if this job finished successfully """ return self._state == JobState.DONE
[docs] def hasStarted(self) -> bool: """ Returns True if this job has started (not waiting) """ return self._state != JobState.WAITING
[docs] def getStatusStrings(self) -> Tuple[str, str, str]: """ Return a tuple of status strings for printing by `JobDJ`. The strings returned are (status, jobid, host). """ # This default implementation doesn't produce much useful info, so # override if necessary. if self.state == JobState.DONE: status_string = FINISHED else: status_string = "unknown" jobid = "[none]" host = "[local]" return status_string, jobid, host
[docs] def getJobDJ(self) -> "JobDJ": """ Return the JobDJ instance that this job has been added to. """ return self._jobdj
def __lt__(self, other: "BaseJob"): """ This comparison method determines the order of execution for jobs in the JobDJ _jobqueue heap. """ if not isinstance(other, BaseJob): raise TypeError("Comparing with a non-BaseJob class is not " f"valid: ({self}, {other})") # Note that this method returns True if the current job is *higher* # priority. This is because of the "min heap" behavior of the heapq # module. Think of it as sorting things into the order you want them # executed in. # If this method is re-implemented in a subclass take care to # preserve anti-symmetry - i.e. cmp(a, b) == -1 * cmp(b, a). # Give jobs that run locally priority, as they don't consume a job # host. if self.runsLocally() and not other.runsLocally(): return True elif not self.runsLocally() and other.runsLocally(): return False else: return self._add_priority < other._add_priority # Dependency methods.
[docs] def addPrereq(self, job: "BaseJob"): """ Add a job that is an immediate prerequisite for this one. """ # Check for circular dependencies. if self._dependents and self in job.genAllPrereqs(): raise RuntimeError(f"{job} is a dependent of {self} " "and cannot also be made a prerequisite.") self._prereqs.add(job) job._dependents.add(self)
[docs] def addGroupPrereq(self, job: "BaseJob"): """ Make all jobs connected to `job` prerequisites of all jobs connected to this Job. """ final_jobs = [] for j in job.genAllJobs(): if not j._dependents: final_jobs.append(j) initial_jobs = [] for j in self.genAllJobs(): if not j._prereqs: initial_jobs.append(j) for ij in initial_jobs: for fj in final_jobs: ij.addPrereq(fj)
[docs] def getPrereqs(self): """ Return a set of all immediate prerequisites for this job. """ return self._prereqs
def _pruneGraph(self) -> List["BaseJob"]: """ Remove this job from the prerequisites list of any dependents and return a list of any dependent jobs that no longer have unfinished prerequisites. """ runnable_jobs = [] for job in self._dependents: job._prereqs.remove(self) if not job._prereqs: runnable_jobs.append(job) return runnable_jobs
[docs] def genAllPrereqs(self, seen=None) -> Generator["BaseJob", None, None]: """ A generator that yields all jobs that are prerequisites on this one. """ for job in self._prereqs: yield from job.genAllPrereqs(seen) yield job
[docs] def genAllJobs( self, seen: Set["BaseJob"] = None) -> Generator["BaseJob", None, None]: """ A generator that yields all jobs connected to this one. """ if seen is None: seen = set() seen.add(self) for job in self._dependents: if job in seen: continue yield from job.genAllJobs(seen) for job in self._prereqs: if job in seen: continue yield from job.genAllJobs(seen) yield self
[docs] def addFinalizer(self, function: Callable[["BaseJob"], None], run_dir: str = None): """ Add a function to be invoked when the job completes successfully. See also the add_multi_job_finalizer function. """ cb = _wrap_job_finalizer(function, run_dir) self._finalizers.append(cb)
[docs]class SubprocessJob(BaseJob): """ A job for running an external process. By default, stdout and stderr are collected and made available as the 'stdout' and 'stderr' attributes when the job is completed. """ init_count = 0
[docs] def __init__(self, command: List[str], command_dir: Optional[List[str]] = None, timeout: Optional[int] = None, stdout=subprocess.PIPE, stderr=subprocess.PIPE): """ If stdout or stderr are expected to be large, you can pass an open file object instead of using PIPE. :param command: The command to be run. :param timeout: Timeout (in seconds) after which the subprocess will be killed. If None, the subprocess is allowed to run indefinitely. :param stdout: The stdout argument to be passed to the subprocess Popen constructor. :param stderr: The stderr argument to be passed to the subprocess Popen constructor. """ super().__init__(command_dir) self.host = LOCALHOST_ENTRY_NAME self.name = f"localjob[{SubprocessJob.init_count}]" SubprocessJob.init_count += 1 self._command = command self._subprocess = None self._stdout = stdout # The kwarg to pass to Popen self._stderr = stderr # The kwarg to pass to Popen self.timeout = timeout self.stdout = None # process stdout, set once the job is complete self.stderr = None # process stderr, set once the job is complete self.kill_start_time = 0 # Track the request time for kill self._duration = None # Time spent running the job.
def __getstate__(self): """ Return the state to be pickled. """ state_dict = dict(self.__dict__) state_dict['_subprocess'] = None return state_dict
[docs] def update(self): """ Update the current job status and set state. """ return_code = self._subprocess.poll() if return_code is None: if self.kill_start_time: # We tried to kill the job before. # If it was more than a minute ago, kill it forcefully if (get_current_time() - self.kill_start_time) > 60: self._endJob(force=True) self.kill_start_time = 0 elif self.timeout: if (get_current_time() - self._start_time) > self.timeout: self.kill() return self.stdout, self.stderr = self._subprocess.communicate() fn_time = get_current_time() # best guess at process terminated self._duration = fn_time - self._start_time if return_code == 0 or self.abort_job: self.state = JobState.DONE else: self.state = JobState.FAILED
[docs] def doCommand(self, *args, **kwargs): """ Execute the command associated with this job via subprocess. """ self._start_time = get_current_time() self._subprocess = subprocess.Popen(self._command, stdout=self._stdout, stderr=self._stderr, universal_newlines=True)
[docs] def kill(self): """ Send termination request to subprocess managed job. """ self.cancel()
[docs] def cancel(self): """ Send termination request to subprocess managed job. This method will eventually deprecate SubprocessJob.kill """ if not self.abort_job: self.abort_job = True self.kill_start_time = get_current_time() self._endJob()
def _endJob(self, force: bool = False): """ End a subprocess, ignoring errors from processes that have already completed. Sends a "Terminate" signal by default; sends a "Kill" signal if force is True. :param force: Should the process be sent a "Kill" signal instead of a "Terminate" signal? """ try: if force: self._subprocess.kill() else: self._subprocess.terminate() except OSError as err: if getattr(err, 'winerror', None) != 5 and err.errno != 3: raise # There is no contract about the state of killed jobs. This is set # to make sure that the job will be removed from running lists. if self.state == JobState.ACTIVE: self.state = JobState.DONE finally: # Collect end time now. fn_time = get_current_time() self._duration = fn_time - self._start_time
[docs] def getDuration(self) -> Optional[int]: """Return the CPU time of the job in seconds. If the job is still running, returns None. """ if self._duration is not None: return int(math.ceil(self._duration))
[docs]class JobControlJob(BaseJob): """ This class defines a job control job to be run under `JobDJ`. """
[docs] def __init__( self, command: List[str], command_dir: Optional[str] = None, name: Optional[str] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, launch_timeout: Optional[int] = None, launch_env_variables: Optional[Dict[str, str]] = None, resource_requirement: Optional[resource.ComputeRequirement] = None, **kwargs): """ Job constructor. :param command: The command that runs the job. :param command_dir: The directory from which to run the command. :param name: The name of the job. :param max_retries: Number of allowed retries for this job. If this *is* set, it is never overridden by the `SCHRODINGER_MAX_RETRIES` environment variable. If it *is not* set, the value of max_retries defined in JobDJ is used, and `SCHRODINGER_MAX_RETRIES` can be used to override this value at runtime. To prevent this job from being restarted altogether, set max_retries to zero. :param timeout: Timeout (in seconds) after which the job will be killed. If None, the job is allowed to run indefinitely. :param launch_timeout: Timeout (in seconds) for the job launch process to complete. If None, a default timeout will be used for jobserver and old jobcontrol jobs ( see get_default_timeout() ) unless a value for job timeout parameter is passed and is not greater than the default timeout. :param launch_env_variables: A dictionary with the environment variables to add when the jobcontrol job is launched. The name of any additional variables to set should be in the keyword of the dict and the value should be the corresponding value. These will be added to any environment variables already present, but removed after the job has been launched. :param kwargs: Additional keyword arguments. Provided for consistency of interface in subclasses. :param resource_requirement: Whether the job will require special compute resources, such as GPU. """ super().__init__(command_dir, resource_requirement) self._job_id = None self._job_obj = None self.max_retries = max_retries if name is not None: self.name = name self.procs = kwargs.get('procs', 1) if not self.procs: self.procs = 1 # If self._use_messages is False, the jobdb will be checked on every # call to update(). self._use_messages = True # _last_job_update is the value of get_current_time() at which the last # update to the job record took place. In the situation that job # control messages are being used, it is the time at which the last # message was received from jmonitor. self._last_job_update = 0 self.update_delay = get_update_delay() self.status_string = None self._command = command self.timeout = timeout if launch_timeout is not None: self.launch_timeout = int(launch_timeout) else: DEFAULT_LAUNCH_TIMEOUT = get_default_launch_timeout() if not self.timeout or self.timeout > DEFAULT_LAUNCH_TIMEOUT: self.launch_timeout = DEFAULT_LAUNCH_TIMEOUT else: self.launch_timeout = int(self.timeout) if launch_env_variables is not None: self._launch_env_variables = launch_env_variables else: self._launch_env_variables = {} self._job_downloader = None self._job_launcher = None
[docs] def getJob(self) -> Optional[jobcontrol.Job]: """ Return the job record as a schrodinger.job.jobcontrol.Job instance. Returns None if the job hasn't been launched. """ if self._job_id: self._readAgain() return self._job_obj
[docs] def getDuration(self) -> Optional[int]: """ Return the duration of the Job as recorded by job server. The duration does not include queue wait time. If the job is running or has not launched, returns None. Note that this method makes a blocking call to the job server. """ job = self.getJob() if job is not None: return job.getDuration()
[docs] def runsLocally(self) -> bool: return False
[docs] def usesJobServer(self) -> bool: """ Detect, by looking at the jobId, whether this job uses a job server. """ return self._job_id and mmjob.mmjob_is_job_server_job(self._job_id)
def _readAgain(self): """ Update the jobcontrol.Job object from the database. :raise RuntimeError: If the job record is missing. """ self._job_obj = jobcontrol.Job(self._job_id) def __getstate__(self): """ Return the object state to be serialized by pickle. """ # This method uses a copy of the instance __dict__ attribute and # modifies two values. First, it sets _job_obj to None, as this is a # jobcontrol.Job instance that relies on mmlib functions and # therefore can't be pickled. Second, it sets _use_messages to False # if the state of the job is ACTIVE. This is because any restarted # JobDJ will not receive any messages from job control for jobs that # were started under a different JobDJ. state_dict = dict(self.__dict__) state_dict['_job_downloader'] = None state_dict['_job_launcher'] = None state_dict['_job_obj'] = None if self.state == JobState.ACTIVE: state_dict['_use_messages'] = False return state_dict def __setstate__(self, state_dict): """ Reset state from a pickle. """ self.__dict__.update(state_dict) # Retry failed jobs. if self.state == JobState.FAILED_RETRYABLE: self.state = JobState.WAITING self.status_string = None self.num_failures = 0 # Reset _last_job_update so the first update after de-serialization # reads the jobdb. self._last_job_update = 0
[docs] def update(self): """ Checks for changes in job status, and updates the object appropriately (marks for restart, etc). :raise RuntimeError: if an unknown Job Status or ExitStatus is encountered. """ global _status_messages if self.state != JobState.ACTIVE: # Was NOT running at last check return # no need to update Status, ExitStatus = None, None check_database = False # If using messages, check the _status_messages dict for status # updates; otherwise explicitly check the jobdb. If it's been more # than MESSAGE_TIMEOUT seconds since the last update, check the # jobdb. if self._use_messages: Status, ExitStatus = _status_messages.pop(self._job_id, (None, None)) if Status: self._last_job_update = get_current_time() elif (get_current_time() - self._last_job_update) >= MESSAGE_TIMEOUT: check_database = True else: check_database = True if check_database: # If we checked the job database less than self.update_delay # seconds ago, don't bother to update it. if (get_current_time() - self._last_job_update) <= self.update_delay: return try: self._readAgain() self._last_job_update = get_current_time() except RuntimeError: # job database file is missing: logger.exception(f"error reading {self._job_id} job record") self._hasDied('missing') return Status = self._job_obj.Status try: ExitStatus = self._job_obj.ExitStatus except RuntimeError: ExitStatus = None elif Status is None: # If we're not checking the database and the messages didn't # give us the Status, then there has been no change, so return. return # Goes here if the job object is successfully acquired if Status in [ 'launched', 'submitted', 'started', 'running', 'paused', 'exited' ]: if self._hasTimedOut(): self.kill() return # not a status in new JOB_SERVER elif Status in ['stranded', 'unreachable']: # Can't connect to the job, restart it. self._hasDied(Status) return elif Status in ['completed', 'incorporated']: # self._job_downloader is set while download is occurring if self._job_downloader: return elif self.usesJobServer(): self._startAsyncDownload(Status, ExitStatus) else: self._completeJob(Status, ExitStatus) return else: raise RuntimeError(f"ERROR: JobDJ: Unknown job status: {Status}")
def _startAsyncDownload(self, status: str, exit_status: str): """ Start a download process in the background. :param status: last status of Job :type status: str :param exit_status: last ExitStatus of code :type exit_status: str """ syslog.debug(f"Start async download for {self}") self._job_downloader = jobhub.JobDownloader(self._job_id) self._job_downloader.downloadFinished.connect( functools.partial( self._downloadFinished, status=status, exit_status=exit_status, ), QtCore.Qt.QueuedConnection) self._job_downloader.download() def _markDownloadFailure(self, output: str): """ Mark download as failed with the following output. :param str output: Description of download error. """ logger.error(f"download of {self._job_id} failed with {output}") self._hasDied("download failure") def _downloadFinished(self, output: str, status: str, exit_status: str): """ Handle finished download, whether successful or not. :param str output: Output from JobDownloader. If empty, download was successful, otherwise contains error description. :param str status: string status representation, output of schrodinger.job.jobcontrol.Job.Status :param str exit_status: exit status representation, output of schrodinger.job.jobcontrol.Job.ExitStatus """ # Safe to do here since _startAsyncDownload invokes this method via a # QueuedConnection. if self._job_downloader: self._job_downloader = None if output: self._markDownloadFailure(output) return self._completeJob(status, exit_status) def _completeJob(self, status: str, exit_status: str): """ Perform a state transition to finished. :param status: string status representation, output of schrodinger.job.jobcontrol.Job.Status :param exit_status: exit status representation, output of schrodinger.job.jobcontrol.Job.ExitStatus """ if status == 'completed': if exit_status == FINISHED: # The job is done self._markFinished(FINISHED) return elif exit_status == "stopped": # Stopped by user. consider done. self._markFinished('stopped') return elif exit_status in ['died', 'killed', 'fizzled']: if self.abort_job: self._markFinished(exit_status) else: self._hasDied(exit_status) return else: raise RuntimeError( f"ERROR: JobDJ: Unknown job exit status: {exit_status}") elif status == 'incorporated': # Incorporated, is done. self._markFinished('incorporated') return def _hasTimedOut(self) -> bool: """ Return True if the job has surpassed its timeout value. """ if not self.timeout: return False start_time = self._job_obj.LaunchTime or self._job_obj.StartTime start = time.mktime( time.strptime(start_time, jobcontrol.timestamp_format)) current = get_current_time() elapsed = current - start if elapsed > self.timeout: return True return False def _markFinished(self, action: str): """ Marks the job as finished. """ self.state = JobState.DONE self.status_string = action def _hasDied(self, action: str): """ Tells `JobDJ` to restart the job next-time-around. """ self._job_downloader = None self.state = JobState.FAILED_RETRYABLE self.status_string = action if self._job_id: if self._job_obj: # Backup any log files from the failed run: command_dir = self.getCommandDir() for filename in self._job_obj.LogFiles: if command_dir: filename = os.path.join(command_dir, filename) backup_file(filename, copy=True) logger.error( f"Subjob '{self.name}' died ({action}) [jobid {self._job_id}]") else: logger.error(f"Subjob '{self.name}' died ({action})")
[docs] def doCommand(self, host: str, local: bool = False): """ Launch job on specified `host` using jobcontrol.launch_job(). :param host: Host on which the job will be executed. :param local: Removed in JOB_SERVER. """ # reset launch state self.launch_error = "" self._job_id = None self._job_obj = None cmd = get_command(self.getCommand(), host=host, procs=self.procs, local=local) self.host = host env = os.environ.copy() if self._launch_env_variables: env.update(self._launch_env_variables) if mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER): syslog.debug(f"launch subjob in asynchronous mode {cmd}") self._launchAsynchronous(cmd, env) else: syslog.debug(f"launch subjob in synchronous mode {cmd}") self._launchSynchronous(cmd, env)
def _launchSynchronous(self, cmd: List[str], env: Dict[str, str]): """ Launch job in a blocking fashion. :param cmd: command line for launch command :param env: environment to use when launching the job """ try: if mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER): jobobj = self._launch_with_retries(cmd, env, num_retries=1) else: jobobj = self._launch_with_retries(cmd, env) except RuntimeError as exc: return self._processLaunchFailed(err=exc) return self._processJobStarted(job=jobobj) def _launch_with_retries(self, cmd: List[str], env: Dict[str, str], num_retries=2) -> jobcontrol.Job: """ Launch a job with retries. Sometimes launching fails, for reasons we don't entirely understand. One situation is a failure of the qsub command when submitting to a queue. :param cmd: command line for launch command :param env: environment to use when launching the job :param num_retries: number of times to retry launch """ for retry_count in range(1, num_retries + 1): try: return jobcontrol.launch_job(cmd, print_output=False, timeout=self.launch_timeout, env=env) except RuntimeError as e: logger.warning("WARNING: Job launch failed for " f"{retry_count}/{num_retries} times.\n% " f"{subprocess.list2cmdline(cmd)}\nOutput\n{e}") if retry_count == num_retries: raise def _deleteJobLauncher(self, state: str): """ Change the state of a job after launch. This also deletes the job launcher to free c++ memory. """ self._job_launcher = None self.state = state def _processJobStarted(self, job: jobcontrol.Job): """ Update the state of this class when job is launched successfully. :param job: representation of the job at launch """ name = job.Name if name: self.name = name self._job_id = job.JobId self._job_obj = job self._last_job_update = get_current_time() self.launch_error = "" state = JobState.ACTIVE # Delete job launcher on a timer, since deleting the _job_launcher # from a slot of the _job_launcher will cause a crash. Don't change job # state until deleting the launcher if self._job_launcher: QtCore.QTimer.singleShot( 0, functools.partial(self._deleteJobLauncher, state)) else: self.state = state def _processLaunchFailed(self, err: str): """ Update state of this class when job launch failed. :param job: representation of the job at launch :param err: err message if launch fails, will be empty or None on a succesful launch """ logger.warning(f"WARNING: Job launch failed: {err}") self.status_string = PrintableStatus.FAILED_TO_LAUNCH self.launch_error = err state = JobState.FAILED_RETRYABLE # Delete job launcher on a timer, since deleting the _job_launcher # from a slot of the _job_launcher will cause a crash. Don't change job # state until deleting the launcher if self._job_launcher: QtCore.QTimer.singleShot( 0, functools.partial(self._deleteJobLauncher, state)) else: self.state = state def _launchAsynchronous(self, cmd: List[str], env: Dict[str, str]): """ Launch a job in a nonblocking fashion. :param cmd: command line for launch command :param env: environment to use when launching the job """ job_cmd = jobhub.JobCommand(cmd, "UNUSED_VIEWNAME_ARGUMENT", os.getcwd()) launch_env = QtCore.QProcessEnvironment() for k, v in env.items(): launch_env.insert(k, v) job_cmd.setTimeout(self.launch_timeout) job_cmd.setEnvironment(launch_env) self._job_launcher = self._startJobLauncher( job_cmd, successCallback=self._processJobStarted, failureCallback=self._processLaunchFailed, ) return def _startJobLauncher( self, cmd: jobhub.JobCommand, successCallback: Callable[[jobcontrol.Job], None], failureCallback: Callable[[str], None]) -> jobhub.JobLauncher: """ Create a JobLauncher with the given callbacks attached. """ job_launcher = jobhub.JobLauncher(cmd) job_launcher.jobStarted.connect(successCallback) job_launcher.jobLaunchFailed.connect(failureCallback) job_launcher.launch() return job_launcher
[docs] def cancelSubmitted(self) -> bool: """ If the job is still in the 'submitted' state, cancel it, purge the jobrecord and set the job handle to None. Return True if this was successful, False otherwise. """ if not _host_is_queue(self.host): return False self._readAgain() if self._job_obj.Status == 'submitted': syslog.debug("canceling a submitted job") self._job_obj.kill_for_smart_distribution() try: self._job_obj.purgeRecord() except Exception: pass self._job_obj = None return True else: return False
[docs] def retryFailure(self, max_retries: int = 0) -> bool: """ This method will be called when the job has failed, and JobDJ needs to know whether the job should be retried or not. JobDJ's value for the max_retries parameter is passed in, to be used when the job doesn't have its own max_retries value. Return True if this job should be retried, otherwise False. """ if self.max_retries is not None: return self.num_failures <= self.max_retries return self.num_failures <= max_retries
[docs] def kill(self): """ Send kill request to jobcontrol managed job """ self.cancel()
[docs] def cancel(self): """ Send kill request to jobcontrol managed job. This method will eventually deprecate JobControlJob.kill """ if not self.abort_job: if self._job_obj: # if not self._job_obj, this job is probably being launched. # FIXME: JOBCON-7438 terminate launch process self._job_obj.kill() self.abort_job = True
[docs] def maxFailuresReached(self, msg: str): """ Print an error summary, including the last 20 lines from each log file in the LogFiles list of the job record. """ logger.error(f"ERROR: {msg}") if self.launch_error: logger.error(self.launch_error) return elif not self._job_obj: logger.error('\nNo job control errors were found.') return log_files = self._job_obj.LogFiles errors = self._job_obj.Errors for log_file in log_files: command_dir = self.getCommandDir() if command_dir: log_file = os.path.join(command_dir, log_file) if os.path.exists(log_file): try: with open(log_file, errors='backslashreplace') as fh: lines = fh.readlines() logger.error(f"\nLast 20 lines of '{log_file}':\n") logger.error(' ' + ' '.join(lines[-20:]),) except OSError: logger.exception(f"\nCould not read log: {log_file}") else: logger.error(f"\nLog file '{log_file}' does not exist.") if errors: logger.error("\nJob control errors (from job record):") for error in errors: logger.error(f' ERROR: {error}') elif not self.launch_error: logger.error('\nNo job control errors were found.')
[docs] def getStatusStrings(self) -> Tuple[str, str, str]: # See BaseJob.getStatusStrings docstring. status_string = self.status_string jobid = self._job_id if self._job_obj: machine = self._job_obj.JobHost else: machine = None if machine is not None and machine != self._job_obj.QueueHost: machine = machine.split('.', 1)[0] if machine == self.host: host = self.host else: host = f"{self.host} [{machine}]" else: host = self.host return status_string, jobid, host
def __str__(self): cmd_str = subprocess.list2cmdline(self._command) return f"JobControlJob({cmd_str})"
##############################################################################
[docs]class LinkedListNode: """ A node for the LinkedList class, holding a value, and a reference to the previous and next node in the list. """
[docs] def __init__(self, value, prev=None): self.value = value self.prev = prev self.next = None self.removed = False
[docs]class LinkedList: """ A doubly linked list, providing constant time addition, size, and truth checks. It provides for constant time removal if you have the node object in hand. It provides for linear time iteration without copying while allowing removals or additions to the list during iteration. """
[docs] def __init__(self): self.tail = None self.head = None self.size = 0
def __bool__(self): return bool(self.size)
[docs] def __len__(self): return self.size
def _iter_from(self, start: LinkedListNode, reverse: bool = False): """ Iterate over the list in some direction from some start point. Common code from __iter__() and reverse_iter(). """ node = start if reverse: direction = 'prev' else: direction = 'next' while node: yield node, node.value node = getattr(node, direction) while node and node.removed: node = getattr(node, direction) def __iter__(self): """ Iterate from head to tail over the list, yielding a (node, value) tuple for each element. """ return self._iter_from(self.head)
[docs] def reverse_iter(self): """ Iterate from tail to head over the list, yielding a (node, value) tuple for each element. """ return self._iter_from(self.tail, reverse=True)
def __getstate__(self): return {"list_values": list(value for (node, value) in self)} def __setstate__(self, state): # EV 101813 - old pickle state used to return a list of the linked # list values, but __setstate__ isn't called if __getstate__ returns # a false value, so it now returns a dictionary with a list_values # key. if isinstance(state, list): values = state else: values = state['list_values'] self.__init__() for v in values: self.add(v)
[docs] def remove(self, node: LinkedListNode): """ Remove a node from the list. """ node.removed = True if self.head == node: self.head = node.next else: node.prev.next = node.next if self.tail == node: self.tail = node.prev else: node.next.prev = node.prev self.size -= 1
[docs] def add(self, value: LinkedListNode): """ Add a node to the list. """ node = LinkedListNode(value, self.tail) if self.head is None: self.head = node else: self.tail.next = node self.tail = node self.size += 1
[docs]class RunningJobs(LinkedList): """ A LinkedList subclass that tracks running jobs and keeps a tally of jobs running on each machine. """
[docs] def __init__(self): self._jobs_count = {} LinkedList.__init__(self)
[docs] def add(self, job: BaseJob): """ Add a running job. """ if job.host not in self._jobs_count: self._jobs_count[job.host] = 0 self._jobs_count[job.host] += job.procs LinkedList.add(self, job)
[docs] def remove(self, node: LinkedListNode): """ Remove a linked list node. """ job = node.value self._jobs_count[job.host] -= job.procs if self._jobs_count[job.host] == 0: del self._jobs_count[job.host] LinkedList.remove(self, node)
[docs] def jobsCount(self) -> Dict[str, int]: """ Return a dict telling how many jobs are running on each host. """ return self._jobs_count
[docs]def add_multi_job_finalizer(function: Callable[[BaseJob], None], jobs: List[BaseJob], run_dir: str = None): """ Create a finalizer function that will be called when all jobs in the jobs iterator are complete. """ multi_finalizer = _MultiJobFinalizer(function, jobs, run_dir) for job in jobs: job._finalizers.append(multi_finalizer)
class _MultiJobFinalizer: """ A class to allow specification of a function to be invoked when the final job of a group completes successfully. """ def __init__(self, function: Callable[[BaseJob], None], jobs: List[BaseJob], rundir: str = None): self.function = _wrap_job_finalizer(function, rundir) self._jobs = set() for job in jobs: self._jobs.add(job) def __call__(self, job): self._jobs.remove(job) if not self._jobs: self.function(job)
[docs]class PriorityQueue: """ This is a general priority queue. """
[docs] def __init__(self): self._heap = []
[docs] def __len__(self): return len(self._heap)
def __iter__(self): """ Note that this method is only meant to provide access to the items in the queue, and that it does not return them in sorted order. """ yield from self._heap
[docs] def push(self, item): """ Add an item to the heap. This item must have a __lt__ method as per the heapq module requirement. """ heapq.heappush(self._heap, item)
[docs] def pop(self): """ Get the highest priority item, removing it from the heap. """ return heapq.heappop(self._heap)
[docs] def remove(self, item): """ Remove any copies of item from the heap. """ while True: try: self._heap.remove(item) except ValueError: return
[docs]class JobDJ: """ Class for running commands/jobs in parallel under jobcontrol. Create an instance of this class, add commands to run with .addJob(), and then call run(). """
[docs] def __init__( self, hosts: Optional[List[Tuple[str, int]]] = None, local: bool = False, max_retries: Optional[int] = None, default_max_retries: int = 0, max_failures: Optional[int] = None, verbosity: str = "quiet", job_class: BaseJob = JobControlJob, update_delay: Optional[int] = None, ): """ Constructor. :param hosts: A list of (<hostname>, <maximum_concurrent_subjobs>) tuples, where <hostname> is a string and <maximum_concurrent_subjobs> is an integer. The default value of None is determined automatically and is usually desired. :param local: No longer functional in JOB_SERVER. :param max_retries: Number of allowed retries per subjob. If this *is* set, it is never overridden by the `SCHRODINGER_MAX_RETRIES` environment variable. If it *is not* set, the value in default_max_retries is used, and `SCHRODINGER_MAX_RETRIES` is allowed to override. If you wish to disable restarting altogether, set this value to zero. :param default_max_retries: Number of allowed retries per subjob. This value can always be overridden by the `SCHRODINGER_MAX_RETRIES` environment variable. Default is zero. :param max_failures: Total number of allowed subjob failures before `JobDJ` exits. If it is not defined, a default of zero will be used (exit on any failure after attempting to restart), but this can be overridden with the `SCHRODINGER_MAX_FAILURES` environment variable. To allow an unlimited number of subjob failures, set max_failures to the module level NOLIMIT constant. :param verbosity: There are three allowed verbosity levels: "quiet" - only warnings and errors are printed; "normal" - `JobDJ` progress is printed; and "verbose" - additional debugging info is printed. Default is "quiet". :param job_class: The class to use as the default job constructor when the `addJob` argument is not a `BaseJob` instance. :param update_delay: The number of seconds to wait between job control database reads for `JobControlJob` jobs. (This delay is for an individual job, not for any job database read.) Default is None, which causes the module level constant UPDATE_DELAY to be used. """ # Latest jobs tracks the timestamp of the last update and allows # us to get updated jobs from the job server. self._latest_jobs = None self._latest_update_time = get_current_time() # All jobs are in one of the following collections. self._jobqueue = PriorityQueue() # A heap of jobs yet to be started. self._finished = dict() # A dict with successfully completed # jobs as the key and time of # completion as the value. self._failed = [] # A list of failed jobs. self._launching = set() self._running = RunningJobs() # A LinkedList of running jobs. # A note on the _jobqueue collection: # These jobs are all ready to be started, i.e. they have no # unfinished prerequisites. Because every job is a node in a job # dependency graph, jobs with prerequisites only exist as # connections to jobs without prerequisites that are in in the # '_jobqueue' heap. # Ev:98326 Store both a set and a list. A list allows the job order to # be preserved, and a set allows a quick look-up whether a job has # been added or not. self._added_jobs_list = [] self._added_jobs_set = set() self._job_class = job_class # The default argument for update_delay is None so that changes # to the module level constant UPDATE_DELAY will be used here. # (With a default argument of update_delay=UPDATE_DELAY, the # default will always be the value of UPDATE_DELAY at import.) if update_delay is not None: self._update_delay = update_delay else: self._update_delay = get_update_delay() # The maximum number of jobs to start before pausing to check for job # status updates. self.max_starts = 5 # Time that the las job was started (sec) self.last_submit = 0 self.smart_distribution = True self._keep_one_job_on_localhost = False # Ev:57495 self.num_failed_jobs = 0 self._use_messages = self._checkMessaging() self._hosts = None # Ordered dict with host key, maxjobs value. # If None, means use host set by toplevel. self.local = local if verbosity not in ("quiet", "normal", "verbose", "silent"): raise Exception("Unrecognized verbosity level - {verbosity}") self._verbosity = verbosity # 1) Use max_retries if specified # 2) Otherwise, default to default_max_retries, but use # SCHRODINGER_MAX_RETRIES if it is set if max_retries is None: self.max_retries = default_max_retries if 'SCHRODINGER_MAX_RETRIES' in os.environ: self.max_retries = int(os.environ['SCHRODINGER_MAX_RETRIES']) else: self.max_retries = max_retries if max_failures is None: # Use SCHRODINGER_MAX_FAILURES (if set): env_max_failures = os.getenv('SCHRODINGER_MAX_FAILURES') if env_max_failures: self.max_failures = int(env_max_failures) else: self.max_failures = 0 # Use default of zero else: self.max_failures = int(max_failures) # Use the passed host list, if present: if hosts is not None: host_list = [] # List of (host, maxjobs) tuples if isinstance(hosts, list): for host, n in hosts: host_list.append((host, n)) else: raise Exception(f"Invalid type for host argument: {hosts}") self.setHostList(host_list) self._job_id = None # If the user never specified the hosts, get the host list # from jobcontrol. Note that this code will NOT run if the # user instructed JobDJ to use an empty host list (Ev:52954) if self._hosts is None: self.setHostList(jobcontrol.get_host_list()) backend = jobcontrol.get_backend() if backend: self._job_id = backend.job_id # This get function caches environment variables and # removes them for subjobs jobcontrol.get_backend_host_list() # backdoor for VSW to avoid checking if job resources are invalid. This # is needed since VSW starts a JobDJ without valid hosts, so jobs may # not be runnable. Using this option can cause JobDJ to have an # infinite loop (as does not specifying hosts upfront). self._ignore_host_resource_checking = False
[docs] def hasStarted(self) -> bool: """ Returns True if JobDJ has started already """ if self._finished or self._failed or self._running: return True return False
[docs] def isComplete(self) -> bool: """ Returns True if JobDJ has completed, False otherwise. """ if self._running or self._jobqueue: return False else: return True
[docs] def markForRestart(self, job: BaseJob, action: str): """ Mark a job as dead, but make sure that it gets restarted. :param action: Describes the reason the job is being restarted. """ # Used by VSW (Pipeline). job._hasDied(action) if job in self._failed: self._failed.remove(job) job.state = JobState.WAITING self._queueJob(job) self.printStatus(job, PrintableStatus.RESTARTING) elif job in self._finished: del self._finished[job] job.state = JobState.WAITING self._queueJob(job) self.printStatus(job, PrintableStatus.RESTARTING) return
def _checkMessaging(self) -> bool: """ Check to see if use of job control messaging is possible. For legacy jobcontrol it is necessary to get a backend object to communicate via jmonitor messages. :returns: True if messaging is possible, False if not. """ # Check if job control messages need to be used for subjob status. if USE_JOB_CONTROL_MESSAGES: if mmutil.feature_flag_is_enabled( mmutil.JOB_SERVER) or jobcontrol.get_backend() is not None: return True else: return False else: return False def __getstate__(self): state_dict = dict(self.__dict__) state_dict['_latest_jobs'] = None state_dict['_latest_update_time'] = get_current_time() state_dict['_launching'] = set() return state_dict def __setstate__(self, state_dict): """ Reset state from a pickle. """ self.__dict__.update(state_dict) if self._use_messages: self._use_messages = self._checkMessaging() @property def waiting_jobs(self) -> List[BaseJob]: """ Jobs waiting to be started. """ return list(self._jobqueue) @property def done_jobs(self) -> List[BaseJob]: """ Successfully completed jobs, sorted into the order they were marked as completed by JobDJ. """ return sorted(list(self._finished), key=lambda x: self._finished[x]) @property def active_jobs(self) -> List[BaseJob]: return [job for (node, job) in self._running] @property def failed_jobs(self) -> List[BaseJob]: return [job for job in self._failed] @property def all_jobs(self) -> List[BaseJob]: return [job for job in self._added_jobs_list]
[docs] def killJobs(self): """ Kill all active jobs """ raise_failure = False for (node, job) in self._running: attempt = 1 while True: try: job.cancel() except Exception as e: jobid = getattr(job, "_job_id", None) if jobid: name = f"Job {jobid}" else: name = str(job) logger.exception(f"ERROR: {name} cancelation failed") if attempt >= MAX_KILL_ATTEMPTS: raise_failure = True else: attempt += 1 continue break if raise_failure: raise RuntimeError("Unable to kill some of the jobs.")
@property def total_added(self) -> int: """ The number of individual jobs that have been added to the JobDJ instance. """ return len(self._added_jobs_set) @property def total_active(self) -> int: """ The number of jobs currently running. """ return len(self._running) - len(self._launching) @property def total_finished(self) -> int: """ The number of jobs that have finished successfully. """ return len(self._finished) @property def total_failed(self) -> int: """ The number of jobs that have failed. """ return len(self._failed)
[docs] def addJob(self, job: Union[BaseJob, List], add_connected: bool = True, **kwargs): """ Add a job to run. If `job` is not a `BaseJob` instance, a `BaseJob` instance is constructed with `job` as the first argument. The default `BaseJob` class for the `JobDJ` instance can be specified in the constructor for `JobDJ`. Additional keyword arguments are passed on to the job constructor. All job prerequisites and dependencies need to be specified before adding a job to `JobDJ`. :param add_connected: If True, for jobs with dependencies only one job per connected group should be added and all connected jobs will be discovered and added automatically. If False, it is the user's responsibility to make sure that any prerequisites of a job are also added. """ if 'jobdir' in kwargs: warnings.warn( "The 'jobdir' keyword argument for addJob is deprecated. " "Please use 'command_dir' as the functional equivalent.", DeprecationWarning, stacklevel=2) kwargs['command_dir'] = kwargs.pop('jobdir') # If the job isn't a 'BaseJob' subclass, create an instance from the # default job class for this JobDJ instance. if not isinstance(job, BaseJob): if not isinstance(job, list): warnings.warn( "Using JobDJ.addJob() with strings is likely to cause " "problems with arguments that have spaces (such as " "filenames). Please provide 'command' as a list of " "arguments to have launch_job deal with this robustly.", DeprecationWarning, stacklevel=2) else: # Given a cmd list. Make sure each arg is a str for i, arg in enumerate(job, start=1): if not isinstance(arg, str): raise TypeError(f"Argument {i} is of type {type(arg)} " f"(expecting a string)\nCommand: {job}") job = self._job_class(job, **kwargs) procs = kwargs.get('procs', 1) if procs > 1 and len(self._hosts) > 1: raise RuntimeError("Calling addJob() with procs > 1 " "is only supported for single hosts.") add_priority = kwargs.get('add_priority', self.total_added) if add_connected: jobs = list(job.genAllJobs()) else: jobs = [job] for job in jobs: job._jobdj = self for j in jobs: if j in self._added_jobs_set: # Behavior here is questionable; it may be better to raise # an exception or just to be completely silent about jobs # that have been added multiple times. I went for the middle # ground. if j._prereqs: logger.info( textwrap.fill( f"Job '{j}' has already been added, " "probably as a dependency of another job. " "To quiet this message, avoid adding prerequisite " "jobs directly or use the add_connected=False " "option of the JobDJ.addJob() method.", subsequent_indent=" ")) else: logger.debug(f"Job '{j}' has been added multiple times. " "It will only be run once.") continue if hasattr(j, '_use_messages'): j._use_messages = self._use_messages if hasattr(j, "update_delay"): j.update_delay = self._update_delay j._add_priority = add_priority if j.state == JobState.DONE: self._jobFinished(j) elif not j.getPrereqs(): # If a job doesn't have prerequisites, queue it up. If it # does have prerequisites it will only be added to a queue # heap when all of its prereqs are finished. self._queueJob(j) self._added_jobs_set.add(j) self._added_jobs_list.append(j)
def _queueJob(self, job: BaseJob): """ Add a job to a queue heap. """ self._jobqueue.push(job)
[docs] def dump(self, filename: pathlib.Path): """ Pickle the `JobDJ` instance to the specified file name. """ with open(filename, "wb") as fh: pickle.dump(self, fh)
[docs] def setSmartDistribution(self, state: bool): """ Set smart distribution of jobs. :param bool state: Whether to enable smart distribution """ self.smart_distribution = state
[docs] def disableSmartDistribution(self): """ Disable smart distribution of jobs. Smart distribution allows subjobs to run on the machine that JobDJ is running on when JobDJ itself is running under a queuing system. This is usually desirable since the JobDJ process doesn't generally consume significant computational resources and you don't want to leave a queue slot mostly idle. """ self.setSmartDistribution(False)
def _checkSmartDistribution(self): """ Check to see if we should use smart distribution. Smart distribution runs jobs on the localhost when a host is consumed by the JobDJ driver. Smart distribution is used if: 1. smart distribution is enabled (the default), 2. `JobDJ` is running under Job Control itself, 3. the JobDJ job is running on a queue, and 4. "localhost" is not in the `JobDJ` host list already. In this situation, the preference is to always keep a single job running on localhost. If the JobDJ job has a HostEntry that is present in the host list, the maximum number of jobs for that host is decreased by one. This ensures a "smart" job distributor that runs jobs on the same host that the JobDJ driver is on, rather than leaving it mostly idle. """ # Complain if more than one host is specified; and some of the hosts # has "unlimited" ncpus: if len(self._hosts) > 1: unlimited = any([ cpus for cpus in self._hosts.values() if cpus == UNLIMITED_CPUS ]) if unlimited: logger.warning("WARNING: JobDJ: Can't use unlimited cpus " "if more than one host is specified.") self._keep_one_job_on_localhost = False if self.smart_distribution: if not self._hosts: # Added this check to fix STU#766 logger.warning("WARNING: Can't use smart distribution, as host " "list is empty") return host_entry = [host for host in self._hosts][0] parentjob = None if not jobcontrol.get_backend(): return parentjob = jobcontrol.get_backend().getJob() # Check whether 'localhost' is already in the _hosts ordered # dict. If it is, treat it just like any other host. # (self._keep_one_job_on_localhost will be False) if LOCALHOST_ENTRY_NAME in self._hosts: return if not _any_ancestor_is_queued(parentjob): return # Add one localhost cpu and decrement HostEntry maxjobs by # one: self._keep_one_job_on_localhost = True # Ev:57495 if self._hosts[host_entry] != UNLIMITED_CPUS: self._hosts[host_entry] -= 1 def _availableHost(self, *, required_procs: int = 1, resource_requirement) -> Optional[str]: """ Return the next available host name from the host list. Return None if all hosts are saturated. :param required_procs: Number of processors that this subjobs will use. :param resource_requirement: Type of resource the job requires """ active_counts = self.getActiveProcCounts() # Attempt to keep the localhost saturated, but note that only jobs # using a single processor should be run there. Localhost is listed by # "localhost" name always in the case of smart distribution being # enabled. if self._keep_one_job_on_localhost and required_procs == 1 and jobcontrol.get_host( LOCALHOST_ENTRY_NAME).matchesRequirement(resource_requirement): # Number of jobs currently active on localhost: if active_counts.get(LOCALHOST_ENTRY_NAME, 0) == 0: return LOCALHOST_ENTRY_NAME # If got here, then either 'localhost' is saturated, or smart # distribution is inactive. compute_resources = [] for host_entry_name, maximum_concurrent_subjobs in self._hosts.items(): compute_resources.append( _ComputeResource( host_entry=jobcontrol.get_host(host_entry_name), maximum_concurrent_subjobs=maximum_concurrent_subjobs, active_subjobs=active_counts.get(host_entry_name, 0))) return _find_optimal_host( compute_resources, resource_requirement, required_procs, validate_job_requirements=not self._ignore_host_resource_checking)
[docs] def getActiveProcCounts(self) -> Dict[str, int]: """ Return a dictionary containing the number of active jobs on each host. """ return self._running.jobsCount()
[docs] def setHostList(self, host_list: List[Tuple[str, int]]): """ Define compute hosts to run subjobs on. Active jobs are not affected by a change in the host list. :param host_list: A list of (<host_entry_name>, <maximum_concurrent_subjobs>) tuples, where <host_entry_name> is a string and <maximum_concurrent_subjobs> is an integer. """ self._hosts = {} # Repeated hostnames will combine the available cpus and keep the # list position of the first occurrence. # For every non-queued host, if NCPUS was NOT specified (None), # set it to 1. Leave it as None (unlimited) for queued hosts: for (hostname, ncpus) in host_list: if ncpus is not None and ncpus < 1: raise ValueError( f'Invalid number of max jobs ({ncpus}) for host {hostname}') # -HOST localhost if ncpus is None and not _host_is_queue(hostname): ncpus = 1 if hostname in self._hosts: self._hosts[hostname] += ncpus else: self._hosts[hostname] = ncpus
[docs] def getFirstHost(self): """ Get first host from the lists of hosts. :rtype: str :return: First host """ return next(iter(self._hosts))
def _areProcessorsAvailableOnLocalhost(self) -> bool: """ Are there available processors on the localhost? This is treated specially, because the caller may not indicate that there are slots to be had on the localhost. However, if the job can only be run on the localhost, we need to make it available. """ maxjobs = self._hosts.get(LOCALHOST_ENTRY_NAME, 1) proc_counts = self.getActiveProcCounts() return proc_counts.get(LOCALHOST_ENTRY_NAME, 0) < maxjobs
[docs] def printStatus(self, job: Optional[BaseJob] = None, action: Optional[str] = None): """ Prints the status of `JobDJ` and the action/status for the job. If no job is specified, prints the status header. If no action is specified, the `status_string` attribute of the job is used. """ # Adjust spacing is adjusted for the total number of jobs. # m is the width of the 'jobs' column: m==1 when #jobs is 1-9, # 2 when #jobs is 10-99, 3 when #jobs is 100-999, etc. m = len(str(self.total_added)) if job is None: # Print the header s = "-" * m header = """\ JobDJ columns: C: Number of completed subjobs A: Number of active subjobs (e.g., submitted, running) W: Number of waiting/pending subjobs %*s %*s %*s | Activity JobId JobName JobHost %*s %*s %*s | --------- -----------------------""" % ( m, "C", m, "A", m, "W", m, s, m, s, m, s) logger.info(textwrap.dedent(header)) else: status_string, jobid, host = job.getStatusStrings() if action is not None: status_string = action num_active = self.total_active num_completed = self.total_finished + self.total_failed num_waiting = self.total_added - num_active - num_completed status = "%*d %*d %*d | %-9s %s %s %s" % \ (m, num_completed, m, num_active, m, num_waiting, status_string, jobid, job.name, host ) logger.info(status) sys.stdout.flush()
def _start(self, *, restart_failed: bool = True): """ Perform startup activities for `JobDJ`, including printing headers and marking jobs for restart if `JobDJ` is restarting. :param restart_failed: True (default) if previously failed jobs should be restarted, False if not. :raise RuntimeError: If no jobs have been added to `JobDJ`. """ # This code must be kept here because the global logging state will # need to be set if the JobDJ is being recovered from a pickle. if self._verbosity == "quiet": logger.setLevel(log.WARNING) elif self._verbosity == "normal": logger.setLevel(log.INFO) elif self._verbosity == "verbose": logger.setLevel(log.DEBUG) elif self._verbosity == "silent": logger.setLevel(log.CRITICAL) else: self._verbosity = "quiet" logger.setLevel(log.WARNING) num_jobs = self.total_added if num_jobs == 0: msg = "ERROR: No jobs to run. Use JobDJ.addJob() to add jobs." raise RuntimeError(msg) # NOTE: At this point, NCPUS will be None ONLY for queued hosts if len(self._hosts) == 0: logger.info("\nRunning subjobs on hosts:") logger.info(" Not available yet") else: logger.info("\nRunning subjobs on hosts:") for hostname, host_maxjobs in self._hosts.items(): if host_maxjobs is UNLIMITED_CPUS: host_maxjobs = 'No limit' logger.info(f" {hostname} (Max: {host_maxjobs})") logger.info(f"Number of jobs: {num_jobs}") logger.info(f"Max retries per job: {self.max_retries}") if self.max_failures == NOLIMIT: logger.info("Max allowed failures: unlimited") else: logger.info(f"Max allowed failures: {self.max_failures}") logger.info(f"Verbosity: {self._verbosity}\n") if self.isComplete() and not self._failed: # All jobs have completed successfully. (Failed jobs will be # restarted below.) logger.info("JobDJ has already completed.") return restarted_job_control = False if self._finished or self._failed or self._running: logger.info("Resuming JobDJ...\n") # Note that the previous JobDJ instance was running under job # control. if self._job_id: restarted_job_control = True # Update the _job_id attribute for the current process backend = jobcontrol.get_backend() if backend: self._job_id = backend.job_id else: self._job_id = None else: logger.info("Starting JobDJ...\n") # Do this only when starting, because _checkSmartDistribution # can modify _hosts. self._checkSmartDistribution() syslog.debug( f"hosts after _checkSmartDistribution(): {self._hosts}") logger.info( f"Keep one job on driver host: {self._keep_one_job_on_localhost}" ) sys.stdout.flush() # Print the header: self.printStatus() # If failed jobs are present, this is a restarted JobDJ. # Put failed jobs back in the queue; if state needs to be reset it # should be done in __setstate__. if restart_failed: while self._failed: job = self._failed.pop(0) self._queueJob(job) # If running jobs are present, this is a restarted JobDJ. # Update the job status but ignore failures since we are starting # anew. for node, job in self._running: job.update() # Any jobs that were running under a previous instance of JobDJ # should be marked as killed, since if the previous JobDJ was # a job itself, a job failure will kill all subjobs. if (restarted_job_control and not job.hasExited() and isinstance(job, JobControlJob)): job._hasDied("killed") if job.state == JobState.DONE: self._running.remove(node) self._jobFinished(job) self.printStatus(job) elif job.state == JobState.FAILED_RETRYABLE: self._running.remove(node) self._queueJob(job) self.printStatus(job) # If the job is going to be restarted, make its message use # match the current default. if hasattr(job, '_use_messages'): job._use_messages = self._use_messages # Reset failures and state. job.num_failures = 0 job.state = JobState.WAITING # Set num_failed_jobs to zero (this only matters if this is a # restart from pickle). self.num_failed_jobs = 0 # If there are no available processors, print a message: if not self._hosts: # No hosts are given to this JobDJ to use logger.info("Waiting for available processors to use...")
[docs] def run(self, *, status_change_callback: Optional[Callable[[BaseJob], None]] = None, periodic_callback: Optional[Callable[[BaseJob], None]] = None, callback_interval: int = 300, restart_failed: bool = True): """ Call this method to run all jobs that have been added. The method will return control when all jobs have completed. :param status_change_callback: A function to call every time a job status changes. For example, JobState.RUNNING->JobState.DONE. This function takes a single argument of a schrodinger.job.queue.BaseJob object. :param periodic_callback: A command to call periodically, regardless of whether job status has changed or not. The function will be called without any arguments. :param callback_interval: The interval at which the periodic interval will be called. This time is only approximately enforced and will depend on the timing delay settings (e.g. MONITOR_DELAY). :param restart_failed: True (default) if previously failed jobs should be restarted, False if not. """ # Make sure QCoreApplication/QApplication is present jobcontrol._launch_qapp() self._start(restart_failed=restart_failed) event_loop = QtCore.QEventLoop() if periodic_callback: @qt_utils.exit_event_loop_on_exception def _callback(**kwargs): periodic_callback() periodic_timer = QtCore.QTimer() periodic_timer.timeout.connect( functools.partial(_callback, event_loop=event_loop)) periodic_timer.start(int(callback_interval * 1000)) # Call the function immediately to not wait for event loop QtCore.QTimer.singleShot( 0, functools.partial(self._updateAndLaunchJobs, MONITOR_DELAY * 1000, status_change_callback=status_change_callback, event_loop=event_loop)) event_loop.exec() if periodic_callback: # Explicitly stop the callback timer. We don't want to rely on # garbage collection to stop the timer when this method exits since # it's possible that the top level exception handler will hold on to # a traceback object that keeps this method's namespace alive. periodic_timer.stop() # Raise an exception if the event_loop quit with an exception. exception = qt_utils.get_last_exception() if exception: raise exception if self.isComplete(): self._finish()
def _checkSubmitted(self, last_job: BaseJob): """ Cancel a job in the 'submitted' state if possible and move it back into the job queue to be restarted. This method is only useful to call when a known non-queue host is available. """ if not last_job.hasExited(): return # If the last job didn't run on localhost then there's no need to # cancel a queued job to run on the localhost. if not last_job.host == "localhost": return # If the job would never be launched on localhost then there's no need # to cancel a queued job. if "localhost" not in self._hosts and not self._keep_one_job_on_localhost: return # If any of the jobs in the _jobqueue are not run via job control, they # will be started up on localhost, so don't cancel queued jobs. for job in self._jobqueue: if not job.runsLocally(): return # Iterate in reverse order so that subjob that has been in # the queue for the shortest time is canceled. for node, job in self._running.reverse_iter(): # we should not kill a job we are launching if not isinstance(job, JobControlJob) or job.state != JobState.ACTIVE: continue if job.cancelSubmitted(): backend = jobcontrol.get_backend() if backend: backend.deleteSubJob(job._job_id) self._running.remove(node) self._queueJob(job) break def _updateRunningJobs(self) -> Generator[BaseJob, None, None]: """ A generator to monitor the running jobs and update their state. Yields a job every time it finishes or fails. """ for node, job in self._running: job.update() # Make sure we always print one launched line, even if the job is # already JobState.DONE or JobState.FAILED. if job.state != JobState.LAUNCHING: if job in self._launching: self._launching.remove(job) self._logLaunchStatus(job) yield job if job.state == JobState.DONE: self._running.remove(node) self._jobFinished(job) self.printStatus(job) yield job elif job.state == JobState.FAILED_RETRYABLE: job.num_failures += 1 self._running.remove(node) # If the job can't be retried, add it to the failed list, # otherwise push it back on the job queue. if job.retryFailure(self.max_retries): job.state = JobState.WAITING self._queueJob(job) else: self._jobFailed(job) self.printStatus(job) yield job elif job.state == JobState.FAILED: job.num_failures += 1 self._running.remove(node) self._jobFailed(job) self.printStatus(job) yield job @qt_utils.exit_event_loop_on_exception def _updateAndLaunchJobs( self, monitor_delay: int, *, status_change_callback: Optional[Callable[[BaseJob], None]] = None, event_loop=None): """ Processes running job events. This function will call itself unless all jobs are complete. :param monitor_delay: number of milliseconds to wait before processing events again :type monitor_delay: int :param status_change_callback: A function to call every time a job status changes. For example, RUNNING->DONE. This function takes a single argument of a schrodinger.job.queue.BaseJob object. :param event_loop: event_loop used for processing :type event_loop: PyQt5.QtCore.QAbstractEventDispatcher """ did_start_job = self._processJobEvents( status_change_callback=status_change_callback) if self.isComplete(): event_loop.quit() return False # if we start a job, immediately process job events again, otherwise # wait to poll if did_start_job: time_to_next_processing = 0 else: time_to_next_processing = monitor_delay QtCore.QTimer.singleShot( time_to_next_processing, functools.partial(self._updateAndLaunchJobs, monitor_delay, status_change_callback=status_change_callback, event_loop=event_loop)) def _processJobEvents( self, status_change_callback: Optional[Callable[[BaseJob], None]] = None, ) -> bool: """ Check and update statuses of all running jobs and launch new jobs. Run a callback function status_change_callback on each job if it was started or completed. Returns whether or not a job was started. """ did_start_job = False self._updateStatusFromMessages() for job in self._updateRunningJobs(): if status_change_callback: status_change_callback(job) self._checkSubmitted(job) for started_job in self._startJobs(max_starts=self.max_starts): if status_change_callback: status_change_callback(started_job) self._updateStatusFromMessages() did_start_job = True return did_start_job def _finish(self): """ Log footer upon JobDJ completion. """ logger.info("\nAll jobs have completed.\n") logger.info( f"{self.total_finished} of {self.total_added} job(s) succeeded; " f"{self.total_failed} job(s) failed.\n") failed_jc_job = False for failed_job in self._failed: # The code below list out jobnames which has Job object. # Launch stage jobs which failed are not shown. if isinstance(failed_job, JobControlJob) and failed_job._job_obj: if not failed_jc_job: logger.debug("Failed JobControlJobs are -") failed_jc_job = True logger.debug(f"... {failed_job._job_obj.Name}") def _updateStatusFromMessages(self): """ Update the status of all jobs monitor by this jobdj, via global status_messages. """ global _status_messages if not self._use_messages: return if mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER): new_job_changes = self._getJobUpdates() else: new_job_changes = self._getJobUpdatesLegacy() _status_messages.update(new_job_changes) def _getJobUpdates(self) -> Dict[str, Tuple[str, str]]: """ Retrieve new status changes from jobs in JOB_SERVER. :rtype: dict(str,tuple(str,str)) :return: dict with keys of jobid, value is (job_status, exit_status) """ if (get_current_time() - self._latest_update_time) < MINIMUM_UPDATE_TIME: return {} if self._latest_jobs is None: self._latest_jobs = mmjob.LatestJobs() if not self._job_id: # If this JobDJ isn't a job, ignore subjobs to reduce the # number of irrelevant updates on the first query. self._latest_jobs.setAllowSubJobs(False) self._latest_jobs.update() job_changes = {} for job in self._latest_jobs.getUpdatedJobs(): if job.isComplete(): exit_status = job.ExitStatus else: exit_status = None job_changes[job.JobId] = job.Status, exit_status syslog.debug( f"_getMessages: {job.JobId} {job.Status} {exit_status}") self._latest_update_time = get_current_time() return job_changes def _getJobUpdatesLegacy(self) -> Dict[str, Tuple[str, str]]: """ Update job status via reading the backend fifo, legacy jobcontrol only. :rtype: dict(str,tuple(str,str)) :return: dict with keys of jobid, value is (job_status, exit_status) """ job_changes = {} # Pull all messages off the queue and update the _message dict. try: while True: job_id, status, exit_status = \ mmjob.mmjobbe_subjob_status_changed() job_changes[job_id] = status, exit_status syslog.debug("_getMessages: {} {} {}".format( job_id, status, exit_status)) except mm.MmException as e: if e.rc != mmjob.MMJOBBE_EMPTY_LIST: raise return job_changes def _jobFailed(self, job: BaseJob): """ A method to keep track of completely failed jobs and die if max_failures is exceeded. """ self._failed.append(job) self.num_failed_jobs += 1 syslog.debug(f"Job failed: {job} ") msg = f"Subjob '{job.name}' has failed" if job.num_failures > 1: msg += f" after {job.num_failures - 1} retries." else: msg += "." # Ensure state is FAILED, not FAILED_RETRYABLE job.state = JobState.FAILED self.printStatus(job) job.maxFailuresReached(msg) if self.num_failed_jobs > self.max_failures and self.max_failures != NOLIMIT: # Reached max_failures; abort the JobDJ.run loop and assume # jobcontrol will handle canceling all remaining subjobs raise MaxJobFailureError def _jobFinished(self, job: BaseJob): """ Take some standard steps on successful completion of a job. Add the completed job to the finished list, call its finalize method, and look for new jobs from completed dependencies. """ # Make sure that finalization and dependent job addition are only # done once per job. if job in self._finished: return self._finished[job] = get_current_time() job.finalize() for j in job._pruneGraph(): if j.state == JobState.DONE: self._jobFinished(j) elif not j.getPrereqs(): self._queueJob(j) def _startJobs( self, max_starts: Optional[int] = None) -> Generator[BaseJob, None, None]: """ A generator to start as many jobs as possible, given the current open hosts. Each job that is successfully started (or restarted) is yielded. The optional max_starts argument can be used to limit the number of jobs that will be started before this generator terminates. :param max_starts: The maximum number of jobs to start. If None, do not yield. """ started_jobs_count = 0 failed_launches = [] should_requeue = [] while self._jobqueue: if max_starts and started_jobs_count >= max_starts: break job = self._jobqueue.pop() if job.runsLocally(): if self._areProcessorsAvailableOnLocalhost(): job.run() if job.state == JobState.DONE: self._jobFinished(job) else: self._running.add(job) self.printStatus(job, PrintableStatus.STARTED) started_jobs_count += 1 yield job else: # If no jobs are running, then avoid infinite loop and exit if len(self._running) == 0: raise RuntimeError( 'Unable to launch jobs; no processors available on localhost' ) should_requeue.append(job) else: # If no host is available, push the job back on the queue # and break (to clean up failed launches) host = self._availableHost( required_procs=job.procs, resource_requirement=job._resource_requirement) if not host: self._queueJob(job) break if not mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER): # Make sure that host jobs aren't started more frequently # than SUBMIT_DELAY seconds. now = get_current_time() time_since_submit = now - self.last_submit if time_since_submit < SUBMIT_DELAY: time.sleep(SUBMIT_DELAY - time_since_submit) now = get_current_time() self.last_submit = now job.run(host, self.local) if job.state == JobState.FAILED_RETRYABLE: failed_launches.append(job) elif job.state == JobState.DONE: self._jobFinished(job) self.printStatus(job) else: self._running.add(job) # using async mode in JOB_SERVER if job.state == JobState.LAUNCHING: self._launching.add(job) # using legacy jobcontrol or synchronous mode else: self._logLaunchStatus(job) started_jobs_count += 1 yield job for j in should_requeue: self._queueJob(j) for job in failed_launches: job.num_failures += 1 # If the job can't be retried, add it to the failed list, # otherwise push it back on the job queue. if job.retryFailure(self.max_retries): job.state = JobState.WAITING self._queueJob(job) else: self._jobFailed(job) yield job def _logLaunchStatus(self, job: BaseJob): """ Logs a line about a job that has been launched. """ if job.launch_error: # launch errors will be logged later return elif job.num_failures > 0: self.printStatus(job, PrintableStatus.RESTARTED) else: self.printStatus(job, PrintableStatus.LAUNCHED)
def _any_ancestor_is_queued(job: jobcontrol.Job) -> bool: """ Return True if any job in the hierarchy of parent(s) is running in an HPC queue. :param job: job to query """ if _host_is_queue(job.HostEntry): return True # The OrigLaunchHost represents the ultimate parent of job, this statement # is probably true whenever the above is True and will be true in some # cases where it is not, e.g. a driver launched to localhost on a cluster # node with a queue -HOST argument. if job.OrigLaunchHost != job.JobHost: return True return False
[docs]def get_current_time() -> float: """ Return time, suitable for mocking. """ return time.time()
@dataclasses.dataclass class _ComputeResource: host_entry: jobcontrol.Host maximum_concurrent_subjobs: int active_subjobs: int def _remove_gpu_hosts( compute_resources: List[_ComputeResource]) -> List[_ComputeResource]: """ Remove gpu hosts if there are separate CPU hosts. """ valid_compute_resources = [] for compute_resource in compute_resources: if compute_resource.host_entry.matchesRequirement( resource.ComputeRequirement(resource.ComputeType.GPU)): continue valid_compute_resources.append(compute_resource) # if we filter all hosts, return original set of hosts if not valid_compute_resources: return compute_resources return valid_compute_resources def _find_optimal_host(compute_resources: List[_ComputeResource], resource_requirement: resource.ComputeRequirement, required_procs: int, validate_job_requirements: bool = True) -> Optional[str]: """ Find an optimal host to run a job matching resource_requirement. :return: host_entry_name best matching the requirement, or None if the host is busy running jobs :raises MissingResourceError: if no hosts match the resource_requirements """ valid_compute_resources = [] for compute_resource in compute_resources: if compute_resource.host_entry.matchesRequirement(resource_requirement): valid_compute_resources.append(compute_resource) if not valid_compute_resources and validate_job_requirements: raise MissingResourceError(resource_requirement.compute_type.name, compute_resources) # prune mixed gpu / compute resource if resource_requirement.compute_type == resource.ComputeType.CPU: valid_compute_resources = _remove_gpu_hosts(valid_compute_resources) # Submit next subjob to a host that has available processors # AND has the fewest number of subjobs on it. # This will make sure host lists such as: "-HOST monica,nina" # work correctly (both have unlimited cpus). least_run_host = None least_run_num = sys.maxsize for compute_resource in valid_compute_resources: if compute_resource.host_entry.name == LOCALHOST_ENTRY_NAME and compute_resource.active_subjobs == 0: return compute_resource.host_entry.name if ((compute_resource.maximum_concurrent_subjobs is UNLIMITED_CPUS or (compute_resource.maximum_concurrent_subjobs - compute_resource.active_subjobs >= required_procs)) and compute_resource.active_subjobs < least_run_num): least_run_host = compute_resource.host_entry.name least_run_num = compute_resource.active_subjobs return least_run_host