Source code for schrodinger.test.scival

"""
Tools used by SciVal test writers. Of most common use will be'
`JobControlJob` and `Reporter`. Every group of SciVal tests should
probably have a `Reporter`.
"""

import inspect
import os
import signal
import sys
import textwrap
import time
import traceback
from abc import ABC
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Union

import _pytest.runner
import pytest

from schrodinger.job import jobcontrol
from schrodinger.job import queue
from schrodinger.test.performance import reporter
from schrodinger.utils import fileutils


[docs]class JobControlJob(queue.JobControlJob):
[docs] def __init__(self, command: List[str], suite: str, name: str, command_dir: Optional[str] = None, max_retries: Optional[int] = None, launch_env_variables: Optional[Dict[str, str]] = None, **kwargs): """ Initialize a job object for SciVal jobs, which also includes suite and name attributes. :param command: commandline for launching job :param suite: name of scival suite :param name: job name, for annotation for JobDJ before job is running :param command_dir: directory to launch job from, default to CWD :param max_retries: Number of times to retry failed jobs, default is globally controlled by JobDJ. :param launch_env_variabes: Extra environment variables to add to existing environment for job launching. """ super().__init__(command, command_dir=command_dir, name=name, max_retries=max_retries, launch_env_variables=launch_env_variables, **kwargs) self.suite = suite
[docs] def kill(self, *args, **kwargs): start_time = getattr(self._job_obj, 'LaunchTime') or getattr( self._job_obj, 'StartTime', None) start = time.mktime( time.strptime(start_time, jobcontrol.timestamp_format)) current = time.time() elapsed = current - start self.elapsed_at_timeout = elapsed print(f" Killing {self} after {elapsed}s") super().kill(*args, **kwargs)
def __repr__(self): return f'<scival.JobControlJob {self.suite}/{self.name}>'
[docs]class Reporter(ABC): """ Report results to the performance database when all jobs are complete. Based on queue._MultiJobFinalizer. """ # These must be supplied by child classes NAME = None PRODUCT = None DESCRIPTION = None
[docs] def __init__(self, buildtype: Optional[str] = None, build_id: Optional[str] = None): """ :param buildtype: NB or OB, or None if reporting not enabled :param build_id: build-XXX or None if reporting not enabled """ self.reporter = None if not all((self.NAME, self.PRODUCT, self.DESCRIPTION)): msg = 'NAME, PRODUCT, and DESCRIPTION must be defined' raise NotImplementedError(msg) if self.analyze == Reporter.analyze: msg = 'analyze() must be implemented' raise NotImplementedError(msg) if buildtype and build_id: self.reporter = reporter.create_performance_test_reporter( name=self.NAME, product=self.PRODUCT, description=self.DESCRIPTION, scival=True, upload=True) else: self.reporter = None self.build_id = build_id self.buildtype = buildtype self._running_jobs = set() self.tests = set()
def __call__(self, job: queue.BaseJob): """Call analyze() once all jobs have completed.""" self._running_jobs.remove(job) if self._running_jobs: return print(f"Analyzing results for {self.NAME} in {self.PRODUCT}") try: self.analyze(self.tests) except Exception: # catch all exceptions, because we can't know what exceptions # some user-written analyze function may throw. Logs the exception # to the same place that the jobdj is running queue.logger.exception( f"Exception while analyzing results for {self.NAME}") # Add the suites that the jobs were part of to the list of # suites that reporting failed on. (this is expected to add one # suite to one jobdj) msg = traceback.format_exc() for job in self.tests: job._jobdj.failed_reports[job] = msg return if self.reporter: self.reporter.report(build_id=self.build_id, buildtype=self.buildtype)
[docs] def addJob(self, job: JobControlJob): """Add a job to the reporter.""" self._running_jobs.add(job) self.tests.add(job) if not hasattr(job, 'PRODUCT'): setattr(job, 'PRODUCT', self.PRODUCT) if not hasattr(job, 'LONG_SUITE_NAME'): setattr(job, 'LONG_SUITE_NAME', self.NAME) job.addFinalizer(self)
[docs] def addAsFinalizer(self, job: queue.BaseJob): return self.addJob(job)
[docs] def addResult(self, name: str, value: Union[int, float], units: Optional[str] = None): """ Print the result. And report them when running under automated build either as NB or OB. :param name: name of metric to report :param value: value of metric :param units: Optional units for value """ reporter.validate_types(name, value, units) if self.reporter: print(f"Reporting to STU metric {name}: {value}{units or ''}") self.reporter.addResult(name, value, units) else: print( f"Found metric {name}: {value}{units or ''} but not reporting to STU database." )
[docs] def analyze(self, jobs: List[JobControlJob]): """ If all jobs completed, loop over self.tests and use self.addResult() to generate data to be reported. """ raise NotImplementedError('Required')
class _KillOnError: """ Context manager to kill all jobs in a jobdj if an error is encountered. """ def __init__(self, jobdj: queue.JobDJ): self.jobdj = jobdj self._original_handler = None def __enter__(self): self._original_handler = signal.signal(signal.SIGINT, self._getHandler()) def __exit__(self, exc_type, exc_value, trb): signal.signal(signal.SIGINT, self._original_handler) if exc_value: print(f"Encountered {exc_type} Killing jobs: {exc_value}") print(traceback.format_exc()) self._kill(exc_type) def _getHandler(self): def handler(signum, frame): msg = ''.join(traceback.format_stack(frame)) self._kill(f'signal {signum} at {msg}') sys.exit(signum) return handler def _kill(self, msg: str): """Kill all jobs, with timeout set to 5 minutes.""" print("Encountered %s Killing jobs" % msg) self.jobdj.killJobs()
[docs]class JobDJ(queue.JobDJ):
[docs] def __init__(self, *args, **kwargs): kwargs['job_class'] = kwargs.pop('job_class', JobControlJob) self.timeout = kwargs.pop('timeout', 60 * 60 * 24) super().__init__(*args, **kwargs) self.failed_reports = {} self.killed_jobs = set() self._command_dir = None self.pytest_item = None self.pytest_items = {}
[docs] def run(self): with _KillOnError(self): super().run(restart_failed=False)
[docs] def printStatus(self, job: Optional[queue.BaseJob] = None, action: Optional[str] = None): """ Override base printStatus. Change formatting, replace printing the jobname with printing the test name (usually the same, but...), add printing of suite. :param job: job object to print the status of, otherwise prints general status :param action: custom string to print as status of a job, otherwise use standard status """ m = len(str(self.total_added)) or 1 activity_len = 10 jobhost_len = max( [len(x[0]) for x in self._hosts] + [len('localhost'), len('pdx-desk-l12')]) jobid_len = jobhost_len + 12 name_len = lambda j: len(getattr(j, 'name', '')) testname_len = max(map(name_len, self.all_jobs)) or 1 suite_len = lambda j: len(getattr(j, 'suite', '')) suite_len = max(map(suite_len, self.all_jobs)) or 1 fmt = '{c:>{m}} {a:>{m}} {w:>{m}} | {activity:{activity_len}} {jobid:{jobid_len}} {suite:{suite_len}} {testname:{testname_len}} {jobhost:{jobhost_len}}' base_fmt_dict = dict(m=m, activity_len=activity_len, jobid_len=jobid_len, suite_len=suite_len, testname_len=testname_len, jobhost_len=jobhost_len) if job is None: # Print the header s = "-" * m header = """ JobDJ columns: C: Number of completed subjobs A: Number of active subjobs (e.g., submitted, running) W: Number of waiting/pending subjobs """ queue.logger.info(textwrap.dedent(header)) queue.logger.warning( fmt.format(c='C', a='A', w='W', activity='Activity', jobid='JobId', suite='Suite', testname='Test Name', jobhost='Job Host', **base_fmt_dict)) queue.logger.warning( fmt.format(c=s, a=s, w=s, activity='-' * activity_len, jobid='-' * jobid_len, suite='-' * suite_len, testname='-' * testname_len, jobhost='-' * jobhost_len, **base_fmt_dict)) else: status_string, jobid, host = job.getStatusStrings() if action is not None: status_string = action num_active = len(self._running) num_completed = len(self._finished) + len(self._failed) + len( self.killed_jobs) num_waiting = self.total_added - num_active - num_completed queue.logger.warning( fmt.format(c=num_completed, a=num_active, w=num_waiting, activity=status_string or '', jobid=jobid or '(none)', suite=getattr(job, 'suite', ''), testname=getattr(job, 'name', ''), jobhost=host or '(none)', **base_fmt_dict)) sys.stdout.flush()
[docs] def addJob(self, job: JobControlJob, add_connected: bool = False, **kwargs): kwargs['timeout'] = kwargs.pop('timeout', self.timeout) if not isinstance(job, list) and not (hasattr(job, 'name') and hasattr(job, 'suite')): raise TypeError('SciVal jobs are required to have the "name" and ' 'suite attributes.') if isinstance(job, queue.BaseJob) and job.timeout is None: job.timeout = self.timeout return super().addJob(job, add_connected=add_connected, **kwargs)
[docs] def addTests(self, item: pytest.Item, build_id: Optional[str] = None, buildtype: Optional[str] = None): """ Call the add_tests function in the Pytest Item. :param item: Current node that represents an add_tests function :param build_id: build-XXX matching current build, can be None if not reporting :param buildtype: NB or OB, can be None if not reporting """ exec_dir = self.setItem(item) try: add_tests = _assert_any_jobs_added(item.function, self) with fileutils.chdir(exec_dir): call_and_report(item, 'setup', add_tests, self, build_id, buildtype, exec_dir) finally: self.clearItem()
def _queueJob(self, job: queue.BaseJob): """ Add a job to a queue heap. """ if job not in self._added_jobs_set: if self._command_dir and not job.getCommandDir(): job._command_dir = self._command_dir self.pytest_items[self.pytest_item].append(job) super()._queueJob(job)
[docs] def setItem(self, pytest_item: pytest.Item): """ Set the current Pytest Item. Jobs added to the jobdj while the pytest_item is set to pytest_item will be associated with it for error reporting &c. """ self.pytest_item = pytest_item self.pytest_items[self.pytest_item] = [] self._command_dir = os.path.normpath( os.path.abspath(os.path.dirname(pytest_item.nodeid))) return self.getCommandDir()
[docs] def clearItem(self): """Clear the current Pytest Item.""" self.pytest_item = None self._command_dir = None
[docs] def getCommandDir(self) -> Optional[str]: """ The default job launching directory (matches BaseJob interface) :return: directory of job launching, or None if CWD """ return self._command_dir
def _jobFinished(self, job: queue.BaseJob): """ Customize behavoir for killed jobs. Skips finalize (so test reporting isn't attempted for killed jobs), and kills all other jobs from the same suite. Also tracks killed jobs separately from completed jobs. """ if job.abort_job: self.killed_jobs.add(job) self.killAllJobsForSuite(job.suite) else: super()._jobFinished(job)
[docs] def killAllJobsForSuite(self, suite: str): """ Kill all running jobs using the named suite, remove all waiting jobs for with the named suite. """ for job in self.active_jobs: if job.suite == suite: job.kill() to_remove = set() for job in self._jobqueue: if job.suite == suite: to_remove.add(job) for job in to_remove: self._jobqueue.remove(job)
[docs]def call_and_report(item: pytest.Item, when: str, fxn: Callable, *args): """ Execute a function and report its result to pytest. A slightly modified version from _pytest.runner to deal with functions that don't return the word "passed". """ item._skipped_by_mark = False def inner(): fxn(*args) return 'passed' hook = item.ihook hook.pytest_runtest_logstart(nodeid=item.nodeid, location=item.location) call = _pytest.runner.CallInfo.from_call(inner, when=when) report = hook.pytest_runtest_makereport(item=item, call=call) hook.pytest_runtest_logreport(report=report)
def _assert_any_jobs_added(add_tests: Callable, jobdj: JobDJ): """Assert that the add_jobs function adds jobs to the jobdj""" def inner(*args, **kwargs): initial_job_count = len(jobdj.all_jobs) rvalue = add_tests(*args, **kwargs) final_job_count = len(jobdj.all_jobs) if final_job_count == initial_job_count: name = inspect.getsourcefile(add_tests) name += ':' + add_tests.__name__ + '()' raise AssertionError( f"Expected {name} to add jobs to the jobdj, but it did not. " "Double check that it is working as intended") return rvalue return inner