Source code for schrodinger.application.pathfinder.analysis

"""
A module to perform retrosynthetic analysis.

Examples::

    # retrosynthetic analysis
    reactions = reaction.read_reactions_file('reactions.json')
    retrosynthesis = analysis.retrosynthesize(amide, reactions)
    print(retrosynthesis.asString())
    for route in retrosynthesis.getRoutes():
        print(route.asString())
        print("Starting materials:")
        for sm in route.getStartingNodes():
            print("-", sm)
        route.write('route.json')

"""

import collections
import copy
import fnmatch

import more_itertools
from rdkit import Chem

from schrodinger.application.pathfinder import route as route_mod
from schrodinger.utils import log

logger = log.get_output_logger("pathfinder")

# Tag for "auxiliary transforms" which get special treatment during the
# retrosynthetic analysis.
TRANSFORM = 'transformation'

RetrosynthesisState = collections.namedtuple('RetrosynthesisState',
                                             'mols_analyzed depth done tree')
RetrosynthesisState.mols_analyzed.__doc__ = \
    "Number of molecules analyzed so far (int)"
RetrosynthesisState.depth.__doc__ = \
    "Depth of the last node to be analyzed (int)"
RetrosynthesisState.done.__doc__ = \
    "Flag indicating whether the analysis ran to completion (bool)"
RetrosynthesisState.tree.__doc__ = \
    "Retrosynthetic tree (RetrosynthesisNode)"


[docs]class Transformer: """ Applies transformations (single-reactant, single-product reactions such as functional group interconversions) to an input molecule and generates routes up to a certain depth. The transformations are tried in input order, which means that: 1) when two transfomations can be applied to different parts of the molecule, only one of the two possible route orders is returned; 2) when one transformation depends on another, it may only work if the second one appears later in the input order. These restrictions are meant as an optimization to reduce combinatorial explosions, with the understanding that the supported use case is "orthogonal" transformations. """
[docs] def __init__(self, rxns): """ :param rxns: reaction dictionary by name :type rxns: {str: reaction.Reaction} """ self._rxns = rxns self._transforms = [ rxn.inverse() for rxn in rxns.values() if is_transformation(rxn) ] logger.debug(f'Got {len(self._transforms)} transforms')
[docs] def apply(self, mol, max_depth): """ Generate routes up to the given depth. The first route will always be the zero-step route consisting of the starting material. :param mol: input molecule :type mol: rdkit.Chem.rdchem.Mol :param max_depth: maximum number of steps :type max_depth: int :return: list of routes :rtype: [schrodinger.application.pathfinder.route.RouteNode] """ smiles = Chem.MolToSmiles(mol) sm = route_mod.ReagentNode(mol) routes = {smiles: sm} self._apply(mol, max_depth, sm, routes) return list(routes.values())
def _apply(self, mol, max_depth, node, routes, start=0): if max_depth <= 0: return for i in range(start, len(self._transforms)): transform = self._transforms[i] for prods in transform.apply([mol]): prod = prods[0] smiles = Chem.MolToSmiles(prod) if smiles not in routes: new_node = route_mod.RouteNode(prod) rxn = self._rxns[transform.name] new_node.setReactionInstance(rxn, [node]) routes[smiles] = new_node self._apply(prod, max_depth - 1, new_node, routes, i)
[docs]class ForwardAnalyzer: """ Using "applicable" synthetic reactions and a list of routes, generate a new list of routes up to a certain depth. A reactions is synthetic if it has more than one reactant; it is "applicable" if any of the reactant templates match the target molecule of the route. We don't actually apply the reactions because we don't know what the "other reactant" would be yet; we just check which atoms would be matched. Multiple reactions can be applied as long as the atoms matched by each don't overlap (in other words, the reactions are orthogonal). This works best when the reaction SMARTS are written in such a way that only relevant atoms are matched. """
[docs] def __init__(self, rxns): """ :param rxns: reaction dictionary by name :type rxns: {str: reaction.Reaction} """ self._rxns = rxns self._syn_rxns = [ rxn.inverse() for rxn in rxns.values() if not is_transformation(rxn) ] logger.debug(f'Got {len(self._syn_rxns)} synthetic reactions')
[docs] def analyzeRoutes(self, routes, max_depth): """ For each input route, generate new routes using "applicable" synthetic reactions. :param routes: list of input routes :type routes: [schrodinger.application.pathfinder.route.RouteNode] :param max_depth: maximum number of steps. NOTE: this is a total depth including the steps already in the input route. For example, if an input route has two steps and max_depth=3, only 1 more step would be considered. :type max_depth: int :return: list of new routes. None of the input routes are returned, because transformation-only routes are not considered interesting for the purpose of this class. :rtype: [schrodinger.application.pathfinder.route.RouteNode] """ new_routes = {} for route in routes: self._analyzeOne(route, max_depth, new_routes) return list(new_routes.values())
def _analyzeOne(self, node, max_depth, new_routes): used_atoms = set() max_depth -= node.depth() self._analyze(node.mol, max_depth, 0, node, used_atoms, new_routes) def _analyze(self, mol, max_depth, start, node, used_atoms, new_routes): if max_depth <= 0: return for i in range(start, len(self._syn_rxns)): syn_rxn = self._syn_rxns[i] rxn = self._rxns[syn_rxn.name] for i, tpl in enumerate(syn_rxn.rxn.GetReactants()): for match in mol.GetSubstructMatches(tpl): atoms = set(match) if not used_atoms & atoms: new_node = self._makeNode(rxn, i, node) key = new_node.treeAsString() if key not in new_routes: new_routes[key] = new_node self._analyze(mol, max_depth - 1, i, new_node, used_atoms | atoms, new_routes) def _makeNode(self, rxn, prec_idx, prec_node): new_node = route_mod.RouteNode() precursors = [] for i, reagent_class in enumerate(rxn.rhs_classes): if i == prec_idx: precursors.append(prec_node) else: node = route_mod.ReagentNode(reagent_class=reagent_class) precursors.append(node) new_node.setReactionInstance(rxn, precursors) return new_node
def _validate_reactions_for_retrosynthesis(reactions): """ Check that the reactions are usable for retrosysnthesis. For example, check that they are all one-product reactions. :param reactions: reactions to check :type reactions: list of Reaction :raises: ValueError """ for rxn in reactions: numtargets = rxn.rxn.GetNumReactantTemplates() if numtargets != 1: raise ValueError(f"Bad reaction '{rxn}'. Retrosynthesis only " "supports one-product reactions.")
[docs]def retrosynthesize(*a, **d): """ Convenience wrapper for retrosynthesize_gen for backward compatibility. Takes takes the same arguments as retrosynthesize_gen and returns a RetrosynthesisNode. """ for state in retrosynthesize_gen(*a, **d): pass return state.tree
[docs]def retrosynthesize_gen(target_mol, reactions_dict, max_depth=1, exclude=None, require_bonds=None, frozen_atoms=None, broken_bonds=None, fragments=None, label_attachment_atoms=False): r""" Generate a retrosynthetic tree to the desired depth based on the target molecule. This function is a generator which yields after analyzing each molecule (node in the tree). The search is done breadth-first and the caller can break out at any time, getting a valid (but possibly incomplete) tree. :type target_mol: Mol :param reactions_dict: Reaction dictionary. :type reactions_dict: .reaction.ReactionDict or dict {str: Reaction} :param exclude: Set of tags or reaction names to exclude. Tags are matched exactly; names are matched using shell globbing (\*, ?, []). :type exclude: set of str :param require_bonds: If provided, only reactions which break at least one bond in this set will be used for the analysis. Each bond is a sorted tuple of two 1-based atom indexes. :type require_bonds: set of tuple of int :param frozen_atoms: If provided, reactions which break any bond between two atoms in this set are skipped. :type frozen_atoms: set of int :param broken_bonds: If provided, this dict will be filled with a bond: reactions mapping, where a bond is a pair of atom indexes, and the reactions a set of strings (reaction names) for all the bonds that were broken during the analysis. :type broken_bonds: {(int, int): set(str)} :param fragments: If provided, this set will be filled with tuples of atom indexes for the fragments identified during the analysis. "Fragment" here is understood as a subset of atoms in the original molecule that is also present in a precursor. :type fragments: set of tuple of int :param label_attachment_atoms: if true, add atom mapping numbers to "attachment atoms", which are atoms that were present in the target molecule and which were involved in any broken bond. :type label_attachment_atoms: bool :return: yields a RetrosynthesisState after analyzing each molecule. The current tree can be found in the .tree property of the state object. NOTE: the same (evolving) tree object is returned with each state! This is for efficiency reasons and because there's not a strong use case for keeping a "trajectory". :rtype: generator of RetrosynthesisState """ # sort reactions by name to get a predictable, reproducible result reactions = [reactions_dict[r] for r in sorted(reactions_dict)] _validate_reactions_for_retrosynthesis(reactions) if exclude: reactions = filter_reactions(reactions, exclude) if (frozen_atoms or require_bonds or broken_bonds is not None or fragments is not None): target_mol = _add_atom_ids(target_mol) if broken_bonds is None: # We'll set it even if the caller didn't, because it's used by # _retrosynthesize to switch on the bond/atom tracking. broken_bonds = {} if frozen_atoms: frozen_bonds = _get_target_bonds(target_mol, frozen_atoms) innerfrozen_atoms = _get_innerfrozen_atoms(target_mol, frozen_atoms) else: frozen_bonds = set() innerfrozen_atoms = set() mol = Chem.RemoveHs(target_mol) def get_mol_for_node(mol): if broken_bonds is not None: # broken_bonds implies that atoms were labeled tgt_bonds = _get_target_bonds(mol) mol_for_node = _strip_atom_ids(mol, frozen_atoms, innerfrozen_atoms) frozen = bool(tgt_bonds & frozen_bonds) else: mol_for_node = mol frozen = False tgt_bonds = None return mol_for_node, frozen, tgt_bonds mol_for_node, frozen, tgt_bonds = get_mol_for_node(mol) root = route_mod.RetroSynthesisNode(mol_for_node, reagent_class=None, frozen=frozen) queue = collections.deque([(root, mol, tgt_bonds, 0, set())]) n = 0 inverse_tags = getattr(reactions_dict, 'inverse_tags', None) while queue: n += 1 node, mol, tgt_bonds, depth, blacklist = queue.popleft() if depth >= max_depth: continue for rxn in reactions: new_blacklist = set(blacklist) if inverse_tags: if rxn.tags & blacklist: continue for tag in rxn.tags: new_blacklist |= inverse_tags[tag] for precursors in rxn.apply((mol,)): if broken_bonds is not None: prec_bonds = set() for precursor in precursors: prec_bonds |= _get_target_bonds(precursor) if fragments is not None: if fragment := _get_fragment_atoms(precursor): fragments.add(fragment) cur_changed_bonds = tgt_bonds - prec_bonds # if a bond is missing from the precursor, it is broken prec_bond_atoms = {bond[0] for bond in prec_bonds} cur_changed_atoms = {bond[0] for bond in cur_changed_bonds} cur_broken_bonds = cur_changed_atoms - prec_bond_atoms logger.debug( f'{max_depth}: {rxn} broken bonds: {cur_broken_bonds}') if (TRANSFORM not in rxn.tags and require_bonds and not (cur_broken_bonds & require_bonds)): # We did not break any of the required bonds, so skip. continue if cur_changed_bonds & frozen_bonds: # We broke or changed a frozen bond, so skip. continue for bond in cur_broken_bonds: broken_bonds.setdefault(bond, set()).add(rxn.name) if label_attachment_atoms: for precursor in precursors: _label_attachment_atoms(precursor, broken_bonds) else: cur_broken_bonds = set() precursor_nodes = [] for precursor, reagent_class in zip(precursors, rxn.rhs_classes): if reagent_class and reagent_class.reactive: next_depth = max_depth else: next_depth = depth + 1 mol_for_node, frozen, prec_tgt_bonds = get_mol_for_node( precursor) newnode = route_mod.RetroSynthesisNode(mol_for_node, reagent_class, frozen=frozen) if next_depth < max_depth: queue.append((newnode, precursor, prec_tgt_bonds, next_depth, new_blacklist)) precursor_nodes.append(newnode) node.addReactionInstance(rxn, precursor_nodes, cur_broken_bonds) yield RetrosynthesisState(n, depth + 1, not queue, root)
def _label_attachment_atoms(mol, broken_bonds): """ Label the "attachment atoms", those that were involved in a bond in the target but no longer are. The labels are added as atom mapping numbers and the value is the original atom index (1-based). :param mol: molecule to modify :type mol: rdkit.Chem.rdchem.Mol :param broken_bonds: bonds that have been broken so far :type broken_bonds: set of (int, int) """ bond_atoms = set(more_itertools.flatten(broken_bonds)) for atom in mol.GetAtoms(): iso = atom.GetIsotope() if iso and iso in bond_atoms: atom.SetAtomMapNum(iso) def _add_atom_ids(mol): """ Return a copy of mol in which a unique ID has been added to each atom (currently implemented as a hack using isotope information). :param mol: molecule to modify :type mol: rdkit.Chem.rdchem.Mol """ new_mol = copy.copy(mol) for i, atom in enumerate(new_mol.GetAtoms(), 1): # We don't label hydrogens because we will use an implicit hydrogen # representation, and RemoveHs won't remove hydrogen with isotope info. if atom.GetAtomicNum() != 1: atom.SetIsotope(i) return new_mol def _strip_atom_ids(mol, frozen_atoms, innerfrozen_atoms): """ Return a copy of mol from which the atom IDs have been removed (under the current hack, this means removing all isotope information from the molecule). Also, if a truthy `frozen_atoms` is provided, a boolean property is added to each atom specifying whether the atom is frozen or not, and the atom index of the atom in the target (user-input) molecule is stored as an integer property. :param mol: molecule to modify :type mol: rdkit.Chem.rdchem.Mol :param frozen_atoms: set of frozen atom indexes (1-based) :type frozen_atoms: set of int :param innerfrozen_atoms: set of innerfrozen atom indexes (1-based) :type innerfrozen_atoms: set of int :return: stripped molecule :rtype: rdkit.Chem.rdchem.Mol """ new_mol = copy.copy(mol) for atom in new_mol.GetAtoms(): if frozen_atoms: iso = atom.GetIsotope() atom.SetBoolProp(route_mod.FROZEN_PROP, iso in frozen_atoms) atom.SetBoolProp(route_mod.INNERFROZEN_PROP, iso in innerfrozen_atoms) atom.SetIntProp(route_mod.TARGET_ATOM_IDX_PROP, iso) atom.SetIsotope(0) return new_mol def _get_target_bonds(mol, atoms=None): """ Return a set of "target bonds" in the molecule. These are the bonds which existed in the original target molecule, which means that both of the atoms in the bond have a unique ID. (New atoms added by reactions don't have an ID.) :param mol: molecule :type mol: rdkit.Chem.rdchem.Mol :param atoms: if provided, only return bonds for which both atoms belong to this set of 1-based atom indexes. :type atoms: set of int :return: set of bonds; each bond is a sorted tuple of two 1-based atom indexes. :rtype: set of tuple of int """ bonds = set() for bond in mol.GetBonds(): a1 = bond.GetBeginAtom() a2 = bond.GetEndAtom() iso1 = a1.GetIsotope() iso2 = a2.GetIsotope() bond_order = bond.GetBondType() if iso1 and iso2: if atoms and not (iso1 in atoms and iso2 in atoms): continue atom_pair = tuple(sorted([iso1, iso2])) bonds.add((atom_pair, bond_order)) return bonds def _get_innerfrozen_atoms(mol, frozen_atoms): """ Find the "innerfrozen" atoms, which are the subset of frozen atoms that are not connected to any non-frozen atom. :param mol: molecule :type mol: rdkit.Chem.rdchem.Mol :param frozen_atoms: set of 1-based frozen atom indexes :type frozen_atoms: set of int :return: set of 1-based innerfrozen atom indexes :type atoms: set of int """ innerfrozen = set(frozen_atoms) for idx in frozen_atoms: atom = mol.GetAtomWithIdx(idx - 1) for nei in atom.GetNeighbors(): nei_idx = nei.GetIdx() + 1 if nei_idx not in frozen_atoms: innerfrozen.discard(idx) return innerfrozen def _get_fragment_atoms(mol): """ Return a tuple of atom indices in the "original target molecule" that are available in `mol` through atom labels (isotopes). Not all atoms in `mol` carry the labels since only some are related (can be mapped back) to the "original target molecule". :param mol: molecule :type mol: rdkit.Chem.rdchem.Mol :return: atom indexes (1-based) :rtype: tuple of int """ return tuple( sorted(a.GetIsotope() for a in mol.GetAtoms() if a.GetIsotope()))
[docs]def reaction_is_excluded(reaction, exclude): r""" Check if a reaction meets any of the exclusion criteria. :type reaction: Reaction :param exclude: Set of tags or reaction names to exclude. Tags are matched exactly; names are matched using shell globbing (\*, ?, []). :type exclude: set of str :return: True if reaction is meets any of the exclusion criteria :rtype: bool """ if reaction.tags & exclude: return True for patt in exclude: if fnmatch.fnmatch(reaction.name, patt): return True return False
[docs]def filter_reactions(reactions, exclude): r""" Return a shallow copy of a list of reactions, filtering out those matching any of the exclusion criteria. :type reactions: list of Reaction :param exclude: Set of tags or reaction names to exclude. Tags are matched exactly; names are matched using shell globbing (\*, ?, []). :type exclude: set of str :rtype: list of reaction """ return [r for r in reactions if not reaction_is_excluded(r, exclude)]
[docs]def is_transformation(rxn): """ Check if a reaction is a "transformation", meaning that it has a single reactant and a single product. :param rxn: reaction to test :type rxn: reaction.Reaction :return: True if rxn is a transformation, False otherwise :rtype: bool """ return len(rxn.lhs_classes) == 1 and len(rxn.rhs_classes) == 1