Source code for schrodinger.structutils.markush_enumerate

'''
Support for "R-group" enumeration as in ENUM-108.
'''

import itertools
import math
import re
from collections import deque
from collections import namedtuple
from functools import cmp_to_key

import numpy

from schrodinger import structure
from schrodinger.structutils import rmsd
from schrodinger.utils import log

#------------------------------------------------------------------------------#

LOGGER_NAME = 'markush_enumerate'

#------------------------------------------------------------------------------#

GROUP_ID_PROP = 'i_rge_group_id'
BOND_RANK_PROP = 'i_rge_bond_rank'
ATTACHMENT_PROP = 'i_rge_attachment'

#------------------------------------------------------------------------------#


def _is_dummy(atom):

    return atom.atomic_number in (0, -2)


#------------------------------------------------------------------------------#


def _get_neighbors_by_explicit_rank(atom):
    '''
    Returns indices of the bonded atoms ordered using explicit bond ranks.
    If not all bonds have ranks, or some ranks are not unique, returns None.

    :return: List of bonded atom indices in a particular order.
    :rtype: list(int) or None
    '''

    seen = dict()  # map rank -> neighbor index

    for bond in atom.bond:
        try:
            rank = bond.property[BOND_RANK_PROP]
        except KeyError:
            return None

        if rank in seen:
            return None
        else:
            seen[rank] = int(bond.atom2)

    return [seen[i] for i in sorted(seen)]


#------------------------------------------------------------------------------#


def _get_bond_rank_by_bfs(atom1, atom2, groups):
    """
    Traverses structure in BFS order starting from atom `atom2`
    with `atom1` marked as "visited" from get-go. Records
    group IDs seen, along with their distance from atom1. Returns
    sorted list of (distance, group ID) tuples (with distance
    measured in the number of bonds), or fallback value that uses
    negative atomic numbers instead of the group IDs in case no
    groups were encountered.

    :param atom1: First of the two atoms that define the bond to be ranked.
    :type atom1: `schrodinger.structure._StructureAtom`

    :param atom2: Second of the two atoms that define the bond to be ranked.
    :type atom2: `schrodinger.structure._StructureAtom`

    :param groups: Dictionary that maps atom indices onto group IDs.
    :type groups: dict(int, int)

    :return: List of (distance, ID) tuples.
    :rtype: list(tuple(int, int))
    """

    queue = deque([(atom2, 1)])
    visited = {int(atom1)}

    bfs_atoms = []
    bfs_groups = []

    while queue:
        curr, dist = queue.popleft()
        curr_index = int(curr)

        if curr_index not in visited:
            visited.add(curr_index)

            try:
                bfs_groups.append((dist, groups[curr_index]))
            except KeyError:
                bfs_atoms.append((dist, -curr.atomic_number))

            for neighbor in curr.bonded_atoms:
                if int(neighbor) not in visited:
                    queue.append((neighbor, dist + 1))

    if bfs_groups:
        return sorted(bfs_groups)
    else:
        return sorted(bfs_atoms)


#------------------------------------------------------------------------------#


def _get_neighbors_by_bfs(atom, groups):
    '''
    Returns indices of the bonded atoms ordered using heuristic rules
    that try to prioritize bonds leading toward lower ranked nearby
    R-groups.

    :param groups: Dictionary that maps atom indices onto group IDs.
    :type groups: dict(int, int)

    :return: List of bonded atom indices in a particular order.
    :rtype: list(int)
    '''

    ranks = dict()  # neighbor atom index -> bond rank

    for b in atom.bond:
        ranks[int(b.atom2)] = (b.order,
                               _get_bond_rank_by_bfs(atom, b.atom2, groups))

    return sorted(ranks.keys(), key=lambda x: ranks[x])


#------------------------------------------------------------------------------#

# PlaceHolder.bonds -- ordered list of PlaceHolder.atom neighbors
# (indices of the atoms bonded to the PlaceHolder.atom in the "scaffold" CT)

PlaceHolder = namedtuple('PlaceHolder', ['group_id', 'atom', 'bonds'])

#------------------------------------------------------------------------------#


[docs]def get_placeholders(st, groups=None): ''' Gathers placeholders metadata. :param st: Structure. :type st: `schrodinger.structure.Structure` :param groups: Dictionary that maps placeholder atom indices onto their group IDs. If `None`, regard atoms that carry the "GROUP_ID_PROP" property as "placeholders". :type atoms: dict(int, int) or None :return: List of the placeholders records. :rtype: list(PlaceHolder) :raises: ValueError if something about `st` looks not right. ''' if groups is None: groups = dict() for atom in st.atom: try: group_id = atom.property[GROUP_ID_PROP] except KeyError: continue groups[int(atom)] = group_id for (atom, group_id) in groups.items(): if group_id <= 0: raise ValueError(f'non-positive placeholder ID for atom {atom}') # use explicit ranks (if available) to order the placeholders bonds outcome = [] done = True for (atom_index, group_id) in groups.items(): neighbors = _get_neighbors_by_explicit_rank(st.atom[atom_index]) outcome.append(PlaceHolder(group_id, atom_index, neighbors)) if neighbors is None: done = False if done: return outcome # resort to heuristics for i in range(len(outcome)): if outcome[i].bonds is None: p = outcome[i] neighbors = _get_neighbors_by_bfs(st.atom[p.atom], groups) outcome[i] = PlaceHolder(p.group_id, p.atom, neighbors) return sorted(outcome, key=lambda ph: ph.group_id)
#------------------------------------------------------------------------------#
[docs]def get_attachment_points(st): ''' Returns ordered list of attachment points in `st`. The order is either based upon atom labels ("ATTACHMENT_PROP", if available) or decided by the bond order and the peer atom atomic number. :return: List of atoms. :rtype: list(int) ''' def compare_atoms(a1, a2): try: return a1.property[ATTACHMENT_PROP] - a2.property[ATTACHMENT_PROP] except KeyError: o = a1.bond[1].order - a2.bond[1].order if o: return o else: return (next(a1.bonded_atoms).atomic_number - next(a2.bonded_atoms).atomic_number) attchpts = [] for atom in st.atom: if _is_dummy(atom) and atom.bond_total == 1: attchpts.append(atom) return [int(a) for a in sorted(attchpts, key=cmp_to_key(compare_atoms))]
#------------------------------------------------------------------------------# def _renumber_placeholder(ph, rmap, rbonds): ''' Apply atom renumbering map to the placeholder data. :param ph: Original placeholder. :type ph: `PlaceHolder` :param rmap: Atom renumbering map (keys: old indices, values: new indices). :type rmap: dict(int, int or None) :param rbonds: New bonds map (see `place_fragment`). :type rbonds: dict(int, int) :return: Renumbered placeholder data. :rtype: `PlaceHolder` ''' new_atom = rmap[ph.atom] new_bonds = [] for b in ph.bonds: _b = rmap[b] if _b is None: # peer atom was a placeholder and got replaced with a fragment new_bonds.append(rbonds[ph.atom]) else: new_bonds.append(_b) return PlaceHolder(ph.group_id, new_atom, new_bonds) #------------------------------------------------------------------------------# def _place_fragment(st, placeholder, frag_st, attchpts): ''' Replaces `placeholder` atom in `st` with `frag_st` structure. :param attchpts: Attachment points within `frag_st` (atom indices). :type attchpts: list(int) :return: Atom renumbering map and updated bonds map. The latter is a dictionary that maps (old) atom indices (B) from placeholder.bonds onto (new) indices (F): once the placeholder atom (P) is replaced by the fragment, all of the (P)-(B) bonds get replaced by the (F)-(B) bonds ((F) stands for a fragment atom here). :rtype: tuple(dict(int, int or None), dict(int, int)) ''' # transform the fragment using highest ranked placeholder bond frag_atom = frag_st.atom[attchpts[0]] rmsd.superimpose_bond( st, (placeholder.bonds[0], placeholder.atom), # Y-R in `st` frag_st, (frag_atom, next(frag_atom.bonded_atoms))) # D-X in `frag_st` offset = st.atom_total st.extend(frag_st) # bonds between `placeholder.atom` and atoms in `placeholder.bonds` # are going to be replaced by bonds with fragment's atoms; new_bonds # is to keep track of that new_bonds = dict() for b, a in zip(placeholder.bonds, attchpts): frag_atom = frag_st.atom[a] # "dummy" in frag_st (attachment point) frag_bond = frag_atom.bond[1] # has only one bond frag_atom2 = frag_bond.atom2 # scaffold bond to be replaced scaffold_bond = st.getBond(b, placeholder.atom) # make Y-X in `st` new_bonds[b] = int(frag_atom2) + offset st.addBond(b, new_bonds[b], scaffold_bond.order) # use fragment attachement point position for atom b in `st`: # for example C-R1-R2, when placing a fragment instead of R1 # (like D1-C-D2, where D1 and D2 are the attachment points), # use D2's positions for R2 in the outcome C-C-R2 st.atom[b].xyz = frag_atom.xyz # no need to delete old bonds -- they should # be gone once we delete the atom to_del = [i + offset for i in attchpts] to_del.append(placeholder.atom) rmap = st.deleteAtoms(to_del, renumber_map=True) new_bonds = {k: rmap[v] for (k, v) in new_bonds.items()} return rmap, new_bonds #------------------------------------------------------------------------------#
[docs]class Scaffold(object): ''' Holds reference to the "scaffold" structure. Knows how to join R-groups to the core. '''
[docs] def __init__(self, st, groups=None): ''' :param groups: Dictionary that maps atom indices onto "group IDs". If `None`, use "GROUP_ID_PROP" to identify the "placeholder" atoms. :type groups: dict(int, int) or None ''' self.st = st self.placeholders = get_placeholders(self.st, groups=groups)
@property def fragmentIDs(self): ''' Returns set of fragment IDs. ''' return {p.group_id for p in self.placeholders}
[docs] def populate(self, fragments): ''' Replaces placeholders with fragments. :param fragments: Indexable that maps fragment IDs onto structures. :type fragments: dict(int, `schrodinger.structure.Structure`) ''' outcome = self.st.copy() todo = list(reversed(self.placeholders)) while todo: ph = todo.pop() try: frag_st = fragments[ph.group_id] except KeyError: continue attchpts = get_attachment_points(frag_st) if len(attchpts) < len(ph.bonds) or not ph.bonds: continue rmap, new_bonds = _place_fragment(outcome, ph, frag_st, attchpts) # renumber remaining placeholders for i in range(len(todo)): todo[i] = _renumber_placeholder(todo[i], rmap, new_bonds) return outcome
#------------------------------------------------------------------------------#
[docs]def validate_fragment(st): ''' :param st: Structure. :type st: `schrodinger.structure.Structure` :raises: ValueError if there is no attachment points. ''' if not get_attachment_points(st): raise ValueError('no attachment points')
#------------------------------------------------------------------------------#
[docs]def validate_scaffold(st): ''' :param st: Structure. :type st: `schrodinger.structure.Structure` :raises: ValueError if there is no placeholders. ''' for atom in st.atom: if _is_dummy(atom) and GROUP_ID_PROP in atom.property: return raise ValueError('no placeholders')
#------------------------------------------------------------------------------# def _pick_angles(existing, num_points): ''' Picks angles to place `num_points` on unit circle kind-of uniformly (truly uniform placement requires optimization that would imply platform dependence). :param existing: List of angles (in radians) locating existing points. :type existing: list of float :param num_points: Number of points to add. :type num_points: int :return: List of the picked angles (radians). :rtype: list of float ''' assert num_points > 0 TWOPI = 2 * math.pi _fold_angle = lambda x: x - TWOPI * math.floor(x / TWOPI) curr = sorted(_fold_angle(x) for x in existing) if not curr: return [i * TWOPI / num_points for i in range(num_points)] # otherwise pick the "opening" if len(curr) == 1: x0 = curr[0] size = TWOPI else: x0 = curr[0] size = curr[1] - curr[0] for i in range(1, len(curr) - 1): _size = curr[i + 1] - curr[i] if _size > size: x0 = curr[i] size = _size _size = _fold_angle(curr[0] - curr[-1]) if _size > size: x0 = curr[-1] size = _size if size < math.pi: # recurse if opening is less than half-circle x1 = x0 + size / 2 if num_points == 1: return [x1] return [x1] + _pick_angles(curr + [x1], num_points - 1) else: # place all into the same opening otherwise return [ x0 + size * (i + 1) / (num_points + 1) for i in range(num_points) ] #------------------------------------------------------------------------------# def _add_dummies(st, atom_index, num_dummies, bond_length=1.23): ''' Adds "dummy" atoms bonded to the atom identified by the `atom_index` (the dummies are positioned in the XY plane). :param st: Structure. :type st: `schrodinger.structure.Structure` :param atom_index: Index of the atom to which a "dummy" is to be added. :type atom_index: int :param num_dummies: Number of dummies to add. :type num_dummies: int :param bond_length: Length of the host-dummy bonds. :type bond_length: float :return: Newly added atoms. :rtype: list of schrodinger.structure._StructureAtom :raises: ValueError on errors. ''' if num_dummies <= 0: return [] # project atoms' neighbors onto unit circle existing = [] atom1 = st.atom[atom_index] for atom2 in atom1.bonded_atoms: vec = [atom2.x - atom1.x, atom2.y - atom1.y, atom2.z - atom1.z] veclen = math.sqrt(vec[0]**2 + vec[1]**2 + vec[2]**2) + 1.0e-13 existing.append(math.atan2(vec[1] / veclen, vec[0] / veclen)) # place `num_dummies` points on circle around the `atom` outcome = [] for angle in _pick_angles(existing, num_dummies): x = atom1.x + bond_length * math.cos(angle) y = atom1.y + bond_length * math.sin(angle) dummy = st.addAtom('Du', x, y, atom1.z) st.addBond(atom_index, dummy, 1) outcome.append(dummy) return outcome #------------------------------------------------------------------------------#
[docs]def convert_fragment_from_v3000(st): ''' Converts fragment metadata from MDL V3000 conventions to the ones expected by this module. :param st: Structure. :type st: `schrodinger.structure.Structure` :raises: ValueError if something goes wrong. ''' attchpts = dict() # atom index -> list of attchpts for atom in st.atom: try: labels = [int(t) for t in atom.property.pop('s_sd_ATTCHPT').split()] if not labels: continue except KeyError: continue except ValueError: raise ValueError( f'unexpected s_sd_ATTCHPT property value at atom {int(atom)}') if -1 in labels: # "-1" is the same as "1 2" pos = labels.index(-1) labels[pos:pos + 1] = [1, 2] attchpts[int(atom)] = labels if not attchpts: raise ValueError('no attachment points defined') for (atom_index, labels) in attchpts.items(): dummies = _add_dummies(st, atom_index, len(labels)) for (dummy, label) in zip(dummies, labels): dummy.property[ATTACHMENT_PROP] = label
#------------------------------------------------------------------------------#
[docs]def convert_scaffold_from_v3000(st): ''' Converts scaffold metadata from MDL V3000 conventions to the ones expected by this module. :param st: Structure. :type st: `schrodinger.structure.Structure` ''' groups = dict() # atom index -> group ID for atom in st.atom: if _is_dummy(atom): try: RGROUPS = atom.property.pop('s_sd_RGROUPS') except KeyError: continue m = re.match(r'^\s*\(\s*\d+\s+(\d+)\s*\)\s*$', RGROUPS) if m: group_id = int(m.group(1)) groups[int(atom)] = group_id atom.property[GROUP_ID_PROP] = group_id # bond ranks for atom_index in groups.keys(): neighbors = _get_neighbors_by_bfs(st.atom[atom_index], groups) if len(neighbors) > 1: for (rank, atom2) in enumerate(neighbors, 1): bond = st.getBond(atom_index, atom2) bond.property[BOND_RANK_PROP] = rank
#------------------------------------------------------------------------------#
[docs]class FragmentReader(object): ''' Reads structures from file, skips "invalid" (in context of ENUM-108) fragments. '''
[docs] def __init__(self, filename, logger=None): self.filename = filename self.reader = enumerate(structure.StructureReader(self.filename), 1) if logger is not None: self.logger = logger else: self.logger = log.get_output_logger(LOGGER_NAME)
def __iter__(self): return self def __next__(self): while True: i, st = next(self.reader) try: validate_fragment(st) return st except ValueError: try: convert_fragment_from_v3000(st) validate_fragment(st) return st except ValueError as e: self.logger.warn('WARNING: %s (%d): %s.', self.filename, i, e)
#------------------------------------------------------------------------------#
[docs]def read_scaffold(filename): ''' Reads scaffold from file. :param filename: File name. :type filename: str :return: Scaffold instance. :rtype: `Scaffold` :raises: ValueError on errors. ''' st = structure.Structure.read(filename) try: validate_scaffold(st) except ValueError: convert_scaffold_from_v3000(st) validate_scaffold(st) return Scaffold(st)
#------------------------------------------------------------------------------#
[docs]class FragmentsIterator(object): ''' Iterator over Cartesian product of fragment collections. '''
[docs] def __init__(self, sources, random=None): ''' :param sources: List of [iterable over fragments, id1, id2, ...] lists (see `r_group_enumerate.parse_rgroups`). :type sources: list of list :param random: If None, enumerate sequentially. If integer, enumerate randomly using `random` as a seed. :type random: None or int ''' iters = [] self.ids = [] for s in sources: iters.append(s[0]) self.ids.append(s[1:]) if random is None: self.prng = None self.product = itertools.product(*iters) else: self.prng = numpy.random.RandomState(random) self.product = None self.fragments = [] for it in iters: self.fragments.append([frag for frag in it])
def __iter__(self): return self def __next__(self): ''' :return: Dictionary that maps fragment IDs onto structures. ''' if self.prng: comb = [self.prng.choice(frags) for frags in self.fragments] else: comb = next(self.product) outcome = dict() for (frag_st, ids) in zip(comb, self.ids): for group_id in ids: outcome[group_id] = frag_st return outcome
#------------------------------------------------------------------------------#
[docs]class VirtualLibrary(object): ''' Implements common intended use case logic. '''
[docs] def __init__(self, scaffold, sources, random=None, logger=None): ''' :param scaffold: Scaffold. :type scaffold: `Scaffold` :param sources: List of [iterable over fragments, id1, id2, ...] lists (see `r_group_enumerate.parse_rgroups`). Or list of [filename, id1, id2, ...] lists. :type sources: list of list :param random: If None, enumerate sequentially. If integer, enumerate randomly using `random` as a seed. :type random: None or int ''' self.scaffold = scaffold _sources = [] for group in sources: if type(group[0]) is str: _sources.append([FragmentReader(group[0], logger=logger)] + group[1:]) else: _sources.append(group) self.fragments_iter = FragmentsIterator(_sources, random=random) if logger is None: self.logger = log.get_output_logger(LOGGER_NAME) else: self.logger = logger
def __iter__(self): return self def __next__(self): frags = next(self.fragments_iter) return (self.scaffold.populate(frags), frags)
#------------------------------------------------------------------------------#