Source code for schrodinger.application.matsci.msutils

"""
Utility functions and classes for MatSci workflows.

Copyright Schrodinger, LLC. All rights reserved.
"""

import collections
import contextlib
import copy
import csv
import os
import re
import shlex
import shutil
import textwrap
import warnings
from collections.abc import Mapping
from contextlib import nullcontext
from distutils import util as dutil

import numpy
from requests.packages.urllib3.exceptions import InsecureRequestWarning

import schrodinger
from schrodinger import structure
# Matsci modules except msprops and codeutils should not be imported here
from schrodinger.application.desmond import cms
from schrodinger.application.matsci import msprops
from schrodinger.infra import mm
from schrodinger.structutils import analyze
from schrodinger.test import ioredirect
from schrodinger.utils import fileutils
from schrodinger.utils.license import is_opls2_available

maestro = schrodinger.get_maestro()

APP_DIR = 'matsci_templates'

# -2 is the official dummy atomic number, but some sources use 0, such as
# Maestro builder ligands (MATSCI-6328, SHARED-6160)
DUMMY_ATOMIC_NUMBERS = {0, -2}
DUMMY_ELEMENT_SYMBOLS = {'Du', 'DU'}
DUMMY_ATOM_NAMES = {'Xpi'}  # Jaguar mmjag uses this

PROP_TYPE_GET_DICT = {
    'r': mm.mmct_atom_property_get_real,
    'b': mm.mmct_atom_property_get_bool,
    'i': mm.mmct_atom_property_get_int,
    's': mm.mmct_atom_property_get_string,
}

RMTREE_FAILED = 'rmtree_failed'

Forcefield = collections.namedtuple('Forcefield', 'version name')

SDGR_INDEX = 'schrodinger_index'


[docs]@contextlib.contextmanager def with_global_as(glob_var, new_var): """ Context manager which updates global variable with new variable and restores with old value when execution leaves context manager :type glob_var: Any :param glob_var: The global variable which gets updated with given new variable :type new_var: Any :param new_var: The new variable with which global variable gets updated """ orig_var = glob_var globals().update(glob_var=new_var) yield globals().update(glob_var=orig_var)
[docs]def get_default_forcefield(): """ Returns information for S-OPLS if license is found. If no license is found, returns information for OLPS2005, which requires no license. :returns namedtuple forcefield: Named 2-tuple containing the `version` and `name` of the default forcefield, respectively """ # OPLS_2005 does not allow ZOB to sp3 carbon while OPLS3 (now known as # S-OPLS) does if is_opls2_available(): forcefield = Forcefield(mm.OPLSVersion.F16, mm.OPLS_NAME_F16) # Fall back to OPLS_2005 if an S-OPLS license is not available else: forcefield = Forcefield(mm.OPLSVersion.F14, mm.OPLS_NAME_F14) return forcefield
[docs]def remove_properties(struct, props=None, matches=None, atom_props=None, atom_matches=None): """ Remove all the matching structure and atom properties. No error is thrown if the given properties do not actually exist on the structure. :type struct: `structure.Structure` or `cms.Cms` :param struct: The structure to remove properties from :type props: list :param props: A list of structure properties to delete :type matches: list :param matches: Remove all structure properties whose name contains any of these strings :type atom_props: list :param atom_props: A list of atom properties to delete :type atom_matches: list :param atom_matches: Remove all atom properties whose name contains any of these strings """ def _get_proplist(prop_names, given, strings): if given is None: proplist = [] else: proplist = given[:] if strings is not None: for name in prop_names: for astring in strings: if astring in name: proplist.append(name) return proplist is_cms = isinstance(struct, cms.Cms) for prop in _get_proplist(struct.getPropertyNames(), props, matches): if is_cms: struct.remove_cts_property(prop) else: struct.property.pop(prop, None) for prop in _get_proplist(struct.getAtomPropertyNames(), atom_props, atom_matches): remove_atom_property(struct, prop)
[docs]def remove_atom_property(struct, prop): """ Delete atom property from all atoms in a structure (structure will be modified). :type struct: `structure.Structure` or `cms.Cms` :param struct: Structure object to be modified :type prop: str :param prop: Atom property to be removed """ if isinstance(struct, cms.Cms): remove_cms_atom_property(struct, prop) struct.deletePropertyFromAllAtoms(prop)
[docs]def remove_cms_atom_property(cms_model, propname): """ Delete an atom property from a cms model :param cms_model: cms model :type cms_model: `cms.Cms` :param propname: property name :type propname: str """ for struct in [cms_model._raw_fsys_ct, cms_model.fsys_ct ] + cms_model.comp_ct: remove_atom_property(struct, propname)
[docs]def has_atom_property(struct, prop): """ Check if structure has any atom with the property set. :param structure.Structure: Input structure :param str: Property name :raise KeyError: If property name doesn't start with: `s_`, `r_`, `i_`, `b_` :raise mm.MmException: If unexpected error occurred :return bool: True of property is present, False otherwise """ if struct.atom_total == 0: return False # get function based on the property first char funct = PROP_TYPE_GET_DICT[prop[0]] try: funct(struct, prop, 1) except mm.MmException as mmexc: if mmexc.rc == mm.MMCT_ATOM_PROPERTY_NOT_DEFINED_IN_CT: # Property does not exist on any atom return False elif mmexc.rc == mm.MMCT_ATOM_PROPERTY_UNDEFINED_ATOM: # Property exists on at least one atom, but not this one return True else: # An unexpected error occurred, don't be silent about it raise return True
[docs]def getstr(ret): """ Convert binary string (or other data) to str. :type ret: binary_type or any other type convertable to str :param ret: Value to be converted to str :rtype: str :return: Value converted to str """ if isinstance(ret, bytes): ret = ret.decode() return str(ret)
[docs]def get_project_group_hierarchy(st=None, row=None): """ Return the project group hierarchy for the given structure or row. :type st: `schrodinger.structure.Structure` :param st: the structure :type row: `schrodinger.project.ProjectRow` :param row: the project row :raise ValueError: if there is an issue :rtype: list :return: the hierarchy (outermost to innermost) """ if (not st and not row) or (st and row): msg = ('Either a structure or a project row must be given.') raise ValueError(msg) if st: hierarchy = st.property.get(mm.M2IO_DATA_SUBGROUP_TITLE) if hierarchy: return hierarchy.split(mm.M2IO_SUBGROUP_SEPARATOR) eid = st.property.get(msprops.ENTRY_ID_PROP) if eid is None: msg = ('Structure has no entry ID.') raise ValueError(msg) if not maestro: msg = ('No Maestro session is active.') raise ValueError(msg) pt = maestro.project_table_get() row = pt.getRow(eid) if not row: msg = ('Structure is not in the project.') raise ValueError(msg) if row: hierarchy = row.property.get(mm.M2IO_DATA_SUBGROUP_TITLE) if hierarchy: return hierarchy.split(mm.M2IO_SUBGROUP_SEPARATOR) hierarchy = [] group = row.group while group: hierarchy.append(group.name) group = group.getParentGroup() hierarchy.reverse() return hierarchy
[docs]def set_project_group_hierarchy(st, hierarchy, collapsed=False): """ Set the project group hierarchy for the given structure. :type st: `schrodinger.structure.Structure` :param st: the structure :type hierarchy: list :param hierarchy: the hierarchy (outermost to innermost) :param bool collapsed: Whether the group should initially be collapsed """ if hierarchy: hierarchy = mm.M2IO_SUBGROUP_SEPARATOR.join(hierarchy) st.property[mm.M2IO_DATA_SUBGROUP_TITLE] = hierarchy st.property[mm.M2IO_DATA_SUBGROUPID] = hierarchy st.property[mm.M2IO_DATA_SUBGROUP_COLLAPSED] = collapsed
[docs]def get_matsci_user_data_dir(): """ Get the absolute path to the user's local MatSci data directory for storing custom templates, protocols, etc. Directory is created if it doesn't exist. :rtype: str :return: The absolute path the Materials Science data parent directory """ data_dir = os.path.join( fileutils.get_directory_path(fileutils.LOCAL_APPDATA), APP_DIR) fileutils.mkdir_p(data_dir) return data_dir
[docs]def structure_reader(filename, log=None, do_raise=False): """ Read structures from a file until the end or the first structure with an error. :param str filename: filename :param function log: Log function, if None, nothing is called :param bool do_raise: Whether to raise on an exception :yield schrodinger.structure.Structure: Next structure in the file """ # Send to /dev/null if log is None log = log if log else lambda x: None try: with structure.StructureReader(filename) as reader: yield from reader except: log('Failed to read a structure from %s.' % filename) if do_raise: raise
[docs]def is_dummy_atom(atom): """ Return True if the given atom is a dummy atom. :type atom: schrodinger.structure._StructureAtom :param atom: the atom :rtype: bool :return: return True if the given atom is a dummy atom """ return (atom.atomic_number in DUMMY_ATOMIC_NUMBERS or atom.element in DUMMY_ELEMENT_SYMBOLS or atom.name in DUMMY_ATOM_NAMES)
[docs]def validate_no_dummy_atoms(structs): """ Validate that the passed structures don't have dummy atoms :param iterable structs: Structures to check :rtype: bool or (bool, str) :rtype: True if no structures has dummy atoms, False and error message if at least one structure does """ for struct in structs: if any(is_dummy_atom(atom) for atom in struct.atom): return False, (f'At least one structure ({struct.title}) has dummy ' 'atoms and cannot be used with this workflow.') return True
[docs]def add_or_update_bond_order(ct, atom1, atom2, bond_order): """ Create a new bond, or update the existing bond order of this bond. :type ct: schrodinger.structure.Structure :type atom1: schrodinger.structure._StructureAtom :type atom2: schrodinger.structure._StructureAtom :type bond_type: int (0-3) :rtype: schrodinger.structure._Bond """ bond = ct.getBond(atom1, atom2) if not bond: return ct.addBond(atom1, atom2, bond_order) bond.order = bond_order return bond
[docs]def add_or_update_bond_type(ct, atom1, atom2, bond_type): """ Create a new bond, or update the existing bond type of this bond. :type ct: schrodinger.structure.Structure :type atom1: schrodinger.structure._StructureAtom :type atom2: schrodinger.structure._StructureAtom :type bond_type: schrodinger.structure.BondType :rtype: schrodinger.structure._Bond """ bond = ct.getBond(atom1, atom2) if not bond: return ct.addBond(atom1, atom2, bond_type) bond.type = bond_type return bond
[docs]def trim_str(text, max_len, suffix='...'): """ Trim the string to approximately max_len. Add a suffix if the string is longer than max_len. :param text: String to trim :param int max_len: Max length of the string :param str suffix: Suffix to add if the string is to be trimmed :return str: Trimmed string """ # For usage see msutils_test.py :: test_trim_str trim_len = len(suffix) if max_len <= len(suffix): trim_len = 0 if len(text) > max_len: return text[:max_len - trim_len] + suffix return text
[docs]@contextlib.contextmanager def mmlewis_apply(quiet=True): """ Context manager that initializes mm and returns mm.mmlews_apply method. Example usage: with msutils.mmlewis_apply() as lewis_apply: assert lewis_apply(struct) is None :yield: mm.mmlewis_apply method. :raise mm.MmException: On mmlewis_apply failure """ con_man = ioredirect.IOSilence() if quiet else nullcontext() mm.mmerr_initialize() mm.mmlewis_initialize(mm.MMERR_DEFAULT_HANDLER) with con_man: yield mm.mmlewis_apply mm.mmlewis_terminate() mm.mmerr_terminate()
[docs]def get_atom_ffio_velocity(atom): """ Get FFIO atom velocities. :param structure._StructureAtom atom: Input atom :return numpy.array: Array of velocities """ vel = [ atom.property.get(msprops.FFIO_ATOM_VEL(axis), 0.) for axis in ['x', 'y', 'z'] ] return numpy.array(vel)
[docs]def set_atom_ffio_velocity(atom, velocity): """ Set FFIO atom velocities. :param structure._StructureAtom atom: Atom to modify :param list velocity: List of velocities (x, y, z components) """ for val, axis in zip(velocity, ['x', 'y', 'z']): atom.property[msprops.FFIO_ATOM_VEL(axis)] = val
[docs]def get_unique_name(new_name, existing_names): """ Add a suffix to new_name to make a unique name if it already exists in existing_names. :param str new_name: The new name :param list existing_names: Existing names :rtype: str :return: The unique version of new_name """ names_set = set(existing_names) unique_name = new_name index = 1 while unique_name in names_set: unique_name = new_name + f"_{index}" index += 1 return unique_name
[docs]def get_next_name(name): """ Get the next customer facing name. For example, 'xx' gives 'xx (1)' and 'xx (n)' gives 'xx (n + 1)' :param str name: The name based on which the next is generated :rtype: str :return: The unique version of new_name """ matches = re.compile('\\(([0-9]*?)\\)$').search(name) if not matches: name += ' (1)' return name splitted = name.split('(') splitted[-1] = str(int(splitted[-1][:-1]) + 1) + ')' name = '('.join(splitted) return name
[docs]def setting_to_bool(string, empty_is_false=True): """ Convert a yes/no/true/false/1/0/on/off type string to a Python boolean :param str string: The string to convert :param empty_is_false: If the string is empty or None, return False :rtype: bool :return: True if the string is a "true"-y word (TRUE, true, t, yes, on, 1, etc), False if it is a "false"-y word (FALSE, false, f, no, off, 0). :raise ValueError: If the string cannot be interpreted in a True/False manner :raise AttributeError: If something other than a string is passed in """ if empty_is_false and (string is None or string == ""): return False return bool(dutil.strtobool(string))
[docs]def flatten(alist, afunc=None): """ Flatten the given list into a set. :type alist: list :param alist: elements contain iterable data :type afunc: function or None :param afunc: function used to extract iterable data from the given list elements or None if there isn't one :rtype: set :return: a flattened set of data from the given list """ if afunc is None: afunc = lambda x: x return set([j for i in alist for j in afunc(i)])
[docs]def get_unique_ordered_list(l_values): """ Remove the duplicates from the list while maintaining the order. If there are duplicates then the value with lower index is kept :param l_values: The list of values to make unique :type l_values: list :returns: The unique ordered list. :rtype: list """ seen_vals = set() seen_add = seen_vals.add return [x for x in l_values if not (x in seen_vals or seen_add(x))]
[docs]def get_atomic_element(atomic_number): """Given atomic number return chemical element. :param int atomic_number: Atomic number :rtype: str :return: Chemical element """ # TODO: Remove this when SHARED-7629 is implemented return mm.mmat_get_element_by_atomic_number(atomic_number).rstrip()
[docs]@contextlib.contextmanager def ignore_ssl_warnings(): """ Context manager to temporarily ignore InsecureRequestWarning warning. """ # Disable InsecureRequestWarning - currently our ssl distribution is platform # dependent, and our clients are not uniform in their ssl configuration, # so we cannot use ssl verification. with warnings.catch_warnings(): warnings.simplefilter('ignore', InsecureRequestWarning) yield
[docs]def get_index_from_default_name(atom_name): """ Find the atom index from string of element name and atom index :type atom_name: str :param atom_name: concatenated string of element symbol with the atom index :rtype index: int or None :para atom_index: Atom index """ index = re.findall(r'\d+', atom_name) if not index: return return int(index[0])
[docs]def title_case(original, exceptions=('an', 'of', 'the', 'for'), skip_single_letters=True): """ Convert the string to title case, optionally ignoring articles and single letters Examples with default kwargs: "Number of molecules": "Number of Molecules" "axis b": "Axis b" :param str original: The string to make title case :param tuple exceptions: The words to not capitalize :param bool skip_single_letters: Whether single letters should not be capitalized :rtype: str :return: The string in title case """ parts = re.split(' ', original) title_parts = [parts[0].capitalize()] for part in parts[1:]: if part in exceptions or part.isupper() or \ (skip_single_letters and len(part) == 1): title_parts.append(part) else: title_parts.append(part.capitalize()) return ' '.join(title_parts)
[docs]def generate_smiles(struct): """ Return a SMILES string for `st`. For more options, see the `schrodinger.structutils.smiles.SmilesGenerator` class. :type struct: `Structure` :param struct: Structure for which SMILES string is desired. :rtype: str :return: SMILES string representing `st`. """ # TODO: investigate RKDIT for generating smiles return analyze.generate_smiles(struct)
[docs]def get_common_property_names(sts): """ Return the property names that all of the given structures have in common. :type sts: list[`schrodinger.structure.Structure`] :param sts: the structures to search :rtype: set[str] :return: the common property names """ names = set(sts[0].property) for st in sts[1:]: names = names.intersection(st.property) return names
[docs]def get_common_float_property_names(sts): """ Return the float property names that all of the given structures have in common. :type sts: list[`schrodinger.structure.Structure`] :param sts: the structures to search :rtype: set[str] :return: the common float property names """ names = set() for name in get_common_property_names(sts): prop = structure.PropertyName(dataname=name) if prop.type == structure.PROP_FLOAT: names.add(name) return names
[docs]def get_common_atom_property_names(sts): """ Return the property names that all atoms of the given structures have in common. :type sts: list[`schrodinger.structure.Structure`] :param sts: the structures to search :rtype: set[str] :return: the common atom property names """ names = None for st in sts: # this can be slow for large structures for atom in st.atom: if names is None: names = set(atom.property) else: names = names.intersection(atom.property) return names
[docs]def get_common_float_atom_property_names(sts): """ Return the float atom property names that all of the given structures have in common. :type sts: list[`schrodinger.structure.Structure`] :param sts: the structures to search :rtype: set[str] :return: the common float atom property names """ names = set() for name in get_common_atom_property_names(sts): prop = structure.PropertyName(dataname=name) if prop.type == structure.PROP_FLOAT: names.add(name) return names
[docs]def is_coarse_grain(struct, by_atom=False): """ Check if struct is a coarse grain structure :type struct: `schrodinger.structure.Structure` :param struct: The structure to check :type by_atom: bool :param by_atom: If True, check each atom to see if it is coarse grain and return True if any atom is coarse grin. If False, check only for the coarse grain structure property. True is useful when the structure has been obtained via maestro.workspace_get, which removes structure-level properties, or if the structure may be a mixed atomistic/coarse-grained structure. :rtype: bool :return: True if it is a coarse grain structure, False if not """ if not struct: # Handle cases where None is passed in by panels that do not have # structures loaded yet return False if by_atom: return mm.mmct_ct_has_coarse_grain_atom(struct) else: return struct.property.get(msprops.REP_TYPE_KEY) == msprops.CG_REP_TYPE
[docs]def structure_reader_to_3d(file_path, require_stereo=False, out_rep=None, debug=False): """ Read structures from a file and return 3D representations. :param str file_path: the file, can be of any format supported by `schrodinger.structure.StructureReader` or `schrodinger.structure.SmilesReader` :param bool require_stereo: see `schrodinger.structure.Structure.generate3dConformation` :type out_rep: str or None :param out_rep: For eta-bound transition metal complexes, if None then the conversion is to the opposite of the given representation, eta to centroid or centroid to eta, if a string then must be either module constant parserutils.CENTROID or parserutils.ETA in which case the conversion will always provide an output representation of the given type :param bool debug: If debug is True it prints output else it doesn't :rtype: list[`schrodinger.structure.Structure`] :return: 3D structures """ # importing locally to avoid circular imports from schrodinger.application.matsci import buildcomplex2d from schrodinger.application.matsci import etatoggle as tesu # see MATSCI-10912 where fast3d can not be used to volumize eta-bound # metal complexes if file_path.endswith(fileutils.SDF_EXT): con_man = nullcontext() if debug else ioredirect.IOSilence() with con_man: try: st = buildcomplex2d.ComplexSdfToMae(file_path).run() except buildcomplex2d.ComplexSdfToMaeException: # the input sdf could have been of a type not handled by # buildcomplex2d and so try the standard approach pass else: tesu.toggle_structure(st, out_rep=out_rep) return [st] # see SHARED-7822 for adding SMILES support to StructureReader, # but for now do this manually try: sts = list(structure.StructureReader(file_path)) except ValueError as err: try: reader = structure.SmilesReader(file_path) except Exception: # intentionally only raise the original ValueError rather # than this Exception raise err from None else: # uses canvas sts = [smi_st.get2dStructure() for smi_st in reader] # uses fast3d for st in sts: st.generate3dConformation(require_stereo=require_stereo) return sts
[docs]def keyword_string_to_dict(keystring): """ Return a dictionary whose keys are keywords and values are keyword values :type keystring: str :param keystring: The keywords are taken from this string - keywords must be in the keyword=value format and whitespace delimited. :rtype: dict :return: Dictionary of keyword/value pairs :raise ValueError: if any tokens do not match the keyword=value format """ keylist = keystring.split() keydict = {} for keyvalue in keylist: if not keyvalue[0].isalnum(): raise ValueError('Keywords must begin with a letter or number, ' '%s does not' % keyvalue) msg = 'An invalid keyword syntax was encountered: %s' % keyvalue try: key, value = keyvalue.split('=') except ValueError: raise ValueError(msg) if not key or not value: raise ValueError(msg) keydict[key] = value return keydict
[docs]def keyword_dict_to_string(keydict): """ Return a string of keywords specified by keydict. :type keydict: dict :param keydict: Dictionary - keys are Jaguar keywords, values are keyword values of str type :rtype: str :return: A string of space-separated keyword=value pairs """ # 'fix_for_cmdline' argument has been removed (MATSCI-3026) keystring = "" for key, value in keydict.items(): keystring += '%s=%s ' % (key, str(value)) keystring = keystring.rstrip() return keystring
[docs]def is_equal_formula(sts1, sts2): """ Whether lists of structures have the same total chemical formula (mass conservation). :param list[structure.Structure] sts1: List of structures :param list[structure.Structure] sts2: Other list of structures :rtype: bool :return: Whether structures have same total chemical formula """ # In principle sts1, sts2 can be replaced with *args to take arbitrary # number of structure lists # Fast check for atom totals sums = [sum(st.atom_total for st in sts) for sts in (sts1, sts2)] if len(set(sums)) != 1: return False # most_common is used since we want frozenset of touples not a dict numbers = [ frozenset( collections.Counter(atom.atomic_number for st in sts for atom in st.atom).most_common()) for sts in (sts1, sts2) ] return len(set(numbers)) == 1
[docs]def deep_update_dict(source, overrides): """ Override/append source dict values using overrides dict, return a new dict. Everything is deepcopied to prevent unexpected changes. :type source: dict :param source: Source dictionary :type overrides: dict :param overrides: Dictionary to override with :rtype: dict :return: Updated dictionary """ ret = copy.deepcopy(source) for key, value in overrides.items(): if isinstance(value, Mapping) and value: returned = deep_update_dict(source.get(key, {}), value) ret[key] = copy.deepcopy(returned) else: ret[key] = copy.deepcopy(overrides[key]) return ret
[docs]def get_interface_normal(struct): """ Define normal interface vector in c direction. :type struct: `schrodinger.structure.Structure` :param struct: structure """ max_z = struct.getXYZ().max(axis=0)[2] return '0:0:%.3f:0:0:%.3f' % (max_z, max_z + 1.)
[docs]def get_val_from_cmdline(args, var, default=None): """ Get value from the command line given variable name. :param args: Arguments. If string it will be split into a list :type args: list or str :param str var: Variable name :param Any default: Default value if var is not found """ if isinstance(args, str): args = shlex.split(args) for idx, arg in enumerate(args): if arg == var and len(args) > idx + 1: return args[idx + 1] return default
[docs]def count_waters(filename=None, struct=None): """ Count the number of waters in the first structure in the given file :type filename: str :param filename: The path to a structure file :type struct: `schrodinger.structure.Structure` :param filename: A structure object. Either filename or struct must be given :rtype: `schrodinger.structure.Structure`, int :return: The first structure in the file and the number of waters in it """ if not bool(filename) ^ bool(struct): raise RuntimeError('Either filename or struct must be given but not ' 'both') if filename: struct = structure.Structure.read(filename) # Note - can't use the 'water' alias because it detects bridging O as water # SHARED-6231, MATSCI-6464 water_asl = ('(/H0-O0-H0/ and not /H0-O0(-H0)-H0/) and atom.ele O') return struct, len(analyze.evaluate_asl(struct, water_asl))
# see MATSCI-2737 and SUPPORT-71364
[docs]def force_rmtree_resist_nfs(removal_dir, logger=None, failed_dir=RMTREE_FAILED): """ Force remove a directory tree or if it contains stale NFS handles then move it to the specified failure repository. :param str removal_dir: The directory tree to be removed :param logging.Logger logger: The logging object you want to throw an error to if we need to move the folder rather than delete it. :param str failed_dir: The name of a failure repository/directory to put the removed directory into """ try: fileutils.force_rmtree(removal_dir) except OSError: os.makedirs(failed_dir, exist_ok=True) if logger is not None: msg = dedent(f""" Removal of directory {removal_dir} has failed because it contains open files. Moving this directory to {failed_dir} and proceeding. """) logger.warning(msg) shutil.move(removal_dir, failed_dir)
[docs]def dedent(msg, rm_breaks=True): ''' Light wrapper for the `textwrapper.dedent` function, but run with with a `strip()` afterwards too. Can also remove mid-string line breaks. Useful when paired with a pattern such as: msg = (""" Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. """) This function will remove the leading & trailing linebreaks, and it will also fix the indentations at the beginning of each new line. :param str msg: The message string that you want to dedent. :param bool rm_breaks: Whether or not you want to replace all linebreaks with spaces. Typically more useful to turn off when making tooltips. ''' dedented_msg = textwrap.dedent(msg).strip() if rm_breaks: dedented_msg = dedented_msg.replace('\n', ' ') return dedented_msg
[docs]def write_csv_from_dicts(filename, rows): """ Write a csv file from a list of dicts. Column headers will be the superset of all keys in the dictionaries in the rows list. :param str filename: The name of the file to write to :param list rows: A list of dicts, one for each row in the csv. Keys are property names (column headers), values are row values for that column. Each dictionary need not have key/value pairs for all columns. """ proplist = set() for row in rows: proplist.update(row.keys()) headers = sorted(proplist) with open(filename, 'w', newline='') as propfile: writer = csv.DictWriter(propfile, fieldnames=headers) writer.writeheader() for row in rows: writer.writerow(row)
[docs]def is_family_prop(prop, family): """ Check if property is of a family. :param str prop: Property :param PROP_FAMILY family: Family :rtype: bool :return: Whether property is from family """ assert isinstance( family, msprops.PROP_FAMILY), ('Unknown property family: %s' % family) for var_type in PROP_TYPE_GET_DICT: if prop.startswith('%s_%s_' % (var_type, family.name)): return True return False