Source code for schrodinger.test.stu.outcomes.standard_workups

"""
Contains the class `_PropertyCompare`, and several functions to perform test
workups.

The most commonly used workup is `compare_ct_property`

@copyright: Schrodinger, LLC. All rights reserved.
"""

import csv
import glob
import gzip
import inspect
import os
import re
import shutil
import sys
import tarfile
from zipfile import ZipFile

import schrodinger.job.util as util
from schrodinger import structure
from schrodinger.test.stu import common
from schrodinger.test.stu.outcomes import failures
from schrodinger.utils import csv_unicode
from schrodinger.utils import subprocess

logger = common.logger


[docs]class WorkupFailure(failures.WorkupFailure):
[docs] def __init__(self, msg, *args): wkp = self._callingWorkup() msg = f'{wkp}: {msg}' super().__init__(msg, *args)
def _callingWorkup(self): """Returns a string with the calling workup's name and arguments.""" frame = inspect.stack()[2][0] args = inspect.formatargvalues(*inspect.getargvalues(frame)) function = inspect.getframeinfo(frame).function return function + args
class _PropertyCompare(): """ Compare properties between a ct and a csv file generated from a ct via proplister. """ def __init__(self, std_file): self.title_key = [] # Stores matching key property name self.title_index = [] # Stores matching key index in CSV data # Stores requested property information by matching key self.std_properties = {} self.property_tol = {} # Stores tolerance information by matching key self.property_cmp_mode = {} # Stores tolerance mode by matching key # Stores property index by comparison property title self.property_index = {} self.import_std_file(std_file) def _make_unique_key_ct(self, ct, title_key): """ Make a unique string from a number of properties stored in ct for properties in title_key. """ hash_str = "" count = 0 missing_key = False for elem in title_key: local_title_key = self._get_expanded_property_title(elem) if local_title_key not in ct.property: missing_key = True prop_value = "" else: prop_value = ct.property[local_title_key] if count == 0: sep = "0_" else: sep = "_%d_" % (count) hash_str = f"{hash_str}{sep}{prop_value}" count += 1 return (hash_str, missing_key) def _make_unique_key_csv(self, line, title_index): """ Make a unique string from a number of properties stored in line for properties in title_index. """ hash_str = "" count = 0 for elem in title_index: if count == 0: sep = "0_" else: sep = "_%d_" % (count) hash_str = hash_str + sep + line[elem].strip() count += 1 return hash_str def import_std_file(self, std_file): """ Import the property/values and the key to use in comparison. Used to identify cts to perform the comparison on. usage: import_std_file(std_file) """ line_count = 0 prop_block = 1 with csv_unicode.reader_open(std_file) as inh: for line in csv.reader(inh): line = [elem.strip() for elem in line] if not line or not any(line): prop_block = 0 line_count = 0 continue elif line[0][0] == "#": # Skip comment line continue # First line always gives the property used as # the matching key elif line_count == 0 and prop_block == 1: for elem in line: self.title_key.append(elem.strip()) line_count += 1 continue elif prop_block == 1 and len(line) > 1: if line[1].find('%') > -1: self.property_cmp_mode[line[0].strip()] = 'PERCENT' line[1] = line[1].replace('%', '') else: self.property_cmp_mode[line[0].strip()] = 'ABSVALUE' self.property_tol[line[0].strip()] = float(line[1]) elif prop_block == 0 and line_count == 0: # Read header of all data for elem in self.title_key: for i in range(0, len(line)): if line[i].strip() == elem: self.title_index.append(i) for i in range(0, len(line)): if line[i].strip() in self.property_tol: self.property_index[line[i].strip()] = i if self.title_index == [] or len(self.title_index) != len( self.title_key): raise ValueError( "Failed to find title key in header of csv file") elif prop_block == 0 and line_count != 0: # Read per-ligand ct property data storing the title # and other requested properties local_hash = {} title_value = self._make_unique_key_csv( line, self.title_index) for elem in self.property_index: if elem[0] == 'r': try: local_hash[elem] = float( line[self.property_index[elem]].strip()) except: local_hash[elem] = "" elif elem[0] == 'i' or elem[0] == 'b': try: local_hash[elem] = int( line[self.property_index[elem]].strip()) except: local_hash[elem] = "" else: try: local_hash[elem] = line[ self.property_index[elem]].strip() except: local_hash[elem] = "" if title_value in self.std_properties: raise ValueError("WARNING(import_std_file): Match keys " "are not unique; Try using multiple " "properties for the match key.") self.std_properties[title_value] = local_hash else: msg = ("WARNING(import_std_file): PropertyCompare file " "not properly formatted.\n" " Offending line: {}".format(', '.join(line))) raise ValueError(msg.format(', '.join(line))) line_count += 1 # Verify that all expected properties were found in the csv data if len(self.property_tol) != len(self.property_index): msg = [ "Found only %s/%s comparision properties" % (len(self.property_index), len(self.property_tol)), "Following properties not found in csv data:" ] for elem in list(self.property_tol): if elem not in self.property_index: msg.append(" %s" % elem) raise ValueError('\n'.join(msg)) def _get_expanded_property_title(self, name): """ Return full m2io title for a property if given title/entry short name from proplister output. This could be expanded to return full title for all shortened names. """ if name == "title": return "s_m_title" elif name == "entry": return "s_m_entry_name" return name.strip() def check_for_all_std_cts(self, str_file): """ Check that all expected std cts are found in the comparison structure file. usage: check_for_all_std_cts(str_file) """ try: inh = structure.StructureReader(str_file) except: return False # Make list of match keys expect to find in structure file match_props = list(self.std_properties) for ct in inh: (unique_key, missing_key) = self._make_unique_key_ct(ct, self.title_key) if missing_key: logger.warning( "WARNING(check_for_all_std_cts): Not all title keys " "were found: %s %s" % (ct.title, self.title_key)) if unique_key in match_props: match_props.pop(match_props.index(unique_key)) if match_props: msg = ["Following match key cts not found in '%s'" % str_file] for elem in match_props: msg.append(" %s" % elem) raise AssertionError('\n'.join(msg)) return True def compare_ct_properties(self, ct): """ Perform comparison against a standard for a single ct. usage: compare_ct_properties(ct) """ # Deal with short property names in title property (title, missing_key) = self._make_unique_key_ct(ct, self.title_key) # Deal with potential missing data if missing_key: logger.warning("WARNING(compare_ct_properties): Not all title keys " "were found: %s %s" % (ct.title, self.title_key)) # ct not found in csv file - return true if title not in list(self.std_properties): return True # loop over properties to compare. If any aren't within range # return False (failure) for elem in list(self.std_properties[title]): # Deal with short property names in property; only use # with ct - not csv data ct_elem = self._get_expanded_property_title(elem) # Skip properties with no data in the standard file if self.std_properties[title][elem] == "": if ct_elem in ct.property: logger.warning( "WARNING: For ct %s property '%s' in " "reference file has no value but in ct has a " "value of '%s'" % (title, elem, ct.property[ct_elem])) continue # Return failure if property not in test ct if ct_elem not in ct.property: msg = ("For ct {} property '{}' does not exist" " in the test ct, but does in the reference" " property file") raise AssertionError(msg.format(title, elem)) if ct_elem[0] == 's': # Perform string comparison if ct.property[ct_elem] != self.std_properties[title][elem]: msg = ("Failed comparison for '%s' in '%s': " "|%s| vs |%s|" % (elem, title, ct.property[ct_elem], self.std_properties[title][elem])) raise AssertionError(msg) elif self.property_cmp_mode[elem] == "ABSVALUE": # Perform real/int/bool comparisons using absolute value range if (abs(ct.property[ct_elem] - self.std_properties[title][elem]) > abs(self.property_tol[elem])): msg = ("Failed comparison for '%s' in '%s': " "|%f| vs |%f| with range %f") raise AssertionError(msg % (elem, title, ct.property[ct_elem], self.std_properties[title][elem], abs(self.property_tol[elem]))) elif self.property_cmp_mode[elem] == "PERCENT": # Perform real/int/bool comparisons using percent range per_value = (abs(self.property_tol[elem] / 100.0) * self.std_properties[title][elem]) if (abs(ct.property[ct_elem] - self.std_properties[title][elem]) > abs(per_value)): msg = ("Failed comparison for '%s' in '%s': " "|%f| vs |%f| with range %f" % (elem, title, ct.property[ct_elem], self.std_properties[title][elem], abs(per_value))) raise AssertionError(msg) else: msg = ("ERROR(compare_ct_properties): Internal " "configuration error: %s, %s" % (self.property_cmp_mode[elem], ct_elem)) raise AssertionError(msg) return True
[docs]def lines_in_file(file_expr, num_lines_target, tolerance): """ Counts the number of lines in one or more files matching a single shell expansion expression, `file_expr` and checks that they are within +/- `tolerance` of `num_lines_target`. usage: lines_in_file('file', num_lines_target, tolerance) """ if isinstance(tolerance, str): msg = f"tolerance must be a number (found \"{tolerance}\")" raise TypeError(msg) files = glob.glob(file_expr) if not files: logger.error("ERROR: unable to find specified file(s) in test " "directory") return False line_count = 0 for f in files: try: with open(f) as fh: line_count += len(fh.readlines()) except OSError as err: logger.error('File: "%s" could not be opened' % f) logger.error(str(err)) return False if not (line_count <= num_lines_target + tolerance and line_count >= num_lines_target - tolerance): raise WorkupFailure('%i lines found for %s' % (line_count, ', '.join(files))) return True
def _check_files(filenames, exist=True): """ Checks if files either all exist (default) or all don't exist. :type filenames: list(str) :param filenames: List of filenames or shell expansion expressions representing filenames. :type exist: bool :param exist: Whether to expect the files to exist or not. """ for name in filenames: files = glob.glob(name) if exist != bool(files): raise failures.WorkupFailure(name) return True
[docs]def files_exist(*filenames): """ Checks to see if all the files exist. Inputs are assumed to be filenames or shell expansion expressions representing filenames. Note that shell expansion expressions check for _any_ match (not _all_). usage: files_exist('file1', 'file2', ...) """ try: return _check_files(filenames) except failures.WorkupFailure as err: raise WorkupFailure('%s not found' % err.args[0])
file_exists = files_exist """Alias for backwards compatibility."""
[docs]def many_files_exist(input_file): """ Checks to see if all the files exist. Input is assumed to be a file containing list of filenames or shell expansion expressions representing filenames - one filename per line. """ filenames = [] try: with open(input_file) as fh: for name in fh: filenames.append(name.strip()) except OSError as err: logger.error('File: "%s" could not be opened' % input_file) logger.error(str(err)) return False return files_exist(*filenames)
[docs]def files_dont_exist(*filenames): """ Checks to see if all the files don't exist. Input is assumed to be a list of filenames or shell expansion expressions representing filenames. usage: files_dont_exist('file1', 'file2', ...) """ try: return _check_files(filenames, exist=False) except failures.WorkupFailure as err: raise WorkupFailure('%s found' % err.args[0])
def _get_ct_count(filename): """ Return number of cts contained in a given file """ inh = structure.StructureReader(filename) ct_count = 0 for ct in inh: ct_count += 1 return ct_count
[docs]def cts_in_file(filename, num_cts_target, tolerance=0): """ Counts the number of cts in a file and checks that its within +/- range of the expected number of cts. usage: cts_in_file(file, num_cts_target, tolerance) """ ct_count = _get_ct_count(filename) if ((ct_count <= (num_cts_target + tolerance)) and (ct_count >= (num_cts_target - tolerance))): return True raise WorkupFailure("Found %s cts" % ct_count)
[docs]def cts_less_than(filename, num_cts_upper_bound): """ Check that `filename` has fewer than `num_cts_upper_bound` cts. """ ct_count = _get_ct_count(filename) if ct_count < num_cts_upper_bound: return True raise WorkupFailure("%s contains %s cts; expected < %s" % (filename, ct_count, num_cts_upper_bound))
[docs]def cts_greater_than(filename, num_cts_lower_bound): """ Check that `filename` has greater than `num_cts_lower_bound` cts. """ ct_count = _get_ct_count(filename) if ct_count > num_cts_lower_bound: return True raise WorkupFailure("%s contains %s cts; expected > %s" % (filename, ct_count, num_cts_lower_bound))
[docs]def compare_ct_property(filename, inp_property, inp_property_target, tolerance=0): """ Compare property in the first ct to a user-provided value. example usage: compare_ct_property('filename', 'inp_property', 7.5, tolerance=0.01) Tolerance only works for properties that start with i or r (integers or floats). """ if isinstance(tolerance, str): msg = f"tolerance must be a number (found \"{tolerance}\")" raise TypeError(msg) inh = structure.StructureReader(filename) # Convert target property to appropriate format if (inp_property[0] == 'i'): property_target = int(inp_property_target) elif (inp_property[0] == 'r'): property_target = float(inp_property_target) elif (inp_property[0] == 'b'): property_target = bool(inp_property_target) elif (inp_property[0] == 's'): property_target = str(inp_property_target) else: raise WorkupFailure("Invalid property title, '%s'" % inp_property) ct = next(inh) # For boolean and string do exact comparison. # For int and float do range comparison if (inp_property[0] == 'b' and property_target != bool(ct.property[inp_property])): raise WorkupFailure('%s is actually "%s"' % (inp_property, bool(ct.property[inp_property]))) elif (inp_property[0] == 's' and property_target != ct.property[inp_property]): raise WorkupFailure('{} is actually "{}"'.format( inp_property, ct.property[inp_property])) elif (inp_property[0] == 'i' or inp_property[0] == 'r'): val1 = ct.property[inp_property] val2 = property_target if (abs(val1 - val2) > tolerance): raise WorkupFailure('%s is actually "%s"' % (inp_property, ct.property[inp_property])) return True
[docs]def parse_log_file(file_expr, pattern, occurrences=1): """ Search files for a string. The string must appear at least the required number of times in each file. :param file_expr: file(s) to search; treated as a glob expression :type file_expr: str :param pattern: string to search; treated as a simple substring :type pattern: str :param occurrences: minimum number of times the pattern must appear in each file. As a special case, when occurrences=0, it is treated as a maximum. :type occurrences: int Examples:: parse_log_file('job.log', 'Job succeeded!') parse_log_file('dock.log', 'Wrote pose', occurrences=42) parse_log_file('*.out', 'ERRROR', occurrences=0) """ #ensure that pattern is a string. pattern = str(pattern) files = glob.glob(file_expr) if not files: raise WorkupFailure('ERROR: using "%s", found no files in the test ' "directory" % file_expr) for filename in files: try: with open(filename) as fh: file_data = fh.read() except OSError as err: raise WorkupFailure('File: "%s" could not be opened: %s' % (filename, err)) if occurrences == 0: if file_data.find(pattern) != -1: raise WorkupFailure('"%s" found' % pattern) else: position = 0 for i in range(occurrences): if file_data.find(pattern, position) == -1: if i == 0: raise WorkupFailure("Couldn't find \"%s\"" % pattern) else: raise WorkupFailure('Found \"%s\" only %i time(s)' % (pattern, i)) else: position = file_data.find(pattern, position) + 1 return True
[docs]def strict_evaluate_ct_properties(stdvalue_file, structure_file): """ Deprecated: Please use `compare_ct_properties` if possible. Compare a subset of ct level properties for cts in a file against standard values using strict evaluation. All properties must match within the provided per-value range in order for the comparison to be a success. Also checks that all expected cts (rows) in standard file have a corresponding match in the structure file. The structure file can have additional cts though. Use the `evaluate_ct_properties` workup instead to prevent a failure from being triggered when a ct in the standard file is not found in the structure file. :param str stdvalue_file: Path to a PropertyCompare file (must be a csv file with specific columns; see the _PropertyCompare class for details) :param str structure_file: Path to a Maestro structure file """ pc = _PropertyCompare(stdvalue_file) for ct in structure.StructureReader(structure_file): if not pc.compare_ct_properties(ct): return False if not pc.check_for_all_std_cts(structure_file): return False return True
[docs]def evaluate_ct_properties(stdvalue_file, structure_file): """ Deprecated: Please use `compare_ct_properties` if possible. Compare a subset of ct level properties for cts in a file against standard values. All properties must match within the provided per-value range in order for the comparison to be a success. Note that a failure will NOT be triggered if a ct (row) in the standard file is not located in the structure file. To ensure that all cts in the standard match an entry in the comparison file, please use the workup `strict_evaluate_ct_properties`. :param str stdvalue_file: Path to a PropertyCompare file (must be a csv file with specific columns; see the _PropertyCompare class for details) :param str structure_file: Path to a Maestro structure file """ try: pc = _PropertyCompare(stdvalue_file) except TypeError as err: raise WorkupFailure("ERROR: %s" % err) else: # We should probably be doing this the other way around # i.e. iterating over the pc file instead of the structure to avoid # cases where the structure file is blank ct_count = 0 for ct in structure.StructureReader(structure_file): if not pc.compare_ct_properties(ct): return False ct_count += 1 # Quick and dirty fix for false positives on blank structure files if ct_count == 0: logger.warning("No CTs found in structure file.") return False return True
[docs]def check_for_all_expected_cts(stdvalue_file, structure_file): """ Look for the existence of all expected cts. The structure file can have additional cts. Uses PropertyCompare standard file format: https://wiki.schrodinger.com/QA/PropertyCompare usage: check_for_all_expected_cts('std_file', 'str_file') """ try: pc = _PropertyCompare(stdvalue_file) except TypeError as err: raise WorkupFailure("ERROR: %s" % err) else: if not pc.check_for_all_std_cts(structure_file): return False return True
[docs]def compare_files(filea, fileb, *options): """ Find the diff of two files and compare the output. Compare script takes the following arguments. usage: compare_files('filea', 'fileb', '-option1', 'value1_if_any', '-option2', 'value2_if_any') Options:: <cutoff> Absolute or relative diff allowed btw numerical fields. Must be a positive number. The default is 0.00001. -r|-rel Compare relative differences btw differing fields. This is the default. -a|-abs Compare absolute differences btw differing fields. The default is to compare relative differences. -m|-mag <mag> Minimum magnitude to use for assessing relative diffs. Must be positive. The default value is 1.0. -z Compare magnitudes only (ignore sign differences). -s|-skip <file> File containing regexps for lines to ignore. By default the file './skip_lines' is used. """ banned_extensions = (".maegz", ".gz") if filea.endswith(banned_extensions): raise ValueError( f"gzipped files {filea} are not allowed in compare_files workup") elif fileb.endswith(banned_extensions): raise ValueError( f"gzipped files {fileb} are not allowed in compare_files workup") compare_path = util.hunt('mmshare') compare_path = os.path.join(compare_path, 'compare.pl') diff_cmd = ['diff', filea, fileb] # Numbers are tolerances, so no need to worry about precision when casting # to string. compare_cmd = ['perl', compare_path] + [str(x) for x in options] try: diff_process = subprocess.Popen(diff_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) except OSError as err: # On Darwin, there can be a race condition in terminating the open # file handles when a command fails. See discussion in SHARED-3909. import errno if sys.platform == 'darwin' and err.errno == errno.EINVAL: raise OSError(f'File {filea} or {fileb} is not available') raise my_env = os.environ.copy() my_env['LC_ALL'] = 'C' compare_process = subprocess.Popen(compare_cmd, stdin=diff_process.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=my_env, universal_newlines=True) stdout, stderr = compare_process.communicate() diff_stderr = diff_process.communicate()[1] # If file is not a real file, or permissions problems diff_rc = diff_process.returncode if diff_rc not in (0, 1): raise OSError(diff_stderr, diff_rc) compare_rc = compare_process.returncode if compare_rc == 0: return True # Bad Usage if compare_rc == 1 and 'This is a postprocessor' in stderr: raise ValueError('Arguments incorrectly formatted. Usage:\n\n' + stderr) # Problem running compare command: if stderr: raise WorkupFailure('Problem running compare: %s' % stderr) # Diff in files raise WorkupFailure('{filea} and {fileb} differ:\n{diff}'.format( filea=filea, fileb=fileb, diff=stdout))
[docs]def untar(tar_name, *filenames): """ Extracts a file from a tar file. The files file1, file2, etc. will be extracted from tarName. Extract all the files in tarName, if no filename is specified. usage: untar(tar_name, file1, file2, ...) """ try: tar_file = tarfile.open(tar_name) if not filenames: tar_file.extractall() else: for filename in filenames: tar_file.extract(filename) tar_file.close() return True # Successful extraction except Exception as E: raise WorkupFailure('Failed with error: %s' % E)
[docs]def unzip(zip_name, *filenames): """ Extracts a file from a zip file. The files file1, file2, etc. will be extracted from zipName. Extract all the files in zipName,if no file name is specified usage: unzip(zip_name, file1, file2, ...) """ try: zip_file = ZipFile(zip_name) if not filenames: zip_file.extractall() else: for filename in filenames: zip_file.extract(filename) zip_file.close() return True # Successful extraction except Exception as E: raise WorkupFailure('Failed with error: %s' % E)
[docs]def gunzip(filename): """ Extracts a file from a gz file. The original file is untouched, and the extracted file has the same name minus the gz extension. usage: gunzip(filename) """ try: assert filename.endswith('gz') decompressed_filename = re.sub(r'\.?gz$', '', filename) with gzip.open(filename, 'rb') as src: with open(decompressed_filename, 'wb') as dest: shutil.copyfileobj(src, dest) return True # Successful extraction except Exception as E: raise WorkupFailure('Failed with error: %s' % E)
[docs]def extract_ct(input_file, output_file, index): """ Extract the nth structure from input_file and write it to output_file. usage: extract_ct(input_file, output_file, index) """ try: st = structure.Structure.read(input_file, index) st.write(output_file) except StopIteration: raise WorkupFailure('Failed with error: reached EOF') except Exception as E: raise WorkupFailure('Failed with error: %s' % E)
[docs]def expect_job_failure(): """ Expect the job to have a "bad" exit status. If this workup is included, the test will FAIL if it has a successful exit status and SUCCEED if it has a bad exit status. This is opposite of the usual behavior. """
# This doesn't actually DO anything. It is here as a dummy for # documentation purposes. The real code responsible is in stu.workup and # stu.testscripts. # **************************************************************************** if __name__ == "__main__": if sys.argv[1] in dir(): workup = globals()[sys.argv[1]] result = workup(*sys.argv[2:]) if not result: print(' Failed') sys.exit(1) else: print(' Pass!') else: print(' workup', sys.argv[1], 'not found') sys.exit(1)