'''
Support for "R-group" enumeration as in ENUM-108.
'''
import itertools
import math
import re
from collections import deque
from collections import namedtuple
from functools import cmp_to_key
import numpy
from schrodinger import structure
from schrodinger.structutils import rmsd
from schrodinger.utils import log
#------------------------------------------------------------------------------#
LOGGER_NAME = 'markush_enumerate'
#------------------------------------------------------------------------------#
GROUP_ID_PROP = 'i_rge_group_id'
BOND_RANK_PROP = 'i_rge_bond_rank'
ATTACHMENT_PROP = 'i_rge_attachment'
#------------------------------------------------------------------------------#
def _is_dummy(atom):
return atom.atomic_number in (0, -2)
#------------------------------------------------------------------------------#
def _get_neighbors_by_explicit_rank(atom):
'''
Returns indices of the bonded atoms ordered using explicit bond ranks.
If not all bonds have ranks, or some ranks are not unique, returns None.
:return: List of bonded atom indices in a particular order.
:rtype: list(int) or None
'''
seen = dict() # map rank -> neighbor index
for bond in atom.bond:
try:
rank = bond.property[BOND_RANK_PROP]
except KeyError:
return None
if rank in seen:
return None
else:
seen[rank] = int(bond.atom2)
return [seen[i] for i in sorted(seen)]
#------------------------------------------------------------------------------#
def _get_bond_rank_by_bfs(atom1, atom2, groups):
"""
Traverses structure in BFS order starting from atom `atom2`
with `atom1` marked as "visited" from get-go. Records
group IDs seen, along with their distance from atom1. Returns
sorted list of (distance, group ID) tuples (with distance
measured in the number of bonds), or fallback value that uses
negative atomic numbers instead of the group IDs in case no
groups were encountered.
:param atom1: First of the two atoms that define the bond to be ranked.
:type atom1: `schrodinger.structure._StructureAtom`
:param atom2: Second of the two atoms that define the bond to be ranked.
:type atom2: `schrodinger.structure._StructureAtom`
:param groups: Dictionary that maps atom indices onto group IDs.
:type groups: dict(int, int)
:return: List of (distance, ID) tuples.
:rtype: list(tuple(int, int))
"""
queue = deque([(atom2, 1)])
visited = {int(atom1)}
bfs_atoms = []
bfs_groups = []
while queue:
curr, dist = queue.popleft()
curr_index = int(curr)
if curr_index not in visited:
visited.add(curr_index)
try:
bfs_groups.append((dist, groups[curr_index]))
except KeyError:
bfs_atoms.append((dist, -curr.atomic_number))
for neighbor in curr.bonded_atoms:
if int(neighbor) not in visited:
queue.append((neighbor, dist + 1))
if bfs_groups:
return sorted(bfs_groups)
else:
return sorted(bfs_atoms)
#------------------------------------------------------------------------------#
def _get_neighbors_by_bfs(atom, groups):
'''
Returns indices of the bonded atoms ordered using heuristic rules
that try to prioritize bonds leading toward lower ranked nearby
R-groups.
:param groups: Dictionary that maps atom indices onto group IDs.
:type groups: dict(int, int)
:return: List of bonded atom indices in a particular order.
:rtype: list(int)
'''
ranks = dict() # neighbor atom index -> bond rank
for b in atom.bond:
ranks[int(b.atom2)] = (b.order,
_get_bond_rank_by_bfs(atom, b.atom2, groups))
return sorted(ranks.keys(), key=lambda x: ranks[x])
#------------------------------------------------------------------------------#
# PlaceHolder.bonds -- ordered list of PlaceHolder.atom neighbors
# (indices of the atoms bonded to the PlaceHolder.atom in the "scaffold" CT)
PlaceHolder = namedtuple('PlaceHolder', ['group_id', 'atom', 'bonds'])
#------------------------------------------------------------------------------#
[docs]def get_placeholders(st, groups=None):
'''
Gathers placeholders metadata.
:param st: Structure.
:type st: `schrodinger.structure.Structure`
:param groups: Dictionary that maps placeholder atom indices onto
their group IDs. If `None`, regard atoms that carry the
"GROUP_ID_PROP" property as "placeholders".
:type atoms: dict(int, int) or None
:return: List of the placeholders records.
:rtype: list(PlaceHolder)
:raises: ValueError if something about `st` looks not right.
'''
if groups is None:
groups = dict()
for atom in st.atom:
try:
group_id = atom.property[GROUP_ID_PROP]
except KeyError:
continue
groups[int(atom)] = group_id
for (atom, group_id) in groups.items():
if group_id <= 0:
raise ValueError(f'non-positive placeholder ID for atom {atom}')
# use explicit ranks (if available) to order the placeholders bonds
outcome = []
done = True
for (atom_index, group_id) in groups.items():
neighbors = _get_neighbors_by_explicit_rank(st.atom[atom_index])
outcome.append(PlaceHolder(group_id, atom_index, neighbors))
if neighbors is None:
done = False
if done:
return outcome
# resort to heuristics
for i in range(len(outcome)):
if outcome[i].bonds is None:
p = outcome[i]
neighbors = _get_neighbors_by_bfs(st.atom[p.atom], groups)
outcome[i] = PlaceHolder(p.group_id, p.atom, neighbors)
return sorted(outcome, key=lambda ph: ph.group_id)
#------------------------------------------------------------------------------#
[docs]def get_attachment_points(st):
'''
Returns ordered list of attachment points in `st`. The order is either
based upon atom labels ("ATTACHMENT_PROP", if available) or decided
by the bond order and the peer atom atomic number.
:return: List of atoms.
:rtype: list(int)
'''
def compare_atoms(a1, a2):
try:
return a1.property[ATTACHMENT_PROP] - a2.property[ATTACHMENT_PROP]
except KeyError:
o = a1.bond[1].order - a2.bond[1].order
if o:
return o
else:
return (next(a1.bonded_atoms).atomic_number -
next(a2.bonded_atoms).atomic_number)
attchpts = []
for atom in st.atom:
if _is_dummy(atom) and atom.bond_total == 1:
attchpts.append(atom)
return [int(a) for a in sorted(attchpts, key=cmp_to_key(compare_atoms))]
#------------------------------------------------------------------------------#
def _renumber_placeholder(ph, rmap, rbonds):
'''
Apply atom renumbering map to the placeholder data.
:param ph: Original placeholder.
:type ph: `PlaceHolder`
:param rmap: Atom renumbering map (keys: old indices, values: new indices).
:type rmap: dict(int, int or None)
:param rbonds: New bonds map (see `place_fragment`).
:type rbonds: dict(int, int)
:return: Renumbered placeholder data.
:rtype: `PlaceHolder`
'''
new_atom = rmap[ph.atom]
new_bonds = []
for b in ph.bonds:
_b = rmap[b]
if _b is None:
# peer atom was a placeholder and got replaced with a fragment
new_bonds.append(rbonds[ph.atom])
else:
new_bonds.append(_b)
return PlaceHolder(ph.group_id, new_atom, new_bonds)
#------------------------------------------------------------------------------#
def _place_fragment(st, placeholder, frag_st, attchpts):
'''
Replaces `placeholder` atom in `st` with `frag_st` structure.
:param attchpts: Attachment points within `frag_st` (atom indices).
:type attchpts: list(int)
:return: Atom renumbering map and updated bonds map. The latter is
a dictionary that maps (old) atom indices (B) from
placeholder.bonds onto (new) indices (F): once the placeholder
atom (P) is replaced by the fragment, all of the (P)-(B) bonds get
replaced by the (F)-(B) bonds ((F) stands for a fragment atom here).
:rtype: tuple(dict(int, int or None), dict(int, int))
'''
# transform the fragment using highest ranked placeholder bond
frag_atom = frag_st.atom[attchpts[0]]
rmsd.superimpose_bond(
st,
(placeholder.bonds[0], placeholder.atom), # Y-R in `st`
frag_st,
(frag_atom, next(frag_atom.bonded_atoms))) # D-X in `frag_st`
offset = st.atom_total
st.extend(frag_st)
# bonds between `placeholder.atom` and atoms in `placeholder.bonds`
# are going to be replaced by bonds with fragment's atoms; new_bonds
# is to keep track of that
new_bonds = dict()
for b, a in zip(placeholder.bonds, attchpts):
frag_atom = frag_st.atom[a] # "dummy" in frag_st (attachment point)
frag_bond = frag_atom.bond[1] # has only one bond
frag_atom2 = frag_bond.atom2
# scaffold bond to be replaced
scaffold_bond = st.getBond(b, placeholder.atom)
# make Y-X in `st`
new_bonds[b] = int(frag_atom2) + offset
st.addBond(b, new_bonds[b], scaffold_bond.order)
# use fragment attachement point position for atom b in `st`:
# for example C-R1-R2, when placing a fragment instead of R1
# (like D1-C-D2, where D1 and D2 are the attachment points),
# use D2's positions for R2 in the outcome C-C-R2
st.atom[b].xyz = frag_atom.xyz
# no need to delete old bonds -- they should
# be gone once we delete the atom
to_del = [i + offset for i in attchpts]
to_del.append(placeholder.atom)
rmap = st.deleteAtoms(to_del, renumber_map=True)
new_bonds = {k: rmap[v] for (k, v) in new_bonds.items()}
return rmap, new_bonds
#------------------------------------------------------------------------------#
[docs]class Scaffold(object):
'''
Holds reference to the "scaffold" structure. Knows how to join
R-groups to the core.
'''
[docs] def __init__(self, st, groups=None):
'''
:param groups: Dictionary that maps atom indices onto "group IDs".
If `None`, use "GROUP_ID_PROP" to identify the "placeholder"
atoms.
:type groups: dict(int, int) or None
'''
self.st = st
self.placeholders = get_placeholders(self.st, groups=groups)
@property
def fragmentIDs(self):
'''
Returns set of fragment IDs.
'''
return {p.group_id for p in self.placeholders}
[docs] def populate(self, fragments):
'''
Replaces placeholders with fragments.
:param fragments: Indexable that maps fragment IDs onto structures.
:type fragments: dict(int, `schrodinger.structure.Structure`)
'''
outcome = self.st.copy()
todo = list(reversed(self.placeholders))
while todo:
ph = todo.pop()
try:
frag_st = fragments[ph.group_id]
except KeyError:
continue
attchpts = get_attachment_points(frag_st)
if len(attchpts) < len(ph.bonds) or not ph.bonds:
continue
rmap, new_bonds = _place_fragment(outcome, ph, frag_st, attchpts)
# renumber remaining placeholders
for i in range(len(todo)):
todo[i] = _renumber_placeholder(todo[i], rmap, new_bonds)
return outcome
#------------------------------------------------------------------------------#
[docs]def validate_fragment(st):
'''
:param st: Structure.
:type st: `schrodinger.structure.Structure`
:raises: ValueError if there is no attachment points.
'''
if not get_attachment_points(st):
raise ValueError('no attachment points')
#------------------------------------------------------------------------------#
[docs]def validate_scaffold(st):
'''
:param st: Structure.
:type st: `schrodinger.structure.Structure`
:raises: ValueError if there is no placeholders.
'''
for atom in st.atom:
if _is_dummy(atom) and GROUP_ID_PROP in atom.property:
return
raise ValueError('no placeholders')
#------------------------------------------------------------------------------#
def _pick_angles(existing, num_points):
'''
Picks angles to place `num_points` on unit circle kind-of
uniformly (truly uniform placement requires optimization
that would imply platform dependence).
:param existing: List of angles (in radians) locating existing points.
:type existing: list of float
:param num_points: Number of points to add.
:type num_points: int
:return: List of the picked angles (radians).
:rtype: list of float
'''
assert num_points > 0
TWOPI = 2 * math.pi
_fold_angle = lambda x: x - TWOPI * math.floor(x / TWOPI)
curr = sorted(_fold_angle(x) for x in existing)
if not curr:
return [i * TWOPI / num_points for i in range(num_points)]
# otherwise pick the "opening"
if len(curr) == 1:
x0 = curr[0]
size = TWOPI
else:
x0 = curr[0]
size = curr[1] - curr[0]
for i in range(1, len(curr) - 1):
_size = curr[i + 1] - curr[i]
if _size > size:
x0 = curr[i]
size = _size
_size = _fold_angle(curr[0] - curr[-1])
if _size > size:
x0 = curr[-1]
size = _size
if size < math.pi:
# recurse if opening is less than half-circle
x1 = x0 + size / 2
if num_points == 1:
return [x1]
return [x1] + _pick_angles(curr + [x1], num_points - 1)
else:
# place all into the same opening otherwise
return [
x0 + size * (i + 1) / (num_points + 1) for i in range(num_points)
]
#------------------------------------------------------------------------------#
def _add_dummies(st, atom_index, num_dummies, bond_length=1.23):
'''
Adds "dummy" atoms bonded to the atom identified by the `atom_index`
(the dummies are positioned in the XY plane).
:param st: Structure.
:type st: `schrodinger.structure.Structure`
:param atom_index: Index of the atom to which a "dummy" is to be added.
:type atom_index: int
:param num_dummies: Number of dummies to add.
:type num_dummies: int
:param bond_length: Length of the host-dummy bonds.
:type bond_length: float
:return: Newly added atoms.
:rtype: list of schrodinger.structure._StructureAtom
:raises: ValueError on errors.
'''
if num_dummies <= 0:
return []
# project atoms' neighbors onto unit circle
existing = []
atom1 = st.atom[atom_index]
for atom2 in atom1.bonded_atoms:
vec = [atom2.x - atom1.x, atom2.y - atom1.y, atom2.z - atom1.z]
veclen = math.sqrt(vec[0]**2 + vec[1]**2 + vec[2]**2) + 1.0e-13
existing.append(math.atan2(vec[1] / veclen, vec[0] / veclen))
# place `num_dummies` points on circle around the `atom`
outcome = []
for angle in _pick_angles(existing, num_dummies):
x = atom1.x + bond_length * math.cos(angle)
y = atom1.y + bond_length * math.sin(angle)
dummy = st.addAtom('Du', x, y, atom1.z)
st.addBond(atom_index, dummy, 1)
outcome.append(dummy)
return outcome
#------------------------------------------------------------------------------#
[docs]def convert_fragment_from_v3000(st):
'''
Converts fragment metadata from MDL V3000 conventions to
the ones expected by this module.
:param st: Structure.
:type st: `schrodinger.structure.Structure`
:raises: ValueError if something goes wrong.
'''
attchpts = dict() # atom index -> list of attchpts
for atom in st.atom:
try:
labels = [int(t) for t in atom.property.pop('s_sd_ATTCHPT').split()]
if not labels:
continue
except KeyError:
continue
except ValueError:
raise ValueError(
f'unexpected s_sd_ATTCHPT property value at atom {int(atom)}')
if -1 in labels:
# "-1" is the same as "1 2"
pos = labels.index(-1)
labels[pos:pos + 1] = [1, 2]
attchpts[int(atom)] = labels
if not attchpts:
raise ValueError('no attachment points defined')
for (atom_index, labels) in attchpts.items():
dummies = _add_dummies(st, atom_index, len(labels))
for (dummy, label) in zip(dummies, labels):
dummy.property[ATTACHMENT_PROP] = label
#------------------------------------------------------------------------------#
[docs]def convert_scaffold_from_v3000(st):
'''
Converts scaffold metadata from MDL V3000 conventions to
the ones expected by this module.
:param st: Structure.
:type st: `schrodinger.structure.Structure`
'''
groups = dict() # atom index -> group ID
for atom in st.atom:
if _is_dummy(atom):
try:
RGROUPS = atom.property.pop('s_sd_RGROUPS')
except KeyError:
continue
m = re.match(r'^\s*\(\s*\d+\s+(\d+)\s*\)\s*$', RGROUPS)
if m:
group_id = int(m.group(1))
groups[int(atom)] = group_id
atom.property[GROUP_ID_PROP] = group_id
# bond ranks
for atom_index in groups.keys():
neighbors = _get_neighbors_by_bfs(st.atom[atom_index], groups)
if len(neighbors) > 1:
for (rank, atom2) in enumerate(neighbors, 1):
bond = st.getBond(atom_index, atom2)
bond.property[BOND_RANK_PROP] = rank
#------------------------------------------------------------------------------#
[docs]class FragmentReader(object):
'''
Reads structures from file, skips "invalid"
(in context of ENUM-108) fragments.
'''
[docs] def __init__(self, filename, logger=None):
self.filename = filename
self.reader = enumerate(structure.StructureReader(self.filename), 1)
if logger is not None:
self.logger = logger
else:
self.logger = log.get_output_logger(LOGGER_NAME)
def __iter__(self):
return self
def __next__(self):
while True:
i, st = next(self.reader)
try:
validate_fragment(st)
return st
except ValueError:
try:
convert_fragment_from_v3000(st)
validate_fragment(st)
return st
except ValueError as e:
self.logger.warn('WARNING: %s (%d): %s.', self.filename, i,
e)
#------------------------------------------------------------------------------#
[docs]def read_scaffold(filename):
'''
Reads scaffold from file.
:param filename: File name.
:type filename: str
:return: Scaffold instance.
:rtype: `Scaffold`
:raises: ValueError on errors.
'''
st = structure.Structure.read(filename)
try:
validate_scaffold(st)
except ValueError:
convert_scaffold_from_v3000(st)
validate_scaffold(st)
return Scaffold(st)
#------------------------------------------------------------------------------#
[docs]class FragmentsIterator(object):
'''
Iterator over Cartesian product of fragment collections.
'''
[docs] def __init__(self, sources, random=None):
'''
:param sources: List of [iterable over fragments, id1, id2, ...] lists
(see `r_group_enumerate.parse_rgroups`).
:type sources: list of list
:param random: If None, enumerate sequentially. If integer, enumerate
randomly using `random` as a seed.
:type random: None or int
'''
iters = []
self.ids = []
for s in sources:
iters.append(s[0])
self.ids.append(s[1:])
if random is None:
self.prng = None
self.product = itertools.product(*iters)
else:
self.prng = numpy.random.RandomState(random)
self.product = None
self.fragments = []
for it in iters:
self.fragments.append([frag for frag in it])
def __iter__(self):
return self
def __next__(self):
'''
:return: Dictionary that maps fragment IDs onto structures.
'''
if self.prng:
comb = [self.prng.choice(frags) for frags in self.fragments]
else:
comb = next(self.product)
outcome = dict()
for (frag_st, ids) in zip(comb, self.ids):
for group_id in ids:
outcome[group_id] = frag_st
return outcome
#------------------------------------------------------------------------------#
[docs]class VirtualLibrary(object):
'''
Implements common intended use case logic.
'''
[docs] def __init__(self, scaffold, sources, random=None, logger=None):
'''
:param scaffold: Scaffold.
:type scaffold: `Scaffold`
:param sources: List of [iterable over fragments, id1, id2, ...] lists
(see `r_group_enumerate.parse_rgroups`). Or list of
[filename, id1, id2, ...] lists.
:type sources: list of list
:param random: If None, enumerate sequentially. If integer, enumerate
randomly using `random` as a seed.
:type random: None or int
'''
self.scaffold = scaffold
_sources = []
for group in sources:
if type(group[0]) is str:
_sources.append([FragmentReader(group[0], logger=logger)] +
group[1:])
else:
_sources.append(group)
self.fragments_iter = FragmentsIterator(_sources, random=random)
if logger is None:
self.logger = log.get_output_logger(LOGGER_NAME)
else:
self.logger = logger
def __iter__(self):
return self
def __next__(self):
frags = next(self.fragments_iter)
return (self.scaffold.populate(frags), frags)
#------------------------------------------------------------------------------#