Source code for schrodinger.test.stu.base_executable

"""
Base class for STU utilities.  Each utility should probably implement
`getParser` and `_doCommand`

@copyright: Schrodinger, Inc. All rights reserved.
"""

# Add _strptime to sys.modules before anything else happens to avoid threading
# bug in requests module.  See SHARED-2608.

import argparse
import datetime
import future.utils
import os.path
import sys

from schrodinger.utils import cmdline
from schrodinger.utils import log

from . import client
from . import common
from . import constants
from . import run
from . import testscripts
from . import workup

# Add encodings.idna to sys.modules before anything else happens to import
# bug in requests module.  See SHARED-2618.

# This creates the logger.  If we ever need to log with a logger,
# we must import this module
logger = common.logger


[docs]class store_server(argparse.Action): """Argparse action to monkeypatch the server address.""" def __call__(self, parser, namespace, values, option_string=None): if not values.startswith('http'): values = 'http://' + values common.BASE_URL = values
def _check_dir(string): """Check whether a directory exists. Used in parser.""" if not os.path.isdir(string): raise argparse.ArgumentTypeError("Directory %s is not accessible." % string) return os.path.abspath(string)
[docs]class TestRunnerParser(argparse.ArgumentParser): "Subclass that allows easy adding of options."
[docs] def __init__(self, present_tense, past_tense, *args, **kwargs): super().__init__(*args, **kwargs) self.past_tense = past_tense.lower() self.cap_present_tense = present_tense.capitalize()
[docs] def addSkipArg(self): self.add_argument("-t", "--test_skip", action="store_false", dest="verify", help=("Skip test execution of job before adding " "to database."))
[docs] def addTestIDArg(self): self.add_argument( "--id", dest="test_ids", type=common.str2list, help= "%s one or more tests with ID numbers indicated. Format: 1,7,10-13 or '1 7 10-13' will choose tests 1, 7, 10, 11, 12, and 13." % self.cap_present_tense) self.add_argument("-s", dest="test_ids", type=common.str2list, help=argparse.SUPPRESS)
[docs] def addExtractArgs(self): self.addTestIDArg() self.add_argument( "-p", "--priority", dest="priority", metavar='1,2', type=common.str2list, help= "%s jobs with priority that can be a single value, a range, or a comma- or space-separated list. Allowed values are integers 1-5. (No default)." % self.cap_present_tense) self.add_argument( "-prod", "--product", dest="products", type=common.str2strlist, help= "%s scripts for specified products, may be a comma-separated list (No default)." % self.cap_present_tense) self.add_argument( "--skip_products", dest="skip_products", type=common.str2strlist, help= "Do not %s scripts for specified products, may be a comma-separated list (No default)." % self.cap_present_tense.lower()) self.add_argument( "--components", dest="components", type=common.str2strlist, help= "%s scripts for specified project components, may be a comma-separated list (No default)." % self.cap_present_tense) self.add_argument( "--skip_components", dest="skip_components", type=common.str2strlist, help= "Do not %s scripts for specified project components, may be a comma-separated list (No default)." % self.cap_present_tense.lower()) self.add_argument( '--tag', metavar='tag1,tag2,tag3', dest='tags', default=tuple(), type=common.str2strlist, help= "%s tests tagged with any of the provided tags. Comma-separated list." % self.cap_present_tense) self.add_argument( '--skip_tag', metavar='tag1,tag2,tag3', dest='skip_tags', default=client.SCIVAL_TAGS, type=common.str2strlist, help= "Do not %s tests tagged with any of the provided tags. Comma-separated list." % self.cap_present_tense.lower())
[docs] def addDirectories(self, required=False, noscratch=False): if noscratch: unless = (' Tests will be run in a scratch directory unless the ' '--noscratch option is specified.') else: unless = '' help_msg = 'One or more directories to be {}.{}'.format( self.past_tense, unless) if required: self.add_argument('directories', metavar='DIRECTORY', type=_check_dir, nargs='+', help=help_msg) else: self.add_argument('--directories', metavar='DIRECTORY', type=_check_dir, nargs='+', help=help_msg) if noscratch: self.add_argument('--noscratch', dest='scratch', action='store_false', help=('%s in the test directory instead of a ' 'scratch directory.' % self.cap_present_tense))
[docs] def addExpandArg(self): self.add_argument("--expand_variables", action="store_true", dest="expand_variables", help=("Expand ${CWD} and ${SHARED} variables upon " "test extraction. (Default: False)"))
[docs] def addTestLimitations(self): launch_location_mutex = self.add_mutually_exclusive_group() launch_location_mutex.add_argument( "--remote_only", action="store_true", help=("Only run tests that can be executed on a remote host.")) launch_location_mutex.add_argument( "--local_only", action="store_true", help=("Only run tests that must be run on the local host.")) self.add_argument('--allow', metavar='tag1,tag2,tag3', dest='allow_tags', type=common.str2strlist, help="Don't exclude tests from being %s that " "\"require:\" the requested resources." % self.past_tense) self.add_argument("--mpi", action="store_true", dest="mpi_enabled", help=("Execute *only* MPI-enabled jobs from " "selected test tests. Running without " "this option runs only non-MPI-enabled " "tests")) self.add_argument("-mpi", action="store_true", dest="mpi_enabled", help=argparse.SUPPRESS) self.addForce()
[docs] def addCPUArg(self): self.add_argument("--cpu", dest="ncpu", metavar="<number>", default=1, type=int, help=("Number of CPUs each subjob will run on. " "(Default: 1)")) self.add_argument("-cpu", dest="ncpu", metavar="<number>", default=1, type=int, help=argparse.SUPPRESS)
[docs] def addForce(self): self.add_argument("-f", "--force", dest="force", action="store_true", help=("Force execution of tests, overriding " "any limitations such as unsupported " "platforms, bug disabled, \"require:\" " "labels, # of processor limitations, MPI " "requirements, etc."))
[docs] def addExecuteOptions(self): self.add_argument( "--additionalArgs", default='', help= "Additional arguments to append to every test's command. Specify as a (quoted) space separated list" ) self.add_argument( "-timeout", "--timeout", type=int, help= "Max duration of a test in seconds. Jobs longer than this will be killed." )
[docs] def addVerbosityArgs(self): self.add_argument('--quiet', action='store_true', help="Print only the most important information") self.add_argument( '--verbose', action='store_true', help= "Print additional information. Note that -DEBUG implies -verbose.")
[docs] def addServerArg(self, user_option=True): # Specify the STU server to use for running. Suppressed, because it is # only used in testing STU. self.add_argument('--server', dest='server', default=common.BASE_URL, action=store_server, help=argparse.SUPPRESS) self.add_argument('--user', dest='username', default=client.get_stu_username(), help='Username on the STU server.')
[docs] def addJobControlOptions(self): cmdline.add_jobcontrol_options(self, options=[cmdline.HOST, cmdline.DEBUG])
[docs]class TestUtility: present_tense = None past_tense = None
[docs] def __init__(self, arguments=None): self.utility = '$SCHRODINGER/utilities/stu_%s' % self.present_tense self.starttime = None if arguments is None: arguments = sys.argv[1:] self.cmd_run = self.utility + ' ' + ' '.join(arguments) self.getOptions(arguments)
[docs] def printHeader(self): """ Print header """ logger.info("************************************************") logger.info("* System Test Utility (STU) *") logger.info("************************************************\n") logger.debug('User: "%s"' % (self.username)) self.starttime = datetime.datetime.now() logger.info(str(self.starttime.ctime()) + '\n') logger.debug("Input command: ") logger.debug(self.cmd_run) logger.debug('STU server: %s' % common.BASE_URL) logger.debug('')
[docs] def printFooter(self): logger.warning('\nRequest complete.') now = datetime.datetime.now() logger.info(str(now.ctime()))
[docs] def getParser(self, prog=None, description=None): """ :rtype: argparse.ArgumentParser """ if not self.present_tense or not self.past_tense: raise NotImplementedError("This is a blank base class.") if not description: description = self.__doc__ if not prog: prog = self.utility return TestRunnerParser(present_tense=self.present_tense, past_tense=self.past_tense, prog=prog, description=description)
[docs] def getOptions(self, arguments=None, user_option=True): parser = self.getParser() parser.addServerArg(user_option=user_option) parser.parse_args(arguments, self) if self.verbose or getattr(self, 'debug', False): logger.setLevel(log.DEBUG) self.verbosity = 'verbose' elif self.quiet: logger.setLevel(log.WARNING) self.verbosity = 'quiet' else: logger.setLevel(log.INFO) self.verbosity = 'normal' return parser
[docs] def getTests(self): """ Get the TestScript objects required based on the mode. """ if hasattr(self, 'directories') and self.directories: for directory in self.directories: yield testscripts.getTestFromDir(self.username, directory) else: test_ids = getattr(self, 'test_ids', []) products = getattr(self, 'products', []) components = getattr(self, 'components', []) priorities = getattr(self, 'priority', []) tags = getattr(self, 'tags', []) not_products = set( getattr(self, 'failed_products', None) or tuple()) for source in getattr(self, 'failed_products', None) or tuple(): if source.lower() == 'none': continue not_products.update(constants.source2product[source]) not_products.update(getattr(self, 'skip_products', None) or tuple()) not_components = getattr(self, 'skip_components', tuple()) not_tags = getattr(self, 'skip_tags', tuple()) for test in client.retrieve(self.username, test_ids, products, components=components, priorities=priorities, tags=tags, not_products=not_products, not_components=not_components, not_tags=not_tags): try: test_ids.pop(test_ids.index(test.id)) except (ValueError, AttributeError): pass yield test if test_ids: logger.info( '\nThese test IDs were requested but not available: ' + ', '.join(str(test_id) for test_id in test_ids))
[docs] def runJobs(self, inscratch=False): """ :param report: Should results be reported to the DB? :param inscratch: Should the tests be copied to a scratch directory before execution? :return: Did test execution complete as expected? :rtype: bool """ status = True if inscratch: self.tests = [] self.runner = run.Runner(self) for test in self.getTests(): if inscratch: # copy test to scratch directory # set test.directory attribute to scratch in a copy of test. test.copyToScratch() self.tests.append(test) test.substituteFiles() self.runner.addScript(test) logger.debug("%s tests queued." % self.runner.job_runner.total_added) try: self.runner() except run.JobDJError as err: logger.error(err) status = False return self.runner, status
[docs] def printRunSummary(self, runner): if not runner.tests: return True if not self.quiet: workup.print_summary(runner.tests) # Add a blank line between brief and extended summary. logger.info('') success = self.printOnlyFailed(runner.tests) return success or not getattr(self, 'return_code', True)
[docs] def printOnlyFailed(self, tests): failed_tests = [ test_id for (test_id, test) in future.utils.listitems(tests) if not test.outcome ] if failed_tests: logger.warning("{} tests run, {} failed.".format( len(tests), len(failed_tests))) logger.warning('Failed test IDs: ' + ' '.join( str(test_id) for test_id in workup.sort_mixed_list(failed_tests))) else: logger.warning("{} tests run, all passed.".format(len(tests))) return not failed_tests
[docs] def printTestNumbers(self, test_ids): for i, test in enumerate(test_ids): print("%8s" % test, end=' ') if not i + 1 % 8: print()
def __call__(self): self.printHeader() successes = [] success = True for test in self.getTests(): if self._doCommand(test): if hasattr(test, 'id'): successes.append(test.id) else: successes.append(test) else: success = False if successes: logger.info("\n%d tests %s:" % (len(successes), self.past_tense)) self.printTestNumbers(successes) else: logger.warning("\nWARNING: No tests %s" % self.past_tense) self.printFooter() return success def _doCommand(self, test): raise NotImplementedError("This is a blank base class.")
[docs] def requireApiKey(self, parser): try: common.get_api_key() except (OSError, RuntimeError) as err: parser.error(err)