Source code for schrodinger.test.stu.run

"""
Contains `TestJCJob`, `TestSPJob`, `TestQueue`, and `Runner` classes.

`TestJCJob` is a test utility-specific subclass of queue.JobControlJob.
`TestSPJob` is a test utility-specific subclass of queue.SubprocessJob.  These
classes allow the test utility to track some information that we care about as
a job executes (for instance job duration and test_id).  `TestQueue` is a
subclass of queue.JobDJ, and allows further control on reporting.

`Runner` controls all job running parameters.  It is also responsible for
actually running the jobs and requesting their workups.  The meat is in
`Runner.__call__`.
"""
import contextlib
import datetime
import functools
import logging
import os
import platform
import shlex
import sys
import textwrap
import time
import traceback
from typing import Callable
from typing import TYPE_CHECKING
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union

import psutil

from schrodinger.job import jobcontrol
from schrodinger.job import queue
from schrodinger.utils import log
from schrodinger.utils import subprocess
from schrodinger.utils.env import EnvironmentContext

from . import common
from . import client
from . import constants
from . import process_management
from . import sysinfo
from . import testscripts
from . import workup
from .joberrors import run_postmortem

if TYPE_CHECKING:
    from . import base_executable  # circular import

logger = common.logger

# Globals
DOWNLOAD_FAILED = 'file download failed'
SCHRODINGER_LICENSE_CHECKOUTS = 'SCHRODINGER_LICENSE_CHECKOUTS'
SCHRODINGER_STU_TEST_ID = 'SCHRODINGER_STU_TEST_ID'
STDOUT_FILENAME = 'stdout.txt'


[docs]class JobDJError(RuntimeError): pass
# ****************************************************************************
[docs]class TestJob: """ Interface for the TestJCJob and TestSPJob classes """ @property def duration(self) -> Optional[int]: return self._duration @property def exit_status(self) -> str: """ Returns an exit status from the list of available Jobcontrol exit statuses. - For TestJCJobs this comes from job.ExitStatus, which will return a Jobcontrol exit status (from Job.pm in MMSHARE_EXEC) if a job object (i.e. schrodinger.job.jobcontrol.job) is found. If a job object is not found, it could be because the job is not started or it failed to launch. These special cases are handled by setting the initial exit status to "Job not started" and resetting it to "Falied to launch" if the job fails to launch (see TestJCJob.docommand). - For TestSPJobs a we limit the available exit statuses to "finished", "died", and "killed" because there is no job object (i.e. schrodinger.job.jobcontrol.job) associated with TestSPJobs. Like TestJCJobs the initial exit status is set to "Job not started" and is reset it to "Failed to launch" if the job fails to launch (see TestSPJob.docommand). """ raise NotImplementedError("Needs to be defined in job specific class") @property def test_id(self) -> int: return self._test_id @property def canceled_by_timeout(self) -> bool: """ Return True only if the job was cancelled by JobDJ, this mostly due to timeout. """ # this variable comes from BaseJob class return self.abort_job
[docs] @contextlib.contextmanager def setupTestEnvironment(self, host: str = 'localhost'): """ Set environment variables used during testing. SCHRODINGER_LICENSE_CHECKOUTS to a file in the directory: SHARED-2727 SCHRODINGER_STU_TEST_ID to current test. (SHARED-3352) Remove TOPLEVEL_HOST_ARGS for Jaguar. (SHARED-4089) """ # Only set SCHRODINGER_LICENSE_CHECKOUTS if the job runs locally. For # remote jobs, this record of licenses is not guaranteed to be # consistent. See SHARED-2727. # Also skip license testing if the test is tagged with the skip license # testing tag. if use_license_checking(self._test): # Already in the test execution directory. license_check_file = os.path.abspath(constants.LICENSE_CHECK_FILE) if os.path.exists(license_check_file): os.remove(license_check_file) else: license_check_file = None with EnvironmentContext(SCHRODINGER_STU_TEST_ID, str(self.test_id)): with EnvironmentContext(SCHRODINGER_LICENSE_CHECKOUTS, license_check_file): yield
[docs] def infoStatus(self, status: str): self._jobdj.printStatus(self, status)
[docs] def debugStatus(self, status: str): status = self._jobdj.formatStatus(self, status) queue.logger.debug(status)
[docs] def warnStatus(self, status: str): status = self._jobdj.formatStatus(self, status) queue.logger.warning(status)
# ****************************************************************************
[docs]class TestJCJob(queue.JobControlJob, TestJob): """ Like a normal JobControlJob, but:: - Ignore failures to launch. - Be aware of scriptID. - Easily access job duration and exit status. """
[docs] def __init__(self, command: List[str], command_dir: Optional[str], test_id: Union[int, str] = None, test: testscripts.TestScript = None, timeout: Optional[int] = None, runs_locally: bool = False, **kwargs): """ Overridden to add the test_id. :param command: Command to be run. :param command_dir: Directory to run in. :param test_id: Unique identifier of script. :param test: The representation of all test data for the job that is being run. :param timeout: Duration in seconds after which to kill the job. None is never. :param runs_locally: Should this job be launched on localhost (never remote hosts)? """ self._test_id = test_id """The test number in the database (or the directory name in validate mode.""" if not test_id: self._test_id = os.path.basename(command_dir) # The STU test object self._test = test # Set initial value to be pulled by exit_status if a job has not been # launched self._exit_status = constants.JOB_NOT_STARTED # Job dies after this period. self.timeout = timeout self._runs_locally = runs_locally super().__init__(command=command, command_dir=command_dir, timeout=timeout, **kwargs) self.name = kwargs.get('name', 'STU#%s' % self._test_id) self.exceptions_caught = 0
[docs] def doCommand(self, host: str = 'localhost', *args, **kwargs): """ Overridden to ignore errors. Executes the command described by self._command. The parent class has two required arguments, but has the call signature `doCommand(*args, **kwargs)`, hence its usage here. """ self.debugStatus('prelaunch') try: with self.setupTestEnvironment(): with add_jobcontrol_handler(STDOUT_FILENAME): super().doCommand(host, *args, **kwargs) except Exception as err: self.state = queue.JobState.FAILED queue.logger.exception(" Jobcontrol error message: %s" % err) with open(STDOUT_FILENAME, 'a') as stdout: stdout.write(f'{err}\n') else: with open(STDOUT_FILENAME, 'a') as stdout: if self.launch_error: stdout.write(f'{self.launch_error}\n') self._exit_status = None self.host = host
[docs] def runsLocally(self) -> bool: "Force the test to be run on the host on which the JobDJ is running." return self._runs_locally
@property def duration(self) -> Optional[int]: """ Duration of the job, according to the job record. Implemented as a property to provide consistent interface with `TestSPJob`. Also gives duration for RUNNING jobs, so as to be consistent with `TestSPJob`. """ job = self.getJob() # Still running if job and job.StartTime and not job.StopTime: current = time.time() start = time.mktime( time.strptime(job.StartTime, jobcontrol.timestamp_format)) return current - start # Not started or complete: returns None or the duration value. return self.getDuration() @property def exit_status(self) -> str: """ Exit status of the job, according to the job record. Implemented as a property to provide consistent interface with `TestSPJob`. """ # Not simply returning self._getState() because we want a consistent # exit status across jobcontrol and non jobcontrol jobs. job = self.getJob() if job and not self._exit_status: try: return job.ExitStatus except RuntimeError as err: if 'stranded' in str(err): return 'stranded' if 'exited' in str(err): return 'exited' raise else: return self._exit_status @exit_status.setter def exit_status(self, value: str): self._exit_status = value
# ***************************************************************************
[docs]class TestSPJob(queue.SubprocessJob, TestJob): """ Like a normal SubprocessJob job, but: - Ignore failures to launch. - Kill subjobs when killing this job. - Be aware of scriptID. - Access job duration and status. """
[docs] def __init__(self, command: List[str], command_dir: Optional[str] = None, test_id: Union[int, str] = None, test: testscripts.TestScript = None, timeout: Optional[int] = None, **kwargs): """ Overridden to add the test_id. :param command: Command to be run. :param command_dir: Directory to run in. :param test_id: Unique identifier of script. :param test: The representation of all test data for the job that is being run. :param timeout: Duration in seconds after which to kill the job. None is never. """ self._test_id = test_id """The test number in the database (or the directory name in validate mode.""" if not test_id: self._test_id = os.path.basename(command_dir) # The STU test object self._test = test # Set initial value to be pulled by exit_status if a job has not been # launched self._exit_status = constants.JOB_NOT_STARTED super().__init__(command=command, command_dir=command_dir, timeout=timeout, **kwargs) self.name = kwargs.get('name', 'STU#%s' % self._test_id)
[docs] def preCommand(self, *args, **kwargs): """ Overridden to open standard files for recording standard error and standard out. Also marks the start time of the job. """ super().preCommand(*args, **kwargs) # Do this AFTER super, so that the files are opened in the correct # directory self._stdout = open(STDOUT_FILENAME, 'w') self._stderr = subprocess.STDOUT self.addFinalizer(TestSPJob.cleanUp) self._start = time.time()
[docs] def doCommand(self, *args, **kwargs): """ Overridden to ignore errors. Executes the command described by self._command. """ self.debugStatus('prelaunch') try: with self.setupTestEnvironment(): super().doCommand(*args, **kwargs) except Exception as err: self.state = queue.JobState.FAILED self._exit_status = constants.FAILED_TO_LAUNCH queue.logger.exception(" Subprocess error message: %s" % err)
[docs] def update(self): """ Overridden to make errors non fatal. """ try: super().update() except Exception as err: self.state = queue.JobState.FAILED queue.logger.exception(" Subprocess error message: %s" % err)
[docs] def cleanUp(self): """ Close standard files for recording standard error and standard out. """ # Force data to be moved from OS buffer to disk. Required on Windows. if not self._stdout.closed: self._stdout.flush() os.fsync(self._stdout.fileno()) self._stdout.close()
[docs] def getStatusStrings(self) -> Tuple[str, str, str]: """ Return a tuple of status strings for printing by `JobDJ`. :return: (status, jobid, host) """ if self.hasExited(): status_string = self.exit_status else: status_string = "unknown" jobid = ' [none]' host = platform.node() return status_string, jobid, host
@property def exit_status(self) -> str: """ Exit status of the job, according to subprocess. Implemented as a property to provide consistent interface with `TestJCJob`. """ if self.state == queue.JobState.DONE: if self.abort_job: return 'killed' return queue.FINISHED elif self.state in (queue.JobState.FAILED, queue.JobState.FAILED_RETRYABLE): if self._exit_status == constants.FAILED_TO_LAUNCH: return self._exit_status else: return 'died' else: return self._exit_status
[docs] def kill(self): """First, kill children, then myself""" logger.warning(f" Killing {self.test_id}") try: test_process = psutil.Process(self._subprocess.pid) logger.warning("status of {self.test_id}: " " ".join(test_process.cmdline())) except (psutil.NoSuchProcess, psutil.AccessDenied): # Don't worry about processes that have already ended. # Windows: NoSuchProcess, Mac AccessDenied self.cleanUp() if self.state == queue.JobState.ACTIVE: self.state = queue.JobState.DONE return status = process_management.format_process_status(test_process) logger.warning(status) self._stdout.write("process status at kill point:\n" + status) self._stdout.flush() messages = process_management.kill_launched_jobcontrol_jobs( self.test_id) if messages: logger.warning(messages) self._stdout.write(messages) self._stdout.flush() process_management.kill_process_children(test_process) super().kill() try: if test_process.is_running(): test_process.kill() except (psutil.NoSuchProcess, psutil.AccessDenied): # Windows: Don't worry about processes that have already ended. pass self.cleanUp()
[docs]class TestQueue(queue.JobDJ): """ Like a normal JobDJ, but: - Print the script ID at status points - Run workups """
[docs] def __init__(self, hosts: Optional[List[Tuple[str, int]]] = None, verbosity: str = 'quiet', timeout: Optional[int] = None): update_delay = min(5, timeout or 5) super().__init__(hosts=hosts, verbosity=verbosity, max_retries=0, max_failures=queue.NOLIMIT, update_delay=update_delay, job_class=TestJCJob) self.disableSmartDistribution() self.timeout = timeout
[docs] def printStatus(self, job: Optional[TestJob] = None, action: Optional[str] = None): if job is None: self.printHeader() return status = self.formatStatus(job, action) if action in ('launched', 'started'): queue.logger.warning(status) else: queue.logger.info(status)
[docs] def printHeader(self): """Print the header""" nspaces = len(str(self.total_added)) spaces = '-' * nspaces host_text, host_blanks = '', '' if self._original_verbosity != 'quiet': host_text = ' Host' host_blanks = ' -------------------------------' timestamp_text, timestamp_blanks = '', '' if self._original_verbosity == 'verbose': timestamp_text = ' ' * 28 + 'Time' timestamp_blanks = ' ' + '-' * 26 header = """ STU columns: C: Number of completed tests A: Number of active tests (e.g., submitted, running) W: Number of waiting/pending tests """ logger.info(textwrap.dedent(header)) logger.warning("%*s %*s %*s | Activity Test# JobId%s%s" % (nspaces, 'C', nspaces, 'A', nspaces, 'W', host_text, timestamp_text)) logger.info( "%*s %*s %*s | ----------------- ----- -------------------------%s%s" % (nspaces, spaces, nspaces, spaces, nspaces, spaces, host_blanks, timestamp_blanks)) return textwrap.dedent(header)
[docs] def formatStatus(self, job: Optional[TestJob] = None, action: Optional[str] = None): """ Override to print script ID. :param job: `queue.BaseJob` object of interest. :param action: Status to be printed. """ # Had to copy WAY too much of this function. Ugly. # Adjust spacing is adjusted for the total number of jobs. # nspaces is the width of the 'jobs' column: nspaces==1 when #jobs is # 1-9, 2 when #jobs is 10-99, 3 when #jobs is 100-999, etc. nspaces = len(str(self.total_added)) status_string, jobid, host = job.getStatusStrings() if action is not None: status_string = action num_active = len(self._running) num_completed = len(self._finished) + self.num_failed_jobs num_waiting = self.total_added - num_active - num_completed timestamp_text = '' if self._verbosity == 'quiet': host = '' if self._verbosity == 'verbose': now = datetime.datetime.now() timestamp_text = now.strftime(' %Y-%m-%d %H:%M:%S.') timestamp_text += now.strftime('%f')[:3] status = ( "%*d %*d %*d | %-17s %4s %-24s %-31s%s" % (nspaces, num_completed, nspaces, num_active, nspaces, num_waiting, status_string, job.test_id, jobid, host, timestamp_text)) return status
def _setLoggerVerbosity(self): if self._verbosity == "quiet": queue.logger.setLevel(log.WARNING) elif self._verbosity == "normal": queue.logger.setLevel(log.INFO) elif self._verbosity == "verbose": queue.logger.setLevel(log.DEBUG) elif self._verbosity == "silent": queue.logger.setLevel(log.CRITICAL) else: self._verbosity = "quiet" queue.logger.setLevel(log.WARNING) def _start(self, **kwargs): try: self._original_verbosity = self._verbosity self._verbosity = "quiet" super()._start(**kwargs) finally: self._verbosity = self._original_verbosity self._setLoggerVerbosity()
[docs] def addJob(self, job: TestJob, add_connected: bool = True, timeout: Optional[int] = None, **kwargs): if timeout is None: timeout = self.timeout if isinstance(job, TestJob): job.timeout = self.timeout return super().addJob(job, add_connected, timeout=timeout, **kwargs)
def _jobFailed(self, job: TestJob): super()._jobFailed(job) try: job.cleanUp() except AttributeError: pass
# ****************************************************************************
[docs]class Runner: """ Runner controls all job running parameters within the backend test utility code. It is also responsible for actually running the jobs and requesting their workups. The meat is in `Runner.addScript` and `Runner.__call__`. """ # ************************************************************************
[docs] def __init__(self, ui: "base_executable.TestUtility"): """ Initialize Runner Class :param ui: Contains information about the user interface (i.e. the command line arguments) """ self.additionalArgs = ui.additionalArgs self.verbosity = ui.verbosity self.ncpu = ui.ncpu self.postmortem = getattr(ui, 'postmortem', False) self.timeout = getattr(ui, 'timeout', None) self.reporter = getattr(ui, 'reporter', None) # keys off test_id, values are testscripts.TestScript self.tests = {} self.job_runner = TestQueue(verbosity=ui.verbosity, timeout=self.timeout) self.job_runner.disableSmartDistribution() self.xvfb_cmd = None
# ************************************************************************
[docs] def addScript(self, test: testscripts.TestScript): """ Add test to be executed by self.__call__. Adds the test information to self.job_runner. :param test: Test to be executed. """ test_id = test.id try: # Should replace this with the attribute of the test, but be # careful with validate mode. test_id = int(test_id) except ValueError: test_id = test_id or test.directory # This protects against tests that do not have an automate command cmd = test.command if not cmd: logger.warning('Test "%s" has no command, skipping' % test_id) return if not test.directory and not test_id: raise TypeError("I don't know where to run this job (no test " "directory or test id") self.tests[test_id] = test jobdir = test.directory or str(test_id) jobdir = os.path.join(os.getcwd(), jobdir) # Add additional args to the cmd cmd += ' ' + self.additionalArgs # use shlex for shell like parsing # makes for standard space/quote protection cmd_list = [] # initial list for prepped commands to be added to # make tests that require display to run with xvfb on Linux if "require:display" in test.tags and sys.platform.startswith( "linux") and 'DISPLAY' not in os.environ: # Determine the appropriate xvfb invocation (see get_xvfb_cmd for # an explanation) and cache it on first use because the version of # xvfb is unlikely to change between commands. if not self.xvfb_cmd: self.xvfb_cmd = get_xvfb_cmd() cmd_list = self.xvfb_cmd + cmd_list try: for i_cmd in shlex.split(cmd): # Replace keywords in cmd with run specific values i_cmd = i_cmd.replace("${NCPU}", str(self.ncpu)) i_cmd = i_cmd.replace("${HOST}", sysinfo.REMOTE.host) i_cmd = i_cmd.replace("${CWD}", jobdir) i_cmd = i_cmd.replace("${SHARED}", os.path.join(os.getcwd(), "shared")) i_cmd = i_cmd.replace('$SCHRODINGER', os.getenv('SCHRODINGER')) i_cmd = i_cmd.replace('${SCHRODINGER}', os.getenv('SCHRODINGER')) # Add element of cmd to list which will be passed to JC or SP cmd_list.append(i_cmd) except Exception: logger.exception( f"WARNING: Failed to parse command string for {test_id}") # amend the command line for jobs that need DRIVERHOST to be specified # (this will only be done when a remote host is specified) if "require:driverhost" in test.tags: cmd_list.append("-DRIVERHOST") cmd_list.append(sysinfo.REMOTE.host) # Need ${SLASH} keyword to workaround QA-618 cmd_list = [c.replace("${SLASH}", "/") for c in cmd_list] if test.useJC(): runs_locally = not test.runsRemotely() jobdict = dict(command_dir=jobdir, test_id=test_id, test=test, runs_locally=runs_locally) if "${NCPU}" not in cmd: jobdict['procs'] = self.ncpu job = TestJCJob(cmd_list, **jobdict) else: job = TestSPJob(cmd_list, command_dir=jobdir, test_id=test_id, test=test) self.job_runner.addJob(job) return job
# ************************************************************************ def __call__(self) -> bool: """ Execute and run workups on all tests added to instance of Runner object. Outcomes stored in the TestScript objects in self.tests. This needs to be separate from the extraction step so that we can validate new tests. """ if not self.job_runner.total_added: return True logger.debug("Executing tests.") self.problems = [] workups = workup.discover_workups() try: self.job_runner.run(status_change_callback=functools.partial( self.jobStatusChange, workups)) except Exception as err: msg = 'ERROR: Failure in executing jobs, Killing jobs: %s' % err logger.exception(msg) self.job_runner.killJobs() raise JobDJError(msg) if self.problems: msg = ('ERROR: Critical problems during test execution:\n %s' % '\n '.join(self.problems)) raise JobDJError(msg) return True
[docs] def jobStatusChange(self, workups: Dict[str, Callable], job: TestJob): if job.state in { queue.JobState.FAILED, queue.JobState.FAILED_RETRYABLE }: if job.launch_error: job._exit_status = constants.FAILED_TO_LAUNCH with open(os.path.join(job.getCommandDir(), STDOUT_FILENAME), 'a') as stdout: stdout.write(f'{job.launch_error}\n') elif job.state == queue.JobState.ACTIVE: job._exit_status = None if not job.hasExited() or self.tests[job.test_id].outcome is not None: return test = self.tests[job.test_id] _update_test_script_on_job_completion(test, job) self.problems.extend( run_workup(test, job, workups, self.postmortem, self.reporter))
def _update_test_script_on_job_completion(test: testscripts.TestScript, job: TestJob): """ After running a job, the test object needs to be updated to include additional information. """ # Timing is -1 to distinguish between jobs that round to zero time, and # this value is serialized to DB as an integer if job.duration is None: test.timing = -1 else: test.timing = job.duration test.exit_status = job.exit_status
[docs]def run_workup(test: testscripts.TestScript, job: TestJob, workups: Dict[str, Callable], generate_postmortem: bool, reporter: Optional[client.ResultReporter]) -> List[str]: """ Run workup code and report results. If an upload failed or an unexpected error occurred (a workup failure is considered expected, since it can be reporter upon), then return a list of str error messages. """ problems = [] # Run workup and record results try: job.debugStatus('start workup') test.runWorkup(job, registered_workups=workups) if test.outcome: job.infoStatus('workup succeeded') else: job.warnStatus('workup failed') # This should be protected in workup_outcome, instead. except Exception as err: msg = 'Failure in workup: %s' % err logger.exception(msg) job.infoStatus('FAILURE EXECUTING WORKUP') test.outcome = False if not test.workup_messages: test.workup_messages = msg finally: jc_exit_status = constants.JC_outcome_codes.get(test.exit_status, False) # If the exit status was unexpected, run a postmortem. # Also, all KNIME tests with a failing workup should # include a postmortem. if generate_postmortem and not test.outcome: if not jc_exit_status or test.product == 'KNIME': try: # Set SCHRODINGER_STU_TEST_ID so that this gets reported # from errors within postmortem filestore connections. with EnvironmentContext(SCHRODINGER_STU_TEST_ID, str(job.test_id)): run_postmortem(job, test.product) except Exception: job.warnStatus("failed to run postmortem") msg = f"Reporting failed for test {job.test_id}" logger.exception(msg) problems.append(f"{msg}: {traceback.format_exc()}") if reporter: try: reporter.report(test) job.debugStatus('reported result') except Exception as err: job.warnStatus('failed to report result') msg = 'Reporting failed for test %s' % job.test_id logger.exception(msg) print(msg + ': ' + str(err)) problems.append(msg + ': ' + str(err)) return problems
[docs]def get_xvfb_cmd() -> List[str]: """ xvfb-run needs the -a (auto server number) option, except on CentOS 7 (and possibly other OSes) where that option is superceded by the -d (auto display) option. As a bonus, the long option --auto-display is shown in the help on CentOS 7 but does not actually work. Furthermore, xvfb-run does not have a --version flag which could be used to reason about supported options. To determine what flags are supported, try to run a no-op command with `xvfb-run -d`, and if that fails, use -a. """ xvfb_cmd = ['xvfb-run', '-d'] try: subprocess.run( xvfb_cmd + [':'], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) except subprocess.CalledProcessError: xvfb_cmd = ['xvfb-run', '-a'] return xvfb_cmd
[docs]def use_license_checking(test: testscripts.TestScript) -> bool: """ Return True if license checking should be enabled for this test run. """ # in production test is not None, but is sometimes None in unittest if not test: return True elif constants.SKIP_LICENSE_TAG in test.tags: return False elif "require:specific_host" in test.tags: return False elif sysinfo.REMOTE.name != sysinfo.LOCAL.name: return False return True
[docs]@contextlib.contextmanager def add_jobcontrol_handler(filename: str): """ Within a context, modify jobcontrol logger to log messages to a particular file. :param filename: name of file to log jobcontrol launch messages to """ handler = logging.FileHandler(filename) original_level = jobcontrol.logger.getEffectiveLevel() jobcontrol.logger.setLevel(logging.INFO) jobcontrol.logger.addHandler(handler) try: yield finally: jobcontrol.logger.removeHandler(handler) jobcontrol.logger.setLevel(original_level) handler.close()