Source code for schrodinger.test.stu.testscripts

"""
Contains the class `TestScript`, which provides methods to create, modify,
extract and delete a test script. See also the convenience function
l{getTestFromDir}, which can be used to read a test from a directory.

"""

import datetime
import inspect
import numbers
import os
import re
import shutil

from . import common
from . import constants
from . import sysinfo
from . import workup
from .outcomes.failures import READMESyntaxError

logger = common.logger

EXECUTED_DIR = re.compile(r'^(.+)_run_\d\d\d\d-\d\d-\d\d')

PATH_ENCODING = 'utf-8'


[docs]def get_test_id(directory): """ Get the test ID based on the directory name. Also guesses whether the test has already been run. :rtype: tuple(int/str, bool) :return: (TestID, Was the test executed?) """ basename = os.path.basename(directory) if basename.startswith('stu_'): basename = basename[4:] was_run = EXECUTED_DIR.match(basename) if was_run: try: return int(was_run.group(1)), True except ValueError: return was_run.group(1), True try: return int(basename), False except ValueError: return basename, False
[docs]def getTestFromDir(username, directory, filename='README'): """ Read test information from a file. """ substitution_files = TestScript.find_substitution_files(directory) data = TestScript.read_readme(os.path.join(directory, filename)) data['directory'] = directory data['number'], data['executed'] = get_test_id(directory) data['substitution_files'] = substitution_files data['creator'] = data.pop('created_by', username) test = TestScript._getTestFromDict(data) return test
[docs]class TestScript:
[docs] def __init__( self, product, priority, description, command, workup, build_modified=None, creator=None, number=None, directory=None, question='', product_subfeature=None, mpi_enabled=None, allowed_cpu="", jira_tracking=None, disabled_for_bug=False, unsupported_platforms=None, shared_files=tuple(), # noqa: M511 substitution_files=tuple(), # noqa: M511 useJC=True, resource_uri=None, download=None, upload=None, tags=None, minimum_version=None, maximum_version=None, executed=False, **kwargs): self.id = number "Test number" self.product = product self.product_subfeature = product_subfeature self.priority = priority self.description = description self.question = question self.directory = directory self.expect_job_failure = False self.tags = tags or [] # Execution limitations self.mpi_enabled = mpi_enabled self.allowed_cpu = allowed_cpu if self.allowed_cpu is True: self.allowed_cpu = '1' if isinstance(self.allowed_cpu, numbers.Number): self.allowed_cpu = str(self.allowed_cpu) self.unsupported_platforms = unsupported_platforms or [] self.disabled_for_bug = disabled_for_bug self.jira_tracking = jira_tracking self.minimum_version = minimum_version self.maximum_version = maximum_version # Script technicals self.build_modified = build_modified or sysinfo.LOCAL.mmshare self.creator = creator self.shared_files = [ name for name in shared_files if name.lower() != 'none' ] # remove trailing path separators from shared files/directories self.shared_files = [f.rstrip('/\\') for f in self.shared_files] self.substitution_files = [ name for name in substitution_files if name.lower() != 'none' ] # Automation options self._useJC = useJC # None can be returned from DB and means default, which is True if self._useJC is None: self._useJC = True self.command = command self.workupstr = workup if self.workupstr: try: compile(self.workupstr, str(self), 'eval') except TypeError as e: raise READMESyntaxError( f'Test {self} workup causes TypeError: {e}') except SyntaxError as e: raise READMESyntaxError( f"Workup is not valid for test {self}\n\n" f"Workup String: {self.workupstr} \n\nError: {e}") # Add remote installation tag if it's needed and not present. if 'pdxgpu' in self.command: if 'require:pdxgpu_install' not in self.tags: self.tags.append('require:pdxgpu_install') if 'host bolt' in self.command.lower(): if 'require:bolt_install' not in self.tags: self.tags.append('require:bolt_install') # expect_job_failure must be handled explicitly if 'expect_job_failure' in self.workupstr: self.expect_job_failure = True # Simplify future communication with the web service. self.resource_uri = resource_uri self.download = download self.upload = upload # The following will be filled in if/when the script is run. self.executed = executed self.outcome = None "bool : Success or Failure of the test" self.workup_messages = '' "str : Messages from the workup" self.failure_type = '' "str : Failure category as per SHARED-3037" self.timing = 0 "float : how long the test took to run (s)" self.exit_status = constants.JOB_NOT_STARTED "str : exit status" self.validate(**kwargs)
@classmethod def _getTestFromDict(cls, data): """ Creates a test object using a dictionary keyed on the names used in the repo and DB. """ testdata = data.copy() # These variables have historically had a different internal and # external name. The long term solution is to make these names match. external_to_internal = dict(use_JC='useJC', automate_cmd='command', outcome_workup='workup', priority_level='priority') for external, internal in external_to_internal.items(): if external in testdata: testdata[internal] = testdata.pop(external) try: test = cls(**testdata) except TypeError: argspec = inspect.getfullargspec(cls.__init__) # remove self and the args that have default values required_args = argspec.args[1:-len(argspec.defaults)] required_args = set(required_args) unfulfilled_args = required_args - set(testdata) if not unfulfilled_args: # Something else went wrong. raise msg = 'Missing required README fields: ' msg += ', '.join(unfulfilled_args) if 'workup' in msg: # Basically, we want to encourage people to write a workup, # but in some situations the exit status really is a sufficient # check. msg += (' workup can be blank, but the field must be present ' 'in the README.') # Add information about the test to the error message. if 'product' in data and 'number' in data: msg += f' {data["product"]} test {data["number"]}.' raise READMESyntaxError(msg) return test # ************************************************************************
[docs] @classmethod def read_readme(cls, readme): """ Read README and extract script information. Format is keyword=value pairs. Also does limited boolean parsing. """ readme_data = {} try: inh = open(readme) except: logger.critical("WARNING: Failed to read the %s." % readme) return {} for line in inh: # Strip newlines line = line.replace('\r', '') # discard '\r' characters (shouldn't be necessary). line = line.strip() # Recognize blank lines if not line: continue # Read data for line data = line.split('=', 1) if len(data) < 2: logger.warning("WARNING: Failed to read line in , \"%s\"\n%s" % (readme, line)) return {} keyword = data[0].strip() value = data[1].strip() # If possible, convert the field to a bool or an int. If this is # not possible, we want a str anyway. if value.lower() in ["yes", "true", "1"]: value = True elif value.lower() in ["no", "false", "0"]: value = False else: try: value = int(value) except ValueError: pass if keyword in readme_data: logger.info("WARNING: keyword, %s, exists multiple times in ," "%s. Using first occurrence in file." % (keyword, readme)) else: readme_data[keyword] = value inh.close() # Check "unsupported_plats" for compatibility with old READMEs. if 'unsupported_plats' in readme_data: readme_data["unsupported_platforms"] = readme_data.pop( "unsupported_plats") for key in ('unsupported_platforms', 'shared_files', 'tags'): if key in readme_data: readme_data[key] = re.split(r"\s*,\s*", readme_data[key]) return readme_data
[docs] def write_readme(self, fileobj=None): """ Print or return the README data. """ self.validate() rm = [ 'product = %s\n' % self.product, 'priority = %s\n' % self.priority, 'description = %s\n' % self.description, 'command = %s\n' % self.command, 'workup = %s\n' % self.workupstr, 'created_by = %s\n' % self.creator ] if self.question: rm.append('question = %s\n' % self.question) if self.product_subfeature: rm.append('product_subfeature = %s\n' % self.product_subfeature) if self.shared_files: rm.append('shared_files = %s\n' % ', '.join(self.shared_files)) if self.mpi_enabled: rm.append('mpi_enabled = %s\n' % self.mpi_enabled) if self.unsupported_platforms: rm.append('unsupported_platforms = %s\n' % ', '.join(self.unsupported_platforms)) if self._useJC is not None: rm.append('useJC = %s\n' % self._useJC) if self.disabled_for_bug: rm.append('disabled_for_bug = %s\n' % self.disabled_for_bug) if self.jira_tracking: rm.append('jira_tracking = %s\n' % self.jira_tracking) if self.allowed_cpu: rm.append('allowed_cpu = %s\n' % self.allowed_cpu) if self.tags: rm.append('tags = %s\n' % ', '.join(self.tags)) if self.minimum_version: rm.append('minimum_version = %d\n' % self.minimum_version) if self.maximum_version: rm.append('maximum_version = %d\n' % self.maximum_version) if not fileobj: return '\n'.join(rm) if isinstance(fileobj, str): with open(fileobj, 'w') as fileobj: fileobj.writelines(rm)
[docs] def validate(self, **kwargs): """ Validate the data stored in a TestScript object. Should be done when instantiating one or dumping it to file. """ if kwargs: extra = set(kwargs) - { 'date_created', 'date_modified', 'modifier', 'component', 'interested_users', 'test_directory' } if extra: raise READMESyntaxError('Unrecognized fields in README: ' f'{extra} for {self}') for tag in self.tags: if ':' in tag and not tag.startswith('require:'): raise READMESyntaxError( f'":" is not allowed in tag names (found "{tag}") {self}') if self.minimum_version: if not (isinstance(self.minimum_version, int) and len(str(self.minimum_version)) == 5): raise READMESyntaxError( 'The minimum version needs to be a 5 digit integer ' f'(not {self.minimum_version}) {self}') if self.maximum_version: if not (isinstance(self.maximum_version, int) and len(str(self.maximum_version)) == 5): raise READMESyntaxError( 'The maximum version needs to be a 5 digit integer ' f'(not {self.maximum_version}) {self}') if self.disabled_for_bug and not self.jira_tracking: raise READMESyntaxError( 'If you are disabling a test with "disabled_for_bug", you ' 'must provide a JIRA ID and explanation in the ' f'"jira_tracking" field. {self}') if not self.description.startswith('SciVal'): # SciVal STU tests are not actually run by STU, so they don't need # this validation. validate_command_for_host(self.command, self.tags, self, self.product) if '-LOCAL' in self.command: raise READMESyntaxError( '-LOCAL is not allowed STU command argument') if self.useJC(): if '-NOJOBID' in self.command: raise READMESyntaxError( '-NOJOBID argument is not allowed for STU tests with useJC=True' ) elif '-WAIT' in self.command: raise READMESyntaxError( '-WAIT argument is not allowed for STU tests with useJC=True' )
# ************************************************************************
[docs] def runWorkup(self, job=None, registered_workups=None): """Run my workup.""" if job: directory = job.getCommandDir() else: directory = self.directory workup.workup_outcome(self, directory, registered_workups=registered_workups, job_dj_job=job) return self.outcome
# ************************************************************************
[docs] def getNewExecuteDirectory(self, attempts=120): """Get a new directory name.""" if self.id: name = f'stu_{self.id}' basedir = os.getcwd() else: directory = self.original_directory or self.directory basedir, name = os.path.split(directory) exedir_format = "{name}_run_{date:%Y-%m-%d}_{{index:0=3}}" exedir_format = exedir_format.format(name=name, date=datetime.datetime.now()) exedir_format = os.path.join(basedir, exedir_format) for i in range(attempts): path = exedir_format.format(index=i) if not os.path.exists(path): return path else: raise Exception( f'No unique new directory found after {attempts} attempts.')
# ************************************************************************
[docs] def copyToScratch(self): """Copy files to a scratch folder.""" self.original_directory = self.directory self.directory = self.getNewExecuteDirectory() try: shutil.copytree(self.original_directory, self.directory) except Exception as err: # Ignore errors on Darwin, see QA-646 if not sysinfo.LOCAL.isDarwin: logger.debug('WARNING: Failed to copy "%s" to "%s"' % (self.original_directory, self.directory)) import traceback logger.debug(traceback.format_exc()) logger.debug(err) return False return True
# ************************************************************************
[docs] def recoverFromScratch(self, get_license=True): """ Remove scratch folder and prepare to add/modify the test. """ if get_license: self._getReferenceLicense() shutil.rmtree(self.directory) self.directory = self.original_directory
# ************************************************************************ def _getReferenceLicense(self): """ Get the license file created in the scratch directory and move it to the test directory. Assumes both self.directory and self.original_directory are set, should be run from `recoverFromScratch`. """ license = os.path.join(self.directory, constants.LICENSE_CHECK_FILE) reference = os.path.join(self.original_directory, constants.REF_LICENSE_CHECK_FILE) if os.path.isfile(license) and not os.path.isfile(reference): logger.info('Creating %s for %s' % (constants.REF_LICENSE_CHECK_FILE, self.id)) shutil.copy(license, reference) # ************************************************************************
[docs] @classmethod def find_substitution_files(cls, directory): """ Run through all files in `directory` and look for the string "${SHARED} or ${CWD}". Only text files need to be processed. """ subfiles = [] for root, dirs, files in os.walk(directory): for filename in files: if filename == 'README' or filename.endswith('gz'): continue newpath = os.path.join(root, filename).encode(PATH_ENCODING) with open(newpath, 'rb') as inh: for line in inh: if b'${SHARED}' in line or b'${CWD}' in line: newpath = newpath.replace( directory.encode(PATH_ENCODING), b"", 1).decode(PATH_ENCODING) # Workaround for residual / character that may or # may not be found if newpath[0] == "/" or newpath[0] == "\\": newpath = newpath.replace("/", "", 1) newpath = newpath.replace("\\", "", 1) logger.debug(f"{newpath} requires substitution " f"(test {directory}.") subfiles.append(newpath) break if subfiles == []: logger.debug("DEBUG: ${SHARED} or ${CWD} not found in any files " "in \"%s\"" % (directory)) return subfiles
# *************************************************************************
[docs] def substituteFiles(self): """ Replace substitution expressions in files that require it. Requires that the substitution files already be identified. """ try: for filename in self.substitution_files: self._substituteInPlace(filename) except Exception as err: logger.exception("Failed to substitute files for %s:\n" % (self)) return False return True
# ************************************************************************* def _substituteInPlace(self, filename): """ Replace ${SHARED} and ${CWD} in `filename` with the shared directory and test directory, respectively. """ path = os.path.join(self.directory, filename) outlines = [] shared = os.path.join(os.getcwd(), 'shared').encode(PATH_ENCODING) cwd = os.path.abspath(self.directory).encode(PATH_ENCODING) try: with open(path, 'rb') as fh: for line in fh: if b"${SHARED}" in line: line = line.replace(b"${SHARED}", shared) if b"${CWD}" in line: line = line.replace(b"${CWD}", cwd) outlines.append(line) with open(path, 'wb') as fh: for line in outlines: fh.write(line) except Exception: logger.critical('REPORTING INFO FOR SHARED-2572') logger.critical('Current directory: %s' % os.path.abspath(os.getcwd())) logger.critical(f'Test directory: {self.directory} ({cwd})') logger.critical('{} exists: {}'.format(self.directory, os.path.exists(cwd))) logger.critical('{} exists: {}'.format(path, os.path.exists(path))) logger.critical('All files in test directory: %s' % ', '.join(os.listdir(cwd))) raise return True # *************************************************************************
[docs] def useJC(self): """ Determines whether the script will be run under jobcontrol, uses self._useJC as a default value. :return: Should this script be run under jobcontrol? :rtype: bool """ return bool(self._useJC)
# *************************************************************************
[docs] def runsRemotely(self): """ A job is available to run on a remote host if: * It is a jobcontroljob that doesn't have the require:localhost tag * It is not a jobcontroljob, but it has ${HOST} in the command. """ useJC = self.useJC() return (useJC and 'require:localhost' not in self.tags or not useJC and '${HOST}' in self.command)
# *************************************************************************
[docs] def toDict(self): """ Dump test object as a dict """ self.validate() rdict = self.__dict__.copy() unserialized_fields = { 'executed', 'exit_status', 'workup_messages', 'failure_type', 'expect_job_failure', 'timing', 'original_directory', 'directory', 'outcome', 'id' } for field in unserialized_fields: rdict.pop(field, None) rdict['useJC'] = rdict.pop('_useJC', None) rdict['number'] = self.id rdict['workup'] = rdict.pop('workupstr') return rdict
def __str__(self): return f'{self.product} test {self.id}'
[docs]def validate_command_for_host(command, tags, test_id, product): """ Raises READMESyntaxError if command has encoded incorrect host information. In general, there should be no -HOST information so stu can decide what hosts to run with. NOTE: This is same as code in forms validation for stu server. Use this code once we integrate stu server to use from mmshare. :param command: commandline which will be executed :type command: str :param tags: list of tags associated with test :type tags: set(str) :param test_id: name of test (used for error reporting) :type test_id: str :param product: name of Product test is associated with :type product: str """ if product == 'Job control': return if 'require:specific_host' in tags: return if '-HOST' not in command: return # These validations don't apply to job control or mpi tests if re.search('-HOST.*localhost', command): if 'require:localhost' not in tags: raise READMESyntaxError( f'Tests which specify "-HOST localhost" in the ' f'command must use the tag "require:localhost". {test_id} ') elif '-HOST ${HOST}' not in command and '-HOST "${HOST}:${NCPU}"' not in command: raise READMESyntaxError( f'Tests which contain "-HOST" in the command must ' f'use the tag "require:specific_host". {test_id}') # Other variants of command are considered to be success return
README2TEST = {} """Correspondence between README values and test values.""" TEST2README = {v: k for k, v in README2TEST.items()} """Correspondence between README values and test values.""" LIST_VALUES = [] """Values that are lists.""" # ***************************************************************************** # This module is intended to be imported and won't typically be # run. To ensure the "program" is only executed when run as a script # and not run when imported, do the following check for __main__: if __name__ == "__main__": print("Nothing to see here.")