Source code for schrodinger.infra.mmcheck

import logging
import os
import re
import sys
import types
import unittest.mock

import pymmlibs
from schrodinger.utils import log

Null = object()
should_check_for_mock = False


def _trace_log(function):
    """
    Create a new function that logs its name and arguments every time it is
    called.

    """

    def trace_logger(self, *args):
        """
        Log the name and arguments, then call the original function and
        return its result.

        """
        _logger.debug("calling {} with {}".format(self.function.__name__,
                                                  str(args)))
        return function(self, *args)

    return trace_logger


class _Wrapper:
    """
    Class used to wrap a function from pymmlibs, checking the return code
    and raising an exception if it is not of the correct 'OK' enum value.

    """

    def __init__(self, function, return_code, indicates_failure):
        self.function = function
        self.return_code = return_code
        self.indicates_failure = indicates_failure
        self.__doc__ = function.__doc__

    def __call__(self, *args):
        """
        Call the wrapped mmlib function.

        """
        if should_check_for_mock:
            for arg in args:
                if isinstance(arg, unittest.mock.MagicMock):
                    raise TypeError(
                        f"Passing mock to a c++ function will turn the value into an integer, you probably want to mock this function arg={arg} function={self.function}"
                    )
        result = self.function(*args)
        if isinstance(result, tuple) or isinstance(result, list):
            result_len = len(result)
            rc = result[0]
            if result_len > 2:
                # Return a tuple of the result values.
                result = result[1:]
            elif result_len == 2:
                # If there's only one result value, return it as an object.
                result = result[1]
            else:
                result = None
        else:
            rc = result
            result = None

        ok_code = self.return_code.ok

        # Deal with void functions
        if rc is None:
            rc = ok_code

        # Check to see if this function is one that only signals failure.
        if self.indicates_failure is not None:

            # If the rc is not the one indicating failure, make the result
            # equal rc and rc equal ok.
            if rc != self.indicates_failure:
                result = rc
                rc = ok_code

            # If the rc indicates failure, make the rc the canonical failure
            # value. Note that this is necessary because some functions (e.g.
            # mmlist_get()) use numeric values to indicate failure, and
            # sometimes these values coincide with the OK values.
            else:
                rc = self.return_code.error
        # If the rc is not the ok value, raise an MmException.
        if rc != ok_code:
            raise MmException(self, args, rc)

        return result

    def __repr__(self):
        return '<wrapped function {} at {}>'.format(self.function.__name__,
                                                    hex(id(self)))


# Set _trace_logging to True to get a log of all mm function calls printed
# to stderr.
_trace_logging = False
if os.environ.get('SCHRODINGER_PYTHON_MMLIBS_TRACE'):
    _trace_logging = True

if _trace_logging:
    _Wrapper.__call__ = _trace_log(_Wrapper.__call__)

if _trace_logging:
    log.logging_config(level=logging.DEBUG, stream=sys.stderr)
else:
    log.default_logging_config()

_logger = logging.getLogger("schrodinger.infra.mm")

pymmlibs.mmerr_initialize()
pymmlibs.mmerr_queue_on(pymmlibs.MMERR_DEFAULT_HANDLER, 1)

IGNORED_SWIG_FUNCTIONS = {
    'SWIG_PyInstanceMethod_New',
    'SWIG_PyStaticMethod_New',
    'SwigBoolStringMap_swigregister',
    'SwigBoolVector2_swigregister',
    'SwigBoolVector3_swigregister',
    'SwigBoolVector_swigregister',
    'SwigChmAtomIntPairVector_swigregister',
    'SwigChmAtomIntPair_swigregister',
    'SwigDoubleVector2_swigregister',
    'SwigDoubleVector3_swigregister',
    'SwigDoubleVector_swigregister',
    'SwigFloatVector2_swigregister',
    'SwigFloatVector3_swigregister',
    'SwigFloatVector_swigregister',
    'SwigInt64Vector2_swigregister',
    'SwigInt64Vector3_swigregister',
    'SwigInt64Vector_swigregister',
    'SwigIntDoubleMap_swigregister',
    'SwigIntIntMap_swigregister',
    'SwigIntIntPair_swigregister',
    'SwigIntVector2_swigregister',
    'SwigIntVector3_swigregister',
    'SwigIntVector_swigregister',
    'SwigPyIterator_swigregister',
    'SwigStringBoolMap_swigregister',
    'SwigStringDoubleMap_swigregister',
    'SwigStringFloatMap_swigregister',
    'SwigStringInt64Map_swigregister',
    'SwigStringIntMap_swigregister',
    'SwigStringStringMap_swigregister',
    'SwigStringUint64Map_swigregister',
    'SwigStringUintMap_swigregister',
    'SwigStringVector2_swigregister',
    'SwigStringVector3_swigregister',
    'SwigStringVector_swigregister',
    'SwigUInt64DoubleMap_swigregister',
    'SwigUInt64FloatMap_swigregister',
    'SwigUInt64UintMap_swigregister',
    'SwigUInt64Vector2_swigregister',
    'SwigUInt64Vector3_swigregister',
    'SwigUInt64Vector_swigregister',
    'SwigUintDoubleMap_swigregister',
    'SwigUintFloatMap_swigregister',
    'SwigUnsignedIntVector2_swigregister',
    'SwigUnsignedIntVector3_swigregister',
    'SwigUnsignedIntVector_swigregister',
    'SwigVectorIntIntPair_swigregister',
    'ifstream_swigregister',
    'istream_swigregister',
    'new_instancemethod',
    'ofstream_swigregister',
    'ostream_swigregister',
    'weakref_proxy',
}


[docs]def wrap_module(module_dict, return_codes, ignored_functions, failure_dict, inserted_module_globals=None): """ Create wrappers for all of the mmlibs and place them into the module's global namespace. :type return_codes: dict :param return_codes: Dict of ReturnCode objects, as generated by generate_return_codes :type ignored_functions: iterable of strings :param ignored_functions: are functions that are created by swig which don't match :type failure_dict: dict :param failure_dict: error code returned if the function encounters an error, when it is not covered by the normal return_codes dict """ ignored_functions = ignored_functions.union(IGNORED_SWIG_FUNCTIONS) #Applicable if you are adding to a different globals() location than the origin #This is mostly legacy behavior to support mm.py behavior if inserted_module_globals: module_globals = inserted_module_globals else: module_globals = module_dict profile_c_code = os.environ.get("SCHRODINGER_PYTHON_CPROFILE", False) # Add all public members of the internal module to the module globals, #wrapping each function with the _Wrapper class to automate error checking. for name, o in module_dict.items(): # Skip "private" objects of pymmlibs. if name[0] == '_': continue # If failure value is Null, it means that the function can never # return a code indicating failure. Therefore, there's no need to # wrap it! fail_code = failure_dict.get(name, None) if (callable(o) and name not in ignored_functions and (fail_code is not Null or _trace_logging or profile_c_code)): prefix = _get_mm_prefix(name) # If there is no prefix, don't wrap the function. if prefix is None: module_globals[name] = o continue # This module assumes all functions are named mmfoo_*. If one # isn't, then the prefix won't be found and we won't know what # return codes to apply. If this happens, the mmlib writer can # change the name of the function or possibly modify this # module. try: rc = return_codes[prefix] except KeyError: raise Exception( "No return codes have been defined for callable function '%s'. Please add them to the appropriate *.i file in the swig directory." % name) if profile_c_code: o = _CCodeProfiler(o) module_globals[name] = _Wrapper(o, rc, fail_code) else: module_globals[name] = o
def _get_mm_prefix(name): """ Get the mmlib prefix from a function or variable name. The prefix is the lowercased string up to (but not including) the first underscore. If no underscore is present, return None. """ try: ix = name.find("_") if ix < 0: return None return name[:ix].lower() except: return None
[docs]class MmException(Exception): """ An exception class that specifically indicates the failure of an mmlibs call. The underlying value of the error code returned by the mmlib function can be retrieved through the 'rc' attribute of the exception. The name of the return code can be retrieved through the 'rc_name' attribute of the exception. """
[docs] def __init__(self, wrapped_function, args, rc): """ Initialize with the wrapped function, arguments used, and the return code. """ self.wrapped_function = wrapped_function self.rc = rc self.function_name = wrapped_function.function.__name__ self.args = args error_handler = pymmlibs.MMERR_DEFAULT_HANDLER err, msg = pymmlibs.mmerr_queue_get(error_handler) if err == pymmlibs.MMERR_OK: self.mmlib_message = msg else: self.mmlib_message = None pymmlibs.mmerr_errclear(error_handler) try: self.rc_name = wrapped_function.return_code.get_name(self.rc) except KeyError: self.rc_name = "unknown"
def __str__(self): if self.mmlib_message: msg = " with message: \"%s\"" % re.sub(r'\n*$', "", self.mmlib_message) else: msg = (". See the mmerr log (stderr by default) for messages " "from the mmlibs.") return ("%s returned error code %d (%s) for arguments %s%s" % (self.function_name, self.rc, self.rc_name, self.args, msg)) def __reduce__(self): # This allows MmExceptions to be pickled return type(self), (self.wrapped_function, self.args, self.rc)
[docs]class ReturnCode: """ A class to hold special return code values and translate return code values into names. """ # a pattern for mmlib success codes ok_pattern = re.compile("^[A-Z][A-Z0-9]*_OK$") # a pattern for mmlib failure codes error_pattern = re.compile("^[A-Z][A-Z0-9]*_(ERR(OR)?|BUMMER)$")
[docs] def __init__(self): self.ok = None self.error = None self.code_names = {}
[docs] def add_code(self, module, name, value=None): """ Add a code name and value to the object. If value is not provided, look it up in the pymmlibs module from the name given. """ if value is None: value = module[name] self.code_names[value] = name # If the name matches the ok/error regexps, set the ok/error # attributes. if ReturnCode.ok_pattern.match(name): self.ok = value elif ReturnCode.error_pattern.match(name): self.error = value
[docs] def get_name(self, value): """ Return the name associated with the value. """ return self.code_names[value]
[docs]def generate_return_code_dict(global_dict, code_lists): """ Set up a dictionary of ReturnCode objects, indexed by the mmlib prefix names. This dictionary is used to translate error codes into names so errors make more sense to the humans reading the error message. :param global_dict: globals() dict of the module with these return codes :param code_lists: Each member of the code_lists list is a list of the names of the return codes for a specific mmlib. Please keep the list sorted by mmlib prefix so it is easy to find the proper place to add new return codes. The return codes are listed explicitly to speed the import of this module. :type code_lists: list """ return_codes = {} for code_list in code_lists: rc = ReturnCode() for code in code_list: rc.add_code(global_dict, code) return_codes[_get_mm_prefix(code)] = rc return return_codes
def _c_function_proxy(fn, *args): """ This function is a python level proxy for the C function 'fn'. """ return fn(*args) class _CCodeProfiler: """ A class for profiling the time spent in C functions. Its main usefulness is in getting relative timings among the C functions. Use of this class is enabled by setting the SCHRODINGER_PYTHON_CPROFILE environment variable to any value. It works by giving each wrapped C function a python wrapper with a unique name so they can be distinguished in the profile. Obviously, this will add more overhead to each C function call, so expect profiles to show a greater time spent in C wrappers than normal. """ # Based on version 1.2 of Recipe 81535 in the Python Cookbook, # published under the Python license. # # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/81535 def __init__(self, fn): name = fn.__name__ filename = "<wrapped pymmlibs>" # The profiler only tracks python functions, so we create a python # function for each underlying C function to enable profiling. # # Use the 'types' module to create a code object that we use to # create a Python function with a name that is the same as the C # function. func_code = _c_function_proxy.__code__ new_code = types.CodeType(func_code.co_argcount, func_code.co_nlocals, func_code.co_stacksize, func_code.co_flags, func_code.co_code, func_code.co_consts, func_code.co_names, func_code.co_varnames, filename, name, 1, func_code.co_lnotab) self.fn = fn self.wrapper = types.FunctionType(new_code, globals()) self.__name__ = self.fn.__name__ def __call__(self, *args): return self.wrapper(self.fn, *args)