Source code for schrodinger.test.memtest

"""
Parse and analyze valgrind output and suppressions.
"""

import collections
import os
import re
import sys

CHECKED_BYTES_RE = re.compile(r'^==.*== Checked [\d,]+ bytes')
LOG_LINE_RE = re.compile(r'^==.*== ')
COMMAND_RE = re.compile(r'^==.*== Command:\W(.*)')
# If a leak has text (key), summarize it as (value)
DESCRIPTION_TO_SUMMARY = {
    'definitely lost': 'memory definitely lost',
    'possibly lost': 'memory possibly lost',
    'still reachable': "memory still reachable",
    'indirectly lost': 'memory indirectly lost',
    'Conditional jump or move': 'uninitialized value used in logic',
    'Use of uninitialized': 'uninitialized value used'
}


[docs]class Suppression: """Represent a suppression for a leak found by valgrind."""
[docs] @classmethod def read(cls, fh, ignore_existing_title=True): lines = [] for line in fh: line = line.strip() if '}' == line: if ignore_existing_title: lines[0] = '' return cls(lines[1], lines[2:], lines[0]) lines.append(line)
[docs] def __init__(self, error_type, suppressions, title=''): self.error_type = error_type self.suppressions = suppressions self.title = title
def __lt__(self, other): """Used in sorting.""" for ssup, osup in zip(self.suppressions, other.suppressions): if ssup != osup: return ssup < osup return len(self.suppressions) < len(other.suppressions) def __le__(self, other): """Used in sorting.""" for ssup, osup in zip(self.suppressions, other.suppressions): if ssup != osup: return ssup < osup return len(self.suppressions) <= len(other.suppressions) def __eq__(self, other): """Used in discarding duplicates.""" for ssup, osup in zip(self.suppressions, other.suppressions): if ssup != osup: return False return True def __repr__(self): title = ['{', self.title, self.error_type] text = '\n '.join(title + self.suppressions) return text + '\n}'
[docs]class ValgrindWarning:
[docs] def __init__(self, command, warnings): self.command = command self.warnings = warnings
[docs]class Leak: """Represent a Valgrind Leak."""
[docs] @classmethod def read(cls, description, fh, command=None, filename=None): backtrace = [] for line in fh: line = LOG_LINE_RE.sub('', line) line = line.strip() if line == '{': suppression = Suppression.read(fh) return cls(filename, command, description, backtrace, suppression) elif line: backtrace.append(line)
[docs] def __init__(self, filename, command, description, backtrace=None, suppression=None): self.filename = os.path.normpath(filename) self.directory = os.path.dirname(self.filename) self.command = command self.description = description self.short_description = self._getShortDescription() self.backtrace = backtrace or tuple() self.suppression = suppression self.duplicates = []
def _getShortDescription(self): for search, summary in DESCRIPTION_TO_SUMMARY.items(): if search in self.description: return summary # I believe this is only: # Invalid write # Invalid read # Mismatched free() / delete / delete [] return self.description.lower()
[docs]def read_valgrind_log(filename, fh): """Read a valgrind log""" leaks = [] warnings = [] command = None skip_lines = 0 in_summary_block = False previous_list_was_a_warning = False for line in fh: # There may be up to one block of errors per pthread, # and we should parse them all, skipping any lines # outside these blocks. if skip_lines > 0: skip_lines -= 1 elif 'HEAP SUMMARY:' in line: skip_lines = 2 # Skip the summary elif 'LEAK SUMMARY:' in line: # this summary may vary in length, depending on the # types of 'still reachable' allocation. It should # always end with a "ERROR SUMMARY". in_summary_block = True elif 'ERROR SUMMARY:' in line: in_summary_block = False elif "Searching for pointers to" in line: pass elif CHECKED_BYTES_RE.search(line): pass else: cmd = COMMAND_RE.search(line) if cmd: command = cmd.group(1) skip_lines = 1 # Skip the pid line elif command and not in_summary_block: # note we only trim on the right, since we care about # indentation of continuation lines in warnings. line = LOG_LINE_RE.sub('', line).rstrip() if not line: pass elif line.startswith('Warning: '): warnings.append(line[9:]) previous_list_was_a_warning = True continue elif previous_list_was_a_warning and line.startswith(' '): # This is a continuation of the previous warning. # Note that there is a blank line and a HEAP SUMMARY # after the warnings block, so we won't be trapped # here forever. continue else: leak = Leak.read(line, fh, command, filename) if leak: # We want valgrind to report 'Still reachable' leaks,`` # but we don't want them to be considered as errors. if leak.short_description == 'memory still reachable': continue else: leaks.append(leak) previous_list_was_a_warning = False if command and warnings: all_warnings = ValgrindWarning(command, warnings) else: all_warnings = None return leaks, all_warnings
[docs]def discover_leaks(directory, verbose=False): """ Search a directory for valgrind log files. :rtype: dict :return: Key: directory path, Value: list of unique leaks seen in that directory """ leaks = collections.defaultdict(list) warnings = collections.defaultdict(list) for root, dirs, files in os.walk(directory): reldirectory = os.path.relpath(root, directory) for filename in files: if not ('valgrind' in filename and filename.endswith('log')): continue absname = os.path.join(root, filename) relname = os.path.relpath(absname, directory) if verbose: print(f'finding leaks in {relname}') with open(absname) as fh: more_leaks, more_warnings = read_valgrind_log(relname, fh) if more_leaks: leaks[reldirectory].extend(more_leaks) if more_warnings: warnings[reldirectory].append(more_warnings) return leaks, warnings
[docs]def simplify(suppression): """Remove test code from suppression. Wildcard any absolute paths.""" for index, sup in enumerate(suppression.suppressions): # wildcard absolute paths if sup.startswith('obj:/'): suppression.suppressions[index] = 'obj:*' # Ignore test code if (sup.endswith( 'test_methodEv' ) or 'fun:_ZN5boost9unit_test9ut_detail7invokerINS1_6unusedEE6invokeIPFvvEEES3_RT_' in sup or 'fun:main' in sup or 'fun:__libc_start_main' in sup or 'fun:_ZN13MM_TestDriver4testEiPPcP15MM_TestCallback' in sup): suppression.suppressions = suppression.suppressions[:index] return
[docs]def uniquify_leaks(leaks): """ Given a list of Leak objects, remove any duplicates. Duplicates include leaks that are wholly included in another leak. """ for leak in leaks: simplify(leak.suppression) leaks.sort(key=lambda l: l.suppression) i = 1 while (i < len(leaks)): if leaks[i].suppression == leaks[i - 1].suppression: leaks[i - 1].duplicates.append(leaks[i]) leaks[i - 1].duplicates.extend(leaks[i].duplicates) del leaks[i] else: i += 1
[docs]def uniquify_suppression_titles(leaks): """ Make sure that each suppression has a unique title. """ titles = set() for leak in leaks: suppression = leak.suppression if suppression.title in titles: split_title = suppression.title.split() try: index = int(split_title[-1]) prefix = ' '.join(split_title[:-1]) except ValueError: index = 0 prefix = suppression.title title = f'{prefix} {index}' while title in titles: index += 1 title = f'{prefix} {index}' suppression.title = title titles.add(suppression.title)
[docs]def read_and_uniquify(filename): """Read a suppression file and make the suppressions in it unique.""" suppressions = [] leaks = [] with open(filename) as fh: index = 0 for line in fh: if line.startswith('{'): suppression = Suppression.read(fh, False) simplify(suppression) suppressions.append(suppression) leaks.append(Leak(filename, index, '', suppression=suppression)) index += 1 sys.stderr.write('{} suppressions found\n'.format(len(leaks))) uniquify_leaks(leaks) sys.stderr.write('{} unique suppressions found\n'.format(len(leaks))) # This puts the unique suppressions in the order of the original # suppressions. Command is overloaded to hold order. leaks.sort(key=lambda l: l.command) uniquify_suppression_titles(leaks) return leaks