Source code for schrodinger.application.canvas.r_group

#Name: Cheminformatics:R-Group Analysis...
#Command: pythonrun r_group_analysis.show_panel
"""
Detect r-groups from CanvasMCS, CombiGlide, or SMARTS

Description: A module that contains function to detect r-groups. The core is
determined either from CombiGlide results, a user-submitted SMARTS pattern,
or a Canvas Maximum Common Substructure.


Copyright Schrodinger, LLC. All rights reserved.
"""

import itertools
import os
import shutil
import tempfile
from collections import OrderedDict
from collections import defaultdict
from past.utils import old_div

import numpy

import schrodinger.application.canvas.r_group_dee as rgadee
import schrodinger.job.util as jobutil
import schrodinger.utils.subprocess as subprocess
from schrodinger import structure
from schrodinger.infra import mm
from schrodinger.infra import mmlist
from schrodinger.infra.canvas import ChmFingerprints
from schrodinger.infra.canvas import ChmMol
from schrodinger.infra.canvas import createElementsToken
from schrodinger.structure import STEREO_FROM_ANNOTATION_AND_GEOM
from schrodinger.structutils import analyze
from schrodinger.structutils import build
from schrodinger.structutils import smiles as smiles_mod
from schrodinger.structutils.rings import find_rings
from schrodinger.utils import fileutils
from schrodinger.utils import log

logger = log.get_output_logger("schrodinger.application.canvas.r_group")
try:
    from schrodinger.maestro import maestro
except ImportError:
    maestro = None
canvasmcs_process = None
canvas = None

NULL_GROUP = 'null'
HYDROGEN_GROUP = 'hydrogen'
ATTACHMENT_PAIRS_TO_SKIP = [(NULL_GROUP, NULL_GROUP),
                            (NULL_GROUP, HYDROGEN_GROUP),
                            (HYDROGEN_GROUP, NULL_GROUP),
                            (HYDROGEN_GROUP, HYDROGEN_GROUP)]
(GOOD_MATCH, NO_MATCHES, DUPLICATE_MATCH, NO_HEAVY_ATOM_MATCH) = list(range(4))


[docs]class Data: """ Class to store intermediary and finals data structures for R-group analysis. Methods starting with add append information on a CT by CT basis to data structures. """
[docs] def __init__(self): #List of file offsets indexed by ct position in input file self.ct_offsets = [] #Core of the ligands stored as a ct self.core_ct = None #R-groups at each position stored as cts self.r_group_cts = [] #List of dictionaries, one dictionary for each r_group position. Each #dictionary keyed on (unique smiles, (grow_atom,to_atom)) tuple. #Grow_atom, to_atom are indices into the atoms into the SMILES pattern #of the methyl. Value is a list of CT indeces, starting at 1, in which #the r-group represented by the SMILES appears at the given position self.r_group_position = [] #Used in SMARTS and MCS methods. Dictionary indexed on SMARTS atom #number. Value is maximum number of r_groups attached to that atom. #Atom with no attachments do not have entries. self.r_groups_per_atom = {} #Used in SMARTS and MCS methods. List of lists. Each list has the #indexes of those atoms in the core of that ct. List element 0 #refers to CT 1. self.core_atoms = {} # list of bonds in the core self.core_bonds = {} #. Each list represents the set of smallest #rings per CT. self.rings = {} #List of lists. Indexed on ct position. List element 0 refers to CT 1 #Value is a list of tuples, in order, of #(core_atom_index,side_atom_index) to which the r_groups are attached self.attachment_points = {} #Dictionary keyed on numeric property which exists in all CT's self.property_values = {} #List of numeric properties which exist in all CT's self.property_list = [] self.r_groups_at_core = [] #R-groups at each position stored as cts self.functional_rings = {} self.bonds_in_rings = {} self.sa_best_energy = None # SimulatedAnnealing best E if SA was called self.sa_cpu_time = None # SimulatedAnnealing best E if SA was called
[docs] def set_prop_list(self, proplist): """ Create list (self.property_list) of every non string property in a ct Checks for existing properties and prevents duplicates """ for prop in proplist: if prop[0] == 's': continue if prop not in self.property_list: self.property_list.append(prop)
[docs] def get_prop_list(self): return self.property_list
[docs] def add_property_value(self, prop, value): """ Add property value to self.property_values dict Done in order of cts """ if prop in self.property_values: new_vals = self.property_values[prop] new_vals.append(value) self.property_values[prop] = new_vals else: self.property_values[prop] = [value]
[docs] def get_prop_values(self): return self.property_values
[docs] def add_bonds_in_rings(self, lol, index): self.bonds_in_rings[index] = lol
[docs] def get_bonds_in_rings(self): return self.bonds_in_rings
[docs] def add_core_ct(self, ct): self.core_ct = ct
[docs] def add_rg_ct(self, position, ct): self.r_group_cts[position].append(ct)
[docs] def get_rg_ct(self): return self.r_group_cts
[docs] def get_attachment_points(self): return self.attachment_points
[docs] def add_attachment_points(self, ct_data, index): self.attachment_points[index] = ct_data
[docs] def print_attachment_points(self): for i, ct in enumerate(list(self.attachment_points)): logger.info(i) for bond in self.attachment_points[ct]: try: logger.info("%i <--> %i :: %i" % (int(bond[0]), int(bond[1]), int(bond[2]))) except: logger.info(bond)
[docs] def print_attachment_points_ct(self, ct_index): points = "" logger.info("Attachment points") logger.info(self.attachment_points) for bond in self.attachment_points[ct_index]: try: points += "%i <--> %i :: %i" % (int(bond[0]), int( bond[1]), int(bond[2])) except: points += bond points += "\n" return points
[docs] def add_rings(self, rings, i): self.rings[i] = rings
[docs] def get_rings(self): return self.rings
[docs] def add_functional_rings(self, rings, ct_index): self.functional_rings[ct_index] = rings
[docs] def get_functional_rings(self): return self.functional_rings
[docs] def add_core(self, core, i): self.core_atoms[i] = core
[docs] def get_core(self, index): return self.core_atoms[index]
[docs] def get_cores(self): return self.core_atoms
[docs] def add_core_bonds(self, bonds, i): self.core_bonds[i] = bonds
[docs] def get_core_bonds(self, index): return self.core_bonds[index]
[docs] def add_ct_offset(self, file_offset): self.ct_offsets.append(file_offset)
[docs] def get_ct_offsets(self): return self.ct_offsets
[docs] def add_position(self): self.r_group_position.append({})
[docs] def add_r_group(self, position_number, smiles, ct_index): try: previous = self.r_group_position[position_number][smiles] except: self.r_group_position[position_number][smiles] = [ct_index] return self.r_group_position[position_number][smiles].append(ct_index)
[docs] def get_r_groups(self): return self.r_group_position
[docs] def add_r_groups_to_atom(self, atom_index, value): self.r_groups_per_atom[atom_index] = value
[docs] def get_r_group_for_atom(self, atom): try: return self.r_groups_per_atom[atom] except: return 0
[docs] def get_r_groups_per_atom(self): return self.r_groups_per_atom
[docs]def add_methyl_ring(ct, principal_bond, bond_list, attachment_positions, output, grow=True): """ Add capping methyls to ring like r_groups :type ct: schrodinger.structure.Structure object :param ct: structure that you are adding a capping group to :type principal_bond: list :param principal_bond: [ _StructureAtom of to_atom, _StructureAtom of from_atom, bond_order ] :type bond_list: list :param bond_list: This contains an outer list of all bonds (specified in in the same way as principal_bond) which contain rings that will need to be capped :type attachment_positions: list :param attachment_positions: list of the index of bonds (from the data.r_group_attachment_bonds ), which need to be capped with rings :type output: bool :type output: If true, function performs as it would in the writing output files. If false, function performs as it would in determining r-group attachments :return: Returns a tuple of structure of the fragment, to_atom index and from-atom index """ if output: added_atoms = [] for atom in ct.atom: atom.growname = "" atom.property['i_rga_index'] = int(atom) stereo_handle = mm.mmstereo_new(ct.handle) mm.mmstereo_add_stereo_information_to_ct(ct.handle, stereo_handle) mm.mmstereo_delete(stereo_handle) props_to_rewrite = {} for prop in ct.property: if prop.startswith('s_st_Chirality_'): props_to_rewrite[prop] = ct.property[prop] capping_dictionary = {} from_atom = principal_bond[0].index #set the atom on the fragment, used to indentify the molecule #when all the bodns are broken ct.atom[principal_bond[1].index].growname = "rpc1" #capping dictionary has keys that are all the core atoms #the values correspond to a list of bonds that are in the fragment #the list may be one or multivalued for bond in bond_list: try: capping_dictionary[bond[0].index].append(bond[1].index) except: capping_dictionary[bond[0].index] = [bond[1].index] #Determine if there are any core_to_core bonds in the ring #to be closed before output #iterates through all core atoms to test if any are bonded to each other. core_atoms = [bond[0].index for bond in bond_list] core_to_core_bonds = [] for index in core_atoms: atom = ct.atom[index] for bond in atom.bond: if bond.atom2.index in core_atoms: if bond.atom2.index < index: bonds = (bond.atom2.index, index, bond.order) else: bonds = (index, bond.atom2.index, bond.order) if bonds not in core_to_core_bonds: core_to_core_bonds.append(bonds) #the s_cg_attachment property is used to indicate non principal #bonds on the fragments, with the number corresponding to the #integer of the r-group position for n, bond in enumerate(bond_list): if bond != principal_bond: #use pos to indicate color of arrow. this integer corresponds #to the colors used for each r_group position pos = str(attachment_positions[n] + 1) core_atom = ct.atom[bond[0].index] attachment_atom = ct.atom[bond[1].index] #If it's a to_atom you set it as negative else as positive try: core_atom.property["s_cg_attachment"] = \ core_atom.property["s_cg_attachment"] \ + "," + str(-1 * int(pos)) except: core_atom.property["s_cg_attachment"] = str(-1 * int(pos)) try: attachment_atom.property["s_cg_attachment"] = \ attachment_atom.property["s_cg_attachment"] \ + "," + pos except: attachment_atom.property["s_cg_attachment"] = pos #this list contains all the atoms that need to be added again #once broken. these atoms include the atoms on the core which #are one bond away from any atom in the fragment atoms_to_add = [] #iterates through all the core atoms which have attachments broken_bonds = [] for core_atom_index in list(capping_dictionary): core_atom = ct.atom[core_atom_index] try: attach = core_atom.property["s_cg_attachment"] except: attach = "" atoms_to_add.append( TempAtom(core_atom.x, core_atom.y, core_atom.z, core_atom.index, core_atom.atom_type, attach)) #set the from atom if this is the principal bond if core_atom_index == principal_bond[0].index: atoms_to_add[-1].set_from_atom() for bonded_atom in capping_dictionary[core_atom_index]: for b in ct.atom[core_atom_index].bond: #probably irrational if b.atom2.index == bonded_atom: current_value = ct.atom[bonded_atom].property.pop( "s_user_attachment_bond_order", "") if current_value == "": ct.atom[bonded_atom].property[ "s_user_attachment_bond_order"] = "%s,%s" % (str( len(atoms_to_add)), str(b.order)) else: current_value = current_value + ( "::%s,%s" % (str(len(atoms_to_add)), str(b.order))) ct.atom[bonded_atom].property[ "s_user_attachment_bond_order"] = current_value #this if statement probably no longer necessary #since we only ever have one attachment between #core and r-group ct.deleteBond(core_atom_index, bonded_atom) broken_bonds.append((core_atom_index, bonded_atom)) #extract only the fragment, which is indentified by the growname for mol in ct.molecule: for a in mol.atom: if a.growname == "rpc1": fragment = mol.extractStructure() break # return fragment here without growing core atoms if not grow: build.add_hydrogens(fragment) return fragment #attachments is a list of atoms on the fragment which should attach #to the core #this should not include the principal bond #this allows filtering out other attachments which existed in #the entire structure #but are not actually attached to the core attachments = [] renumbered_map = {} for atom in fragment.atom: try: prop = atom.property["s_cg_attachment"] #case for only one attachments try: attachments.append(int(prop)) #case for more than one attachment except: proplist = prop.split(",") for val in proplist: attachments.append(int(val)) except: #case for no attachments pass prop = atom.property["i_rga_index"] renumbered_map[prop] = int(atom) if props_to_rewrite: logger.debug("Rewriting the following property values:") logger.debug(props_to_rewrite) logger.debug("On the following attachents: %s" % attachments) logger.debug("Using the following renumbering map %s" % renumbered_map) for prop, value in props_to_rewrite.copy().items(): atoms = value.split('_') rs_value = atoms.pop(1) for atom in atoms: if int(atom) not in renumbered_map: del props_to_rewrite[prop] break for i, value in enumerate(props_to_rewrite.values(), start=1): atoms = value.split('_') rs_value = atoms.pop(1) new_atoms = [] for atom in atoms: new_atoms.append(renumbered_map[int(atom)]) new_atoms.insert(1, rs_value) fragment.property['s_st_Chirality_%s' % i] = "_".join( map(str, new_atoms)) old_indeces = {} #Re-add atoms here which are part of the core, but also connected to #some piece of the fragment for k, new_atom in enumerate(atoms_to_add): #Store the property from the original structure attachment_prop = new_atom.get_attach() try: #Check to see if the corresponding attachment actually #exists in the fragment file if -int(attachment_prop) not in attachments: if new_atom.is_from_atom(): #We always have to add the from_atom, but remove #the property because it may also connect to an #unrelated atom attachment_prop = "" else: #Skip adding this atom, because it doesn't connect #to any fragment atoms continue except: if attachment_prop != "": multi_attachments = attachment_prop.split(",") #check for 'valid attachments' which means the atoms #correspond to connections that exist in the fragment valid_attachments = [] for a in multi_attachments: if -int(a) in attachments: valid_attachments.append(-int(a)) if not valid_attachments: #There are no valid attachments here. This means #that we can ignore adding this atom unless it is #the from_atom. If it is the from_atom, we remove the #s_cg_attachment property. if new_atom.is_from_atom(): attachment_prop = "" else: continue else: #This accounts for the case where some, but not necessarily #all attachments are valid. Reconstruct the s_cg_attachment #property for the core atom with only values that correspond #to those in the fragment. attachment_prop = "" for i, val in enumerate(valid_attachments): if i == 0: attachment_prop += str("-") + str(val) else: attachment_prop += "," attachment_prop += str("-") + str(val) #Add an atom to the fragment with values determined before split. fragment.addAtoms(1) added_atom = fragment.atom[len(fragment.atom)] added_atom.x = new_atom.get_x() added_atom.y = new_atom.get_y() added_atom.z = new_atom.get_z() added_atom.atom_type = new_atom.get_type() added_atom.property["i_cg_old_index"] = new_atom.get_index() added_atom.property["s_cg_attachment"] = attachment_prop if new_atom.is_from_atom(): added_atom.growname = "rpc2" from_atom = len(fragment.atom) #account for any cases needing multiple bonds for atom in fragment.atom: try: bond_order_indicator = \ atom.property["s_user_attachment_bond_order"] except: bond_order_indicator = "" if bond_order_indicator != "": separate = bond_order_indicator.split("::") for element in separate: el = element.split(",") if int(el[0]) == k + 1: fragment.addBond(fragment.atom[len(fragment.atom)], atom, int(el[1])) if output: added_atoms.append(len(fragment.atom)) for atom in fragment.atom: try: old_indeces[atom.index] = atom.property["i_cg_old_index"] except: continue rev_old_ind = {} for key in list(old_indeces): rev_old_ind[old_indeces[key]] = key start_atoms = [b[0] for b in core_to_core_bonds] for key in list(old_indeces): if old_indeces[key] in start_atoms: bond_index = start_atoms.index(old_indeces[key]) try: fragment.addBond(rev_old_ind[core_to_core_bonds[bond_index][0]], rev_old_ind[core_to_core_bonds[bond_index][1]], core_to_core_bonds[bond_index][2]) except KeyError: pass build.add_hydrogens(fragment) #Uncomment this line for debugging, the total should add #to zero, because of matching - and + numbers. #total = [] for a in fragment.atom: a.property.pop("s_user_attachment_bond_order", 0) a.property.pop("i_cg_old_index", 0) if a.growname == "rpc1": to_atom = a.index #Uncomment the following lines for debugging - #try: # if a.property["s_cg_attachment"] != "": # total.append(a.property["s_cg_attachment"]) #except: # pass #try: # total = [ int(x) for x in total ] # if sum(total) != 0: # orig_ct.write("temp_ct.mae") # out = open('data.pk','wb') # pickle.dump(principal_bond, out) # pickle.dump(bond_list, out, -1) # pickle.dump(attachment_positions, out, -1) # out.close() # logger.info("bad sum %s" % total) # sys.exit(1) #logger.info("capping_dictionary %s" % capping_dictionary) #logger.info("core_to_core %s" % core_to_core_bonds) #logger.info("old_indeces %s" % old_indeces) #except ValueError: #logger.info("atoms_to_add %s" % [ x.get_attach() for x in atoms_to_add ]) #logger.info("attachements %s" % attachments) #try: # logger.info("valid_attachments %s" % valid_attachments) #except: # pass #logger.infro("total" % total) #orig_ct.write("temp_ct.mae") #out = open('data.pk','wb') #pickle.dump(principal_bond, out) #pickle.dump(bond_list, out, -1) #pickle.dump(attachment_positions, out, -1) #out.close() #sys.exit(1) # pass #logger.info(sum(total)) return (fragment, from_atom, to_atom)
[docs]class TempAtom():
[docs] def __init__(self, x, y, z, index, atomtype, attach=""): self.x = x self.y = y self.z = z self.atomtype = atomtype self.index = index self.from_atom = False self.attach = attach
[docs] def get_index(self): return self.index
[docs] def get_x(self): return self.x
[docs] def get_y(self): return self.y
[docs] def get_z(self): return self.z
[docs] def get_type(self): return self.atomtype
[docs] def set_from_atom(self): self.from_atom = True
[docs] def is_from_atom(self): return self.from_atom
[docs] def get_attach(self): return self.attach
[docs]def add_methyl_standard(ct, core_atom, side_atom, output, grow=True): """ Breaks bond between core and side atom Grows Carbon off of side-atom, maintaining bond order of core_atom - side_atom bond :type ct: schrodinger.structure.Structure object :param ct: structure that you are adding a capping group to :type core_atom: int :param core_atom: index of the core atom :type side_atom: int :param side_atom: index of the attachment atom :type output: bool :type output: If true, function performs as it would in the writing output files. If false, function performs as it would in determining r-group attachments :return: Returns a tuple of structure of the fragment, to_atom index and from-atom index """ for atom in ct.atom: atom.growname = "" atom.property['i_rga_index'] = int(atom) stereo_handle = mm.mmstereo_new(ct.handle) mm.mmstereo_add_stereo_information_to_ct(ct.handle, stereo_handle) mm.mmstereo_delete(stereo_handle) props_to_rewrite = {} for prop in ct.property: if prop.startswith('s_st_Chirality_'): props_to_rewrite[prop] = ct.property[prop] #This is the case with a null r-group #Grow a dummy atom if not side_atom: ct.addAtoms(1) factor = 0.1 last_atom = len(ct.atom) xyz = [coord + factor for coord in ct.atom[last_atom - 1].xyz] ct.atom[last_atom].xyz = xyz ct.atom[last_atom].atom_type = 61 ct.atom[last_atom].property['i_rga_index'] = last_atom ct.addBond(ct.atom[core_atom], last_atom, 1) side_atom = last_atom bond_order = 0 core_xyz = ct.atom[core_atom].xyz ct.atom[side_atom].growname = "rpc1" #Find bond order for b in ct.atom[core_atom].bond: if b.atom2.index == side_atom: bond_order = b.order break ct.deleteBond(core_atom, side_atom) for mol in ct.molecule: for a in mol.atom: if a.index == side_atom: fragment = mol.extractStructure() break if not grow: build.add_hydrogens(fragment) return fragment fragment.addAtoms(1) fragment.atom[len(fragment.atom)].atom_type = 4 - int(bond_order) fragment.atom[len(fragment.atom)].xyz = core_xyz fragment.atom[len(fragment.atom)].growname = "rpc2" renumbered_map = {} for atom in fragment.atom: if atom.growname == "rpc1": to_atom = atom.index if atom.growname == "rpc2": prop = int(core_atom) else: prop = atom.property["i_rga_index"] renumbered_map[prop] = int(atom) if props_to_rewrite: logger.debug("Rewriting the following property values:") logger.debug(props_to_rewrite) logger.debug("Using the following renumbering map %s" % renumbered_map) for prop, value in props_to_rewrite.copy().items(): atoms = value.split('_') rs_value = atoms.pop(1) for atom in atoms: if int(atom) not in renumbered_map: del props_to_rewrite[prop] break for i, value in enumerate(props_to_rewrite.values(), start=1): atoms = value.split('_') rs_value = atoms.pop(1) new_atoms = [] for atom in atoms: new_atoms.append(renumbered_map[int(atom)]) new_atoms.insert(1, rs_value) fragment.property['s_st_Chirality_%s' % i] = "_".join( map(str, new_atoms)) from_atom = len(fragment.atom) fragment.addBond(to_atom, from_atom, bond_order) build.add_hydrogens(fragment) return (fragment, from_atom, to_atom)
[docs]def add_to_data(data, fragment, from_atom, to_atom, to_atom_chirality, j, ct_index): if fragment == "null": data.add_r_group(j, ('null', (-1, to_atom), 0), ct_index) return smiles_gen = smiles_mod.SmilesGenerator( stereo=smiles_mod.STEREO_FROM_ANNOTATION_AND_GEOM) smiles, mapping = smiles_gen.getSmilesAndMap(fragment) from_atom_index, to_atom_index = checkMultipleMappings( fragment, smiles, from_atom, to_atom) data.add_r_group(j, (smiles, (from_atom_index, to_atom_index), to_atom_chirality), ct_index)
[docs]def checkMultipleMappings(fragment, smiles, from_atom, to_atom): """ This function checks whether a given fragment may match SMILES string multiple times. In that case a 'canonical' match is selected. In no mapping is found we return (-1, -1) as in previous version of add_to_data function. We assume that calling functions know how to deal with this case. :param fragment: r-group structure :type fragment: `structure.Structure` :parame smiles: SMILES string :type smiles: str :param from_atom: atom index of 'from' atom in fragment. This is a core atom that was replaced with a cap group. :type from_atom: int :param to_atom: atom index of 'to' atom in fragment. This is atom in r-group attached to core. :type to_atom: int :return: tuple that contains positions of 'from' and 'to' atoms in SMILES mapping. These values are 0-indexed. If no mapping is found we return (-1, -1). :rtype: tuple """ matches = analyze.evaluate_smarts_canvas(fragment, smiles, uniqueFilter=False) from_to_atom = [] for mapping in matches: try: from_atom_index = mapping.index(from_atom) to_atom_index = mapping.index(to_atom) except ValueError: pass else: from_to_atom.append((from_atom_index, to_atom_index)) if from_to_atom: # Cannonicalize the result if there are multiple matches. return sorted(from_to_atom, reverse=True)[0] else: return (-1, -1)
[docs]def determine_attachments(core, ct, ct_index, data): r_groups_per_atom = data.get_r_groups_per_atom() rings = data.get_rings() f_rings = data.get_functional_rings() for atom in ct.atom: atom.property.pop("i_cg_attachment", 0) ct_rings = rings[ct_index] functional_rings = f_rings[ct_index] r_group_attachment_bonds = determine_attachment_bonds( ct, core, r_groups_per_atom) data.add_attachment_points(r_group_attachment_bonds, ct_index) bonds_in_rings = determine_rings_in_attachments(functional_rings, r_group_attachment_bonds) data.add_bonds_in_rings(bonds_in_rings, ct_index) in_ring = False for r_group, bond in enumerate(r_group_attachment_bonds): for ring_bond in bonds_in_rings: if r_group in ring_bond: #Find other rings which also are bonded to this attachment #point bonds_with_connected_rings = get_connected_ring_bonds( bonds_in_rings, ring_bond) #Return a list of #the actual [ to_atom, from_atom, bond_order ] of each #of the ring bonds ring_bonds = [r_group_attachment_bonds[n] \ for n in bonds_with_connected_rings] (fragment, from_atom, to_atom) = add_methyl_ring(ct.copy(), bond, ring_bonds, bonds_with_connected_rings, False) to_atom_chirality = get_atom_chirality(fragment, to_atom) add_to_data(data, fragment, from_atom, to_atom, to_atom_chirality, r_group, ct_index) #If we find a fragment associated with this r-group in a ring, #break out of this function in_ring = True break if not in_ring: if type(bond[1]) is not type(None): (fragment, from_atom, to_atom) = add_methyl_standard(ct.copy(), bond[0].index, bond[1].index, False) to_atom_chirality = get_atom_chirality(fragment, to_atom) add_to_data(data, fragment, from_atom, to_atom, to_atom_chirality, r_group, ct_index) #null rgroup else: from_atom = bond[0].index to_atom = -1 add_to_data(data, 'null', from_atom, to_atom, 0, r_group, ct_index) in_ring = False
[docs]def get_atom_chirality(ct, atom): handle = mm.mmstereo_new(ct) chirality = mm.mmstereo_atom_stereo(handle, atom) mm.mmstereo_delete(handle) return chirality
[docs]def determine_rings_in_attachments(functional_rings, r_group_attachment_bonds): #Determine which attachment bonds are part of rings bonds_in_rings = [] #Outer list maps to each functional ring #Inner list is an index of the attachment bond in that ring structure for f_ring in functional_rings: bond_indexes = [] #This index is relative the r_group_attachment_bonds data structure for i, bond in enumerate(r_group_attachment_bonds): if bond_in_ring(f_ring, bond): bond_indexes.append(i) bonds_in_rings.append(bond_indexes) return bonds_in_rings
[docs]def determine_attachment_bonds(ct, core, r_groups_per_atom): """ :return list of lists. Inner element is the [core_atom, bonded_atom, bond_order ] of each attachment point """ r_group_attachment_bonds = [] smarts_indexes_with_groups = list(r_groups_per_atom) #Identify the bonds, and their bond orders #Which connect the core to the r-groups for smarts_index in smarts_indexes_with_groups: number_groups_on_atom = r_groups_per_atom[smarts_index] core_atom_index = core[smarts_index] core_atom = ct.atom[core_atom_index] number_groups_found = 0 number_bonded_hydrogens = 0 hydrogens = [] bonds = [] #Iterate through bonded atoms, looking for heavy atoms #and hydrogens for bond in core_atom.bond: bonded_atom = bond.atom2 if (bonded_atom.index not in core) \ and (bonded_atom.element != 'H'): #Found a heavy atom attachment r_group_attachment_bonds.append( [core_atom, bonded_atom, bond.order]) number_groups_found += 1 elif (bonded_atom.index not in core) \ and (bonded_atom.element == 'H'): #Found a hydrogen attachment number_bonded_hydrogens += 1 hydrogens.append(bonded_atom) #If an position has more attachment than heavy atoms, first try to #add hydrogen r-groups, up to the number of existing hydrogens num_hydrogens = len(hydrogens) index = 0 while (number_groups_found < number_groups_on_atom) and \ (num_hydrogens != 0): r_group_attachment_bonds.append([core_atom, hydrogens[index], 1]) number_groups_found += 1 num_hydrogens -= 1 index += 1 #If a position has more attachments than heavy atoms and hydrogens, #then append null positions. This are marked by a bond order of 0 #and a bonded_atom of None if number_groups_found < number_groups_on_atom: num_missing_groups = number_groups_on_atom - number_groups_found for i in range(0, num_missing_groups): r_group_attachment_bonds.append([core_atom, None, 0]) return r_group_attachment_bonds
[docs]def conglomerate_rings(rings, core): """ This function takes the mmssr and combines rings if they are entirely part of the core structure. Since we know in RGA we are never breaking the core structure, this is a safe operation. :param rings: output from a mmsssr calculation on the entire output structure :param core: list of atoms in the core structure """ set_core = set(core) def merge_rings_in_core(rings): output_rings = [] for i, ring1 in enumerate(rings): set_ring1 = set(ring1) found_new_rings = False for j, ring2 in enumerate(rings[i:]): set_ring2 = set(ring2) if not set_ring1.isdisjoint(set_ring2): combined_rings = set_ring1 | set_ring2 if set_core.issuperset(combined_rings): found_new_rings = True output_rings.append(list(combined_rings)) if not found_new_rings: output_rings.append(ring1) if output_rings.sort() != rings.sort(): merge_rings_in_core(output_rings) return output_rings return merge_rings_in_core(rings[:])
[docs]def bond_in_ring(ring, bond): if type(bond[1]) is type(None): return False if bond[1].index not in ring: return False if bond[0].index not in ring: return False return True
[docs]def get_connected_ring_bonds(ct_rings, ring): """ This function returns any other rings that are also connected to any attachment bonds that are part of the ring fed to the function. :param ct_rings: Outer list is a list of each ring that is stored in the data class. Inner list is an index of the attachment bond that is present in that ring :type ct_rings: list :param ring: List of attachment bonds that are present in the ring :type ring: list """ #Recursively determine all bonds relevant to a given ring like r-group #copy was not being made before relevant_rings = ring[:] for attachment_bond in ring: #This is the ring index of in the master ring structure for the ct for ring_index in ct_rings: #ring_index contains all the attachment_bond relevant to each ring if attachment_bond in ring_index: for additional_bond in ring_index: if additional_bond not in relevant_rings: relevant_rings.append(additional_bond) if set(relevant_rings) == set(ring): return relevant_rings else: return get_connected_ring_bonds(ct_rings, relevant_rings)
[docs]class RGroupException(Exception): """ This is a special exception, which is thrown when for some reason r-group calculation can not be completed. When this exception is caught remaining function calls in calculate() function are not made. """
[docs] @classmethod def fromSmartsError(cls, err): """ This function checks whether an exception err was due to invalid SMARTS pattern. :param err: exception :type err: Exception :return: exception with specific message :rtype: `RGroupException` """ if str(err).startswith("Invalid smarts"): msg = "Invalid SMARTS Pattern." else: msg = "SMARTS matching failed: %s" % str(err) return RGroupException(msg)
[docs]class RGroupFinder(): """ This class is used to find optimal core alignments and determines r-groups for input structures. This is a base class that uses 'original' algorithm that tries to minimize the number of r-groups. """
[docs] def __init__(self, input_file, smarts, temp_dir=None, thread=None, use_mm=1, use_fp_sim=False, sa_seed=None, t_factor=None, tmax_mult=None): """ This function creates an instance of RGroupFinder object. :param input_file: name of the input structure file :type input_file: str :param smarts: list of cores SMARTS patterns :type smarts: list :param temp_dir: name of temporary directory :type temp_dir: str :param thread: calculation thread (optional) :type thread: calculation_thread :param use_mm: type of multiple matching algorithm (1-SA and 2-DEE) :type use_mm: int :param use_fp_sim: True to use fingerprint similarity between pairs of 'states' :type use_fp_sim: bool :param sa_seed: SimualtedAnnealing random generator seed :type sa_seed: int :param t_factor: SimulatedAnnealing temperature factor :type tmax_mult: float :param tmax_mult: SimualtedAnnealing parameter to set starting temperature :type tmax_mult: float """ self.input_file = input_file self.smarts = smarts self.temp_dir = temp_dir self.thread = thread self.use_mm = use_mm self.use_fp_sim = use_fp_sim self.dee_msg = "" self.sa_seed = sa_seed self.t_factor = t_factor self.tmax_mult = tmax_mult self._resetData()
def _resetData(self): """ This function resets data objects used during calculation to their initial values. """ # This is the main data object, where calculation results are # stored. It is set to None if calculation fails for any reason. self.data = Data() # This variable is used to store any error messages if calculation # fails for some reason. self.msg = "" # Dictionary of 'core states', where key is the molecule index and # value contains all core matches (each one is a list of atom indices) and # a list of attachments for each core match. When using base algorithm a # list of attachments is empty. self.core_states = {} # Dictionary of 'bond states', where key is the molecule index and # value contains lists of bond indices for each core match. self.bond_states = {} # Indices of molecule that produced multiple core matches self.mult_matches = set() # Indices of molecule that had no core matches self.no_matches = set() # Indices of molecule that had no heavy atoms in the core self.no_heavy_atoms = set() # Indices of duplicate molecules self.duplicate_matches = set() # List of unique SMILES corresponding to core matches self.unique_smiles = set() # ATTENTION!!! #This variable counts number of molecules. Got to be a better way. self.mol_cnt = 0
[docs] def calculate(self): """ This function is called to compute optimal molecule alignment and determine attached r-groups. It defines the order of calls that need to be made as in 'Template Method' design pattern. When calculation is done here self.data object can be retrieved to get results. """ if self.use_mm == 0: logger.debug("Running SMARTS pattern Core/R Group " "Identification Method.") elif self.use_mm == 1: logger.debug("Running SMARTS pattern Core/R Group Identification " "Method - SA Algorithm.") elif self.use_mm == 2: logger.debug("Running SMARTS pattern Core/R Group Identification " "Method - Backtracking Algorithm.") try: # reset data before the calculation begins self._resetData() # find cores matching given SMARTS pattern for each molecule self._findCoreMatches() # optimize molecule alignments self._optimizeAlignment() # finalize calculation self._finalizeCalculation() # generate message self._generateMessage() except RGroupException: # if RGroupException is caught set self.data to None. self.data = None
def _findCoreMatches(self): """ In this function we loop over all molecule and find all possible core matches. """ reader = structure.MaestroReader(self.input_file) if len(self.smarts) == 1: iter_SMARTS = itertools.repeat(self.smarts[0]) else: iter_SMARTS = iter(self.smarts) count = itertools.count() # determine mappings and construct data structures for each core that will be # used to construct 'energy' matrix self.mol_cnt = 0 for i, ct_SMARTS, ct in zip(count, iter_SMARTS, reader): self.mol_cnt += 1 self.data.set_prop_list(list(ct.property)) self.data.add_ct_offset(reader.last_position) build.add_hydrogens(ct) if ct.atom_total < 255: logger.debug('Matching SMARTS=%s for struct=%s' % (ct_SMARTS, i)) try: (core_matches, bond_matches) = evaluate_smarts_ex(ct, ct_SMARTS.strip()) except (RuntimeError, ValueError) as err: raise RGroupException.fromSmartsError(err) else: #molecule is too big core_matches = [] bond_matches = [] # if 'good' match is found store core and bond matches for # future use. retval = self._checkAndTallyCoreMatches(i, ct, core_matches, bond_matches) (good_match, core_matches, bond_matches) = retval if good_match: attachments = [] for core in core_matches: core_attach = self._findAttachments(ct, core) attachments.append(core_attach) self.core_states[i] = [core_matches, attachments] self.bond_states[i] = bond_matches # end of loop over molecules if self.mol_cnt == 1: msg = "Only a single structure was supplied. R-Group Analysis needs " msg += "more than one structure for analysis." raise RGroupException(msg) def _checkAndTallyCoreMatches(self, i, ct, core_matches, bond_matches): """ Verify that a valid match was found by checking that core atoms contains any heavy atoms, that current molecule is not a duplicate etc. Also update the class attributes to reflect the results of the verification. :param i: index of current molecule :type i: int :param ct: current molecule ct :type ct: `schrodinger.structure.Structure` :param core_matches: list of lists of core atom indices for each match found :type core_matches: list :param bond_matches: list of lists of core bonds for each match found :type bond_matches: list :return: A tuple of: - A bool indicating whether this is a 'good' match - The updated core_matches list - The updated bond_matches list :rtype: tuple """ retval = self._checkCoreMatches(ct, core_matches, bond_matches) (match_state, core_matches, bond_matches, smiles) = retval if match_state == GOOD_MATCH: if len(core_matches) > 1: self.mult_matches.add(i) else: self.no_matches.add(i) if match_state == DUPLICATE_MATCH: self.duplicate_matches.add(i) else: self.unique_smiles.add(smiles) if match_state == NO_HEAVY_ATOM_MATCH: self.no_heavy_atoms.add(i) return (match_state == GOOD_MATCH, core_matches, bond_matches) def _checkCoreMatches(self, ct, core_matches, bond_matches): """ Verifies that a valid match was found by checking that core atoms contains any heavy atoms, that current molecule is not a duplicate etc. :param ct: current molecule ct :type ct: `schrodinger.structure.Structure` :param core_matches: list of lists of core atom indices for each match found :type core_matches: list :param bond_matches: list of lists of core bonds for each match found :type bond_matches: list :return: A tuple of: (1) A match state constant that is one of GOOD_MATCH, NO_MATCHES, DUPLICATE_MATCH, or NO_HEAVY_ATOM_MATCH. (2) The updated core_matches list (3) The updated bond_matches list (4) The smiles string for the input molecule :rtype: tuple """ stereo = smiles_mod.STEREO_FROM_ANNOTATION_AND_GEOM smiles_gen = smiles_mod.SmilesGenerator(stereo) smiles = smiles_gen.getSmiles(ct) if smiles in self.unique_smiles: # This molecule is a duplicate return (DUPLICATE_MATCH, core_matches, bond_matches, smiles) elif not (core_matches): # SMARTS pattern did not match in the structure return (NO_MATCHES, core_matches, bond_matches, smiles) else: # verify that core matches contain 'heavy' atoms retval = self._checkCoreHeavyAtoms(ct, core_matches, bond_matches) return retval + (smiles,) def _checkCoreHeavyAtoms(self, ct, core_matches, bond_matches): """ This function verifies that core match has heavy atoms. :param ct: current molecule ct :type ct: `schrodinger.structure.Structure` :param core_matches: list of lists of core atom indices for each match found :type core_matches: list :param bond_matches: list of lists of core bonds for each match found :type bond_matches: list :return: A tuple of: (1) A match state constant that is one of GOOD_MATCH, NO_MATCHES, DUPLICATE_MATCH, or NO_HEAVY_ATOM_MATCH (2) The updated core_matches list (3) The updated bond_matches list :rtype: tuple """ new_core_matches = [] new_bond_matches = [] for core, bond in zip(core_matches, bond_matches): unique_atoms = {ct.atom[a].element for a in core} if unique_atoms != {'H'}: new_core_matches.append(core) new_bond_matches.append(bond) # After eliminating non-heavy atoms, check to see if there are still # multiple core matches if not new_core_matches: match_state = NO_HEAVY_ATOM_MATCH else: match_state = GOOD_MATCH return (match_state, new_core_matches, new_bond_matches) def _generateMessage(self): """ This function checks calculation results and generates appropriate message. :raise RGroupException: raises this exception if only one structure matched the core definition or no matches were found. """ logger.debug( "Following data structure is a list of dictionaries " "describing R-group positions. Each dictionary corresponds to " "r-group position. The keys of the dictionaries are methyl-capped " "unique smiles. The corresponding value is a list of ct's in " "which this unique smiles appears at the position " "corresponding to the dictionary.") for position in self.data.get_r_groups(): logger.debug(position) self.no_matches -= self.duplicate_matches if self.no_matches: self.msg = ("The following input structures were not matched " "at all by the SMARTS pattern and were omitted:") unmatched_mols = sorted(self.no_matches - self.no_heavy_atoms) unmatched_mols_to_str = [str(i + 1) for i in unmatched_mols] self.msg += ", ".join(unmatched_mols_to_str) + "\n" if self.no_heavy_atoms: self.msg += "The following input structures had matches " self.msg += "that had no heavy atoms:" no_heavy_atoms_to_str = [ str(i + 1) for i in sorted(self.no_heavy_atoms) ] self.msg += ", ".join(no_heavy_atoms_to_str) + "\n" if self.duplicate_matches: self.msg += ("The following structures were duplicates of existing " "structures and were omitted:") for ct_num in self.duplicate_matches: self.msg += " %i" % (ct_num + 1) if len(self.core_states) == 1: self.msg = "Only one structure matched the core definition." raise RGroupException(self.msg) if self.msg == "": self.msg = None if len(self.no_matches) == self.mol_cnt: if len(self.no_heavy_atoms) != 0: self.msg = "The core contained no heavy atoms." else: self.msg = "No input structures matched the core definition." raise RGroupException(self.msg) return def _findAttachments(self, ct, core): """ Find attachments for a given core. :param ct: current molecule ct :type ct: `schrodinger.structure.Structure` :param core: core atom indices :type core: list :return: list of attached groups ('hydrogen', 'null' or SMILES) :rtype: list """ # ATTENTION!!! This function could probably be refactored even further. smiles_gen = smiles_mod.SmilesGenerator( stereo=smiles_mod.STEREO_FROM_ANNOTATION_AND_GEOM) temp_r_groups_per_atom = {} smarts_indexes_with_groups = [] attachments = {} for smarts_index, core_atom_index in enumerate(core): num_r_groups = 0 isHydrogen = False core_atom = ct.atom[core_atom_index] attachments[smarts_index] = [] for bonded_atom in core_atom.bonded_atoms: if (bonded_atom.index not in core) and (bonded_atom.element != 'H'): num_r_groups += 1 elif (bonded_atom.element == 'H'): isHydrogen = True temp_r_groups_per_atom[smarts_index] = num_r_groups if num_r_groups == 0: if isHydrogen: attachments[smarts_index].append("hydrogen") else: attachments[smarts_index].append("null") else: for i in range(num_r_groups): smarts_indexes_with_groups.append(smarts_index) ct_rings = find_rings(ct) functional_rings = conglomerate_rings(ct_rings, core) r_group_attachment_bonds = determine_attachment_bonds( ct, core, temp_r_groups_per_atom) bonds_in_rings = determine_rings_in_attachments( functional_rings, r_group_attachment_bonds) for r_group, bond in enumerate(r_group_attachment_bonds): if type(bond[1]) is type(None): continue idx = smarts_indexes_with_groups[r_group] in_ring = False for ring_bond in bonds_in_rings: if r_group in ring_bond: bonds_with_connected_rings = get_connected_ring_bonds( bonds_in_rings, ring_bond) ring_bonds = [ r_group_attachment_bonds[n] for n in bonds_with_connected_rings ] fragment = add_methyl_ring(ct.copy(), bond, ring_bonds, bonds_with_connected_rings, False, grow=False) smiles = smiles_gen.getSmiles(fragment) attachments[idx].append(smiles) in_ring = True break if not in_ring: fragment = add_methyl_standard(ct.copy(), bond[0].index, bond[1].index, False, grow=False) smiles = smiles_gen.getSmiles(fragment) attachments[idx].append(smiles) return list(attachments.values()) def _optimizeAlignment(self): """ This function finds best core matches for molecules that have multiple matches. Indices of best core matches are appended to self.core_states data. """ # total number of molecules that had multiple matches num_mult_matches = len(self.mult_matches) if num_mult_matches > 0: best_matches = self._findBestMatches() # put best matches into core_states dictionary here cnt = 0 for k in self.core_states.keys(): if k in self.mult_matches: self.core_states[k].append(best_matches[cnt]) cnt += 1 else: self.core_states[k].append(0) def _calculateStatesTally(self): """ This function calculates number of 'states' (matches) for each multiply matched molecule as well as the grand total of all states. :return: list consisting of number of states for each molecule and a total number of states :rtype: list, int """ # total number of 'states' for all multiply matched molecules total_states = 0 # list of number of 'states' for each multiply matched molecule num_states = [] for i, t in self.core_states.items(): if i in self.mult_matches: cores, attachments = t num_states.append(len(cores)) total_states += len(cores) return num_states, total_states def _findBestMatches(self): """ Find best matching core for each multiply matched molecule. :return: list that contains indices of 'best matching' cores for each molecule :rtype: list """ dee_matrix = self._calculateEnergyMatrix() # solve optimization problem if self.use_mm == 1: # simulated annealing sa = rgadee.SimulatedAnnealing(dee_matrix, sa_seed=self.sa_seed, t_factor=self.t_factor, tmax_mult=self.tmax_mult) sa.run() best_matches = sa.getBestMatch() self.data.sa_best_energy = sa.best_energy self.data.sa_cpu_time = sa.cpu_time elif self.use_mm == 2: # backtracking and dead end elimination dee_matrix.eliminatePairs() dee_matrix.eliminateSingles() dee_backtrack = rgadee.DEE_Backtracking(dee_matrix) self.dee_msg = dee_backtrack.minimize() best_matches = dee_backtrack.getBestMatch() if len(best_matches) != len(self.mult_matches): msg = "Backtracking algorithm failed to find solution." raise RGroupException(msg) return best_matches def _calculateEnergyMatrix(self): """ This function computes 'energy' matrix that is used in the optimization procedure that finds best matching cores for each molecule. :return: energy matrix :rtype: `DEE_EnergyMatrix` """ # precalculate FP similarities for attachments r_group_sim = self._calculateRGroupSimilarity() # compute similarity matrix num_states, total_states = self._calculateStatesTally() uij = self._calculateSimilarityMatrix(total_states, r_group_sim) return rgadee.DEE_EnergyMatrix(num_states, uij) def _calculateRGroupSimilarity(self): """ This function calculates Tanimoto fingerprint similarities for every attachment pair defined using SMILES. :return: dictionary of similarity scores keyd on the pair of SMILES string :rtype: dict """ # find SMILES and FPs for unique attachments r_group_bitset = self._findUniqueRGroups() # compute similarities for each pair of unique attachments sim_func = self._calculatePairSimilarityGeneric if self.use_fp_sim: sim_func = self._calculatePairSimilarityFP r_group_sim = self._calculatePairSimilarity(r_group_bitset, sim_func) return r_group_sim def _findUniqueRGroups(self): """ This function iterates over core 'states'. For each attached group, which is defined by a SMILES string, it calculates its fingerprint. SMILES string is used as a key, so that we only store FPs for unique attachments. :return: dictionary, where SMILES is key and FP is value :rtype: dict """ r_group_bitset = {} # get list of unique attachment strings, where each element is either # SMILES string or 'hydrogen' or 'null'. In the end throw away 'null' # since it is not used in calculation core_states_attachments = [] list( map(core_states_attachments.extend, [v[1] for v in self.core_states.values()])) all_core_states_attachments = [] list(map(all_core_states_attachments.extend, core_states_attachments)) unique_core_states_attachments = set( [v[0] for v in all_core_states_attachments]) unique_core_states_attachments.discard(NULL_GROUP) # loop over unique attachments and generate their FPs for group_key in unique_core_states_attachments: group_smiles = group_key if group_key == HYDROGEN_GROUP: group_smiles = '[H]' mol = ChmMol.fromSMILES(str(group_smiles)) bs = ChmFingerprints.getRGACodes(mol) r_group_bitset[group_key] = bs return r_group_bitset def _calculatePairSimilarity(self, r_group_bitset, sim_func): """ This function calculates similarities for each pair of unique r-group attachments. :param r_group_bitset: dictionary of FPs for r-groups keyed on SMILES :type r_group_bitset: dict :param sim_func: function used to compute similarity between pair of attachments. This function can be either _calculatePairSimilarityFP or _calculatePairSimilarityGeneric. :type sim_func: function :return: dictionary of pair similarities keyed on a pair of SMILES :rtype: dict """ r_group_sim = {} r_group_keys = list(r_group_bitset) num_keys = len(r_group_keys) for i in range(num_keys): key1 = r_group_keys[i] # special cases: same keys or one key is a null r_group_sim[(key1, key1)] = -1.0 r_group_sim[(NULL_GROUP, key1)] = 0.0 r_group_sim[(key1, NULL_GROUP)] = 0.0 for j in range(i + 1, num_keys): key2 = r_group_keys[j] sim = sim_func(key1, key2, r_group_bitset) r_group_sim[(key1, key2)] = sim r_group_sim[(key2, key1)] = sim for p, sim in r_group_sim.items(): logger.debug("r-group pair: %s similarity: %10.6f" % (p, sim)) return r_group_sim def _calculatePairSimilarityFP(self, key1, key2, r_group_bitset): """ This function calculates Tanimoto FP similarities for a pair of unique r-group attachments. :param key1: first r-group key, which can be SMILES or 'hydrogen' :type key1: str :param key2: second r-group key, which can be SMILES or 'hydrogen' :type key2: str :param r_group_bitset: dict of FPs for r-groups. :type r_group_bitset: dict :return: similarity between two groups :rtype: float """ bs1 = r_group_bitset[key1] bs2 = r_group_bitset[key2] sim = -bs1.simTanimoto(bs2) return sim def _calculatePairSimilarityGeneric(self, key1, key2, r_group_bitset): """ This function calculates similarities for a pair of unique r-group attachments using simple sheme. :param key1: first r-group key, which can be SMILES or 'hydrogen' :type key1: str :param key2: second r-group key, which can be SMILES or 'hydrogen' :type key2: str :param r_group_bitset: dict of FPs for r-groups. This variable is not used by this function, but we need it for compatibility. :type r_group_bitset: dict :return: similarity between two groups :rtype: float """ if key1 == HYDROGEN_GROUP or key2 == HYDROGEN_GROUP: sim = -0.1 elif key1 == key2: sim = -1.0 else: sim = -0.5 return sim def _calculateStatePairScore(self, a1, a2, r_group_sim): """ This function calculates pairwise score for a pair of core states. For each state we have lists of groups at each attachment point. These groups can be 'null', 'hydrogen' or be identified by a SMILES string. It is possible to have multiple groups attached to the same point. :param a1: list of attachment groups for 1st state :type a1: list :param a2: list of attachment groups for 2nd state :type a2: list :param r_group_sim: dictionary of Tanimoto similarities :type r_group_sim: dict :return: pairwise score :rtype: double """ sim = 0.0 nsim = 0 for group1, group2 in zip(a1, a2): # multiple r-groups at attachment point if len(group1) > 1 or len(group2) > 1: if len(group1) == len(group2) and set(group1) == set(group2): sim += -1.0 else: sim += -0.5 nsim += 1 continue # single r-groups at attachment point p = (group1[0], group2[0]) if p in ATTACHMENT_PAIRS_TO_SKIP: continue sim += r_group_sim[p] nsim += 1 if nsim > 0: sim = old_div(sim, nsim) return sim def _calculateSimilarityMatrix(self, ns, r_group_sim): """ This function computes similarity matrix, which contains pairwise scores between core states of molecules that have multiple core matches. :param ns: total number of core states (matrix dimensions) :type ns: int :param r_group_sim: pairwise Tanimoto similarity between pairs of attachments :type r_group_sim: dict :return: ns * ns similarity matrix :rtype: """ uij = numpy.zeros((ns, ns)) ni = 0 cnt = 0 for i, ti in self.core_states.items(): if i in self.mult_matches: ci, ai = ti nj = 0 for j, tj in self.core_states.items(): cj, aj = tj if j > i: for i1, a1 in enumerate(ai): for i2, a2 in enumerate(aj): sim = self._calculateStatePairScore( a1, a2, r_group_sim) if j in self.mult_matches: uij[ni + i1][nj + i2] = sim uij[nj + i2][ni + i1] = sim else: uij[ni + i1][ni + i1] += sim if j in self.mult_matches: nj += len(cj) ni += len(ci) return uij def _finalizeCalculation(self): """ This function is used to add r-group data into Data() object after best matching core has been found. This includes adding core bonds, adding properties and adding information about attched groups. """ # save core data for best matches self._saveBestMatchData() # save properties and attachment data self._savePropertyAndAttachmentData() def _saveBestMatchData(self): """ This function iterates over input structures and stores core and bond data for 'best matches' in the Data() structure. """ reader = structure.MaestroReader(self.input_file) if len(self.smarts) == 1: iter_SMARTS = itertools.repeat(self.smarts[0]) else: iter_SMARTS = iter(self.smarts) count = itertools.count() for i, ct_SMARTS, ct in zip(count, iter_SMARTS, reader): if i in self.core_states: build.add_hydrogens(ct) core_matches, attachments, best_match = self.core_states[i] core = [core_matches[best_match]] self.data.add_core(core, i) bond_match = [self.bond_states[i][best_match]] self.data.add_core_bonds(bond_match, i) add_ct_data(self.data, core[0], ct, i) def _savePropertyAndAttachmentData(self): """ Iterate over input structures and save property and attachment groups data in the main Data() structure. """ #Add an x R-Group positions for each core atom with x attachments r_groups_per_atom = self.data.get_r_groups_per_atom() for key in r_groups_per_atom: for i in range(0, r_groups_per_atom[key]): self.data.add_position() #Determine attachments and store properties cores = self.data.get_cores() reader = structure.MaestroReader(self.input_file) for ct_index, ct in enumerate(reader): if ct_index in self.core_states: props = self.data.get_prop_list() for p in props: #Don't add r_group family properties, since we'll just wipe them #later if p.startswith("i_rga_R") and p.endswith("_Family"): continue if p in ct.property: self.data.add_property_value(p, ct.property[p]) else: #Add a None for the non-existence of a property, to mark #this for the GUI based tables self.data.add_property_value(p, None) #Check to see if the SMARTS pattern matches in #multiple ways core = cores[ct_index][0] build.add_hydrogens(ct) determine_attachments(core, ct, ct_index, self.data) if self.thread and self.thread.terminate_thread: self.thread.exit() raise RGroupException(None) # write debug output f_rings = self.data.get_functional_rings() logger.debug("Functional Rings:") logger.debug(f_rings) logger.debug("R-Groups Per Atom Dictionary:") logger.debug(r_groups_per_atom)
[docs]def get_rgroup_DEE(input_file, SMARTS, temp_dir=None, thread=None, use_mm=1, use_fp_sim=False, sa_seed=None, t_factor=None, tmax_mult=None): finder = RGroupFinder(input_file, SMARTS, temp_dir, thread, use_mm, use_fp_sim, sa_seed=sa_seed, t_factor=t_factor, tmax_mult=tmax_mult) finder.calculate() return (finder.data, finder.msg)
[docs]def get_rgroup_SMARTS(input_file, SMARTS, temp_dir=None, thread=None): """ Takes input_file and SMARTS_pattern and defines r-groups input_file - input .mae file SMARTS_pattern defining core """ logger.debug("Running SMARTS pattern Core/R Group Identification Method.") data = Data() r_group_positions = [] reader = structure.MaestroReader(input_file) if len(SMARTS) == 1: iter_SMARTS = itertools.repeat(SMARTS[0]) else: iter_SMARTS = iter(SMARTS) count = itertools.count() mult_matches = set() no_matches = set() no_heavy_atoms = set() duplicate_matches = set() length = 0 run_alg = False smiles_gen = smiles_mod.SmilesGenerator( stereo=smiles_mod.STEREO_FROM_ANNOTATION_AND_GEOM) smiles_set = set() #CODE BLOCK 1 for i, ct_SMARTS, ct in zip(count, iter_SMARTS, reader): #Add information to dataset data.set_prop_list(list(ct.property)) data.add_ct_offset(reader.last_position) run_alg = True build.add_hydrogens(ct) if ct.atom_total < 300: logger.debug('Matching SMARTS=%s for struct=%s' % (ct_SMARTS, i)) try: (core_matches, bond_matches) = evaluate_smarts_ex(ct, ct_SMARTS.strip()) except ValueError as err: if str(err).startswith("Invalid smarts"): msg = "Invalid SMARTS Pattern." return (None, msg) else: #molecule is too big core_matches = [] smiles = smiles_gen.getSmiles(ct) if smiles in smiles_set: duplicate_matches.add(i) no_matches.add(i) run_alg = False else: smiles_set.add(smiles) #SMARTS pattern did not match in the structure if len(core_matches) == 0: no_matches.add(i) run_alg = False elif len(core_matches) >= 1: #Check SMARTS match to see if it contains heavy atoms remove_bond_matches = [] for ib, core in enumerate(core_matches[:]): unique_atoms = set([ct.atom[a].element for a in core]) if unique_atoms == set(['H']): core_matches.remove(core) remove_bond_matches.append(ib) if len(remove_bond_matches) > 0: for ib in reversed(remove_bond_matches): bond_matches.pop(ib) #Match contains no heavy atoms if len(core_matches) == 0: no_heavy_atoms.add(i) no_matches.add(i) run_alg = False #After eliminating non-heavy atoms, there are still multiple #ways SMARTS patterns match the core elif len(core_matches) > 1: mult_matches.add(i) if i == 0: #Save the first struct matches in case all structures #match first_struct_matches = core_matches[:] if run_alg: core = core_matches[0] data.add_core(core_matches, i) data.add_core_bonds(bond_matches, i) if i not in mult_matches: add_ct_data(data, core, ct, i) if thread and thread.terminate_thread: thread.exit() return (None, None) length = i if length == 0: msg = "Only a single structure was supplied. R-Group Analysis needs " msg += "more than one structure for analysis." return (None, msg) if len(mult_matches) == length + 1: #If all matches are multiple matches on all structures, #assume the first match is the canonical match. ct = next(structure.MaestroReader(input_file, index=1)) data.r_groups_per_atom = {} core = find_best_core_match(ct, first_struct_matches) if core is None: core = first_struct_matches[0] data.add_core([core], 0) add_ct_data(data, core, ct, 0) mult_matches.remove(0) r_groups_per_atom = data.get_r_groups_per_atom() rings = data.get_rings() f_rings = data.get_functional_rings() cores = data.get_cores() logger.debug("Functional Rings:") logger.debug(f_rings) logger.debug("R-Groups Per Atom Dictionary:") logger.debug(r_groups_per_atom) #Add an x R-Group positions for each core atom with x attachments for key in r_groups_per_atom: for i in range(0, r_groups_per_atom[key]): data.add_position() reader = structure.MaestroReader(input_file) #Determine attachments for non multiply matched structures for ct_index, ct in enumerate(reader): if (ct_index not in no_matches): #All property addition occurs here, even for multiply matched #structures props = data.get_prop_list() for p in props: #Don't add r_group family properties, since we'll just wipe them #later if p.startswith("i_rga_R") and p.endswith("_Family"): continue if p in ct.property: data.add_property_value(p, ct.property[p]) else: #Add a None for the non-existence of a property, to mark #this for the GUI based tables data.add_property_value(p, None) #Check to see if the SMARTS pattern matches in #multiple ways if ct_index in mult_matches: continue core = cores[ct_index][0] build.add_hydrogens(ct) determine_attachments(core, ct, ct_index, data) if thread and thread.terminate_thread: thread.exit() return (None, None) r_groups_per_atom = data.get_r_groups_per_atom() if not r_groups_per_atom: no_rgroups = True else: no_rgroups = False reader = structure.MaestroReader(input_file) #Determine attachments for multiply matched structures if mult_matches: smiles_gen = smiles_mod.SmilesGenerator( stereo=smiles_mod.STEREO_FROM_ANNOTATION_AND_GEOM) for ct_index, ct in enumerate(reader): if ct_index == 0: first_ct = ct if (ct_index not in mult_matches) or (ct_index in no_matches): continue build.add_hydrogens(ct) for atom in ct.atom: atom.property.pop("i_cg_attachment", 0) r_group_attachment_bonds = [] new_pos = defaultdict(int) new_groups = OrderedDict() hydr_num = OrderedDict() null_num = OrderedDict() for core_index, core in enumerate(cores[ct_index]): for atom in ct.atom: atom.property.pop("i_cg_attachment", 0) r_group_attachment_bonds = [] temp_r_groups_per_atom = r_groups_per_atom.copy() smarts_indexes_with_groups = [] for smarts_index, core_atom_index in enumerate(core): num_r_groups = 0 core_atom = ct.atom[core_atom_index] for bonded_atom in core_atom.bonded_atoms: if (bonded_atom.index not in core) and \ (bonded_atom.element != 'H'): num_r_groups += 1 if smarts_index not in temp_r_groups_per_atom and num_r_groups: new_pos[core_index] += num_r_groups temp_r_groups_per_atom[smarts_index] = num_r_groups elif num_r_groups and num_r_groups > temp_r_groups_per_atom[ smarts_index]: new_pos[ core_index] += num_r_groups - temp_r_groups_per_atom[ smarts_index] temp_r_groups_per_atom[smarts_index] = num_r_groups ct_rings = find_rings(ct) functional_rings = conglomerate_rings(ct_rings, core) new_groups[core_index] = 0 hydr_num[core_index] = 0 null_num[core_index] = 0 logger.debug("CT %i" % ct_index) logger.debug(smarts_indexes_with_groups) logger.debug(core) r_group_attachment_bonds = determine_attachment_bonds( ct, core, temp_r_groups_per_atom) for bond in r_group_attachment_bonds: attachment_atom = bond[0] if type(attachment_atom) == type(None): null_num[core_index] += 1 elif attachment_atom.element == 'H': hydr_num[core_index] += 1 bonds_in_rings = determine_rings_in_attachments( functional_rings, r_group_attachment_bonds) in_ring = False for r_group, bond in enumerate(r_group_attachment_bonds): try: other_frags = data.get_r_groups()[r_group] except IndexError: #case where no fragments have been collected for this #position, where this position was not defined by #structures which do not match multiply other_frags = {} #We are only interested in the smiles for the sake of this #comparison, because we know the attachment atoms. #frag[0] represents the just the smiles of the data #structure other_frags = set([frag[0] for frag in list(other_frags)]) #Don't create fragments if there is no reason to compare #them to existing fragments if not other_frags or type(bond[1]) is type(None): new_groups[core_index] += 1 continue in_ring = False for ring_bond in bonds_in_rings: if r_group in ring_bond: bonds_with_connected_rings = \ get_connected_ring_bonds(bonds_in_rings, ring_bond) ring_bonds = [r_group_attachment_bonds[n] \ for n in bonds_with_connected_rings] (fragment, from_atom, to_atom) = add_methyl_ring( ct.copy(), bond, ring_bonds, bonds_with_connected_rings, False) smiles = smiles_gen.getSmiles(fragment) if smiles not in other_frags: new_groups[core_index] += 1 in_ring = True break if not in_ring: (fragment, from_atom, to_atom) = add_methyl_standard(ct.copy(), bond[0].index, bond[1].index, False) smiles = smiles_gen.getSmiles(fragment) if smiles not in other_frags: new_groups[core_index] += 1 if thread and thread.terminate_thread: thread.exit() return (None, None) logger.debug("Multiple match of ct %s" % ct_index) logger.debug("Found new groups: %s" % new_groups) logger.debug("Found hydrogens: %s" % hydr_num) logger.debug("Found null groups: %s" % null_num) best_core_match = set() cores_list = list(range(0, len(cores[ct_index]))) #Find core that doesn't produce new positions if len(new_pos) == len(cores_list): minval = min(new_pos.values()) for key, val in new_pos.items(): if val == minval: best_core_match.add(key) elif len(new_pos) < len(cores_list): best_core_match = set(cores_list) - set(list(new_pos)) if len(best_core_match) > 1: for key in list(new_groups): if key not in best_core_match: del new_groups[key] minval = min(new_groups.values()) if minval: for key, val in new_groups.items(): if val != minval: best_core_match.remove(key) if len(best_core_match) > 1: for key in list(null_num): if key not in best_core_match: del null_num[key] minval = min(null_num.values()) if minval: for key, val in null_num.items(): if val != minval: best_core_match.remove(key) if len(best_core_match) > 1: for key in list(hydr_num): if key not in best_core_match: del hydr_num[key] minval = min(hydr_num.values()) if minval: for key, val in hydr_num.items(): if val != minval: best_core_match.remove(key) best_core_match = list(best_core_match) core = None if len(best_core_match) > 1: curr_best_cores = [cores[ct_index][i] for i in best_core_match] first_core = cores[0][0] core = find_best_core_match(ct, curr_best_cores, ref_ct=first_ct, ref_core=first_core) if core is None: core = cores[ct_index][best_core_match[0]] data.add_core([core], ct_index) add_ct_data(data, core, ct, ct_index) r_groups_per_atom = data.get_r_groups_per_atom() if not r_groups_per_atom: no_rgroups = True else: no_rgroups = False data.r_group_position = [] for key in r_groups_per_atom: for i in range(0, r_groups_per_atom[key]): data.add_position() cores = data.get_cores() reader = structure.MaestroReader(input_file) for ct_index, ct in enumerate(reader): build.add_hydrogens(ct) if (ct_index not in no_matches): core = cores[ct_index][0] determine_attachments(core, ct, ct_index, data) if thread and thread.terminate_thread: thread.exit() return (None, None) logger.debug( "Following data structure is a list of dictionaries " "describing R-group positions. Each dictionary corresponds to " "r-group position. The keys of the dictionaries are methyl-capped " "uniqued smiles. The corresponding value is a list of ct's in " "which this unique smiles appears at the position " "corrresponding to the dictionary.") for position in data.get_r_groups(): logger.debug(position) msg = "" for ct_num in duplicate_matches: no_matches.discard(ct_num) if len(no_matches) != 0: msg = ("The following input structures were not matched " "at all by the SMARTS pattern and were omitted:") for num in no_matches: if num not in no_heavy_atoms: msg = msg + " %i," % (num + 1) msg = msg[:-1] msg = msg + "\n" if no_heavy_atoms: msg += "The following input structures had matches " msg += "that had no heavy atoms:" for num in no_heavy_atoms: msg += " %i," % (num + 1) if duplicate_matches: msg += ("The following structures were duplicates of existing " "structures and were omitted:") for ct_num in duplicate_matches: msg += " %i" % (ct_num + 1) if length == 0: msg = "Only one structure matched the core definition." return (None, msg) if msg == "": msg = None if len(no_matches) == length + 1: if len(no_heavy_atoms) != 0: msg = "The core contained no heavy atoms." else: msg = "No input structures matched the core definition." return (None, msg) #if no_rgroups: #msg = ( "No r-groups could be determined. The most likely " #"cause is the core structure found encapsulates the " #"entire molecule for all input structures." ) #return (None, msg) return (data, msg)
[docs]def get_rgroup_MCS(input_file, MCS_index, temp_dir=None, thread=None, use_mm=0, use_fp_sim=False, sa_seed=None, t_factor=None, tmax_mult=None): """ Determines the core and r-groups using the canvasMCS method. :param input_file: Absolute path to mae file to run the canvasMCS utility on :type input_file: string :param MCS_index: atomtyping scheme to use in canvasMCS :type MCS_index: int/str :param temp_dir: name of temporary directory :type temp_dir: str :param thread: calculation thread (optional) :type thread: calculation_thread :param use_mm: type of multiple matching algorithm (0-Original method, 1-SA and 2-DEE) :type use_mm: int :param use_fp_sim: True to use fingerprint similarity between pairs of 'states' :type use_fp_sim: bool :param sa_seed: SimulatedAnnealing random number generator seed. 'None' will use the SimulatedAnnealing defaults. :type sa_seed: int :param t_factor: SimulatedAnnealing temperature factor. 'None' will use the SimulatedAnnealing defaults. :type tmax_mult: float :param tmax_mult: SimulatedAnnealing parameter to set starting temperature. 'None' will use the SimulatedAnnealing defaults. :type tmax_mult: float """ try: # run MCS and get a list of matching SMARTS patterns patterns = getMCSMatches(input_file, MCS_index, temp_dir, thread) except RGroupException as e: # if RGroupException is caught set self.data to None. data = None msg = str(e) return (data, msg) if thread and thread.terminate_thread: return (None, None) if use_mm == 0: (data, msg) = get_rgroup_SMARTS(input_file, patterns, temp_dir=temp_dir, thread=thread) else: (data, msg) = get_rgroup_DEE(input_file, patterns, temp_dir=temp_dir, thread=thread, use_mm=use_mm, use_fp_sim=use_fp_sim, sa_seed=sa_seed, t_factor=t_factor, tmax_mult=tmax_mult) if data: data.MCS_index = MCS_index logger.debug( "Following data structure is a list of dictionaries " "describing R-group positions. Each dictionary corresponds to " "r-group position. The keys of the dictionaries are methyl-capped " "uniqued smiles. The corresponding value is a list of ct's in " "which this unique smiles appears at the position " "corrresponding to the dictionary.") for position in data.get_r_groups(): logger.debug(position) else: logger.debug("No data was returned.") return (data, msg)
[docs]def getMCSMatches(input_file, MCS_index, temp_dir=None, thread=None): """ This function is used to setup and run MCS calculation on a given input file. It returns a list of matching SMARTS patterns. :param input_file: Absolute path to mae file to run the canvasMCS utility on :type input_file: string :param MCS_index: atomtyping scheme to use in canvasMCS :type MCS_index: int/str :param temp_dir: name of temporary directory :type temp_dir: str :param thread: calculation thread (optional) :type thread: calculation_thread :return: list of matching SMARTS patterns :rtype: list """ global canvasmcs_process logger.debug("Running MCS pattern Core and R Group Identification Method.") if not jobutil.hunt("canvas"): msg = "The Canvas product needs to be installed to use this feature." raise RGroupException(msg) if not temp_dir: if maestro: schrod_tmp = maestro.get_temp_location() else: schrod_tmp = fileutils.get_directory_path(fileutils.TEMP) topdir = tempfile.mkdtemp(prefix="rga-tmp", dir=schrod_tmp) temp_dir = tempfile.mkdtemp(dir=topdir) csv_temp_name = os.path.join(temp_dir, 'canvasMCS.csv') schrodinger = os.environ['SCHRODINGER'] srun = os.path.join(schrodinger, "run") canvascmd = os.path.join(schrodinger, "utilities", "canvasMCS") elements_license = "" elements_license = createElementsToken() cmd = [ "canvasMCS", "-imae", input_file, "-ocsv", csv_temp_name, "-atomtype", str(MCS_index), "-showall", "0", "-elements", elements_license ] csv_out_name = os.path.join(temp_dir, 'canvasMCS.out') with open(csv_out_name, 'wb') as csv_out_handle: canvasmcs_process = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=csv_out_handle, universal_newlines=True) null, stderr = canvasmcs_process.communicate() val = canvasmcs_process.returncode canvasmcs_process = None if thread and thread.terminate_thread: raise RGroupException(None) if val == 17: msg = "This feature requires a CANVAS_ELEMENTS license" raise RGroupException(msg) if val != 0: msg = ("CanvasMCS exited prematurely. This could be because " "the input molecules were too dissimilar or too numerous, or " "because the chosen atom-typing scheme was too non-specific.") with open(csv_out_name) as f: msg += "\n\n" msg += f.read() raise RGroupException(msg) with open(csv_temp_name) as f: patterns = [] for i, line in enumerate(f): if i != 0: line = line.strip() end_el = line.split(",") smarts_pattern = end_el[-1] patterns.append(smarts_pattern) with open(csv_out_name) as f: for line in f.readlines(): if line.startswith("Failed to read in Maestro file"): patterns.insert(int(line.split()[-1]) - 1, False) return patterns
[docs]def find_best_core_match(ct, cores, ref_ct=None, ref_core=None): # This is very useful when running RGA on the fragment. # The core with minimum number of bridge atom (or atom # directly connect to core) are preferred. best_core = None nbridge_core = {} for core_index, structure_match in enumerate(cores): if 'i_fep_mapping_bridge' in ct.atom[1].property: nbridge = sum([ ct.atom[atom_index].property['i_fep_mapping_bridge'] for atom_index in structure_match ]) else: nbridge = 0 nbridge_core.setdefault(nbridge, []) nbridge_core[nbridge].append(core_index) # sorting cores by number of bridge atom nbridges = list(nbridge_core) nbridges.sort() if ref_core and ref_ct and len(nbridge_core[nbridges[0]]) > 1: # if we still have multiple matching cores, try RMSD of those cores core_by_rmsd = {} ref_xyz = numpy.array([ref_ct.atom[atom_index].xyz \ for atom_index in ref_core]) for core_index in nbridge_core[nbridges[0]]: core = cores[core_index] xyz = numpy.array([ct.atom[atom_index].xyz \ for atom_index in core]) d = ref_xyz - xyz rmsd = numpy.sqrt(old_div(numpy.sum((d**2).flat), len(d))) core_by_rmsd[rmsd] = core rmsd_vals = list(core_by_rmsd) rmsd_vals.sort() best_core = core_by_rmsd[rmsd_vals[0]] else: best_core = cores[nbridge_core[nbridges[0]][0]] return best_core
[docs]def write_core_only(structure, attachment_points, core_indexes, core_bonds, temp_dir=None): null_bonds = [] base_coord = [1.0, 1.0, 1.0] delta = 0.5 #determine which bonds are aromatic aromatic_atoms = set() for ring in structure.ring: if ring.isAromatic(): for atom in ring.getAtomIndices(): aromatic_atoms.add(atom) aromatic_bonds = set() for atom in structure.atom: try: del atom.property["i_m_Hcount"] except KeyError: pass atom.property["i_rga_core_index"] = int(atom) if int(atom) in core_indexes and int(atom) in aromatic_atoms: for bonded_atom in atom.bonded_atoms: if int(bonded_atom) in core_indexes and int( bonded_atom) in aromatic_atoms: #mmlewis needs bonds specified in only one direction, #unlike maestro files if int(atom) < int(bonded_atom): aromatic_bonds.add((int(atom), int(bonded_atom))) else: aromatic_bonds.add((int(bonded_atom), int(atom))) #append 0 since mmlewis_convert_aromatic_bonds expects the mmlist #length is greater than the number of aromatic atoms hydrogens_to_add = [] for position, bond in enumerate(attachment_points): if bond[1]: bond_order = structure.getBond(bond[0], bond[1]).order coordinates = bond[1].xyz old_index = structure.atom[ bond[1].index].property["i_rga_core_index"] hydrogens_to_add.append((coordinates, old_index, bond[0].index)) structure.deleteBond(bond[0].index, bond[1].index) else: null_bonds.append([bond[0], bond[1], position]) continue try: prev = structure.atom[bond[0].index].property["s_cg_from"] new = "%s:%i" % (prev, position + 1) structure.atom[bond[0].index].property["s_cg_from"] = new except: structure.atom[bond[0].index].property["s_cg_from"] \ = str(position + 1) core_aromatic_atoms = set() #Only check for atoms that are still in rings when you #break apart the core, removing atoms that are no #longer part of ring systems when you break the core for ring in structure.ring: for atom in ring.getAtomIndices(): core_aromatic_atoms.add(atom) for bond in aromatic_bonds.copy(): if bond[0] not in core_aromatic_atoms or \ bond[1] not in core_aromatic_atoms: aromatic_bonds.remove(bond) #set the bond order to be one to overcome limitation in #mmlewis see EV:129295 for atoms in aromatic_bonds: bond = structure.getBond(atoms[0], atoms[1]) bond.order = 1 atom_list1 = [atom[0] for atom in aromatic_bonds] atom_list2 = [atom[1] for atom in aromatic_bonds] atom_list1.append(0) atom_list2.append(0) atom_mmlist1 = mmlist.mmlist(atom_list1) atom_mmlist2 = mmlist.mmlist(atom_list2) #converts core structure to canonical stereochemistry for determination of #unique-core data structures EV:128695 core = structure for attachment, atom in enumerate(hydrogens_to_add): xyz, i_rga_core_index, core_atom = atom core.addAtoms(1) last_atom = len(core.atom) core.atom[last_atom].xyz = xyz core.atom[last_atom].property["i_rga_core_index"] = i_rga_core_index core.atom[last_atom].element = "H" core.addBond(core_atom, last_atom, 1) mm.mmlewis_convert_aromatic_bonds(structure.handle, atom_mmlist1, atom_mmlist2) for n_n, bond in enumerate(null_bonds): core.addAtoms(1) last_atom = len(core.atom) core.atom[last_atom].x = bond[0].x + delta * n_n core.atom[last_atom].y = bond[0].y + delta * n_n core.atom[last_atom].z = bond[0].z + delta * n_n core.addBond(bond[0].index, last_atom, 1) core.atom[last_atom].atom_type = 41 try: prev = core.atom[bond[0].index].property["s_cg_from"] new = "%s:%i" % (prev, bond[2] + 1) core.atom[bond[0].index].property["s_cg_from"] = new except: core.atom[bond[0].index].property["s_cg_from"] = str(bond[2] + 1) #build.add_hydrogens(core) for mol in structure.molecule: for a in mol.atom: if a.index == attachment_points[0][0].index: core = mol.extractStructure() break for atom in core.atom: #color the core appropriately for bond in atom.bond: try: a1 = bond.atom1.property['i_rga_core_index'] a2 = bond.atom2.property['i_rga_core_index'] if (a1, a2) in core_bonds or (a2, a1) in core_bonds: bond.property['i_m_color'] = 65 except: pass try: positions = atom.property["s_cg_from"] except: continue pos_on_atoms = positions.split(":") hydrogens = [] for bonded_atom in atom.bonded_atoms: if bonded_atom.element == "H": hydrogens.append(bonded_atom.index) for j, pos in enumerate(pos_on_atoms): core.atom[hydrogens[j]].property["s_cg_to"] = pos core_ct = core.copy() core_ct.property["i_cg_num_groups"] = len(attachment_points) core_ct.append(os.path.join(temp_dir, "core.mae")) return
[docs]def add_ct_data(data, core, ct, ct_index): """ Add to the determine the rings and r-groups per atom for each ct and add the following data structures: data.add_functional_rings data.add_rings data.add_r_groups_to_atom :type core: list :param core: list of atoms belonging to the core :type ct: schrodinger.structure.Structure object :param ct: structure object corresponding to the core atoms """ #Find mmsssr rings = find_rings(ct) #Combine rings that are entirely part of the core, #leave other rings be functional_rings = conglomerate_rings(rings, core) data.add_functional_rings(functional_rings, ct_index) data.add_rings(rings, ct_index) #Find the number of rgroups on each core atom for smarts_index, core_atom_index in enumerate(core): num_r_groups = 0 core_atom = ct.atom[core_atom_index] for bonded_atom in core_atom.bonded_atoms: #Skip non heavy atoms if (bonded_atom.index not in core) and \ (bonded_atom.element != 'H'): num_r_groups += 1 #Update data structure if it is ever less than the number of groups #found during this function if data.get_r_group_for_atom(smarts_index) < num_r_groups \ and num_r_groups != 0: data.add_r_groups_to_atom(smarts_index, num_r_groups)
def _is_copyable_ct_property(name): ''' Returns `True` if it is OK to copy CT property with name `name` from one CT to another. :param name: Property name. :type name: str ''' return (name != 's_m_title' and name != 'i_m_ct_stereo_status' and not name.startswith('s_st_')) def _copy_ct_properties(src, dst, copyable=_is_copyable_ct_property): ''' Adds CT-level properties from `src` to `dst` except those for whose names `copyable` returns `False`. :param src: Source structure. :type src: `schrodinder.structure.Structure` :param dst: Destination structure. :type dst: `schrodinder.structure.Structure` :param copyable: Callable to identify the properties to copy. :type copyable: (str) -> bool ''' for (name, value) in src.property.items(): if copyable(name): dst.property[name] = value
[docs]def write_output(data, input_file, smarts, cli=False, temp_dir=None, thread=None, settings=None): """ This function writes RGA results to output files. :param data: data object that contains calculation results :type data: Data :param input_file: name of input file used in calculation :type input_file: str :param smarts: True when MCS/SMARTS method was used and False otherwise :type smarts: bool :param cli: True if RGA was run from CL without GUI (this is a guess!) :type cli: bool :param temp_dir: name of temporary directory :type temp_dir: str :param thread: thread used to run RGA calculation :type thread: calculation_thread :rtype: Settings :return: Settings data object, which is used in RGA GUI """ logger.debug("Writing Core and R-Groups output files") ct_r_group = [] ct_from_index = [] ct_to_atom_chirality = [] ct_to_index = [] #list of paths to output files output_files = [] unique_r_groups = [] #list of integers containing number of structures in each file output_length = [] cores = data.get_cores() r_groups = data.get_r_groups() r_groups_per_atom = data.get_r_groups_per_atom() ct_bonds_in_rings = data.get_bonds_in_rings() ct_offsets = data.get_ct_offsets() core_attachment_points = data.get_attachment_points() #Define and populate this data structure #fragment_cores is a list of lists #The outer list loops over each input structure #Each inner list is indexed on r_group position and the valu #is which r_gropu in the corresponding fragment file appears there #Both input structures and r_groups are 0 indexed fragment_cores = [] unique_cores = OrderedDict() for input_structure in range(0, len(ct_offsets)): row = [] for r_group in range(0, len(r_groups)): row.append(-1) fragment_cores.append(row) if not cli: if not os.path.exists(input_file): shutil.copy(input_file, temp_dir) for a, r in enumerate(r_groups): fname = os.path.basename(input_file[:-4]) + "_rga-%i.mae" % (a + 1) if not cli: out_name = os.path.join(temp_dir, fname) else: out_name = os.path.join(os.getcwd(), fname) if os.path.exists(out_name): os.remove(out_name) output_files.append(out_name) output_length.append(0) unique_r_groups.append([]) #Initialize and fill array of arrays recording each r_group #at each position of each ct #Initialize for input_structure in range(0, len(ct_offsets)): r_group_list = [] for r_group in range(0, len(r_groups)): r_group_list.append("") ct_r_group.append(r_group_list[:]) ct_from_index.append(r_group_list[:]) ct_to_atom_chirality.append(r_group_list[:]) for i, position in enumerate(r_groups): for smart in list(position): for j in position[smart]: ct_r_group[j][i] = smart[0] ct_from_index[j][i] = smart[1][0] ct_to_atom_chirality[j][i] = smart[2] #Data structure representing those input structures that #need to be read again and broken in order to write #output files #Elements are a list of form: # [CTOffset,CTIndex,[new_groups(see below)]] unique_r_group_indicator = [] for b, ct in enumerate(ct_offsets): new_groups = [] #List representing r-group position by number #at which this ct has a r-group not yet seen at #that position smiles = ct_r_group[b] from_atoms = ct_from_index[b] to_atom_chirality = ct_to_atom_chirality[b] for c in range(0, len(smiles)): if not smiles[c]: continue identifier = (smiles[c], (from_atoms[c]), to_atom_chirality[c]) if identifier not in unique_r_groups[c]: unique_r_groups[c].append(identifier) new_groups.append(c) if len(new_groups) != 0: unique_r_group_indicator.append((ct, b, new_groups)) #There are two loops for writing output. The first loop handles the core #and the second handles the fragment files. #The core loop looks at all input_files and r_group_positions. reader = structure.MaestroReader(input_file) skipped_cores = [] new_core_attach_points = [] for ct_index, ct in enumerate(reader): if thread and thread.terminate_thread: return build.add_hydrogens(ct) try: core = cores[ct_index] bonds = data.get_core_bonds(ct_index) except: skipped_cores.append(ct_index) continue if not core: skipped_cores.append(ct_index) continue # create a vector of core attachment points v_attach_points = [] for bond in core_attachment_points[ct_index]: v_attach_points.append(bond[0].index) new_core_attach_points.append(v_attach_points) #Only write core file if running from the GUI if not cli: write_core_only(ct.copy(), core_attachment_points[ct_index], core[0], bonds[0], temp_dir=temp_dir) _ur_handle = mm.mmct_ct_m2io_get_unrequested_handle(ct) mm.mmct_ct_open_additional_data(ct, 1) ad_handle = mm.mmct_ct_m2io_get_additional_data_handle(ct) mm.m2io_delete_named_block(_ur_handle, "m_attachment") mm.m2io_open_block(ad_handle, "m_attachment") mm.m2io_set_index_dimension(ad_handle, len(output_files)) intfields = [ "i_m_atom1", "i_m_atom2", "i_m_num_reagents", "i_cgch_minlinker", "i_cgch_maxlinker" ] strfields = [ "s_m_attachment_name", "s_m_reagent_path", "s_m_functional_group" ] #Write the attachment blocks for combgen or smarts core_filename = os.path.join(temp_dir, "core-comdef.mae") stereo_handle = mm.mmstereo_new(ct.handle) mm.mmstereo_add_stereo_information_to_ct(ct.handle, stereo_handle) mm.mmstereo_delete(stereo_handle) if not smarts: core = cores[ct_index][0] bonds = data.get_core_bonds(ct_index)[0] new_positions = unique_r_group_indicator[0][2] relevant_bonds = core_attachment_points[ct_index] for position in new_positions: out_name = os.path.basename(output_files[position]) bond = relevant_bonds[position] core_atom = ct.atom[bond[0].index] r_groups_per_position = len(r_groups[position]) mm.m2io_put_int_indexed(ad_handle, position + 1, intfields, [ bond[0].index, bond[1].index, r_groups_per_position, 0, 0 ]) mm.m2io_put_string_indexed( ad_handle, position + 1, strfields, [str(position + 1), out_name, ""]) else: """ Need to create a dummy molecule with the core Defined either through MCS or an input SMARTS pattern That has every R-Group position filled by a non-H atom Even if we need to create dummy atoms in these positions So that displaying this dummy molecule will show all of the Necessary arrows """ attachment_bonds = [] relevant_bonds = core_attachment_points[ct_index] ct_r_groups = ct_r_group[ct_index] core = cores[ct_index][0] bonds = data.get_core_bonds(ct_index)[0] factor = 0.1 for i, smiles in enumerate(ct_r_groups): out_name = os.path.basename(output_files[i]) bond = relevant_bonds[i] key = core.index(bond[0].index) r_groups_for_atom = len(r_groups[i]) if smiles != "" and smiles != "null": attachment_bonds.append([ i, bond[0].index, bond[1].index, r_groups_for_atom, out_name ]) else: ct.addAtoms(1) last_atom_index = len(ct.atom) last_atom = ct.atom[last_atom_index] offset_atom = ct.atom[last_atom_index - 1] xyz = [coord + factor for coord in offset_atom.xyz] last_atom.xyz = xyz last_atom.atom_type = 61 ct.addBond(bond[0].index, last_atom_index, 1) attachment_bonds.append([ i, bond[0].index, last_atom_index, r_groups_for_atom, out_name ]) for at in attachment_bonds: at_ind, atom1, atom2, num_rea, out_file = at at_ind += 1 at_name = str(at_ind) mm.m2io_put_int_indexed(ad_handle, at_ind, intfields, [atom1, atom2, num_rea, 0, 0]) mm.m2io_put_string_indexed(ad_handle, at_ind, strfields, [at_name, out_file, ""]) for atom in core: for bond in ct.atom[atom].bond: #if bond.atom2.index in core: a1 = bond.atom1.index a2 = bond.atom2.index if (a1, a2) in bonds or (a2, a1) in bonds: bond.property['i_m_color'] = 65 mm.m2io_close_block(ad_handle) ct.append(core_filename) #Second loop for writing the fragment files. Only looks at unique #input files and positions. offsets = [ct[0] for ct in unique_r_group_indicator] reader = structure.MaestroReader(input_file) fragment_output_order = [] for i in range(0, len(output_files)): fragment_output_order.append([]) for c, position in enumerate(offsets): ct = reader.read(position=position) if thread and thread.terminate_thread: return build.add_hydrogens(ct) ct_index = unique_r_group_indicator[c][1] try: core = cores[ct_index][0] except: continue #Skipped file relevant_bonds = core_attachment_points[ct_index] new_positions = unique_r_group_indicator[c][2] #Case for combgen files if not smarts: for index in new_positions: bond = relevant_bonds[index] (fragment, f, t) = add_methyl_standard(ct.copy(), bond[0].index, bond[1].index, True) fragment_smiles = analyze.generate_smiles(fragment) fragment_output_order[index].append(fragment_smiles) fragment.title = "R-group %i (%s)" % (output_length[index] + 1, ct.title) _copy_ct_properties(ct, fragment) fragment.append(os.path.join(temp_dir, output_files[index])) output_length[index] += 1 #Case for MCS/SMARTS analysis else: bonds_in_ring = ct_bonds_in_rings[ct_index] for index in new_positions: ring_bond = False primary_bond = relevant_bonds[index] #Skips attempted writing out of Null groups #which would throw an error try: frag_to_atom = primary_bond[1].index except: frag_to_atom = False for ring in bonds_in_ring: if index in ring: relevant_ring_bonds = get_connected_ring_bonds( bonds_in_ring, ring) logger.debug("Adding Methyl Ring in Output") (fragment, from_atom, to_atom) = add_methyl_ring( ct.copy(), primary_bond, [relevant_bonds[n] for n in relevant_ring_bonds], relevant_ring_bonds, True) ring_bond = True break if not ring_bond: (fragment, f, t) = add_methyl_standard(ct.copy(), primary_bond[0].index, frag_to_atom, True) try: fragment_smiles = analyze.generate_smiles(fragment) except RuntimeError as exception: if str(exception).startswith("invalid atomic number: -2"): fragment_smiles = "Null" fragment_output_order[index].append(fragment_smiles) fragment.title = "R-group %i (%s)" % (output_length[index] + 1, ct.title) _copy_ct_properties(ct, fragment) fragment.append(os.path.join(temp_dir, output_files[index])) output_length[index] += 1 #Don't bother calculating a to_atom_list on cli, since we don't need to #superimpose 2d structures to_atom_list_map = {} if not cli: for i, pos in enumerate(r_groups): for j, SMARTS in enumerate(pos): indexes = pos[SMARTS] index_in_output = unique_r_groups[i].index( (SMARTS[0], SMARTS[1][0], SMARTS[2])) for ct_num in indexes: fragment_cores[ct_num][i] = index_in_output for core in reversed(skipped_cores): fragment_cores.pop(core) #Internal check on the fragment_cores datastructure for i, core in enumerate(fragment_cores): for j, pos in enumerate(core): if pos == -1: raise Exception("\nInput file: %s\nCan't find fragment," "fragment_cores=%s" % (input_file, fragment_cores)) #determine unique_cores if thread and thread.terminate_thread: return smiles_gen = smiles_mod.SmilesGenerator( stereo=smiles_mod.STEREO_FROM_ANNOTATION_AND_GEOM) cores_file = os.path.join(temp_dir, "core.mae") unique_cores_file = os.path.join(temp_dir, "unique_cores.mae") for i, struct in enumerate(structure.StructureReader(cores_file)): smiles = smiles_gen.getSmiles(struct) cores_set = set(list(unique_cores)) if smiles not in cores_set: struct.title = "Core %i" % i struct.append(unique_cores_file) unique_cores[smiles] = [i] else: unique_cores[smiles].append(i) family_props = {} for family in range(0, len(fragment_cores[0])): #family_list is dict with the key being the fragment cores excluding #the one of the family in question #the values are the indicies which fit each family family_list = {} possible_families = {} family_index = 1 for i, struct in enumerate(fragment_cores): other_values = struct[:] index = other_values.pop(family) found = False for key, other_list in possible_families.items(): if other_values == other_list: family_list[key].append(i) found = True break if not found: family_list[family_index] = [i] possible_families[family_index] = other_values family_index += 1 for i, vals in enumerate(sorted(family_list.values(), key=lambda x: len(x), reverse=True), start=1): for input_struct in vals: if family == 0: family_props[input_struct] = [i] else: family_props[input_struct].append(i) number_of_groups = len(fragment_cores[0]) if len(unique_cores) > 1: for core_index, input_structs in enumerate(unique_cores.values()): for input_struct in input_structs: fragment_cores[input_struct].insert(0, core_index) core_file = os.path.join(temp_dir, "core-comdef.mae") reader = structure.MaestroReader(core_file) new_fname = os.path.join( temp_dir, os.path.basename(input_file)[:-4] + ".new" + ".mae") core_rgroup_atom_number_list = [] original_core_numbers = sorted(list(data.get_cores())) for i, ct in enumerate(reader): if thread and thread.terminate_thread: return original_core_number = original_core_numbers[i] for n, property_value in enumerate(family_props[i], start=1): property_name = "i_rga_R%i_Family" % n ct.property[property_name] = property_value data.add_property_value(property_name, property_value) indexes, to_atoms = determine_rgroup_indexes(ct, number_of_groups) to_atom_list_map[original_core_number] = to_atoms core_rgroup_atom_number_list.append(indexes) ct.append(new_fname) new_input = os.path.join(temp_dir, "core-comdef.mae") shutil.move(new_fname, new_input) core_dict = data.get_cores() original_ct_list = sorted(list(core_dict)) new_core_dict = {} if cli: core_rgroup_atom_number_list = [] else: for ct_num, core in core_dict.items(): final_ct_num = original_ct_list.index(ct_num) attachment_atoms = to_atom_list_map[ct_num] core[0].extend(attachment_atoms) #renumber to 0 based canvas indicies new_core_dict[final_ct_num] = [x - 1 for x in core[0]] # if settings object was passed copy data into it if settings is not None: settings.set_temp_dir(os.path.dirname(output_files[0])) settings.set_fragment_cores(fragment_cores) settings.set_property_values(data.get_prop_values()) settings.set_output_files(output_files) settings.set_output_lengths(output_length) settings.set_unique_cores(unique_cores) settings.set_core_atoms(new_core_dict) settings.set_rgroup_mapping(core_rgroup_atom_number_list) settings.set_core_attachment_points(new_core_attach_points) settings.set_input_file(input_file) return
[docs]def determine_rgroup_indexes(ct, number_of_groups): r_group_indexes = [] _ur_handle = mm.mmct_ct_m2io_get_unrequested_handle(int(ct)) mm.m2io_goto_next_block(_ur_handle, "m_attachment") m_attachment_size = mm.m2io_get_index_dimension(_ur_handle) bond_pairs = [] for attachment_index in range(1, m_attachment_size + 1): bp = mm.m2io_get_int_indexed(_ur_handle, attachment_index, ['i_m_atom1', 'i_m_atom2']) bond_pairs.append(bp) mm.m2io_leave_block(_ur_handle) structure_rgroup_atoms = [] ct_copy = ct.copy() for rgroup in bond_pairs: core_atom = rgroup[0] rgroup_atom = rgroup[1] ct_copy.deleteBond(core_atom, rgroup_atom) rgroup_atoms = [x[1] for x in bond_pairs] rgroup_mapping = {} for mol in ct_copy.molecule: atoms = [int(atom) for atom in mol.atom] for rgroup_index, rgroup_atom in enumerate(rgroup_atoms): if rgroup_atom in atoms: rgroup_mapping[rgroup_index] = atoms structure_rgroup_atoms = [] for group in range(0, number_of_groups): structure_rgroup_atoms.append(rgroup_mapping[group]) return structure_rgroup_atoms, rgroup_atoms
[docs]def evaluate_smarts_ex(structure, smarts, stereo=STEREO_FROM_ANNOTATION_AND_GEOM, start_index=1, uniqueFilter=False, allowRelativeStereo=False, rigorousValidationOfSource=False, hydrogensInterchangeable=True): """ Evaluate SMARTS patterns using the Canvas libraries. Returns a list of lists of ints. Each list of ints is a list of atom indices matching the SMARTS pattern. :type structure: `Structure` or `schrodinger.application.canvas.base.ChmMol` :param structure: Structure to search for matching substructures. :type smarts: str :param smarts: SMARTS string used to match substructures. :type stereo: enum :param stereo: Specify how to determine the stereochemistry of a ChmMol from a Structure. Can be STEREO_FROM_GEOMETRY, STEREO_FROM_ANNOTATION, STEREO_FROM_ANNOTATION_AND_GEOM, or NO_STEREO. See `schrodinger.structutils.smiles.SmilesGenerator.__init__` for descriptions of these options. :type start_index: int :param start_index: Specify the start index of the atom indices returned. Defaults to Structure mode, which is 1-based. Pass in 0 for 0-based indices. :rtype: list :return: Each value is a list of atom indices matching the SMARTS pattern. """ import_canvas() smarts = smarts.strip() if isinstance(structure, canvas.ChmMol): mol = structure else: mol = canvas.ChmMol() adaptor = canvas.ChmMmctAdaptor() adaptor.create(structure.handle, mol, smiles_mod._translate_stereo_enum(stereo)) q = canvas.ChmQuery(smarts, True) m = canvas.ChmQueryMatcher(uniqueFilter, allowRelativeStereo, rigorousValidationOfSource, hydrogensInterchangeable) atom_matches = [] bond_matches = [] for match in m.matchEx(q, mol): atom_matches.append( [a.getMolIndex() + start_index for a in match.getMatchedAtoms()]) bond_matches.append([(a.atom1().getMolIndex() + start_index, a.atom2().getMolIndex() + start_index) for a in match.getMatchedBonds()]) return (atom_matches, bond_matches)
[docs]def import_canvas(): """ Perform a lazy import of canvas libraries to speed up loading time. """ global canvas if not canvas: from schrodinger.infra import canvas