Source code for schrodinger.application.matsci.rxn_channel

"""
Classes and functions for building reaction channels.

Copyright Schrodinger, LLC. All rights reserved."""

# Contributor: Thomas F. Hughes

import os
import re
import sys
import time
import warnings
from collections import OrderedDict

import numpy

import schrodinger.structure as structure
from schrodinger.application.matsci import amorphous
from schrodinger.application.matsci import msprops
from schrodinger.infra import fast3d
from schrodinger.infra import mm
from schrodinger.structutils import analyze
from schrodinger.structutils import build
from schrodinger.structutils import minimize
from schrodinger.structutils import rings
from schrodinger.structutils import ringspear
from schrodinger.structutils import transform
from schrodinger.test import ioredirect
from schrodinger.utils import fileutils

# MATSCI-5124 - in contrast to the random module
# the methods in the numpy.random module are consistent
# between Python 2 and 3
my_random = numpy.random.RandomState()

# ring spear elimination failure key
FAILED_SPEAR_ELIM_KEY = 'b_matsci_Failed_Ring_Spear_Elimination'

# Two vectors with a dot product < this are considered perpendicular
PERPENDICULAR_DOT_PROD_THRESH = 0.05


[docs]def detect_tetrahedral(bond_vecs): """ Detect tetrahedral geometry :param list(numpy.array) bond_vecs: Each item is a numpy.array(x, y, z) for the vector from the atom in question along a bond :rtype: bool :return: Whether the atom with these bond vectors is tetrahedral (4 bonds with a bond angle sum >> 360 """ # Must have 4 bonds to be tetrahedral if len(bond_vecs) != 4: return False # Accumulate the angles between all bond vectors angles = [] for index, vec1 in enumerate(bond_vecs): for vec2 in bond_vecs[index + 1:]: radians = transform.get_angle_between_vectors(vec1, vec2) angles.append(numpy.degrees(radians)) # For the square planar case (which is what we are trying to distinguish # from, there will be 4 nearest neighbor (cis) angles that are near 90, and # 2 angles with trans bonds that are near 180. The tetrahedral case will # have all 6 angles near 109.5. So sort the bond angles and check the sum of # the smallest 4 angles. angles.sort() nearest_neighbor_angles = angles[:4] angle_sum = sum(nearest_neighbor_angles) # Square planer = 360, tetrahedral ~= 438 return angle_sum > 400
[docs]def trigonal_bonding_vector(bond_vecs, director=None): """ Find the optimal 4th bond vector to add to a system with 3 existing bond vectors. For the case of an atom with 3 bonds, the 4th bond vector should be normal to the plane formed by the 3 bonded atoms. The is true whether the system is trigonal planar or trigonal pyramidal. :param list(numpy.array) bond_vecs: Each item is a numpy.array(x, y, z) for the vector from the atom in question along a bond :param numpy.array director: In the case where the chosen bond vector is at a 90 degree angle to the existing bond_vecs, the up/down direction of the bond vector is ambiguous. Choose the direction that best aligns with director. :rtype: numpy.array :return: The optimal next bonding vector pointing away from the atom :raise RuntimeError: If bond_vecs doesn't have 3 items """ if len(bond_vecs) != 3: raise RuntimeError('bond_vecs must be of length 3') # vec1 points from the first bonding atom to the second, vec2 from the # second to the third. The two vectors define the plane of the three atoms vec1 = bond_vecs[0] - bond_vecs[1] vec2 = bond_vecs[1] - bond_vecs[2] # Find the normal to the plane. normal = numpy.cross(vec1, vec2) vector = transform.get_normalized_vector(normal) # The normal can point above or below the plane. Place the normal on the # central atom and check wither it is aligned (dot product > 0) or # anti-aligned (dot product < 0). We want the vector to be anti-aligned so # that it points away from the other bonds. (Note - reversing vectors with # dot product = 0 (completely planar bond vecs) keeps results consistent # with those from 19-4 and previous) dotprod = vector.dot(bond_vecs[0]) if abs(dotprod) < PERPENDICULAR_DOT_PROD_THRESH and director is not None: # The vector is almost perpendicular to a bond vector, which means all # bond vectors lie almost in the same plane. Choose the perpendicular # direction that is best aligned with the supplied director vector = find_best_match_with_director([vector, -vector], director) elif dotprod >= 0: # The vector points in the same direction as the bond vectors, reverse # it vector = -vector return vector
[docs]def tetrahedral_bonding_vector(bond_vecs, director=None): """ Find the optimal 5th bond vector to add to a system with 4 existing tetrahedral bond vectors. For tetrahedral systems, the 5th vector gets added anti-parallel to one of the bonding vectors (classic Sn2-type reaction geometry). The first bond is chosen unless director is given, in which case the bond is chosen so that the new bond vector best aligns with the director. :param list(numpy.array) bond_vecs: Each item is a numpy.array(x, y, z) for the vector from the atom in question along a bond :param numpy.array director: Chose the antiparallel bond vector that best aligns with this vector. :rtype: numpy.array :return: The optimal next bonding vector pointing away from the atom :raise RuntimeError: If bond_vecs doesn't have 4 items """ if len(bond_vecs) != 4: raise RuntimeError('bond_vecs must be of length 4') if director is None: return -bond_vecs[0] # Take the negative of each bond vector and use the one with the most # positive dot product (best alignment) with the director candidates = [-x for x in bond_vecs] return find_best_match_with_director(candidates, director)
[docs]def find_best_match_with_director(candidates, director): """ Find the vector in candidates that has the best alignment with director :param list(numpy.array) candidates: The vectors to choose from :param numpy.array director: The chosen vector should be the one that aligns most strongly with this vector :rtype: numpy.array :return: The vector from candidates that best aligns with director """ best_dot_prod = -numpy.inf best_vector = None for candidate in candidates: dotp = director.dot(candidate) if dotp > best_dot_prod: best_dot_prod = dotp best_vector = candidate return best_vector
def _find_perpendicular_bond_vector(bond_vecs, director=None): """ For the given bond vectors, find pairs (or pairs of (bond vector, cardinal axis)) that are non-colinear and produce perpendicular vectors by taking the cross product. Choose an arbitrary resulting vector or the one that best aligns with the director if supplied. Because the three cardinal axes are used as a backup, this method is gauranteed to find a pependicular vector. :param list bond_vecs: Each item is a numpy.array for one bond vector :param numpy.array director: If supplied, choose the resulting perpendicular vector that best aligns (positive dot product) with this vector :rtype: numpy.array :return: The perpendicular bond vector (not normalized) """ def colinear_vectors(vec_one, vec_two): """ Check if two vectors are colinear or not :param numpy.array vec_one: The first vector :param numpy.array vec_two: The first vector :rtype: bool :return: If the two vector are colinear """ colinear_thresh = 0.01 if abs(1 - abs(numpy.dot(vec_one, vec_two))) > colinear_thresh: return False else: return True def _find_candidates(vec1, other_vecs, candidates): """ Find cross products between vec1 and and other vector that is not co-linear with it :param numpy.array vec1: The reference vector :param list other_vecs: numpy arrays that represent other vectors than can be used to obtain a cross product with vec1 :param list candidates: A list of cross-products that have been found so far. Any new cross products will be added to it """ for ovec in other_vecs: if not colinear_vectors(vec1, ovec): # Cross product gives a vector normal to the plane of # the two vectors. There's no reason to assume one of the two # possible directions for a cross product is the better one, so # use them both. crossp = numpy.cross(vec1, ovec) candidates.append(crossp) candidates.append(-crossp) candidates = [] # Prefer to use existing bond vectors for index, bv1 in enumerate(bond_vecs): _find_candidates(bv1, bond_vecs[index + 1:], candidates) # If no two bonds are not colinear, use cardinal axes. It's guaranteed one # of them will be non-colinear if not candidates: cards = [transform.X_AXIS, transform.Y_AXIS, transform.Z_AXIS] for bv1 in bond_vecs: _find_candidates(bv1, cards, candidates) if director is None: # If there is no director, choose an arbitrary vector new_bond_vector = candidates[0] else: # If there is a director, choose the vector that aligns best # with the director new_bond_vector = find_best_match_with_director(candidates, director) return new_bond_vector
[docs]def find_good_bond_vector(atom, length=1.0, director=None, pbc=None): """ Find a good vector to add a new bond given the existing bonds to this atom :param 'schrodinger.structure._StructureAtom` atom: The atom to find the bond to :param float length: The desired length of the bond vector :param numpy.array director: In some cases, such as trigonal planar and tetrahedral, a simple choice is made by this function between several directions. If a director is supplied, the choice that best aligns with the director will be chosen. :param `infrastructure.PBC` pbc: If given, will be used to find the closest image of all neighboring atoms :rtype: numpy.array :return: An array pointing from the given atom along the new bond vector """ bond_sum_thresh = 0.1 atom_vec = numpy.array(atom.xyz) # if the molecule is just an atom then grab the x-axis as a # handle, otherwise obtain the handle from the list of existing # bonds taking account for symmetric cases. there are a few # different protocols that can be used here but I think this # one will work nicely though not exhaustively if not atom.bond_total: if director is None: new_bond_vector = numpy.array(transform.X_AXIS) else: new_bond_vector = numpy.copy(director) else: # make a handle that is opposite the sum of existing bond # vectors bond_vecs = [] for neighbor in atom.bonded_atoms: neighbor_vec = numpy.array(neighbor.xyz) if pbc: # Make sure the correct image is used for the neighbor atom bond_vec = pbc.getShortestVector(*atom_vec, *neighbor_vec) else: bond_vec = neighbor_vec - atom_vec bond_vec = transform.get_normalized_vector(bond_vec) bond_vecs.append(bond_vec) num_bonds = len(bond_vecs) if num_bonds == 3: new_bond_vector = trigonal_bonding_vector(bond_vecs, director=director) elif detect_tetrahedral(bond_vecs): new_bond_vector = tetrahedral_bonding_vector(bond_vecs, director=director) else: new_bond_vector = -1 * sum(bond_vecs) # if that handle vector is small, for example for a fully symmetric # system, then pick a preexisting bond and find another non-colinear # bond or cardinal vector for which their normal is non-zero. if transform.get_vector_magnitude(new_bond_vector) < bond_sum_thresh: new_bond_vector = _find_perpendicular_bond_vector( bond_vecs, director) new_bond_vector = transform.get_normalized_vector(new_bond_vector) return new_bond_vector * length
[docs]def random_choice(collection): """ Return a random element from the given collection. Wraps numpy.random.RandomState.choice to allow for multi-dimensional input, i.e. cases where elements are also iterables. Choices are made using a uniform distribution. :type collection: iterable :param collection: the collection from which to randomly choose an element :rtype: any :return: the chosen element, potentially an iterable """ # numpy.random.RandomState.choice requires that input be 1D so # choose from an iterable of iterables by indexing elements = list(collection) idx = my_random.randint(len(elements)) return elements[idx]
[docs]class Constants(object): """ Manage some constants. """ CHANNEL_FILE = 'CHANNEL_FILE' NUM_RANDOM_CHANNELS = 0 ALL_RANDOM_CHANNELS = 'all' NUM_RANDOM_CHANNELS_METAVAR = 'INT or \'%s\'' % ALL_RANDOM_CHANNELS ASSOCIATION_TYPE = 'association' DISSOCIATION_TYPE = 'dissociation' SINGLE_DISPLACEMENT_TYPE = 'single_displacement' DOUBLE_DISPLACEMENT_TYPE = 'double_displacement' ALL_TYPES = 'all' RANDOM_TYPES_CHOICES = [ ASSOCIATION_TYPE, DISSOCIATION_TYPE, SINGLE_DISPLACEMENT_TYPE, DOUBLE_DISPLACEMENT_TYPE, ALL_TYPES ] DEFAULT_RANDOM_TYPES = [DOUBLE_DISPLACEMENT_TYPE] RANDOM_SEED = None ALLOW_ADSORPTION_ONTO = OrderedDict([ ('graphene', ('[c-0X3][c-0X3]([c-0X3]([c-0X3]([c-0X3])' '[c-0X3])[c-0X3]([c-0X3])[c-0X3])[c-0X3]')) ]) ALL_SMARTS = 'all' SMARTS_WILDCARD = '[*]' ALLOW_ADSORPTION_ONTO_METAVAR = 'SMARTS or \'%s\'' % ALL_SMARTS DISSOCIATION_BOND_LENGTH = 10.0 NO_MINIMIZATION = False ISOLATE_PRODUCTS = False NO_REACTIVE_H = False SUPPORTED_IN_EXTS = ['.mae', '.mae.gz', '.maegz'] UNIQUE = False
[docs]def init_random_seed(random_seed, logger): """ Process the seed to the random number generator. :type random_seed: None or int :param random_seed: the seed for the random number generator :type logger: logging.Logger :param logger: output logger :rtype: int :return: final random seed """ # system time comes out with 6 digits, just use the integer part if random_seed is None: random_seed = int(time.time()) # print some stuff SEED_MSG = ('Seed used is:') if logger: logger.info(SEED_MSG) logger.info('-' * len(SEED_MSG)) logger.info('%s' % random_seed) logger.info('') return random_seed
[docs]def is_polymer_builder_grow_atom(atom): """ Return True if the given atom is a grow atom prepared by the polymer builder module. :type atom: schrodinger.structure._StructureAtom :param atom: the atom :rtype: bool :return: True if a polymer builder grow atom """ head = atom.property.get(amorphous.HEAD_ATOM_PROP) return ((head and atom.property.get(amorphous.COUPLER_ATOM_PROP)) or (head and atom.property.get(amorphous.GROWER_ATOM_PROP)))
[docs]def is_mark_monomer_grow_atom(atom): """ Return True if the given atom is a grow atom prepared by the mark monomer module. :type atom: schrodinger.structure._StructureAtom :param atom: the atom :rtype: bool :return: True if a mark monomer grow atom """ head_or_tail = atom.property.get(msprops.ROLE_PROP) return head_or_tail in [msprops.HEAD, msprops.TAIL]
[docs]def get_grow_idxs(st, idxs=None): """ Return a list of grow indices for the given structure. :type st: schrodinger.structure.Structure :param st: the structure :type idxs: list :param idxs: the indices to check :rtype: list :return: grow indices """ if idxs is None: idxs = range(1, st.atom_total + 1) grow_idxs = [] for idx in idxs: atom = st.atom[idx] if is_polymer_builder_grow_atom(atom) or is_mark_monomer_grow_atom( atom): grow_idxs.append(idx) return grow_idxs
[docs]def remove_grow_properties(st, idxs=None): """ Remove the grow properties from atoms in the given structure. :type st: schrodinger.structure.Structure :param st: the structure :type idxs: list :param idxs: the atom indices """ keys = (amorphous.HEAD_ATOM_PROP, amorphous.COUPLER_ATOM_PROP, amorphous.GROWER_ATOM_PROP, msprops.ROLE_PROP) if idxs is None: idxs = range(1, st.atom_total + 1) for idx in idxs: atom = st.atom[idx] for key in keys: atom.property.pop(key, None)
[docs]def serves_as_backbone_bond(st, idx, jdx): """ Return True if the given bond serves as a backbone bond. :type st: schrodinger.structure.Structure :param st: the structure :type idx: int :param idx: the first atom index of the bond :type jdx: int :param jdx: the second atom index of the bond :rtype: bool :return: True if serves as a backbone bond """ # if the molecule containing the given bond is not a # monomer then return True mol_idxs = st.atom[idx].getMolecule().getAtomIndices() if not get_grow_idxs(st, idxs=mol_idxs): return True a_grow_idxs = get_grow_idxs(st, idxs=indicies_from_bonds_deep(st, idx, [jdx])) b_grow_idxs = get_grow_idxs(st, idxs=indicies_from_bonds_deep(st, jdx, [idx])) return a_grow_idxs and b_grow_idxs
[docs]def serves_as_sidechain_bond(st, idx, jdx): """ Return True if the given bond serves as a sidechain bond. :type st: schrodinger.structure.Structure :param st: the structure :type idx: int :param idx: the first atom index of the bond :type jdx: int :param jdx: the second atom index of the bond :rtype: bool :return: True if serves as a sidechain bond """ # if the molecule containing the given bond is not a # monomer then return True mol_idxs = st.atom[idx].getMolecule().getAtomIndices() if not get_grow_idxs(st, idxs=mol_idxs): return True a_grow_idxs = get_grow_idxs(st, idxs=indicies_from_bonds_deep(st, idx, [jdx])) b_grow_idxs = get_grow_idxs(st, idxs=indicies_from_bonds_deep(st, jdx, [idx])) # if the given bond contains grow atoms then return False, # this could happen for a bond that acts as an end cap # for the growth if idx in a_grow_idxs or jdx in b_grow_idxs: return False return (len(a_grow_idxs) >= 2 and not b_grow_idxs) or (not a_grow_idxs and len(b_grow_idxs) >= 2)
[docs]def valid_polymer_monomer_double_displacement_rxn(st, idx, jdx, kdx, ldx, allow_backbone=True, allow_sidechain=True): """ Return True if the given input is for a valid double displacement polymer monomer reaction. :type st: schrodinger.structure.Structure :param st: the structure containing two molecules :type idx: int :param idx: the first atom index of the bond in the first molecule :type jdx: int :param jdx: the second atom index of the bond in the first molecule :type kdx: int :param kdx: the first atom index of the bond in the second molecule :type ldx: int :param ldx: the second atom index of the bond in the second molecule :type allow_backbone: bool :param allow_backbone: whether to allow backbone reactions :type allow_sidechain: bool :param allow_sidechain: whether to allow sidechain reactions :rtype: bool :return: True if a valid reaction """ # if not a polymer monomer reaction if not get_grow_idxs(st): return True # use only the minimal number of polymer atom properties, for example # those that are used in get_grow_idxs, so as to prevent having to update # all of them as new monomers are successively created, specifically # during fragment mutation for example if allow_backbone and serves_as_backbone_bond(st, idx, jdx) and \ serves_as_backbone_bond(st, kdx, ldx): return True if allow_sidechain and serves_as_sidechain_bond(st, idx, jdx) and \ serves_as_sidechain_bond(st, kdx, ldx): return True return False
[docs]def query_monomers(sts): """ Return a list of bools indicating whether the given structures are monomers, i.e. have at least two grow indices. :type sts: list :param sts: contains schrodinger.structure.Structure :rtype: list :return: contains bools indicating whether the given structures are monomers """ return [len(get_grow_idxs(st)) >= 2 for st in sts]
[docs]def are_monomers(sts): """ Return True if the given structures are monomers, i.e. have at least two grow indices. :type sts: list :param sts: contains schrodinger.structure.Structure :rtype: bool :return: True if the given structures are monomers """ return all(query_monomers(sts))
[docs]def get_deduped_monomers(sts): """ Return a list of smiles deduped monomers. :type sts: list :param sts: contains schrodinger.structure.Structure :rtype: list, list :return: contains smiles deduped monomers, contains filtered titles """ # this is a safety measure for symmetric reactants to ensure # that when deduping that a true monomer, with proper atom # properties, isn't replaced by a fake monomer with the same # smiles but that lacks proper atom properties smiles_sts_dict = OrderedDict() for st in sts: smiles = analyze.generate_smiles(st) smiles_sts_dict.setdefault(smiles, []).append(st) f_sts = [] titles = [] for t_sts in smiles_sts_dict.values(): f_st = None for st in t_sts: if not f_st and are_monomers([st]): f_st = st else: titles.append(st.property[CheckInput.TITLE_KEY]) if not f_st: f_st = t_sts[0] titles.remove(f_st.property[CheckInput.TITLE_KEY]) f_sts.append(f_st) return f_sts, titles
[docs]class CacheInfo(object): """ Holds caches of computed data to reduce timings """
[docs] def __init__(self): """ Create a CacheInfo instance """ # keys = atom indexes, values = SMARTS pattern for that atom self.smarts_cache = {} # atom objects that are adsorption sites self.ads_cache = set() # Whether the ads_cache has been filled yet or not self.ads_cache_filled = False # keys = (atom index1, atom index2) for bonds, values = (SMILES, SMILES) # for the two fragments created by breaking that bond self.smiles_cache = {} # keys = frozenset([atom index1, atom index2]) for bonds, # see checkIfNotSimpleBond for values self.bond_cache = {}
[docs] def fillAdsCache(self, struct, matches): """ Fill the adsorption site cache with atoms from struct :type struct: `schrodinger.structure.Structure` :param struct: The structure with the atoms to check :param set matches: Atom indexes that are automatically considered adsorption sites """ if not self.ads_cache_filled: data = [ x for x in struct.atom if not CheckInput().checkIfNotAdsorbable(struct, x.index, matches) ] self.ads_cache.update(data) self.ads_cache_filled = True
[docs]class ChannelDefinitions(object): """ Manage a collection of reaction channel definitions. """ BLANK_PATTERN = re.compile(r'\s*\n$') RANDOM_ALL_TO_ON_THE_FLY_THRESH = 50 MAX_NUM_UNPRODUCTIVE_CYCLES = 1000 DESCRIPTOR_DICT = { Constants.DISSOCIATION_TYPE: 'Dissociation: ', Constants.ASSOCIATION_TYPE: 'Association: ', Constants.SINGLE_DISPLACEMENT_TYPE: 'Single Displacement: ', Constants.DOUBLE_DISPLACEMENT_TYPE: 'Double Displacement: ' }
[docs] def __init__(self, logger=None): """ Create an instance. :type logger: logging.getLogger :param logger: output logger """ self.logger = logger self.definitions = [] self.bad_defs = [] self.allow_adsorption_onto = None self.matches = None
[docs] @staticmethod def flattenChannels(all_channel_defs): """ Return a flattened list of channels. :type all_channel_defs: OrderedDict :param all_channel_defs: all possible channel definitions, keyed first by channel type and second by reaction type :rtype: list :return: a flattened list of channels """ return [ rxn for rxn_dict in list(all_channel_defs.values()) for rxns in list(rxn_dict.values()) for rxn in rxns ]
[docs] def addDefinition(self, str_definition): """ Add a reaction channel definition to the list of definitions. :type str_definition: str :param str_definition: the string representation of the definition :rtype: bool :return: not_accepted, False for an accepted definition, True for a definition that was not accepted """ definition = ChannelDefinition(str_definition, self.logger) if not definition.not_valid and definition not in self.definitions: self.definitions.append(definition) not_accepted = False else: self.bad_defs.append(str_definition.strip('\n')) not_accepted = True return not_accepted
[docs] def addDefinitionsFromFile(self, channels_file): """ Add reaction channel definitions from an input file to the list of definitions. :type channels_file: str :param channels_file: input file containing the definitions of the reaction channels """ channels_handle = open(channels_file, 'r') for aline in channels_handle: if not self.BLANK_PATTERN.match(aline): self.addDefinition(aline) channels_handle.close()
[docs] def buildChannelStrDef(self, atom_a_one, atom_b_one, atom_a_two=None, atom_b_two=None): """ Build the string representation of a reaction channel from a sequence of integers. :type atom_a_one: int :param atom_a_one: the first atom of the first part of the definition :type atom_b_one: int :param atom_b_one: the first atom of the second part of the definition :type atom_a_two: int :param atom_a_two: the second atom of the first part of the definition :type atom_b_two: int :param atom_b_two: the second atom of the second part of the definition :rtype: str :return: str_def, the string representation of the provided reaction channel """ FIRST_SEP = ChannelDefinition.FIRST_SEP SECOND_SEP = ChannelDefinition.SECOND_SEP str_def = str(atom_a_one) if atom_a_two: str_def += SECOND_SEP + str(atom_a_two) str_def += FIRST_SEP + SECOND_SEP + str(atom_b_one) if atom_b_two: str_def += SECOND_SEP + str(atom_b_two) return str_def
[docs] def getDissociationChannels(self, reactants, no_reactive_h, caches=None): """ Return an OrderedDict of dissociation channel string definitions keyed by type. :type reactants: schrodinger.structure.Structure :param reactants: the reactants from which channels will be enumerated :type no_reactive_h: bool :param no_reactive_h: specify that R-H bonds be considered inert :param `CacheInfo` caches: Data caches to speed up the calculation :rtype: OrderedDict :return: keys are type, values are dissociation channels """ NO_DISSOCIATION_DEFS = """There are no dissociation channels available for the reactants that you have provided. This is likely because there are no single bonds outside of rings or because you have disabled reactivity of R-H bonds. Please reconsider the types of channels that you are trying to sample.""" dissociation_defs = OrderedDict() if not caches: caches = CacheInfo() # these are expensive functions and so are only called for small # systems for bond in reactants.bond: if not any(CheckInput().checkIfNotSimpleBond( reactants, bond.atom1.index, bond.atom2.index, no_reactive_h, bond_cache=caches.bond_cache)): idxs = [(bond.atom1.index, bond.atom2.index)] ahash = tuple( get_sorted_atomic_smarts(reactants, idxs, smarts_cache=caches.smarts_cache)) str_def = self.buildChannelStrDef(*idxs[0]) dissociation_defs.setdefault(ahash, []).append(str_def) if not dissociation_defs and self.logger: self.logger.warning(NO_DISSOCIATION_DEFS) return dissociation_defs
[docs] def getAssociationChannels(self, reactants, caches=None): """ Return an OrderedDict of association channel string definitions keyed by type. :type reactants: schrodinger.structure.Structure :param reactants: the reactants from which channels will be enumerated :param `CacheInfo` caches: Data caches to speed up the calculation :rtype: OrderedDict :return: keys are type, values are association channels """ NO_ASSOCIATION_DEFS = """There are no association channels available for the reactants that you have provided. This is likely because you either have only a single reactant molecule or you do not have at least two adsorbable atoms. Please reconsider the types of channels that you are trying to sample as well as the possibility of adding custom adsorption sites.""" association_defs = OrderedDict() if caches is None: caches = CacheInfo() caches.fillAdsCache(reactants, self.matches) # these are expensive functions and so are only called for small # systems for molecule_one in reactants.molecule: for molecule_two in reactants.molecule: if molecule_one.number >= molecule_two.number: continue for atom_one in molecule_one.atom: if atom_one not in caches.ads_cache: continue for atom_two in molecule_two.atom: if atom_two not in caches.ads_cache: continue idxs = [(atom_one.index, atom_two.index)] ahash = tuple( get_sorted_atomic_smarts( reactants, idxs, smarts_cache=caches.smarts_cache)) str_def = self.buildChannelStrDef(*idxs[0]) association_defs.setdefault(ahash, []).append(str_def) if not association_defs and self.logger: self.logger.warning(NO_ASSOCIATION_DEFS) return association_defs
[docs] def getSingleDisplacementChannels(self, reactants, no_reactive_h, unique, caches=None): """ Return an OrderedDict of single displacement channel string definitions keyed by type. :type reactants: schrodinger.structure.Structure :param reactants: the reactants from which channels will be enumerated :type no_reactive_h: bool :param no_reactive_h: specify that R-H bonds be considered inert :type unique: bool :param unique: provide only unique products :param `CacheInfo` caches: Data caches to speed up the calculation :rtype: OrderedDict :return: keys are type, values are single displacement channels """ NO_SINGLE_DISPLACEMENT_DEFS = """There are no single displacement channels available for the reactants that you have provided. This is likely because you either have only a single reactant molecule, have no single bonds outside of rings, have disabled the reactivity of R-H bonds, have no adsorbable atoms, or if unique has been specified have no unique products. Please reconsider the types of channels that you are trying to sample as well as the possibility of adding custom adsorption sites.""" single_displacement_defs = OrderedDict() if not caches: caches = CacheInfo() caches.fillAdsCache(reactants, self.matches) # these are expensive functions and so are only called for small # systems, this type of channel involves an atom in one molecule # and a bond in another and so to handle the asymmetry we loop over # all pairs of molecules and take an atom from the first and a bond # from the second for molecule_one in reactants.molecule: for molecule_two in reactants.molecule: if molecule_one.number != molecule_two.number: for atom in molecule_one.atom: if not atom in caches.ads_cache: continue for bond in get_bonds_in_molecule(molecule_two): if any(CheckInput().checkIfNotSimpleBond( reactants, bond.atom1.index, bond.atom2.index, no_reactive_h, bond_cache=caches.bond_cache)): continue if unique: if not any(CheckInput().checkChannelUniqueness( reactants, atom.index, None, bond.atom1.index, bond.atom2.index, smiles_cache=caches.smiles_cache)): continue idxs = [(atom.index,), (bond.atom1.index, bond.atom2.index)] ahash = tuple( get_sorted_atomic_smarts( reactants, idxs, smarts_cache=caches.smarts_cache)) str_def = self.buildChannelStrDef( atom.index, bond.atom1.index, None, bond.atom2.index) single_displacement_defs.setdefault( ahash, []).append(str_def) if not single_displacement_defs and self.logger: self.logger.warning(NO_SINGLE_DISPLACEMENT_DEFS) return single_displacement_defs
[docs] def getDoubleDisplacementChannels( self, reactants, no_reactive_h, unique, no_reactive_polymer_monomer_backbone=False, no_reactive_polymer_monomer_sidechain=False, caches=None): """ Return an OrderedDict of double displacement channel string definitions keyed by type. :type reactants: schrodinger.structure.Structure :param reactants: the reactants from which channels will be enumerated :type no_reactive_h: bool :param no_reactive_h: specify that R-H bonds be considered inert :type unique: bool :param unique: provide only unique products :type no_reactive_polymer_monomer_backbone: bool :param no_reactive_polymer_monomer_backbone: specify that polymer monomer backbone bonds be considered inert :type no_reactive_polymer_monomer_sidechain: bool :param no_reactive_polymer_monomer_sidechain: specify that polymer monomer sidechain bonds be considered inert :param `CacheInfo` caches: Data caches to speed up the calculation :rtype: OrderedDict :return: keys are type, values are double displacement channels """ NO_DOUBLE_DISPLACEMENT_DEFS = """There are no double displacement channels available for the reactants that you have provided. This is likely because you either have only a single reactant molecule, have no single bonds outside of rings, have disabled the reactivity of R-H bonds, have disabled the reactivity of polymer monomer backbone and/or sidechain bonds, or if unique has been specified have no unique products. Please reconsider the types of channels that you are trying to sample.""" double_displacement_defs = OrderedDict() allow_backbone = not no_reactive_polymer_monomer_backbone allow_sidechain = not no_reactive_polymer_monomer_sidechain if caches is None: caches = CacheInfo() # these are expensive functions and so are only called for small # systems is_polymer = get_grow_idxs(reactants) for molecule_one in reactants.molecule: for molecule_two in reactants.molecule: if molecule_one.number < molecule_two.number: for bond_one in get_bonds_in_molecule(molecule_one): b1i1 = bond_one.atom1.index b1i2 = bond_one.atom2.index if any(CheckInput().checkIfNotSimpleBond( reactants, b1i1, b1i2, no_reactive_h, bond_cache=caches.bond_cache)): continue for bond_two in get_bonds_in_molecule(molecule_two): b2i1 = bond_two.atom1.index b2i2 = bond_two.atom2.index if any(CheckInput().checkIfNotSimpleBond( reactants, b2i1, b2i2, no_reactive_h, bond_cache=caches.bond_cache)): continue # It's important speed-wise to check is_polymer # first before calling valid_polymer if (is_polymer and not valid_polymer_monomer_double_displacement_rxn( reactants, b1i1, b1i2, b2i1, b2i2, allow_backbone=allow_backbone, allow_sidechain=allow_sidechain)): continue if unique: if not any(CheckInput().checkChannelUniqueness( reactants, b1i1, b1i2, b2i1, b2i2, smiles_cache=caches.smiles_cache)): continue idxs = [(b1i1, b1i2), (b2i1, b2i2)] str_def = self.buildChannelStrDef( b1i1, b2i1, b1i2, b2i2) ahash = tuple( get_sorted_atomic_smarts( reactants, idxs, outer_sort=True, smarts_cache=caches.smarts_cache)) double_displacement_defs.setdefault( ahash, []).append(str_def) if not double_displacement_defs and self.logger: self.logger.warning(NO_DOUBLE_DISPLACEMENT_DEFS) return double_displacement_defs
[docs] def getAllChannels(self, reactants, random_types, no_reactive_h, unique, no_reactive_polymer_monomer_backbone=False, no_reactive_polymer_monomer_sidechain=False): """ Return a nested OrderedDict of all channel string definitions keyed first by type of channel and second by type of reaction. :type reactants: schrodinger.structure.Structure :param reactants: the reactants from which channels will be enumerated :type random_types: list :param random_types: list of strings specifying which types of channels should be sampled when generating the random reaction channels :type no_reactive_h: bool :param no_reactive_h: specify that R-H bonds be considered inert :type unique: bool :param unique: provide only unique products :type no_reactive_polymer_monomer_backbone: bool :param no_reactive_polymer_monomer_backbone: specify that polymer monomer backbone bonds be considered inert :type no_reactive_polymer_monomer_sidechain: bool :param no_reactive_polymer_monomer_sidechain: specify that polymer monomer sidechain bonds be considered inert :rtype: OrderedDict :return: all possible channel definitions, keyed first by channel type and second by reaction type """ channel_defs_to_sample = OrderedDict() caches = CacheInfo() if Constants.DISSOCIATION_TYPE in random_types: dissociation_defs = self.getDissociationChannels(reactants, no_reactive_h, caches=caches) if dissociation_defs: channel_defs_to_sample[Constants.DISSOCIATION_TYPE] = \ dissociation_defs if Constants.ASSOCIATION_TYPE in random_types: association_defs = self.getAssociationChannels(reactants, caches=caches) if association_defs: channel_defs_to_sample[Constants.ASSOCIATION_TYPE] = \ association_defs if Constants.SINGLE_DISPLACEMENT_TYPE in random_types: single_displacement_defs = self.getSingleDisplacementChannels( reactants, no_reactive_h, unique, caches=caches) if single_displacement_defs: channel_defs_to_sample[Constants.SINGLE_DISPLACEMENT_TYPE] = \ single_displacement_defs if Constants.DOUBLE_DISPLACEMENT_TYPE in random_types: double_displacement_defs = self.getDoubleDisplacementChannels( reactants, no_reactive_h, unique, no_reactive_polymer_monomer_backbone= no_reactive_polymer_monomer_backbone, no_reactive_polymer_monomer_sidechain= no_reactive_polymer_monomer_sidechain, caches=caches) if double_displacement_defs: channel_defs_to_sample[Constants.DOUBLE_DISPLACEMENT_TYPE] = \ double_displacement_defs return channel_defs_to_sample
[docs] def printAllChannels(self, all_channel_defs): """ For each type of sampled channel log the total number of such channels. :type all_channel_defs: OrderedDict :param all_channel_defs: all possible channel definitions, keyed first by channel type and second by reaction type """ types_to_sizes_dict = OrderedDict() for channel_type, rxn_types in all_channel_defs.items(): total = sum([len(rxns) for rxns in rxn_types.values()]) types_to_sizes_dict[channel_type] = total TOTAL_TAG = 'Total:' max_descriptor = max( [len(descriptor) for descriptor in self.DESCRIPTOR_DICT.values()]) max_size = max([len(str(size)) for size in \ list(types_to_sizes_dict.values())]) HEADER_ONE = 'Number of possible reaction channels by type' HEADER_TWO = '-' * len(HEADER_ONE) if self.logger: self.logger.info(HEADER_ONE) self.logger.info(HEADER_TWO) self.logger.info('') for key, value in types_to_sizes_dict.items(): descriptor = self.DESCRIPTOR_DICT[key] aline = descriptor.ljust(max_descriptor) + \ str(value).rjust(max_size) self.logger.info(aline) if len(types_to_sizes_dict) > 1: self.logger.info('-' * (max_descriptor + max_size)) total = sum(types_to_sizes_dict.values()) total_line = TOTAL_TAG.ljust(max_descriptor) + \ str(total).rjust(max_size) self.logger.info(total_line) self.logger.info('')
[docs] def addRandomChannelsFromAll(self, num_random_channels, random_seed, all_channel_defs, bin_rxn_types=False): """ Add the specified number of random channels to the list of channels to be computed by picking a random sample of the size given from the list of all possible channels. If the number of random channels is 'all' or greater than or equal to the number of possible channels then pick all of them in order. :type num_random_channels: int or str 'all' :param num_random_channels: the desired number of random channels to generate :type random_seed: None or int :param random_seed: the seed for the random number generator :type all_channel_defs: OrderedDict :param all_channel_defs: all possible channel definitions, keyed first by channel type and second by reaction type :type bin_rxn_types: bool :param bin_rxn_types: if True then when generating random channels first bin by reaction type followed by selecting a random type followed by a random instance, if False then just select a random instance from all instances """ ASKED_FOR_TOO_MANY_MSG = """ You have specified that %s random reaction channels should be generated but this system, given the other options you have specified, only has %s possible reaction channels. Proceeding with all possible reaction channels.""" flattened_channels = self.flattenChannels(all_channel_defs) total_num_channels = len(flattened_channels) if num_random_channels == Constants.ALL_RANDOM_CHANNELS or \ num_random_channels == total_num_channels: defs_to_add = list(flattened_channels) elif num_random_channels > total_num_channels: if self.logger: self.logger.warning(ASKED_FOR_TOO_MANY_MSG % (num_random_channels, total_num_channels)) defs_to_add = list(flattened_channels) else: random_seed = init_random_seed(random_seed, self.logger) my_random.seed(random_seed) if bin_rxn_types: defs_to_add = [] while len(defs_to_add) < num_random_channels: channel_type = random_choice(all_channel_defs) rxn_type = random_choice(all_channel_defs[channel_type]) rxn = random_choice( all_channel_defs[channel_type][rxn_type]) if rxn not in defs_to_add: defs_to_add.append(rxn) else: defs_to_add = list( my_random.choice(flattened_channels, size=num_random_channels, replace=False)) for def_to_add in defs_to_add: self.addDefinition(def_to_add)
def _getRandomAtomsFromBins(self, mols, cache=None): """ Return a list of random atom indicies, one per given molecule, drawn by first binning by atom type. :type mols: tuple :param mols: contains schrodinger.structure._Molecule :type cache: dict :param cache: an atom type dict cache, keys are molecule numbers, values are atom type dicts :rtype: list :return: contains single tuples of atom indices, i.e. (1,) """ singles = [] cache = cache or {} for mol in mols: atom_dict = cache.get(mol.number) if not atom_dict: st = mol.extractStructure() atom_dict = get_atom_type_dict(st) if atom_dict: old = mol.getAtomIndices() new = list(range(1, len(old) + 1)) new_to_old = dict(list(zip(new, old))) atom_dict = _update_type_dict(atom_dict, new_to_old) cache[mol.number] = atom_dict atype = random_choice(atom_dict) single = random_choice(atom_dict[atype]) singles.append(single) return singles def _getRandomBondsFromBins(self, mols, cache=None): """ Return a list of random bonding atom index pairs, one per given molecule, drawn by first binning by bond type. :type mols: tuple :param mols: contains schrodinger.structure._Molecule :type cache: dict :param cache: a bond type dict cache, keys are molecule numbers, values are bond type dicts :rtype: list :return: contains pair tuples of atom indices, i.e. (1, 2) """ pairs = [] cache = cache or {} for mol in mols: bond_dict = cache.get(mol.number) if not bond_dict: st = mol.extractStructure() bond_dict = get_bond_type_dict(st) if bond_dict: old = mol.getAtomIndices() new = list(range(1, len(old) + 1)) new_to_old = dict(list(zip(new, old))) bond_dict = _update_type_dict(bond_dict, new_to_old) cache[mol.number] = bond_dict atype = random_choice(bond_dict) pair = random_choice(bond_dict[atype]) pairs.append(pair) return pairs
[docs] def addRandomChannelsOnTheFly(self, reactants, num_random_channels, random_types, no_reactive_h, unique, bin_rxn_types=False, no_reactive_polymer_monomer_backbone=False, no_reactive_polymer_monomer_sidechain=False): """ Generate the specified number of random reaction channels without ever precomputing the list of all possible channels. :type reactants: schrodinger.structure.Structure :param reactants: the reactants from which channels will be enumerated :type num_random_channels: int :param num_random_channels: the desired number of random channels to generate :type random_types: list :param random_types: list of strings specifying which types of channels should be sampled when generating the random reaction channels :type no_reactive_h: bool :param no_reactive_h: specify that R-H bonds be considered inert :type unique: bool :param unique: provide only unique products :type bin_rxn_types: bool :param bin_rxn_types: if True then when generating random channels first bin by reaction type followed by selecting a random type followed by a random instance, if False then just select a random instance from all instances :type no_reactive_polymer_monomer_backbone: bool :param no_reactive_polymer_monomer_backbone: specify that polymer monomer backbone bonds be considered inert :type no_reactive_polymer_monomer_sidechain: bool :param no_reactive_polymer_monomer_sidechain: specify that polymer monomer sidechain bonds be considered inert """ def wrapper_check_bonding(): """ This is just a convenience wrapper to check the validity of any random bond(s) generated on-the-fly for the current random reaction channel type. :rtype: boolean :return: True if at least one of the bonds is not a simple bond and therefore to skip this random definition, False otherwise """ if atom_a_two: if any(CheckInput().checkIfNotSimpleBond( reactants, atom_a_one, atom_a_two, no_reactive_h)): return True if atom_b_two: if any(CheckInput().checkIfNotSimpleBond( reactants, atom_b_one, atom_b_two, no_reactive_h)): return True def get_bonded_atoms(central_atom_ind): """ Return a list of atom indicies for atoms bound to the central atom. This function just collects indicies provided by the schrodinger.structure.Structure._StructureAtom.bonded_atoms generator so that a sample from which we can randomly choose can be obtained. :type central_atom_ind: int :param central_atom_ind: the central atom for which the bonded atoms will be returned :rtype: list of ints :return: atoms bonded to the central atom """ central_atom = reactants.atom[central_atom_ind] bonded_atoms = [] for bonded_atom in central_atom.bonded_atoms: bonded_atoms.append(bonded_atom.index) return bonded_atoms def break_out(): if num_unproductive_cycles > self.MAX_NUM_UNPRODUCTIVE_CYCLES: UNPRODUCTIVE_MSG = """Over the last %d attempts this script has been unable to generate a valid randomly defined reaction channel. It is likely that you have either asked for a number of random channels that is greater than the number of available channels given your input or you are trying to sample a rare channel. This script will proceed with the %d random channels that have been determined thus far out of the %d that you asked for.""" if self.logger: self.logger.warning( UNPRODUCTIVE_MSG % (self.MAX_NUM_UNPRODUCTIVE_CYCLES, num_random_channels_added, num_random_channels)) return True def get_random_molecule_and_atom(): mol = random_choice(reactant_molecules) atom_one, atom_two = random_choice(mol.getAtomIndices()), None atom_one_bonded_atoms = get_bonded_atoms(atom_one) return mol, atom_one, atom_two, atom_one_bonded_atoms allow_backbone = not no_reactive_polymer_monomer_backbone allow_sidechain = not no_reactive_polymer_monomer_sidechain # begin generating random reaction channels # # in this case we have to use a while loop because we have no # upper bound given that it is too expensive to calculate by # default (if the user wants to they can pass 'all' as the # argument to the number of random reaction channels) num_random_channels_added = 0 num_unproductive_cycles = 0 reactant_molecules = [molecule for molecule in reactants.molecule] atom_dict_cache = {} bond_dict_cache = {} while num_random_channels_added < num_random_channels: if break_out(): break # pick a random type of channel channel_type = random_choice(random_types) # pick a random molecule and a random atom mol_a, atom_a_one, atom_a_two, atom_a_one_bonded_atoms = \ get_random_molecule_and_atom() if channel_type != Constants.DISSOCIATION_TYPE and \ len(reactant_molecules) != 1: # pick another random molecule and a random atom # and compare mol_b, atom_b_one, atom_b_two, \ atom_b_one_bonded_atoms = \ get_random_molecule_and_atom() if mol_a.number == mol_b.number: num_unproductive_cycles += 1 continue # check association type if channel_type == Constants.ASSOCIATION_TYPE: if bin_rxn_types: mols = (mol_a, mol_b) (atom_a_one,), (atom_b_one,) = \ self._getRandomAtomsFromBins(mols, cache=atom_dict_cache) if CheckInput().checkIfNotAdsorbable( reactants, atom_a_one, self.matches): num_unproductive_cycles += 1 continue if CheckInput().checkIfNotAdsorbable( reactants, atom_b_one, self.matches): num_unproductive_cycles += 1 continue # check single displacement type if channel_type == Constants.SINGLE_DISPLACEMENT_TYPE: if not (atom_a_one_bonded_atoms or atom_b_one_bonded_atoms): num_unproductive_cycles += 1 continue elif atom_a_one_bonded_atoms: if not bin_rxn_types: atom_a_two = random_choice(atom_a_one_bonded_atoms) else: mols = (mol_a,) (atom_a_one, atom_a_two), = \ self._getRandomBondsFromBins(mols, cache=bond_dict_cache) mols = (mol_b,) (atom_b_one,), = self._getRandomAtomsFromBins( mols, cache=atom_dict_cache) if CheckInput().checkIfNotAdsorbable( reactants, atom_b_one, self.matches): num_unproductive_cycles += 1 continue elif atom_b_one_bonded_atoms: if not bin_rxn_types: atom_b_two = random_choice(atom_b_one_bonded_atoms) else: mols = (mol_a,) (atom_a_one,), = self._getRandomAtomsFromBins( mols, cache=atom_dict_cache) mols = (mol_b,) (atom_b_one, atom_b_two), = \ self._getRandomBondsFromBins(mols, cache=bond_dict_cache) if CheckInput().checkIfNotAdsorbable( reactants, atom_a_one, self.matches): num_unproductive_cycles += 1 continue # check double displacement type if channel_type == Constants.DOUBLE_DISPLACEMENT_TYPE: if not atom_a_one_bonded_atoms or not atom_b_one_bonded_atoms: num_unproductive_cycles += 1 continue elif not bin_rxn_types: atom_a_two = random_choice(atom_a_one_bonded_atoms) atom_b_two = random_choice(atom_b_one_bonded_atoms) else: mols = (mol_a, mol_b) (atom_a_one, atom_a_two), (atom_b_one, atom_b_two) = \ self._getRandomBondsFromBins(mols, cache=bond_dict_cache) # check bonding if wrapper_check_bonding(): num_unproductive_cycles += 1 continue # check polymer monomers if channel_type == Constants.DOUBLE_DISPLACEMENT_TYPE and \ not valid_polymer_monomer_double_displacement_rxn( reactants, atom_a_one, atom_a_two, atom_b_one, atom_b_two, allow_backbone=allow_backbone, allow_sidechain=allow_sidechain): num_unproductive_cycles += 1 continue # check channel uniqueness if unique and (channel_type == Constants.SINGLE_DISPLACEMENT_TYPE or \ channel_type == Constants.DOUBLE_DISPLACEMENT_TYPE): if not any(CheckInput().checkChannelUniqueness( reactants, atom_a_one, atom_a_two, atom_b_one, atom_b_two)): num_unproductive_cycles += 1 continue elif channel_type == Constants.DISSOCIATION_TYPE: # check bonding if not atom_a_one_bonded_atoms: num_unproductive_cycles += 1 continue elif not bin_rxn_types: atom_b_one, atom_b_two = \ random_choice(atom_a_one_bonded_atoms), None else: mols = (mol_a,) (atom_a_one, atom_b_one), = self._getRandomBondsFromBins( mols, cache=bond_dict_cache) if any(CheckInput().checkIfNotSimpleBond( reactants, atom_a_one, atom_b_one, no_reactive_h)): num_unproductive_cycles += 1 continue # wrap up, do not store any random defs that are bad defs # because we are directly creating the definitions from # structural considerations and the contents here will # only be due to duplicates and the user doesn't need to # see them random_def = self.buildChannelStrDef(atom_a_one, atom_b_one, atom_a_two, atom_b_two) if self.addDefinition(random_def): num_unproductive_cycles += 1 self.bad_defs.remove(random_def.strip('\n')) continue else: num_random_channels_added += 1 num_unproductive_cycles = 0
[docs] def addRandomChannels(self, reactants, num_random_channels=Constants.NUM_RANDOM_CHANNELS, random_types=Constants.DEFAULT_RANDOM_TYPES, random_seed=Constants.RANDOM_SEED, allow_adsorption_onto=None, no_reactive_h=Constants.NO_REACTIVE_H, unique=Constants.UNIQUE, bin_rxn_types=False, no_reactive_polymer_monomer_backbone=False, no_reactive_polymer_monomer_sidechain=False): """ Generate the specified number of random reaction channels. :type reactants: schrodinger.structure.Structure :param reactants: the reactants from which channels will be enumerated :type num_random_channels: int or str :param num_random_channels: the desired number of random channels to generate or 'all' :type random_types: list :param random_types: list of strings specifying which types of channels should be sampled when generating the random reaction channels :type random_seed: None or int :param random_seed: the seed for the random number generator :type allow_adsorption_onto: list :param allow_adsorption_onto: SMARTS patterns of arbitrary adsorption sites to support :type no_reactive_h: bool :param no_reactive_h: specify that R-H bonds be considered inert :type unique: bool :param unique: provide only unique products :type bin_rxn_types: bool :param bin_rxn_types: if True then when generating random channels first bin by reaction type followed by selecting a random type followed by a random instance, if False then just select a random instance from all instances :type no_reactive_polymer_monomer_backbone: bool :param no_reactive_polymer_monomer_backbone: specify that polymer monomer backbone bonds be considered inert :type no_reactive_polymer_monomer_sidechain: bool :param no_reactive_polymer_monomer_sidechain: specify that polymer monomer sidechain bonds be considered inert """ # check input needed for generating random channels num_random_channels, random_types, \ self.allow_adsorption_onto, self.matches = \ CheckInput().checkRandomChannelsInput(reactants, num_random_channels, random_types, random_seed, allow_adsorption_onto, self.logger) if not num_random_channels: # MATSCI-1310 - reset allow_adsorption_onto just to be safe self.allow_adsorption_onto = None return # log adsorption channels print_allow_adsorption_onto(self.allow_adsorption_onto, self.logger) # if all possible channels are wanted or if it is # inexpensive to calculate them all then calculate # them all and print them. note that this use of the # word 'all' when referring to reaction channels means # 'all' of each type specified in the list of random # types to be sampling. if the latter is also 'all' # then that will literally be all of the possible # reaction channels that are calculable by this script. # # if the user either asked for all of the channels or # for an inexpensive system they specified a number of # reaction channels that is larger than or equal to the # number of possible channels then all of the channels # will be computed. otherwise a random sample of channels # will be selected from the list of all possible channels # or are generated on the fly, without ever precomputing the # list of all possible channels, depending on system size. # we do it this way to avoid having to precompute the huge # list of all possible channels for large systems when the # user might only want a small number of them. the sacrifice # being that the number of channels by type will never be # reported to the user and the exit conditions from the on # the fly version are indeterministic. for example, without # the implemented time-out it would be possible to enter an # infinite loop in a situation where the user has asked for # a number of channels that is greater than the number of # available channels given that the latter is never computed # for large systems. also, it will be possible that the # script will prematurely give up on trying to randomly # find valid reaction channels if too much time is spent # trying to find a rare channel if num_random_channels == Constants.ALL_RANDOM_CHANNELS or \ reactants.atom_total < self.RANDOM_ALL_TO_ON_THE_FLY_THRESH: all_channel_defs = self.getAllChannels( reactants, random_types, no_reactive_h, unique, no_reactive_polymer_monomer_backbone= no_reactive_polymer_monomer_backbone, no_reactive_polymer_monomer_sidechain= no_reactive_polymer_monomer_sidechain) if not all_channel_defs: return self.printAllChannels(all_channel_defs) self.addRandomChannelsFromAll(num_random_channels, random_seed, all_channel_defs, bin_rxn_types=bin_rxn_types) else: random_seed = init_random_seed(random_seed, self.logger) my_random.seed(random_seed) self.addRandomChannelsOnTheFly( reactants, num_random_channels, random_types, no_reactive_h, unique, bin_rxn_types=bin_rxn_types, no_reactive_polymer_monomer_backbone= no_reactive_polymer_monomer_backbone, no_reactive_polymer_monomer_sidechain= no_reactive_polymer_monomer_sidechain)
[docs]class ChannelDefinition(object): """ Manage a reaction channel definition. """ PATTERN = \ re.compile(r'\s*(\d+)\s*(\d+)?\s*;\s*(\d+)\s*(\d+)?\s*$') FIRST_SEP = ';' SECOND_SEP = ' '
[docs] def __init__(self, str_definition, logger=None): """ Create an instance. :type str_definition: str :param str_definition: the string representation of the definition :type logger: logging.getLogger :param logger: output logger """ self.str_definition = str_definition self.logger = logger self.not_valid = False self.r_one_a_one = None self.r_one_a_two = None self.r_two_a_one = None self.r_two_a_two = None self.r_one = [] self.r_two = [] self.listdef = [] self.flatdef = [] self.representation = '' self.do_which_types = None self.checkDefinition()
def __eq__(self, other): """ Define class equality. """ state = self.flatdef == other.flatdef return state def __repr__(self): """ Define representation format. """ return self.representation
[docs] def setChannelAttrs(self, r_one_a_one, r_two_a_one, r_one_a_two=None, r_two_a_two=None): """ Set up some attributes for this class. :type r_one_a_one: int :param r_one_a_one: the first atom of the first reactant :type r_two_a_one: int :param r_two_a_one: the first atom of the second reactant :type r_one_a_two: int :param r_one_a_two: the second atom of the first reactant :type r_two_a_two: int :param r_two_a_two: the second atom of the second reactant """ # sort the lists on either side of the separator and then # do a final sort so that the pair with the smallest index # comes first, this is just a convention self.r_one_a_one = int(r_one_a_one) self.r_one = [self.r_one_a_one] self.r_two_a_one = int(r_two_a_one) self.r_two = [self.r_two_a_one] self.r_one_a_two = r_one_a_two if self.r_one_a_two: self.r_one_a_two = int(self.r_one_a_two) self.r_one += [self.r_one_a_two] self.r_one.sort() self.r_two_a_two = r_two_a_two if self.r_two_a_two: self.r_two_a_two = int(self.r_two_a_two) self.r_two += [self.r_two_a_two] self.r_two.sort() if self.r_one[0] <= self.r_two[0]: self.listdef = [self.r_one, self.r_two] self.flatdef = self.r_one + self.r_two else: self.listdef = [self.r_two, self.r_one] self.flatdef = self.r_two + self.r_one r_one, r_two = self.listdef r_one = self.SECOND_SEP.join([str(index) for index in r_one]) r_two = self.SECOND_SEP.join([str(index) for index in r_two]) self.representation = r_one + self.FIRST_SEP + \ self.SECOND_SEP + r_two
[docs] def checkDefStrFormat(self): """ Check the format of the string representation of the definition. """ def_match = self.PATTERN.match(self.str_definition) if def_match: r_one_a_one, r_one_a_two, r_two_a_one, r_two_a_two = \ def_match.groups() self.setChannelAttrs(r_one_a_one, r_two_a_one, r_one_a_two, r_two_a_two) else: self.not_valid = True return self.not_valid
[docs] def checkAtomUniqueness(self): """ Check for unique atoms. """ if len(self.flatdef) != len(set(self.flatdef)): self.not_valid = True return self.not_valid
[docs] def checkAtomZeroIndex(self): """ Check for an atom index that is zero. """ if 0 in self.flatdef: self.not_valid = True return self.not_valid
[docs] def checkDefinition(self): """ Check this reaction channel definition. """ if self.checkDefStrFormat(): return if self.checkAtomUniqueness(): return if self.checkAtomZeroIndex(): return
[docs]class CheckInput(object): """ Manage checking user input. """ TITLE_KEY = 's_m_title' ENTRY_NAME_KEY = 's_m_entry_name' REACTANT_NAME_BASE = 'reactant'
[docs] def checkFileExists(self, infile, logger=None): """ Check if the provided input file exists. :type infile: str :param infile: the input file :type logger: logging.getLogger :param logger: output logger """ NOFILE_MSG = ('The following input file that you have specified\n' 'can\'t be found: \'%s\'. Please ensure that this\n' 'is the correct file and that the file exists.\n' 'This calculation will now exit.') if not os.path.exists(infile): if logger: logger.error(NOFILE_MSG % infile) print(NOFILE_MSG % infile) sys.exit(1)
[docs] def checkInputFile(self, infile, logger=None): """ Check user input file. :type infile: str :param infile: the input file :type logger: logging.getLogger :param logger: output logger """ NUM_STRUCTURES_MSG = ( 'The input Maestro file that you have provided\n' 'contains multiple structures. Input files can\n' 'only contain a single structure. Please remove\n' 'extraneous structures from your input file. This\n' 'calculation will now exit.') if structure.count_structures(infile) != 1: if logger: logger.error(NUM_STRUCTURES_MSG) print(NUM_STRUCTURES_MSG) sys.exit(1)
[docs] def checkReactants(self, reactants, logger=None): """ Check the provided reactants structure. :type reactants: schrodinger.structure.Structure :param reactants: the reactants from which channels will be enumerated :type logger: logging.getLogger :param logger: output logger """ def set_name_prop(key): name = reactants.property.get(key) if not name: reactants.property[key] = self.REACTANT_NAME_BASE ATOMIC_REACTANTS_MSG = """ The reactants structure that you have specified in your input is an atom. The reactants structure provided must have at least two atoms. This script will exit.""" if reactants.atom_total == 1: if logger: logger.error(ATOMIC_REACTANTS_MSG) sys.exit(1) set_name_prop(self.TITLE_KEY) set_name_prop(self.ENTRY_NAME_KEY)
[docs] def checkRandomChannelsInput(self, reactants, num_random_channels, random_types, random_seed, allow_adsorption_onto, logger=None): """ Check parameters for generating random channels. :type reactants: schrodinger.structure.Structure :param reactants: the reactants from which channels will be enumerated :type num_random_channels: int or 'all' :param num_random_channels: the number of random channels to generate or 'all' :type random_types: list :param random_types: list of strings specifying which types of channels should be sampled when generating the random reaction channels :type random_seed: None or int :param random_seed: the seed for the random number generator :type allow_adsorption_onto: list :param allow_adsorption_onto: SMARTS patterns of arbitrary adsorption sites to support :type logger: logging.getLogger :param logger: output logger :rtype: int or 'all', list, list, set :return: num_random_channels, random_types, allow_adsorption_onto as defined above, and matches is a set of atom indicies matching the SMARTS present in allow_adsorption_onto """ # check and process reactants self.checkReactants(reactants, logger) # check number of random channels num_random_channels = \ self.checkNumRandomChannels(num_random_channels, logger) # check random types random_types = self.checkRandomTypes(random_types, logger) # check random seed self.checkRandomSeed(random_seed, logger) # check and process allow adsorption onto option allow_adsorption_onto, matches = \ self.checkAndProcessAllowAdsorptionOnto(reactants, allow_adsorption_onto, logger) return num_random_channels, random_types, \ allow_adsorption_onto, matches
[docs] def checkAndProcessAllowAdsorptionOnto(self, astructure, allow_adsorption_onto, logger=None): """ Check and process the allow adsorption onto option. :type astructure: schrodinger.structure.Structure :param astructure: the structure :type allow_adsorption_onto: list :param allow_adsorption_onto: SMARTS patterns of arbitrary adsorption sites to support :type logger: logging.getLogger :param logger: output logger :rtype: list, set :return: allow_adsorption_onto and matches, final list of SMARTS patterns on which to allow adsorption and a set of atom indicies matching the SMARTS present in allow_adsorption_onto """ if not allow_adsorption_onto: allow_adsorption_onto = list( Constants.ALLOW_ADSORPTION_ONTO.values()) elif Constants.ALL_SMARTS in allow_adsorption_onto: # if all atoms are to serve as adsorption sites then # just set the general wildcard pattern which will # match any atom allow_adsorption_onto = [Constants.SMARTS_WILDCARD] # the allow_adsorption_onto list of SMARTS is now finalized so # we will determine all of the matches now and then pass the matches # around so as to avoid computing the matches multiple times. first # flatten and uniqueify the matched indicies. for this application # all that we need is a unique list of all atom indicies that match # at least one of the provided pattern in allow_adsorption_onto. we # don't know that the provided SMARTS are valid so validate them. # Also, catch non-matching patterns and log them to the user because # it will help them clarify the difference between [C-0X3] and [c-0X3] # for benzene-like structures, as well as catch patterns that the # validator may have missed but are safely not matched. # find matches of valid patterns, run SMARTS evaluation with defaults # except do not allow interchangeable hydrogens # Pre-generate the ChmMol so it's not generated once per valid pattern mol = analyze.create_chmmol_from_structure( astructure, stereo=analyze.STEREO_FROM_ANNOTATION_AND_GEOM) matches = set() skipped_patterns = [] for pattern in allow_adsorption_onto: valid, msg = analyze.validate_smarts_canvas(pattern) if valid: tmp_matches = analyze.evaluate_smarts_canvas( mol, pattern, start_index=1, uniqueFilter=True, allowRelativeStereo=False, rigorousValidationOfSource=False, hydrogensInterchangeable=False) if not tmp_matches and pattern not in \ list(Constants.ALLOW_ADSORPTION_ONTO.values()): skipped_patterns.append(pattern) continue else: skipped_patterns.append(pattern) continue for match in tmp_matches: matches.add(match[0]) # log and remove skipped patterns if skipped_patterns: BAD_SMARTS = """ The following list of provided SMARTS patterns can not be matched to the provided reactant structure because they are either incorrectly formatted or simply do not match and so will be skipped: %s.""" if logger: logger.warning(BAD_SMARTS % ', '.join(skipped_patterns)) for pattern in skipped_patterns: allow_adsorption_onto.remove(pattern) return allow_adsorption_onto, matches
[docs] def checkChannelDefs(self, channeldefs, logger=None): """ Check properties of the supplied channel definitions. :type channeldefs: ChannelDefinitions :param channeldefs: list of ChannelDefinitions :type logger: logging.getLogger :param logger: output logger """ NONINSTANCE_MSG = """ The data type of the channel definitions argument needs to be an instance of ChannelDefinitions. This script can not recover and will thus exit.""" if not isinstance(channeldefs, ChannelDefinitions): if logger: logger.error(NONINSTANCE_MSG) sys.exit(1) BAD_DEF_MSG = """ The following reaction channel definitions that you have provided have either an incorrect format or have already been defined: \'%s\'. Please reformat the definitions according to the instructions provided in the script help. These definitions will be skipped.""" if channeldefs.bad_defs: if logger: logger.warning(BAD_DEF_MSG % channeldefs.bad_defs)
[docs] def checkIfNotAdsorbable(self, astructure, atom_index, matches): """ Check if the provided atom can function as an adsorption site, i.e. check if the atom has any open bonding sites or if the atom belongs to the list of arbitary adsorption sites to allow. :type astructure: schrodinger.structure.Structure :param astructure: the structure that contains the atom whose candidacy for adsorption will be checked :type atom_index: int :param atom_index: the index of the atom to check :type matches: set :param matches: a set of atom indicies matching the SMARTS present in the list of allowed adsorption sites :rtype: bool :return: False if atom can function as an adsorption site, True otherwise """ if atom_index in matches: return if open_bonding_sites(astructure, atom_index): return return True
[docs] def checkIfNotSimpleBond(self, astructure, atom_one, atom_two, no_reactive_h, bond_cache=None): """ Check if the provided two atoms do not form a simple bond, i.e. a single bond that is not in a ring and possibly does not involve a hydrogen atom. :type astructure: schrodinger.structure.Structure :param astructure: the structure on which the simple bond will be checked :type atom_one: int :param atom_one: the first atom in the potential bond :type atom_two: int :param atom_two: the second atom in the potential bond :type no_reactive_h: bool :param no_reactive_h: specify that R-H bonds be considered inert :param dict bond_cache: keys are (atom1 index, atom2 index), values are the data returned by this method for that bond. Used to speed up multiple calls to this method. This method will populate the dictionary as new bonds are checked. :rtype: list of boolean :return: Contains the bool values of whether (1) the two atoms are not bound, (2) the bond they form is not a single bond, (3) the bond they form is in a ring, and (4) if -no_reactive_h has been specified that the bond involves a hydrogen atom. If any of these is True then the bond is not a simple bond. """ indexes = frozenset((atom_one, atom_two)) if bond_cache and indexes in bond_cache: return bond_cache[indexes] not_bound = False not_single_bond = False in_ring = False involves_h = False if not astructure.areBound(atom_one, atom_two): not_bound = True elif astructure.getBond(atom_one, atom_two).order > 1: not_single_bond = True elif (astructure.atom[atom_one].element == 'H' or astructure.atom[atom_two].element == 'H') and no_reactive_h: involves_h = True else: ringbonds = rings.find_ring_bonds(astructure) in_ring = set([atom_one, atom_two]) in ringbonds result = [not_bound, not_single_bond, in_ring, involves_h] if bond_cache is not None: bond_cache[indexes] = result return result
[docs] def checkChannelUniqueness(self, astructure, bond1idx1, bond1idx2, bond2idx1, bond2idx2, smiles_cache=None): """ For the provided structure use SMILES analysis to determine if the defined single or double displacement reaction channels, i.e. A + B-C --> A-B + C (1st type) or A-C + B (2nd type) or A-B + C-D --> A-C + B-D (1st type) or A-D + B-C (2nd type), respectively, lead to unique products. :type astructure: schrodinger.structure.Structure :param astructure: the structure for which the channels will be checked for uniqueness :type bond1idx1: int :param bond1idx1: the first atom index of the bond in the first molecule :type bond1idx2: int or None :param bond1idx2: the second atom index of the bond in the first molecule or None if this is a single displacement channel and there is no bond :type bond2idx1: int :param bond2idx1: the first atom index of the bond in the second molecule :type bond2idx2: int or None :param bond2idx2: the second atom index of the bond in the second molecule or None if this is a single displacement channel and there is no bond :param dict smiles_cache: keys are (atom1 index, atom2 index), values are (smiles1, smiles2) the SMILES patterns for the two fragments caused by breaking the atom1-atom2 bond. Used to speed up multiple calls to this method. This method will populate the dictionare as new bonds/SMILES strings are checked. :rtype: list :return: two booleans, (1) True if the 1st type should be computed, False otherwise and (2) True if the 2nd type should be computed, False otherwise """ def handle_double_displacement(): num_unique = len(set([frag_a, frag_b, frag_c, frag_d])) if num_unique == 1: # A-A + A-A --> A-A + A-A (1st type) or A-A + A-A (2nd type) return [False, False] elif num_unique == 2: if frag_a == frag_b and frag_c == frag_d: # A-A + C-C --> A-C + A-C (1st type) or A-C + A-C (2nd type) return [True, False] elif frag_a == frag_c and frag_b == frag_d: # A-B + A-B --> A-A + B-B (1st type) or A-B + A-B (2nd type) return [True, False] elif frag_a == frag_d and frag_b == frag_c: # A-B + B-A --> A-B + B-A (1st type) or A-A + B-B (2nd type) return [False, True] else: # A-A + A-D --> A-A + A-D (1st type) or A-D + A-A (2nd type) return [False, False] elif num_unique == 3: if frag_a == frag_b or frag_c == frag_d: # A-A + C-D --> A-C + A-D (1st type) or A-D + A-C (2nd type) return [True, False] elif frag_a == frag_c or frag_b == frag_d: # A-B + A-D --> A-A + B-D (1st type) or A-D + B-A (2nd type) return [True, False] elif frag_a == frag_d or frag_b == frag_c: # A-B + C-A --> A-C + B-A (1st type) or A-A + B-C (2nd type) return [False, True] elif num_unique == 4: # A-B + C-D --> A-C + B-D (1st type) or A-D + B-C (2nd type) return [True, True] def handle_single_displacement(frag_partner1, frag_partner2): required_smiles = [frag_a, frag_c] required_smiles.remove(frag_partner2) frag_atomic = required_smiles[0] num_unique = len(set([frag_a, frag_c, frag_partner1])) if num_unique == 1: # A-A + A --> A-A + A (1st type) or A + A-A (2nd type) return [False, False] elif num_unique == 2: if frag_partner2 == frag_partner1: # A-A + B --> A-B + A (1st type) or A + A-B (2nd type) return [True, False] elif frag_a == frag_c: # A-B + A --> A-A + B (1st type) or A + B-A (2nd type) return [True, False] elif frag_partner1 == frag_atomic: # B-A + A --> B-A + A (1st type) or B + A-A (2nd type) return [False, True] elif num_unique == 3: # A-B + C --> A-C + B (1st type) or A + B-C (2nd type) return [True, True] if smiles_cache is None: smiles_cache = {} ab_indexes = (bond1idx1, bond1idx2) cd_indexes = (bond2idx1, bond2idx2) # get the SMILES of the reaction fragments try: frag_a, frag_b = smiles_cache[ab_indexes] except KeyError: frag_a, frag_b = get_fragment_smiles(astructure, *ab_indexes) smiles_cache[ab_indexes] = (frag_a, frag_b) try: frag_c, frag_d = smiles_cache[cd_indexes] except KeyError: frag_c, frag_d = get_fragment_smiles(astructure, *cd_indexes) smiles_cache[cd_indexes] = (frag_c, frag_d) # do double displacement, handle A-B + C-D products if frag_b and frag_d: return handle_double_displacement() # do single displacement, handle A-B + C products if frag_b: return handle_single_displacement(frag_b, frag_a) if frag_d: return handle_single_displacement(frag_d, frag_c)
[docs] def checkChannelDefsVsReactants(self, channeldefs, reactants, matches, no_reactive_h, unique, logger=None): """ Check user defined channels against user defined reactants. :type channeldefs: list :param channeldefs: list of ChannelDefinitions :type reactants: schrodinger.structure.Structure :param reactants: the reactants from which channels will be enumerated :type matches: set :param matches: a set of atom indicies matching the SMARTS present in the list of allowed adsorption sites :type no_reactive_h: bool :param no_reactive_h: specify that R-H bonds be considered inert :type unique: bool :param unique: provide only unique products :type logger: logging.getLogger :param logger: output logger :rtype: list :return: final_channeldefs, list of good ChannelDefinitions """ def process_bools(): if bools[0]: bad_defs[2].append((index, channeldef)) elif bools[1]: bad_defs[5].append((index, channeldef)) elif bools[2]: bad_defs[4].append((index, channeldef)) elif bools[3]: bad_defs[7].append((index, channeldef)) return any(bools) INDEX_MSG = """ In the following reaction channel definitions you have specified atom indicies that are out of bounds with respect to your specified input reactant structure: \'%s\'. These definitions will be skipped.""" BONDING_MSG = """ In the following reaction channel definitions you have specified a reactive bond however the structure you have provided does not have those atoms bonded: \'%s\'. These definitions will be skipped.""" MOLECULE_MSG = """ In the following reaction channel definitions the two atom sequences must correspond to atoms in different molecules: \'%s\'. These definitions will be skipped.""" RING_MSG = """ In the following reaction channel definitions you have specified a reactive bond that belongs to a ring: \'%s\'. Reactive bonds for which both atoms belong to the same ring are not supported. These definitions will be skipped.""" BOND_ORDER_MSG = """ In the following reaction channel definitions you have specified a reactive bond that not a single bond: \'%s\'. Reactive bonds must be single bonds. These definitions will be skipped.""" NOT_ADSORBABLE_MSG = """ In the following reaction channel definitions you have specified at least a single reactive atomic site that is not available for bonding and is not in the list of arbitrary adsorption sites to support: \'%s\'. Please consider either adding a custom adsorption site or in the input reactants structure file changing the formal charge of the atom to which you wish to adsorb. These definitions will be skipped.""" HYDROGEN_BOND_MSG = """ In the following reaction channel definitions you have specified a reactive bond that involves a hydrogen atom: \'%s\'. When \'-no_reactive_h\' is active reactive bonds must not involve hydrogen atoms. These definitions will be skipped.""" NON_UNIQUE_MSG = """ You have specified that you want unique products, yet the following reaction channel definitions that you have specified will result in non-unique products: \'%s\'. These definitions will therefore be skipped.""" ERROR_CODES = { 1: INDEX_MSG, 2: BONDING_MSG, 3: MOLECULE_MSG, 4: RING_MSG, 5: BOND_ORDER_MSG, 6: NOT_ADSORBABLE_MSG, 7: HYDROGEN_BOND_MSG, 8: NON_UNIQUE_MSG } bad_defs = OrderedDict({ 1: [], 2: [], 3: [], 4: [], 5: [], 6: [], 7: [], 8: [] }) final_channeldefs = [] for index, channeldef in enumerate(channeldefs): # check if indicies are out of bounds if max(channeldef.flatdef) > reactants.atom_total: bad_defs[1].append((index, channeldef)) continue # check candidacy of bonds r_one, r_two = channeldef.listdef if len(r_one) == 2: bools = self.checkIfNotSimpleBond(reactants, r_one[0], r_one[1], no_reactive_h) if process_bools(): continue if len(r_two) == 2: bools = self.checkIfNotSimpleBond(reactants, r_two[0], r_two[1], no_reactive_h) if process_bools(): continue # if a bond is specified on either side of the # definition then the definition must span two # molecules, otherwise a single molecule mol_one = reactants.atom[r_one[0]].getMolecule().number mol_two = reactants.atom[r_two[0]].getMolecule().number if len(r_one) == 2 or len(r_two) == 2: if mol_one == mol_two: bad_defs[3].append((index, channeldef)) continue else: if mol_one == mol_two: bools = self.checkIfNotSimpleBond(reactants, r_one[0], r_two[0], no_reactive_h) if process_bools(): continue # if association or single displacement then check # candidacy of atoms for adsorption if mol_one != mol_two: if len(r_one) == 1: if self.checkIfNotAdsorbable(reactants, r_one[0], matches): bad_defs[6].append((index, channeldef)) continue if len(r_two) == 1: if self.checkIfNotAdsorbable(reactants, r_two[0], matches): bad_defs[6].append((index, channeldef)) continue # check channel uniqueness for single and double displacement state = len(r_one) == 1 and len(r_two) == 1 if unique and mol_one != mol_two and not state: bond1idx1 = r_one[0] bond2idx1 = r_two[0] if len(r_one) == 1: bond1idx2 = None bond2idx2 = r_two[1] elif len(r_two) == 1: bond1idx2 = r_one[1] bond2idx2 = None else: bond1idx2 = r_one[1] bond2idx2 = r_two[1] channeldef.do_which_types = self.checkChannelUniqueness(reactants, \ bond1idx1, bond1idx2, bond2idx1, bond2idx2) if not any(channeldef.do_which_types): bad_defs[8].append((index, channeldef)) continue final_channeldefs.append(channeldef) for key, value in bad_defs.items(): if value: definitions = [] for index, definition in value: definitions.append(definition) if logger: logger.warning(ERROR_CODES[key] % definitions) return final_channeldefs
[docs] def checkDissociationBondLength(self, dissociation_bond_length, logger=None): """ Check the specified dissociation bond length. :type dissociation_bond_length: float :param dissociation_bond_length: final bond length to use for dissociative reaction channels :type logger: logging.getLogger :param logger: output logger :rtype: float :return: dissociation_bond_length, final bond length to use for dissociative reaction channels """ dissociation_bond_length_msg = """ You have specified a negative or zero value for the dissociation bond length which is outside the range of acceptable values. Proceeding with the default value of %s.""" if dissociation_bond_length <= 0: if logger: logger.warning(dissociation_bond_length_msg % Constants.DISSOCIATION_BOND_LENGTH) dissociation_bond_length = \ Constants.DISSOCIATION_BOND_LENGTH return dissociation_bond_length
[docs] def checkNumRandomChannels(self, num_random_channels, logger=None): """ Check the specified number of random channels. :type num_random_channels: int or 'all' :param num_random_channels: the desired number of random channels to generate or 'all' :type logger: logging.getLogger :param logger: output logger :rtype: int or 'all' :return: num_random_channels, the desired number of random channels to generate or 'all' """ num_random_channels_msg = """ You have specified a value for the number of random reaction channels to generate that is not supported. This value must be either zero or a positive integer or the string \'%s\'. Proceeding with the default value of %s.""" # it might be a string that is an integer so first try that bad_value = False try: num_random_channels = int(num_random_channels) if num_random_channels < 0: bad_value = True except (ValueError, TypeError): if num_random_channels != Constants.ALL_RANDOM_CHANNELS: bad_value = True if bad_value: if logger: logger.warning(num_random_channels_msg % (Constants.ALL_RANDOM_CHANNELS, Constants.NUM_RANDOM_CHANNELS)) num_random_channels = Constants.NUM_RANDOM_CHANNELS return num_random_channels
[docs] def checkRandomTypes(self, random_types, logger=None): """ Check the specified types of random channels. :type random_types: list of strs :param random_types: the desired types of random channels to generate :type logger: logging.getLogger :param logger: output logger :rtype: list of strs :return: random_types, the desired types of random channels to generate """ random_types_msg = """ You have specified a type(s) of random reaction channel(s) to sample that are not supported. Random channel types must be choosen from the following list: %s. Proceeding with either those specified which are supported or in case there aren't any supported will proceed with the default value of %s.""" # if all are wanted then process the list if Constants.ALL_TYPES in random_types: random_types = list(Constants.RANDOM_TYPES_CHOICES) random_types.remove(Constants.ALL_TYPES) # collect unsupported requests to_removes = [] for random_type in random_types: if random_type not in Constants.RANDOM_TYPES_CHOICES: to_removes.append(random_type) # remove unsupported requests if to_removes: if logger: logger.warning(random_types_msg % (Constants.RANDOM_TYPES_CHOICES, Constants.DEFAULT_RANDOM_TYPES)) for to_remove in to_removes: random_types.remove(to_remove) # never leave with nothing if not random_types: random_types = list(Constants.DEFAULT_RANDOM_TYPES) return random_types
[docs] def checkRandomSeed(self, random_seed, logger=None): """ Check the specified random seed. :type random_seed: None or int :param random_seed: the seed for the random number generator :type logger: logging.Logger :param logger: output logger """ BAD_SEED_MSG = ( 'The following random seed that you have ' 'specified is invalid: %s. The input argument for the ' 'random seed can only be None or a positive integer. This ' 'script will now exit.') if random_seed is not None: if type(random_seed) != int or random_seed < 0: if logger: logger.error(BAD_SEED_MSG % random_seed) sys.exit(BAD_SEED_MSG % random_seed)
[docs]def get_fragment_smiles(astructure, idx1, idx2): """ Return two SMILES patterns of the two molecule fragments that would result from breaking the bond specified by the given two atoms or return the SMILES pattern of the molecule containing idx1 if idx2 is None. :type astructure: schrodinger.structure.Structure :param astructure: the structure containing the molecule for which fragment SMILES are wanted :type idx1: int :param idx1: an atom index :type idx2: int :param idx2: the atom index for the target bond formed with idx1 or None :rtype: str, str or str, None :return: either two strings one for each of two fragments or a single string for the molecule containing idx1 and None """ mol = astructure.atom[idx1].getMolecule() orig_to_new = dict( list(zip(mol.getAtomIndices(), list(range(1, len(mol.atom) + 1))))) mol = mol.extractStructure() if idx2: newidx1 = orig_to_new[idx1] newidx2 = orig_to_new[idx2] frag_a_indicies = indicies_from_bonds_deep(mol, newidx1, [newidx2]) frag_a_st = mol.extract(frag_a_indicies) frag_a_smiles = analyze.generate_smiles(frag_a_st) frag_b_indicies = indicies_from_bonds_deep(mol, newidx2, [newidx1]) frag_b_st = mol.extract(frag_b_indicies) frag_b_smiles = analyze.generate_smiles(frag_b_st) else: frag_a_st = mol.copy() frag_a_smiles = analyze.generate_smiles(frag_a_st) frag_b_smiles = None return frag_a_smiles, frag_b_smiles
[docs]def indicies_from_bonds_deep(astructure, start_atom_index, exclude_atom_indicies=None, depth=None): """ Return a list of atom indicies obtained by collecting all atoms that are connected, in the bond traversal sense, to the given start atom by a number of bonds that is less than or equal to that given by the provided depth parameter. The start atom is included in the returned list. One can set a direction for the traversal by specifying exclusion atoms that are bound to the given start atom; the traversal will be done away from these atoms. Useful for doing SMARTS analysis from a central atom and proceeding outwards from that atom, as in nearest-neighbor, next-nearest-neighbor, etc., i. e. shells. Also useful for splitting a molecule into fragments about a given bond. :type astructure: schrodinger.structure.Structure :param astructure: the structure containing all atoms :type start_atom_index: int :param start_atom_index: the atom index from which to start the traversal :type exclude_atom_indicies: list of int :param exclude_atom_indicies: atom indicies to exclude from the bond traversal which basically sets the direction of the bond traversal, for example in the direction of a certain bond :type depth: int :param depth: the maximum depth that the traversal is allowed to traverse in terms of the number of bonds from the starting atom :rtype: list of ints :return: indicies, atom indicies connected to the start atom by up to the specified bond depth """ # if only radial shells are wanted then the exclusion list # should be empty if exclude_atom_indicies is None: exclude_atom_indicies = [] # if only fragments from bond breaking are wanted then the # depth doesn't matter so we will set it to somthing # guaranteed to be larger than the number of connections, # i.e. set it to the number of atoms plus one (the latter # needed to handle the case where the number of atoms is # the same as the number of connections) if depth is None: depth = astructure.atom_total + 1 # recursion function, if needed set the direction with # exclusion atoms, and if needed set the number of times # through the recursion with the depth. The num_times_through # variable gives the current depth of the traversal, i.e. it # tells you which 'shell' you are on either the first shell # (nearest neighbors to the start atom), the second shell # (next nearest neighbors to the start atom), etc. It is # bounded by the depth argument. It starts at zero and is # incremented by one each time a valid bonding atom is # found in the next shell. Each next shell enters the problem # with the next call of continue_traversal_recursion. This is # done until the desired depth is acheived at which point we # to return from the current call and get back to the most recent # branch point. We first need to decrement the num_times_through # counter by 2, once for incrementing-by-one that was just done # with the current call and once to get back to the last branch # point. If the current atom marks the end of a branch then # we only decrement by one to get back to the last branch point. # Remember that the structure of this set of recursive calls is # that starting from atom A, with first shell [B, C, D], and # second shell [E, F, G, H, I, J] and is greedy, that is that # the code block will reach A->B->E before it reaches even A->C, # etc. def continue_traversal_recursion(start_atom_index, exclude_atom_indicies): global num_times_through for bonded_atom in \ astructure.atom[start_atom_index].bonded_atoms: if bonded_atom.index not in indicies and \ bonded_atom.index not in exclude_atom_indicies: num_times_through += 1 if num_times_through <= depth: indicies.add(bonded_atom.index) continue_traversal_recursion(bonded_atom.index, [start_atom_index]) else: num_times_through -= 2 return num_times_through -= 1 return # put the start atom in indicies = set([start_atom_index]) # this global will track the number of times the # resursion is called global num_times_through num_times_through = 0 # start it off continue_traversal_recursion(start_atom_index, exclude_atom_indicies) # return a sorted list indicies = list(indicies) indicies.sort() return indicies
[docs]def set_zob_to_sob(astructure, atom_one, atom_two): """ If the given structure has a zero-order bond for the given atom indicies then make this bond a single-order bond. :type astructure: schrodinger.structure.Structure :param astructure: the structure for which the bond order may be changed :type atom_one: int :param atom_one: the first atom index of the bond whose order needs changing :type atom_two: int :param atom_two: the second atom index of the bond whose order needs changing """ bond = astructure.getBond(atom_one, atom_two) if bond.order == 0: bond.order = 1
[docs]def wrapper_build_attach_structure(mol_one_st, mol_one_from_atom, mol_one_to_atom, mol_two_st, mol_two_from_atom, mol_two_to_atom): """ A wrapper that corrects the atom reorder map returned from build.attach_structure to account for deleted fragment atoms and wraps the case where the active bonds may be of zero-order. :type mol_one_st: schrodinger.structure.Structure :param mol_one_st: the base structure to which the fragment will be added :type mol_one_from_atom: int :param mol_one_from_atom: defines the point at which the fragment structure will be added to the base structure :type mol_one_to_atom: int :param mol_one_to_atom: this atom defines the part of the base structure that will be replaced by the fragment :type mol_two_st: schrodinger.structure.Structure :param mol_two_st: the fragment structure to be added to the base structure :type mol_two_from_atom: int :param mol_two_from_atom: defines the point at which the base structure will be added to the fragment structure :type mol_two_to_atom: int :param mol_two_to_atom: this atom defines the part of the fragment structure that will be replaced by the base :rtype: dict, dict :return: two mapping dictionaries, (1) orig_to_new_map, a mapping of original indices of atoms in input structures to atom indices in the newly constructed structure. Indices for atoms from the second structure were offset by the number of atoms in the first structure, and (2) new_to_orig_map, the reverse of (1). """ # handle zero-order bonds for both structures set_zob_to_sob(mol_one_st, mol_one_from_atom, mol_one_to_atom) set_zob_to_sob(mol_two_st, mol_two_from_atom, mol_two_to_atom) # add atom indices to structures as properties before modification orig_to_new_map = {} atom_renumbering_index_property_key = 'i_matsci_atomRenumberingIndex' mol_two_offset = 0 # initialize all fragment atoms to None in the renumber map for at in mol_one_st.atom: at.property[atom_renumbering_index_property_key] = at.index orig_to_new_map[at.index] = None mol_two_offset = at.index for at in mol_two_st.atom: at.property[ atom_renumbering_index_property_key] = mol_two_offset + at.index orig_to_new_map[mol_two_offset + at.index] = None build.attach_structure(mol_one_st, mol_one_from_atom, mol_one_to_atom, mol_two_st, mol_two_from_atom, mol_two_to_atom) # track atom renumbering and update the renumber map for at in mol_one_st.atom: if at.property.get(atom_renumbering_index_property_key) is not None: orig_to_new_map[at.property.get( atom_renumbering_index_property_key)] = at.index mol_one_st.deletePropertyFromAllAtoms(atom_renumbering_index_property_key) # while in here return the new to original index map for # convenience, it will skip None atoms new_to_orig_map = {} for key, value in orig_to_new_map.items(): if value is not None: new_to_orig_map[value] = key return orig_to_new_map, new_to_orig_map
[docs]def dict_delete_and_decrement_keys(indict, key_to_delete): """ Return the dictionary obtained from the input dictionary by deleting an entry with the specified key and then decrementing all keys larger than the specified key. Leave the values unchanged. :type indict: dict :param indict: dictionary with integer keys and values from which to remove and decrement the specified key :type key_to_delete: int :param key_to_delete: the key to delete from the input dictionary :rtype: dict :return outdict: the new dictionary without the specifed key and with decremented keys """ outdict = {} for key, value in indict.items(): if key < key_to_delete: outdict[key] = value elif key > key_to_delete: outdict[key - 1] = value return outdict
[docs]def open_bonding_sites(astructure, atom_index): """ Return the number of open bonding sites available for the provided atom in the given structure. This number is the same as the number of unpaired electrons for the atom in its current state in the given structure. :type astructure: schrodinger.structure.Structure :param astructure: the structure that contains the provided atom :type atom_index: int :param atom_index: the index of the atom for which the number of open bonding sites will be reported :rtype: int :return: atom_open_val, the number of open bonding sites for the provided atom """ # using older MacroModel atom types because mmlewis doesn't # have an API to compute this atom_obj = astructure.atom[atom_index] atom_type = mm.mmat_get_atom_type(atom_obj.atom_type_name) atom_max_val = mm.mmat_get_valence(atom_type) atom_curr_val = sum([bond.order for bond in atom_obj.bond]) atom_open_val = atom_max_val - atom_curr_val return atom_open_val
[docs]def minimize_geometry(astructure): """ Perform a geometry minimization on the provided structure. :type astructure: schrodinger.structure.Structure :param astructure: the structure object needing minimization :rtype: bool :return: False if the minimization was successful, True otherwise """ version = Channels.OPLS_VERSION # using a bare except because I am not entirely sure of the # different ways that this function will traceback try: astructure.retype() with ioredirect.IOSilence(): with minimize.minimizer_context(ffld_version=version, struct=astructure, cleanup=False) as minimize_obj: min_res = minimize_obj.minimize() astructure.property[ mm.OPLS_POTENTIAL_ENERGY] = min_res.potential_energy except: return True
[docs]def get_bonds_in_molecule(amolecule): """ This is a convenience function to get a list of bond objects from a molecule object, since no such function exists in our Python API. :type amolecule: schrodinger.structure._Molecule :param amolecule: the molecule object from which bonds will be defined :rtype: list of schrodinger.structure._StructureBond :return: bonds, list of bond objects for bonds in the specified molecule """ # originally this was a set but it was later discovered while # testing the GA that consecutive runs using the same random seed # gave different results, not because of the seed but due to the # fact that the sets were being ordered differently for the two # runs bonds = OrderedDict() for atom in amolecule.atom: for bond in atom.bond: key = [bond.atom1.index, bond.atom2.index] key.sort() key = tuple(key) if key not in list(bonds): bonds[key] = bond return list(bonds.values())
[docs]def get_sorted_atomic_smarts(st, all_idxs, outer_sort=False, smarts_cache=None): """ Return a tuple of sorted atomic SMARTS for the given collection of indicies in the given structure. :type st: schrodinger.structure.Structure :param st: the structure :type all_idxs: list :param all_idxs: contains tuples of atom indices :type outer_sort: bool :param outer_sort: whether to sort the tuples in the returned list :type dict smarts_cache: keys are atom indexes, values are the SMARTS pattern for that atom. Used to speed up multiple calls to this function. This function will populate the dictionary as new atoms are computed. :rtype: list :return: contains tuples of sorted atomic SMARTS having the same structure as the input indicies """ smarts = [] if smarts_cache is None: smarts_cache = {} for idxs in all_idxs: patterns = [] for index in idxs: try: patterns.append(smarts_cache[index]) except KeyError: with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=DeprecationWarning, message="analyze.generate_smarts") atsm = analyze.generate_smarts(st, atom_subset=[index]) patterns.append(atsm) smarts_cache[index] = atsm smarts.append(tuple(sorted(patterns))) if outer_sort: smarts = sorted(smarts, key=lambda x: x[0]) return smarts
def _update_type_dict(adict, amap): """ Return a copy of the given type dictionary with atom indicies updated according to the given index map. :type adict: OrderedDict :param adict: the type dictionary to be updated :type amap: dict :param amap: maps the indicies used in the given dict to another set of indicies :rtype: OrderedDict :return: the updated dict """ new_adict = OrderedDict() for key, values in adict.items(): new_values = [] for value in values: new_value = [amap[x] for x in value] new_values.append(tuple(new_value)) new_adict[key] = new_values return new_adict
[docs]def get_atom_type_dict(st): """ Return an atom type dictionary for the given structure. :type st: schrodinger.structure.Structure :param st: the structure :rtype: OrderedDict :return: atom type dict, keys are single tuples of SMARTS atom types, like (smarts,), values are lists of single tuples of atom indicies, like (1,) """ atom_type_dict = OrderedDict() for atom in st.atom: idxs = [(atom.index,)] ahash = tuple(get_sorted_atomic_smarts(st, idxs)) atom_type_dict.setdefault(ahash, []).append(idxs[0]) return atom_type_dict
[docs]def get_bond_type_dict(st): """ Return a bond type dictionary for the given structure. :type st: schrodinger.structure.Structure :param st: the structure :rtype: OrderedDict :return: bond type dict, keys are pair tuples of SMARTS bond types, like (smarts, smarts), values are lists of pair tuples of atom indicies, like (1, 2) """ bond_type_dict = OrderedDict() for bond in st.bond: idxs = [(bond.atom1.index, bond.atom2.index)] ahash = tuple(get_sorted_atomic_smarts(st, idxs)) bond_type_dict.setdefault(ahash, []).append(idxs[0]) return bond_type_dict
[docs]def get_structure_with_contiguous_molecules(molecules): """ Return a single structure object containing the given molecules but such that the atom ordering per molecule is contiguous. :type molecules: list of schrodinger.structure._Molecule :param molecules: molecules that need to be contiguously assembled into a single structure :rtype: schrodinger.structure.Structure and two dicts :return: mols_st, single structure containing all molecules in contiguous form, and mols_new_to_old and mols_old_to_new, two reordering dictionaries """ def extract_molecule(molecule, start=1): old_order = list(molecule.getAtomIndices()) new_order = list(range(start, len(old_order) + start)) new_to_old = dict(list(zip(new_order, old_order))) old_to_new = dict(list(zip(old_order, new_order))) molecule_st = molecule.extractStructure(copy_props=True) return molecule_st, new_to_old, old_to_new mols_st, mols_new_to_old, mols_old_to_new = extract_molecule(molecules[0]) for molecule in molecules[1:]: mol_st, mol_new_to_old, mol_old_to_new = \ extract_molecule(molecule, mols_st.atom_total + 1) mols_st.extend(mol_st) mols_new_to_old.update(mol_new_to_old) mols_old_to_new.update(mol_old_to_new) return mols_st, mols_new_to_old, mols_old_to_new
[docs]class ReactantMolecule(object): """ Manage a reactant molecule in the reactants structure object. """ HYDROGEN_SYM = 'H' HYDROGEN_BOND_ORDER = 1
[docs] def __init__(self, molecule): """ Create an instance. :type molecule: schrodinger.structure._Molecule :param molecule: reactant molecule to be extracted from the reactants structure """ self.mol_obj = molecule self.new_to_orig_inds = {} self.orig_to_new_inds = {} for new, orig in enumerate(molecule.getAtomIndices(), 1): self.new_to_orig_inds[new] = orig self.orig_to_new_inds[orig] = new self.mol_struct = molecule.extractStructure(copy_props=True) self.reactive_inds_orig = [] self.reactive_inds_new = [] self.tmp_hydrogen_index = None
[docs] def setReactiveInds(self, reactive_inds_orig): """ Set the two lists of reactive indicies attributes using the provided list of original reactive indicies :type reactive_inds_orig: list :param reactive_inds_orig: original reactive indicies for this reactant molecule """ self.reactive_inds_orig = list(reactive_inds_orig) self.reactive_inds_new = [ self.orig_to_new_inds[orig] for orig in reactive_inds_orig ]
[docs] def combineMapWith(self, other_molecule): """ Return the new to orig atom reordering map for this molecule extended by that of the other molecule by offsetting the keys of the map of the other molecule. :type other_molecule: schrodinger.structure._Molecule :param other_molecule: molecule whose new to orig atom reordering map will be combined with that of the current molecule :rtype: dict :return: final_map, the combined new to original atom reordering maps of this molecule and the other molecule """ current_num_atoms = self.mol_struct.atom_total final_map = self.new_to_orig_inds.copy() for key, value in other_molecule.new_to_orig_inds.items(): final_map[key + current_num_atoms] = value return final_map
[docs] def addTempHydrogens(self): """ Add a temporary hydrogen to the reactive atom of this molecule to act as a bond handle for performing the reaction. All traces of this temporary hydrogen will be removed from this instance once the reactions are complete. Note that the family of functions that handles the temporary hydrogens are put in place so that all reaction channels can be handled just like the bond-bond double displacement type, i.e. the association and single displacement types then become nothing more than double displacement with a fake hydrogen. """ atom_to_hydrogenate = self.mol_struct.atom[self.reactive_inds_new[0]] atom_to_hydrogenate_vec = numpy.array(atom_to_hydrogenate.xyz) vec_to_hydrogen = find_good_bond_vector(atom_to_hydrogenate) # set the absolute position and bonding of the new temporary # hydrogen handle, store the new atom index, and append it to # the list of reactive atoms hydrogen_xyz = atom_to_hydrogenate_vec + vec_to_hydrogen hydrogen = self.mol_struct.addAtom(self.HYDROGEN_SYM, hydrogen_xyz[0], hydrogen_xyz[1], hydrogen_xyz[2]) self.mol_struct.addBond(atom_to_hydrogenate, hydrogen, self.HYDROGEN_BOND_ORDER) self.tmp_hydrogen_index = hydrogen.index self.reactive_inds_new.append(hydrogen.index)
[docs] def delTempHydrogens(self): """ Delete all traces of the previously added temporary hydrogen. """ self.mol_struct.deleteAtoms([self.tmp_hydrogen_index]) self.reactive_inds_new.remove(self.tmp_hydrogen_index) self.tmp_hydrogen_index = None
[docs] def reactWith(self, other_molecule, reverse=False): """ React this molecule with the provided other molecule according to their reactive indicies and return the product object. :type other_molecule: ReactantMolecule :param other_molecule: the input molecule that this molecule will react with :type reverse: bool :param reverse: specifies to reverse the order of reactive indicies for the other molecule :rtype: Products :return products: the products object containing the components that result from reacting this molecule with the input molecule """ # reverse handled whether A in the following A-B + C-D # reaction is bound to C or D in the products mol_one_from, mol_one_to = self.reactive_inds_new if reverse: mol_two_to, mol_two_from = other_molecule.reactive_inds_new else: mol_two_from, mol_two_to = other_molecule.reactive_inds_new # call our wrapped verion of build.attach_structure, this # is the first product (A-C or A-D), for which the A part # will be fixed in space product_st = self.mol_struct.copy() mol_two_st = other_molecule.mol_struct.copy() product_map, rev_product_map = \ wrapper_build_attach_structure(product_st, mol_one_from, mol_one_to, mol_two_st, mol_two_from, mol_two_to) # call our wrapped verion of build.attach_structure, this # is the second product (D-B or C-B), for which the D or C # parts will be fixed in space mol_one_st = self.mol_struct.copy() other_product_st = other_molecule.mol_struct.copy() other_product_map, rev_other_product_map = \ wrapper_build_attach_structure(other_product_st, mol_two_to, mol_two_from, mol_one_st, mol_one_to, mol_one_from) # get Products object to finish this off products = Products(product_st, product_map, rev_product_map, other_product_st, other_product_map, rev_other_product_map) return products
[docs] def dissociate(self, dissociation_bond_length): """ Dissociate this molecule according to its reactive indicies. :type dissociation_bond_length: float :param dissociation_bond_length: final bond length to use for dissociative reaction channels :rtype: Products :return products: the products object containing the components that result from reacting this molecule with the input molecule """ product_st = self.mol_struct.copy() # adjust the bond length atom_one_ind, atom_two_ind = self.reactive_inds_orig product_st.adjust(dissociation_bond_length, atom_one_ind, atom_two_ind) # delete the bond product_st.deleteBond(atom_one_ind, atom_two_ind) # even though the Products class isn't perfect here we can # easily still use it by passing the map twice, the map is # lame because no real structure manipulations have been # performed and so indexing hasn't changed product_map = self.new_to_orig_inds.copy() products = Products(product_st, product_map, product_map) return products
[docs]class Products(object): """ Manage the properties of the products that result from applying a reaction channel definition to the reactants. """ RXN_REPRESENTATION_KEY = 's_matsci_RXN_representation' PRODUCT_NAME_BASE = 'product'
[docs] def __init__(self, prod_one_st, prod_one_map, prod_one_rev_map, prod_two_st=None, prod_two_map=None, prod_two_rev_map=None): """ Create an instance. """ self.prod_one_st = prod_one_st.copy() self.prod_one_map = prod_one_map.copy() self.prod_one_rev_map = prod_one_rev_map.copy() if prod_two_st: self.prod_two_st = prod_two_st.copy() else: self.prod_two_st = None if prod_two_map: self.prod_two_map = prod_two_map.copy() if prod_two_rev_map: self.prod_two_rev_map = prod_two_rev_map.copy() self.new_total_reorder = [] self.product_st = self.prod_one_st.copy() self.rxn_representation = None self.index = None self.product_sts = []
[docs] def deleteTmpHydrogens(self, mol_one, mol_two): """ Remove any of the previously added temporary hydrogens from the product structures. :type mol_one: schrodinger.structure._Molecule :param mol_one: the first reactant molecule from which these products came :type mol_two: schrodinger.structure._Molecule :param mol_two: the second reactant molecule from which these products came """ def delete_tmp_hydrogens(amap, hydrogen, arevmap, struct): hydrogen_new = amap.get(hydrogen) if hydrogen_new: arevmap = dict_delete_and_decrement_keys(arevmap, hydrogen_new) struct.deleteAtoms([hydrogen_new]) return arevmap # if the reactant molecule one had a temporary hydrogen # then that hydrogen is now on product two from which # it must be removed, for product two it is the second # reactant molecules indexing which comes first so use # that to offset the hydrogen index from reactant molecule # one to get the final original hydrogen index to remove # from product two, then find the new verion of this index # and delete the atom and update the reverse atom map. Note # that the reactant molecule one will never be left with # its own temporary hydrogen in a product because of the # way that the reverse option is applied only to reactant # molecule two (that is why in the code block following # the code block directly below you will see two called # to delete_tmp_hydrogens, one for product one and one # for product two, while in the block directly below you # will only see one call). if mol_one.tmp_hydrogen_index: final_hydrogen_index = mol_one.tmp_hydrogen_index + \ mol_two.mol_struct.atom_total self.prod_two_rev_map = \ delete_tmp_hydrogens(self.prod_two_map, final_hydrogen_index, self.prod_two_rev_map, self.prod_two_st) # if the reactant molecule two had a temporary hydrogen # then that hydrogen can now be on either product one or # two from which it must be removed, for the product one # it is the first reactant molecules indexing which comes # first so use that to offset the hydrogen index from # reactant molecule two to get the final original hydrogen # index to remove from product # one, for the product two # no offsetting will be needed because the temporary # hydrogen belonged to the reactant from which the product # came, then find the new verions of these indicies and # delete the atoms and update the reverse atom maps if mol_two.tmp_hydrogen_index: final_hydrogen_index = mol_two.tmp_hydrogen_index + \ mol_one.mol_struct.atom_total self.prod_one_rev_map = \ delete_tmp_hydrogens(self.prod_one_map, final_hydrogen_index, self.prod_one_rev_map, self.prod_one_st) final_hydrogen_index = mol_two.tmp_hydrogen_index self.prod_two_rev_map = \ delete_tmp_hydrogens(self.prod_two_map, final_hydrogen_index, self.prod_two_rev_map, self.prod_two_st)
[docs] def buildReorderList(self, mol_one, mol_two): """ Build the atom reorder list needed to make the product structure ordering consistent with that of the original reactants. :type mol_one: schrodinger.structure._Molecule :param mol_one: the first reactant molecule from which these products came :type mol_two: schrodinger.structure._Molecule :param mol_two: the second reactant molecule from which these products came """ # get the reorder map for product one using two maps, # one from product to reactant molecules and another from # reactant molecules to reactants prod_one_map_to_orig = mol_one.combineMapWith(mol_two) prod_one_reorder = [] for atom in self.prod_one_st.atom: if atom.index in self.prod_one_rev_map: orig_ind = \ prod_one_map_to_orig[self.prod_one_rev_map[atom.index]] prod_one_reorder.append(orig_ind) # get the reorder map for product two using two maps, # one from product to reactant molecules and another from # reactant molecules to reactants prod_two_map_to_orig = mol_two.combineMapWith(mol_one) prod_two_reorder = [] for atom in self.prod_two_st.atom: if atom.index in self.prod_two_rev_map: orig_ind = \ prod_two_map_to_orig[self.prod_two_rev_map[atom.index]] prod_two_reorder.append(orig_ind) # sum and invert the two product maps to get something that # build.reorder_atoms can process total_reorder = prod_one_reorder + prod_two_reorder for element in range(1, len(total_reorder) + 1): index = total_reorder.index(element) self.new_total_reorder.append(index + 1)
[docs] def combineAndReorder(self): """ Assemble the final product structure for this reaction channel by combining the structure objects for each product molecule followed by reordering the product atoms to be consistent with the reactants. """ # merge the product structures and apply the reorder map self.product_st = self.prod_one_st.copy() self.product_st.extend(self.prod_two_st) self.product_st = build.reorder_atoms(self.product_st, self.new_total_reorder)
[docs] def extendWithSpectator(self, spectator_st, map_to_orig): """ Extend the product structure object with the spectator structure object and reorder the atoms to be consistent with that of the reactants from which they came. :type spectator_st: schrodinger.structure.Structure :param spectator_st: the structure object containing all spectator molecules :type map_to_orig: dict :param map_to_orig: an atom reordering dictionary where the keys are the reactant atom indicies followed by the spectator atom indicies and the values are their values """ # if there were non-reactive spectator reactant structures # then append them to the product structure if spectator_st: self.product_st.extend(spectator_st) # build the reorder map from the map back to original atom # indicies, i.e. prior to splitting off the spectators and # prior to reaction reorder_map = [] for atom in self.product_st.atom: orig_ind = map_to_orig[atom.index] reorder_map.append(orig_ind) # invert the reorder map new_reorder_map = [] for index in range(1, len(reorder_map) + 1): new_index = reorder_map.index(index) new_reorder_map.append(new_index + 1) # apply reorder map self.product_st = build.reorder_atoms(self.product_st, new_reorder_map)
[docs] def updateProperties(self, index, channeldef, reverse=False): """ Set some properties on this product structure. :type index: int :param index: index of this set of products :type channeldef: ChannelDefinition :param channeldef: the channel definition from which this product came :type reverse: bool :param reverse: specifies that the order of the reactive indicies for the second molecule were reversed """ def separate_mol_part(mol): mol = mol.strip().split(SECOND_SEP) if len(mol) == 2: atom_a, atom_b = mol else: atom_a, atom_b = mol[0], None return atom_a, atom_b def get_bond(atom_a, atom_b): if atom_b: bond_ab = atom_a + BOND_SEP + atom_b else: bond_ab = atom_a return bond_ab NAME_SEP = '.' self.index = index current_name = self.product_st.property[CheckInput.TITLE_KEY] name_tag = current_name + NAME_SEP + self.PRODUCT_NAME_BASE + \ NAME_SEP + str(self.index) self.product_st.property[CheckInput.TITLE_KEY] = name_tag self.product_st.property[CheckInput.ENTRY_NAME_KEY] = name_tag FIRST_SEP = ChannelDefinition.FIRST_SEP SECOND_SEP = ChannelDefinition.SECOND_SEP BOND_SEP = '-' REACT_SEP = ' + ' RXN_SEP = ' --> ' mol_one_def, mol_two_def = channeldef.__repr__().split(FIRST_SEP) atom_a, atom_b = separate_mol_part(mol_one_def) atom_c, atom_d = separate_mol_part(mol_two_def) left_side = get_bond(atom_a, atom_b) + REACT_SEP + \ get_bond(atom_c, atom_d) if atom_b and atom_d: if reverse: right_side = get_bond(atom_a, atom_d) + REACT_SEP + \ get_bond(atom_b, atom_c) else: right_side = get_bond(atom_a, atom_c) + REACT_SEP + \ get_bond(atom_b, atom_d) elif atom_b: if reverse: right_side = get_bond(atom_a, atom_d) + REACT_SEP + \ get_bond(atom_b, atom_c) else: right_side = get_bond(atom_a, atom_c) + REACT_SEP + \ get_bond(atom_b, atom_d) elif atom_d: if reverse: right_side = get_bond(atom_c, atom_b) + REACT_SEP + \ get_bond(atom_a, atom_d) else: right_side = get_bond(atom_a, atom_c) + REACT_SEP + \ get_bond(atom_d, atom_b) else: right_side = get_bond(atom_a, atom_c) if self.prod_two_st: self.rxn_representation = left_side + RXN_SEP + right_side else: self.rxn_representation = right_side + RXN_SEP + left_side self.product_st.property[self.RXN_REPRESENTATION_KEY] = \ self.rxn_representation
[docs] def handleMetalBonding(self, rxn_def, reverse): """ Handle zero-order bonds for metals. :type rxn_def: list :param rxn_def: a list of two lists containing the reaction channel definitions in terms of atomic indicies :type reverse: bool :param reverse: specifies that the order of the reactive indicies for the second molecule were reversed """ def set_zob(ind1, ind2): atom_one = mol_1[ind1] atom_two = mol_2[ind2] if atom_one in metals or atom_two in metals: self.product_st.getBond(atom_one, atom_two).order = 0 # return if there is nothing that would need a ZOB metals = analyze.evaluate_asl(self.product_st, 'metals') if not metals: return # get reactive indicies mol_1, mol_2 = rxn_def # Do single displacement and double displacement # Not using ZOBs for association channels per decision on MATSCI-7972 if len(mol_1) == 2 and len(mol_2) == 1 or \ len(mol_1) == 1 and len(mol_2) == 2: if reverse: if len(mol_1) == 2: set_zob(1, 0) else: set_zob(0, 1) else: set_zob(0, 0) elif len(mol_1) == 2 and len(mol_2) == 2: if reverse: set_zob(0, 1) set_zob(1, 0) else: set_zob(0, 0) set_zob(1, 1)
[docs] def isolateProducts(self, isolate_products, flatdef): """ If requested isolate the individual products to their own structure objects. :type isolate_products: bool :param isolate_products: specifies that products should be returned individually rather than as a whole :type flatdef: list :param flatdef: list containing the original reactive atom indicies of the input structure """ if not isolate_products: self.product_sts = [self.product_st] return NAME_SEP = '.' title_tag = self.product_st.property[CheckInput.TITLE_KEY] title_tag += NAME_SEP entry_tag = self.product_st.property[CheckInput.ENTRY_NAME_KEY] entry_tag += NAME_SEP # from all molecules find the product molecules that # resulted from a reaction and extract them, update # properties, and append to list of product structures, # skip over products that are atoms flatdef_set = set(flatdef) index = 1 for molecule in self.product_st.molecule: if set(molecule.getAtomIndices()).intersection(flatdef_set) \ and len(molecule.atom) > 1: product_st = molecule.extractStructure(True) product_st.property[CheckInput.TITLE_KEY] = \ title_tag + str(index) product_st.property[CheckInput.ENTRY_NAME_KEY] = \ entry_tag + str(index) self.product_sts.append(product_st) index += 1
[docs]class Reactants(object): """ Manage the properties of the reactants. """
[docs] def __init__(self, allreactants, listdef): """ Create an instance. :type allreactants: schrodinger.structure.Structure :param allreactants: the structure object of the input reactants :type listdef: list :param listdef: a list of two lists containing the reaction channel definitions """ self.allreactants = allreactants self.def_part_one = list(listdef[0]) self.def_part_two = list(listdef[1]) self.reactant_st = None self.reactant_new_to_orig = {} self.reactant_orig_to_new = {} self.spectator_st = None self.spectator_new_to_orig = {} self.spectator_orig_to_new = {} self.map_to_orig = {} self.mol_one = None self.mol_two = None self.splitIntoComponents() self.updateDefParts() self.setReactiveMolecules()
[docs] def splitIntoComponents(self): """ Split the input set of all reactants into reactive molecules and a set of spectator molecules. """ # get reactive molecules mol_one = \ self.allreactants.atom[self.def_part_one[0]].getMolecule() mol_two = \ self.allreactants.atom[self.def_part_two[0]].getMolecule() mol_reactants = [mol_one] if mol_two.number != mol_one.number: mol_reactants.append(mol_two) self.reactant_st, self.reactant_new_to_orig, self.reactant_orig_to_new = \ get_structure_with_contiguous_molecules(mol_reactants) # get spectator molecules mol_spectators = [] for molecule in self.allreactants.molecule: if molecule.number != mol_one.number and \ molecule.number != mol_two.number: mol_spectators.append(molecule) if mol_spectators: self.spectator_st, self.spectator_new_to_orig, self.spectator_orig_to_new = \ get_structure_with_contiguous_molecules(mol_spectators) # set reorder map used to later reorder products plus # spectators back to original ordering self.map_to_orig = self.reactant_new_to_orig.copy() for key, value in self.spectator_new_to_orig.items(): self.map_to_orig[key + self.reactant_st.atom_total] = \ value
[docs] def updateDefParts(self): """ Update the indicies in the reaction channel definition parts to be consistent with the new reactive molecule ordering. """ self.def_part_one = [ self.reactant_orig_to_new[index] for index in self.def_part_one ] self.def_part_two = [ self.reactant_orig_to_new[index] for index in self.def_part_two ]
[docs] def setReactiveMolecules(self): """ Set the reactive molecule attributes. """ self.mol_one = \ self.reactant_st.atom[self.def_part_one[0]].getMolecule() self.mol_two = \ self.reactant_st.atom[self.def_part_two[0]].getMolecule() if self.mol_one.number == self.mol_two.number: self.mol_two = None
[docs]class Channels(object): """ Manage the enumeration of reaction channels. """ # use OPLS 2005 OPLS_VERSION = minimize.OPLS_2005 OPLS_ENERGY_KEY = mm.OPLS_POTENTIAL_ENERGY
[docs] def __init__(self, reactants, channeldefs, allow_adsorption_onto=None, dissociation_bond_length=Constants.DISSOCIATION_BOND_LENGTH, no_minimization=Constants.NO_MINIMIZATION, isolate_products=Constants.ISOLATE_PRODUCTS, no_reactive_h=Constants.NO_REACTIVE_H, unique=Constants.UNIQUE, mae_out_file=None, logger=None): """ Create an instance. :type reactants: schrodinger.structure.Structure :param reactants: the reactants from which channels will be enumerated :type channeldefs: ChannelDefinitions :param channeldefs: list of ChannelDefinitions :type allow_adsorption_onto: None or list :param allow_adsorption_onto: SMARTS patterns of arbitrary adsorption sites to support :type dissociation_bond_length: float :param dissociation_bond_length: final bond length to use for dissociative reaction channels :type no_minimization: bool :param no_minimization: specifies that product geometries should not be minimized :type isolate_products: bool :param isolate_products: specifies that products should be returned individually rather than as a whole :type no_reactive_h: bool :param no_reactive_h: specify that R-H bonds be considered inert :type unique: bool :param unique: provide only unique products, firstly do not calculate redundant products and secondly filter out duplicate final structures using SMILES :type mae_out_file: str :param mae_out_file: the full path, including filename, that will be used to write the output `*mae` file that will contain the final set of structures, i.e. reactants plus all sets of products :type logger: logging.getLogger :param logger: output logger """ self.reactants = reactants self.channeldefs = channeldefs self.allow_adsorption_onto = allow_adsorption_onto self.dissociation_bond_length = dissociation_bond_length self.no_minimization = no_minimization self.isolate_products = isolate_products self.no_reactive_h = no_reactive_h self.unique = unique if not mae_out_file: self.mae_out_file = os.path.basename(__file__).rstrip('.py') else: self.mae_out_file = mae_out_file self.logger = logger # A set of SMILES for the structures that have been processed self.unique_smiles = set() # Titles for the structures that have been filtered out due to # duplicate SMILES self.filtered_titles = [] self.not_minimized = [] self.rxn_representations = [] self.mae_writer = None self.fast3d_obj = None self.add_hydrogens = False self.run_stereo = False
[docs] def setUpFast3D(self): """ Set up the fast 3D object. """ optimize = True self.fast3d_obj = fast3d.Volumizer('default', optimize)
[docs] def checkInputParams(self): """ Check all input parameters. """ CheckInput().checkReactants(self.reactants, self.logger) if not self.channeldefs.allow_adsorption_onto: self.allow_adsorption_onto, matches = \ CheckInput().checkAndProcessAllowAdsorptionOnto( self.reactants, self.allow_adsorption_onto, self.logger) else: matches = set(self.channeldefs.matches) CheckInput().checkChannelDefs(self.channeldefs, self.logger) self.channeldefs.definitions = \ CheckInput().checkChannelDefsVsReactants( self.channeldefs.definitions, self.reactants, matches, self.no_reactive_h, self.unique, self.logger) NODEF_MSG = """ No valid reaction channels have been defined. You must provide at least a single valid reaction channel definition. This script will exit.""" if not self.channeldefs.definitions: if self.logger: self.logger.error(NODEF_MSG) sys.exit(1) self.dissociation_bond_length = \ CheckInput().checkDissociationBondLength( self.dissociation_bond_length, self.logger)
[docs] def updateChannelDefReprs(self): """ Update the representations of the reaction channel definitions. """ FIRST_SEP = ChannelDefinition.FIRST_SEP SECOND_SEP = ChannelDefinition.SECOND_SEP def get_repr_with_names(reactant): START = '(' END = ')' repr_with_names = [] for index in reactant: atom = self.reactants.atom[index] if atom.name: name = atom.name else: name = atom.element repr_str = name + START + str(index) + END repr_with_names.append(repr_str) return repr_with_names for channeldef in self.channeldefs.definitions: r_one, r_two = channeldef.listdef r_one = SECOND_SEP.join(get_repr_with_names(r_one)) r_two = SECOND_SEP.join(get_repr_with_names(r_two)) channeldef.representation = \ r_one + FIRST_SEP + SECOND_SEP + r_two
[docs] def printChannelDefs(self): """ Provide a formatted log of the final reaction channel definitions. """ HEADER = 'Reaction channel definitions' SEPARATOR = ':' defs = self.channeldefs.definitions index_max_char = len(str(len(defs))) index_width = index_max_char + len(SEPARATOR) definition_max_char = [ len(definition.__repr__()) for definition in defs ] definition_width = max(definition_max_char) + 1 if self.logger: self.logger.info(HEADER) self.logger.info('-' * len(HEADER)) self.logger.info('') for index, definition in enumerate(defs, 1): index_str = \ str(str(index) + SEPARATOR).ljust(index_width) definition_str = \ definition.__repr__().rjust(definition_width) self.logger.info(index_str + definition_str) self.logger.info('')
[docs] def initMaestroWriter(self): """ Initialize the Maestro writer for the output `*mae` file and put the reactants structure in there. """ self.mae_writer = structure.MaestroWriter(self.mae_out_file)
[docs] def termMaestroWriter(self): """ Terminate the Maestro writer for the output `*mae` file. """ self.mae_writer.close()
[docs] def processProducts(self, product_sts): """ Process the product structures. :type product_sts: list of schrodinger.structure.Structure :param product_sts: contains all product structures for a given reaction channel """ if self.unique: f_product_sts, titles = get_deduped_monomers(product_sts) self.filtered_titles.extend(titles) else: f_product_sts = list(product_sts) for product_st in f_product_sts: title = product_st.property[CheckInput.TITLE_KEY] # if the final structures are to be uniquified then do the # filtering here, since per reaction channel deduping is done # above this controls the deduping of products from different # reaction channels, only write the unique structures and collect # redundancies to be logged later. if self.unique: smiles = analyze.generate_smiles(product_st) if smiles not in self.unique_smiles: self.unique_smiles.add(smiles) else: self.filtered_titles.append(title) continue # by default before any minimization eliminate any intra- or # inter-molecular ring spears or concatenations, if we know # that this elimination protocol has failed then just proceed # with the ring speared structure but mark it with a structure # property, for cases see MATSCI-1758 if ringspear.check_for_spears(product_st): tmp_st = product_st.copy() try: self.fast3d_obj.run(tmp_st, self.add_hydrogens, self.run_stereo) product_st = tmp_st.copy() except Exception: product_st.property[FAILED_SPEAR_ELIM_KEY] = True # all product structure objects are copied from the reactant # structure object so if they can't be optimized then delete # some OPLS properties in case they exist if not self.no_minimization: if minimize_geometry(product_st): self.not_minimized.append(title) product_st.property.pop(self.OPLS_ENERGY_KEY, None) self.mae_writer.append(product_st)
[docs] def enumerateChannels(self): """ Main function to enumerate reaction channels. """ # loop over channel definitions and create the product # structures index = 1 for channeldef in self.channeldefs.definitions: # split the reactive system into one or two reactive # molecules and spectators components = Reactants(self.reactants, channeldef.listdef) # handle dissociations, i.e. only one reactive # molecule, or associations, single displacements, and # double displacements all of which involve two reactive # molecules if not components.mol_two: mol_one = ReactantMolecule(components.mol_one) mol_one.setReactiveInds(components.def_part_one + \ components.def_part_two) products = mol_one.dissociate(self.dissociation_bond_length) products.extendWithSpectator(components.spectator_st, components.map_to_orig) products.updateProperties(index, channeldef) self.rxn_representations.append(products.rxn_representation) products.isolateProducts(self.isolate_products, channeldef.flatdef) # loop over products, minimize and append to list self.processProducts(products.product_sts) index += 1 else: mol_one = ReactantMolecule(components.mol_one) mol_two = ReactantMolecule(components.mol_two) mol_one.setReactiveInds(components.def_part_one) mol_two.setReactiveInds(components.def_part_two) # if association or single displacement then add a # temporary hydrogen handle so that they can be handled # using the double displacment protocol if len(mol_one.reactive_inds_new) == 1: mol_one.addTempHydrogens() if len(mol_two.reactive_inds_new) == 1: mol_two.addTempHydrogens() # for each single or double displacement channel, i.e. # A-B + C-D, there are two possible products, i.e. # A-C + B-D or A-D + B-C, this is handled by do_which_types # which is affected by unique, if unique has not been specified # then we want both channels regardless and so we set both # to True if self.unique: do_which_types = channeldef.do_which_types else: do_which_types = [True, True] # do_which_types is only ever set for single and double # displacement channels, for association we want to skip # the swapping of fake hydrogens and so we pass True and False if mol_one.tmp_hydrogen_index and mol_two.tmp_hydrogen_index: do_which_types = [True, False] # reverse will indicate whether the definition should be # reversed (necessary for the second type) is_reverse = [False, True] # loop over sub-channel cases for atype, reverse in zip(do_which_types, is_reverse): if atype: products = mol_one.reactWith(mol_two, reverse) products.deleteTmpHydrogens(mol_one, mol_two) products.buildReorderList(mol_one, mol_two) products.combineAndReorder() products.extendWithSpectator(components.spectator_st, components.map_to_orig) products.updateProperties(index, channeldef, reverse) self.rxn_representations.append( products.rxn_representation) products.handleMetalBonding(channeldef.listdef, reverse) products.isolateProducts(self.isolate_products, channeldef.flatdef) # loop over products, minimize and append to list self.processProducts(products.product_sts) index += 1 # post-process those molecules for which temporary # hydrogens were added if mol_one.tmp_hydrogen_index: mol_one.delTempHydrogens() if mol_two.tmp_hydrogen_index: mol_two.delTempHydrogens()
[docs] def checkOutputFile(self): """ Check the output file. """ NO_PRODUCTS_MSG = """This calculation has resulted in zero relevant product structures. Please check your input and settings. This script will now exit.""" if structure.count_structures(self.mae_out_file) == 1: if self.logger: self.logger.error(NO_PRODUCTS_MSG) fileutils.force_remove(self.mae_out_file) sys.exit(1)
[docs] def printRxnReprs(self): """ Log a formatted print of all reaction channels in their reaction representation. """ BUFFER = 1 SEP = '.' # get some max field widths max_repr = 0 for rxn_repr in self.rxn_representations: length = len(rxn_repr) if length > max_repr: max_repr = length max_index = len(str(len(self.rxn_representations))) + len(SEP) HEADER1 = 'Reactions' HEADER2 = '-' * (len(HEADER1)) self.logger.info(HEADER1) self.logger.info(HEADER2) self.logger.info('') for index, rxn_repr in enumerate(self.rxn_representations, 1): index_repr = str(index) + SEP rxn_line = index_repr.ljust(max_index) + ' ' * BUFFER + \ rxn_repr.rjust(max_repr) self.logger.info(rxn_line) self.logger.info('')
[docs] def printFiltered(self): """ Log the structures that were filtered by SMILES. """ if self.filtered_titles and self.logger: FILTERED_MSG = """The following product structures will not be returned because they have non-unique SMILES, i.e. they are redundant conformers. If you wish to have these structures as well then do not pass the -unique flag. Here are the filtered structures: %s.""" self.logger.warning(FILTERED_MSG % self.filtered_titles)
[docs] def printNotMinimized(self): """ Log the structures that were not minimized. """ num_total_structs = structure.count_structures(self.mae_out_file) ALL_STRUCTURES = '[ <all structures> ]' if self.not_minimized: NOT_MINIMIZED_MSG = """The geometries for the following list of structures were not able to be minimized likely due to problems with the defined atom types: %s. Please try redefining those atom types, for example if your system contains metals then all bonds to those metals may need to be of zero-order. The latter can be done with the Edit > Build > Create Zero-Order Bonds to Metals option in Maestro or with the bond decrement button. Note that reactive bonds can only be single bonds.""" if self.logger: if len(self.not_minimized) == num_total_structs: self.logger.warning(NOT_MINIMIZED_MSG % ALL_STRUCTURES) else: self.logger.warning(NOT_MINIMIZED_MSG % self.not_minimized)
[docs] def orchestrate(self): """ Orchestrate all of the components of this main class. """ self.setUpFast3D() self.checkInputParams() self.updateChannelDefReprs() self.printChannelDefs() # set up writer self.initMaestroWriter() # minimize reactant structure so that total energies can be # compared with that of products, think of reactants as just # another set of products for the time being self.processProducts([self.reactants]) # do the enumeration self.enumerateChannels() # tear down writer self.termMaestroWriter() if self.logger: self.printRxnReprs() self.printFiltered() self.printNotMinimized() # now that the output Maestro file is closed ensure that we have # products, this check was implemented when it was found that # running a unique dissociation calculation on zero-order bonds # produced no products (unless products are isolated), just # run this check all of the time self.checkOutputFile()