Source code for schrodinger.application.steps.enumerators

import functools
import itertools
import math
import os
import random
from functools import reduce
from operator import mul

from rdkit import Chem
from rdkit.Chem import AllChem

from schrodinger import stepper
from schrodinger.models import json
from schrodinger.stepper import sideinputs
from schrodinger.analysis.transformations import TransformsRepository
from schrodinger.application.pathfinder import analysis
from schrodinger.application.pathfinder import filtering
from schrodinger.application.pathfinder import reaction
from schrodinger.application.pathfinder import synthesis
from schrodinger.application.pathfinder import route
from schrodinger.models import parameters

from . import filters
from . import utils
from .basesteps import LoggerMixin
from .basesteps import MolMapStep
from .dataclasses import FileInMixin
from .dataclasses import MolOutMixin
from .dataclasses import MolInMixin
from .dataclasses import MolMolMixin
from .dataclasses import MolToSmilesSerializer

# default R-group smarts for decorator step settings
DEFAULT_RGROUP_ATOM_SMARTS = '#6,#7,#8,#9,#16,#17,#35,#53'


[docs]def get_transformer_reactions(file, logger=None, lead_in=''): """ Load the valid transformer reactions from file. :param file: the json file to load the transformer reactions from :type file: str :param logger: the logger to write errors :param logger: logging.Logger or NoneType :param lead_in: the lead in text for error logging :type lead_in: str :return: list of reaction smarts: :rtype: list of ChemicalReaction """ repo = TransformsRepository() with open(file) as fh: repo.load(fh) reactions = [] lead_in = lead_in + ': ' if lead_in else lead_in for sma in repo.getSmarts(): try: reactions.append(AllChem.ReactionFromSmarts(sma)) except ValueError as e: if logger: logger.error(f'{lead_in}{str(e)}') return reactions
[docs]class Transformer(LoggerMixin, FileInMixin, MolOutMixin, stepper.MapStep): """ Enumerates unique sanitized products formed using the transformation from the input json file applied to compounds in the `compounds_file` of SMILES only file. """
[docs] class Settings(parameters.CompoundParam): compounds_file: stepper.StepperFile = None
[docs] def validateSettings(self): return utils.validate_file(self, 'compounds_file', required=True)
[docs] def mapFunction(self, file): reactions = get_transformer_reactions(file, logger=self.logger, lead_in=self.getStepId()) self.logger.info(f'{self.getStepId()}: {len(reactions)} reactions') self._seen_smiles = set() serializer = MolToSmilesSerializer() for mol in serializer.deserialize(str(self.settings.compounds_file)): for reaction_ in reactions: for product, in reaction_.RunReactants((mol,)): if Chem.SanitizeMol(product, catchErrors=True): continue # ignore products that can't be sanitized smiles = Chem.MolToSmiles(product) if smiles not in self._seen_smiles: self._seen_smiles.add(smiles) yield product
def _auto_max_tries(max_products, total, tries_per_product=100, recovery=0.95): """ Determine a reasonable number of maximum number of combinations to try. :param max_products: requested maximum number of products :type max_products: int :param total: the total number of reactant combinations :type total: int :param tries_per_product: number of tries per each generated product, on average. Can be seen as an "inverse success rate". :type tries_per_product: int :param recovery: fraction of products which we would like to recover on average when using random sampling in the limiting case where max_products == total :type tries_per_product: float :return: max tries :rtype: int """ # from python/scripts/combinatorial_synthesis.py assert 0.0 < recovery < 1.0 # This is based on the observation that if we roll an N-sided die X*N # times, we'll see 1 - exp(-X) of the faces at least once. fac = -math.log(1.0 - recovery) return min( min(max_products, total) * tries_per_product, math.ceil(total * fac))
[docs]class Synthesizer(MolMolMixin, stepper.Chain): """ Enumerates unique sanitized molecules from a combinatorial synthesis using routes based on the input molecules using the default reaction dictionary and reagent library. If the maximum number of products is less than the total number of combinations the route synthesis will be done by random sampling, which may yield fewer products than requested. Otherwise a systematic set of unique products will be yielded. The settings contain: - `core_smarts`: the SMARTS that the products should have and needs to be part of the input molecule. - `depth`: the maximum depth of the retrosynthetic routes to use. - `reagent_lib`: an optional directory to prepend to the standard reagent library search path - `max_products`: the maximum number of products try to synthesize for each input molecule per route. Use 0 to force an exhaustive synthesis. - `seed`: seed for random number generator. If `None`, the random number generator will not be seeded. - `yield_input`: whether the input molecule should be returned first """
[docs] class Settings(parameters.CompoundParam): core_smarts: str = None depth: int = 2 reagent_lib: stepper.StepperFolder = None # optional max_products: int = 100 seed: int = None yield_input: bool = True deduplicate_routes: bool = False
[docs] def buildChain(self): settings = self.settings if self.settings.yield_input: fork = sideinputs.ForkStep(step=MolMolMixin) self.addStep(fork) self.addStep( RouteEnumerator(core_smarts=settings.core_smarts, depth=settings.depth)) if self.settings.deduplicate_routes: self.addStep(RouteDeduplicator()) self.addStep( RouteEvaluator(max_products=settings.max_products, reagent_lib=settings.reagent_lib, seed=settings.seed)) if self.settings.yield_input: self.addStep(sideinputs.JoinStep(fork))
[docs]class RouteSerializer(stepper.Serializer): """ A serializer for RouteNodes """ DataType = route.RouteNode
[docs] def toString(self, route: route.RouteNode) -> str: return json.dumps(route.getTreeData(self_contained=True))
[docs] def fromString(self, route_str: str) -> route.RouteNode: return route.parse_route_data(json.loads(route_str))
[docs]class RouteDeduplicator(stepper.ReduceStep): Input = Output = route.RouteNode InputSerializer = OutputSerializer = RouteSerializer
[docs] def setUp(self): super().setUp() self._seen_routes = set()
[docs] def reduceFunction(self, routes): for rt in routes: rt_str = rt.getOneLineRepresentation() if rt_str in self._seen_routes: continue self._seen_routes.add(rt_str) yield rt
[docs]class RouteEnumerator(LoggerMixin, MolInMixin, stepper.MapStep): """ Mol -> (RouteEnumerator) -> RouteNode Takes a Mol and enumerates the retrosynthetic routes that can produce it. """ Output = route.RouteNode OutputSerializer = RouteSerializer
[docs] class Settings(parameters.CompoundParam): core_smarts: str = None depth: int = 2
[docs] def validateSettings(self): issues = utils.validate_core_smarts(self, self.settings.core_smarts) if not reaction.get_reactions(): issues.append(stepper.SettingsError(self, 'no reactions defined')) return issues
[docs] def setUp(self): super().setUp() self._reactions = reaction.get_reactions() self._core_smarts = Chem.MolFromSmarts(self.settings.core_smarts)
[docs] def mapFunction(self, mol): frozen_heavy_atoms = self._getFrozenHeavyAtoms(mol) if frozen_heavy_atoms: tree = analysis.retrosynthesize(mol, self._reactions, max_depth=self.settings.depth, frozen_atoms=frozen_heavy_atoms) yield from self._getRoutes(tree)
def _getFrozenHeavyAtoms(self, mol): """ The 1-based index of frozen heavy atoms to use in the retrosynthesis. :param mol: :type mol: Chem.Mol :return: the set of frozen heavy atoms to use :rtype: Set[int] """ mol_H = Chem.AddHs(mol) return [ idx + 1 for idx in mol_H.GetSubstructMatch(self._core_smarts) if mol_H.GetAtomWithIdx(idx).GetAtomicNum() != 1 ] def _getRoutes(self, tree): """ :param tree: the retrosynthesis result for a molecule :type tree: schrodinger.application.pathfinder.route.RetroSynthesisNode :return: routes that have not been synthesized before :rtype: generator of schrodinger.application.pathfinder.route.RouteNode """ routes = tree.getRoutes() for route in routes: route_str = route.getOneLineRepresentation() self.logger.debug(f'{self.getStepId()} ROUTE: {route_str}') yield route
[docs]class RouteEvaluator(MolOutMixin, stepper.MapStep): """ RouteNode -> (RouteEvaluator) -> Mol Takes a route and produces all products from it. """ InputSerializer = RouteSerializer Input = route.RouteNode
[docs] class Settings(parameters.CompoundParam): max_products: int = 100 seed: int = None reagent_lib: stepper.StepperFolder = None # optional
[docs] def validateSettings(self): issues = [] issues += utils.validate_folder(self, 'reagent_lib') return issues
[docs] def setUp(self): super().setUp() reagent_lib = self.settings.reagent_lib self._reagent_lib = [reagent_lib] if reagent_lib else None
def _getProducts(self, route): """ :param route: the route to get the products for :type route: schrodinger.application.pathfinder.route.RouteNode :return: if the maximum number of products requested in the settings is less than the total number of products this route generates a random sample of unique products, otherwise all products :rtype: Iterable[Chem.Mol] """ settings = self.settings max_products = settings.max_products synthesizer = synthesis.Synthesizer(route) reagent_sources = synthesis.get_reagent_sources( route, libpath=self._reagent_lib) if max_products: total = reduce(mul, [src.size for src in reagent_sources]) if max_products < total: return synthesis.RandomSampleIterator( synthesizer, reagent_sources, max_products, max_tries=_auto_max_tries(max_products, total), prng=random.Random(settings.seed) if settings.seed else random) return synthesis.SystematicIterator(synthesizer, reagent_sources)
[docs] def mapFunction(self, route): yield from self._getProducts(route)
[docs]class Decorator(MolMapStep): """ Enumerates unique sanitized molecules formed by replacing a hydrogen on a C, N, or O atom in the ligand with an R-group that was attached to an Ar. The `rgroup_atom_smarts` setting allows for filtering of which reagents in the `rgroup_file` are used for the decoration reaction. The default value of '#6,#7,#8,#9,#16,#17,#35,#53' is used if the `rgroup_atom_smarts` is an empty string or None. The settings is a `filters.ProfileSettings` instance whose `property_ranges` are used to determine with R-groups are allowed to react. The `property_ranges` are optional. seealso:: filters.ProfileSettings Example R-group reagents:: C[Ar] N[Ar] Example of a Decorator definition in a yaml file:: Decorator: rgroup_atom_smarts: '*' # allow all unique rgroup reagents rgroup_file: rgroups_small.smi core_smarts: c1ccccc1 property_ranges: MolWt: [250, 500] RingCount: [0, 5] NumAromaticRings: [0, 3] NumAliphaticRings: [0, 5] NumSpiroAtoms: [0, 0] """ LEAVING_MOL_WT = filtering.DESCRIPTORS_DICT['MolWt']( Chem.MolFromSmiles('[ArH]'))
[docs] class Settings(filters.ProfileSettings): rgroup_file: stepper.StepperFile core_smarts: str = None rgroup_atom_smarts: str = DEFAULT_RGROUP_ATOM_SMARTS
[docs] def validate(self, step): issues = utils.validate_core_smarts(step, self.core_smarts) rgroup_smarts_mol = self.getReagentMol() if not rgroup_smarts_mol: issues.append( stepper.SettingsError(step, 'invalid rgroup_atom_smarts')) file_issues = utils.validate_file(step, 'rgroup_file', required=True) issues += file_issues if not file_issues and rgroup_smarts_mol and not self.getRGroups(): issues.append( stepper.SettingsError(step, 'no matching R-groups')) if self.property_ranges: issues += super().validate(step) return issues
[docs] def getReagentSmarts(self): atoms = self.rgroup_atom_smarts or '*' return f'[Ar]-[{atoms}:1]'
[docs] def getReagentMol(self): return Chem.MolFromSmarts(self.getReagentSmarts())
[docs] def getReactionSmarts(self): return f'{self.getReagentSmarts()}.[#6,#7,#8;h,H1,H2,H3:3]>>[*:1]-[*:3]'
[docs] def getRGroups(self): rgroup_mol = self.getReagentMol() if not rgroup_mol: return [] rgroups = [] if os.path.isfile(self.rgroup_file): supplier = Chem.SmilesMolSupplier(self.rgroup_file, titleLine=0, nameColumn=-1) rgroups = [ m for m in supplier if m is not None and m.HasSubstructMatch(rgroup_mol) ] return synthesis.uniquify_mols(rgroups)
[docs] def validateSettings(self): return self.settings.validate(self)
[docs] def setUp(self): super().setUp() self._seen_smiles = set() self._core_smarts = Chem.MolFromSmarts(self.settings.core_smarts) reaction_smarts = self.settings.getReactionSmarts() self.logger.debug( f'{self.getStepId()}: reaction smarts: {reaction_smarts}') self._reaction = AllChem.ReactionFromSmarts(reaction_smarts) self._rgroups = self.settings.getRGroups() self.logger.info( f'{self.getStepId()}: {len(self._rgroups)} unique R-group(s)')
def _getRGroupsForMol(self, mol): """ Generator yielding R-group reagents that will not violate the required property ranges for the decorated mol. :param mol: the molecule to get the appropriate R-group reagents for :return: collections.abc.Iterable[Chem.Mol] """ for reagent in self._rgroups: for prop, (prop_min, prop_max) in self.settings.property_ranges.items(): mol_prop = filtering.DESCRIPTORS_DICT[prop](mol) reagent_prop = filtering.DESCRIPTORS_DICT[prop](reagent) if prop in ('MolWt', 'MVCorrMW'): reagent_prop -= self.LEAVING_MOL_WT total_prop = mol_prop + reagent_prop if total_prop < prop_min or total_prop > prop_max: break else: yield reagent
[docs] def mapFunction(self, mol): for rgroup in self._getRGroupsForMol(mol): reactants = (rgroup, mol) for product, in self._reaction.RunReactants(reactants): if Chem.SanitizeMol(product, catchErrors=True): continue # silently ignore products that can't be sanitized smiles = Chem.MolToSmiles(product) if smiles not in self._seen_smiles: if product.HasSubstructMatch(self._core_smarts): self._seen_smiles.add(smiles) yield product
[docs]class FragmenterMixin: """ A mixin providing trimmer functionality for fragmenters. """ BOND_SMARTS = NotImplementedError
[docs] def getBreakableBonds(self, mol): """ Return the bond indices in the molecule that are allowed to be broken. :param mol: the molecule to fragment :type mol: Chem.Mol :return: a generator of bonds to break in the molecule :rtype: generator of int """ return (mol.GetBondBetweenAtoms(x, y).GetIdx( ) for x, y in itertools.chain( *[mol.GetSubstructMatches(smarts) for smarts in self.BOND_SMARTS]))
[docs] def fragmentToMolecule(self, fragment): """ Return the molecule version of the fragment. :param fragment: the fragment :type fragment: Chem.Mol :return: the molecule version of the fragment :rtype: Chem.Mol """ return utils.fragment_to_molecule(fragment)
[docs] def isFragmentableMol(self, mol): """ Use this method to determine whether the molecule may be fragmented. :param mol: the molecule to check :type mol: Chem.Mol :return: whether the molecule can/should be further fragmented :rtype: bool """ return True
[docs] def isAcceptableFragment(self, mol): """ Use this method to determine which fragments are acceptable. :param mol: the molecule to check :type mol: Chem.Mol :return: whether the fragment is acceptable :rtype: bool """ return True
def _trimmer(self, mol, seen_smiles=None): """ Recursively trim mol to generate unique fragment molecules. :param mol: Mol to be fragmented. :type mol: Chem.Mol :param seen_smiles: SMILES of molecules that should not be returned. Use NoneType if not :type seen_smiles: set(str) or NoneType :return: generator of trimmed molecules :rtype: generator of Chem.Mol """ seen_smiles = seen_smiles or set() for bond in self.getBreakableBonds(mol): fragments = Chem.FragmentOnBonds(mol, [bond], dummyLabels=[(0, 0)]) for fragment in Chem.GetMolFrags(fragments, asMols=True): molecule = self.fragmentToMolecule(fragment) smiles = Chem.MolToSmiles(molecule) if smiles not in seen_smiles: seen_smiles.add(smiles) if self.isAcceptableFragment(molecule): yield molecule if self.isFragmentableMol(molecule): for frag in self._trimmer(molecule, seen_smiles): yield frag
[docs] def trimmer(self, mol, max_fragments=500): """ Recursively trim the specified mol to generate maximally max_fragments fragment molecules. Returns the input molecule first if it is an acceptable fragment. :param mol: Mol to be fragmented. :type mol: Chem.Mol :param max_fragments: the total number of fragment molecules to generate :type max_fragments: int :return: generator of trimmed molecules :rtype: generator of Chem.Mol """ yield_count = 0 if self.isAcceptableFragment(mol): yield_count += 1 yield mol if self.isFragmentableMol(mol): for fragment_mol in self._trimmer( mol, seen_smiles={Chem.MolToSmiles(mol)}): yield fragment_mol yield_count += 1 if yield_count >= max_fragments: break
[docs]class Fragmenter(filters.MaxMolWtMixin, FragmenterMixin, MolMapStep): """ Recursively fragment molecules. Unless it is filtered due to it's molecular weight or core smarts, this step returns the original molecule first, followed by unique fragment molecules that contain the SMARTS substructure defined in settings and have a molecular weight less than or equal to the optional max_mol_wt setting. The number of molecules returned is limited by max_out in the settings. Fragmentation will only take place for bonds defined in `BOND_SMARTS`. """ BOND_SMARTS = tuple( Chem.MolFromSmarts(smarts) for smarts in [ '[#6;!R]-[!R]', '[!R]-[R]', '[R]!@&-[R]', '[C;R1,R2;r3,r4,r5,r6]-&@[C,N,O,S;R1;r3,r4,r5,r6]' ])
[docs] class Settings(filters.MaxMolWtSettings): core_smarts: str = None max_out: int = 500
[docs] def validateSettings(self): issues = utils.validate_core_smarts(self, self.settings.core_smarts) if self.settings.max_out <= 1: issues.append( stepper.SettingsError(self, 'max_out should be larger than 1')) max_mol_wt_issue = self.validateMaxMolWtSettings() if max_mol_wt_issue: issues.append(max_mol_wt_issue) return issues
[docs] def setUp(self): super().setUp() self._core_smarts = Chem.MolFromSmarts(self.settings.core_smarts) self._need_Hs = None
[docs] @functools.lru_cache() def hasCoreSmarts(self, mol): if self._need_Hs: mol = Chem.AddHs(mol) return mol.HasSubstructMatch(self._core_smarts)
[docs] def isAcceptableFragment(self, mol): return self.hasAcceptableMolWt(mol) and self.hasCoreSmarts(mol)
[docs] def isFragmentableMol(self, mol): # only fragments with the core smarts can be further fragmented return self.hasCoreSmarts(mol)
[docs] def getBreakableBonds(self, mol): # only breakable bonds that are not in the core may be broken all_breakable_bonds = set(super().getBreakableBonds(mol)) breakable_core_bonds = set(self.getBreakableCoreBonds(mol)) return sorted(all_breakable_bonds - breakable_core_bonds)
[docs] def getBreakableCoreBonds(self, mol): """ Return the set of breakable core bond indices in the molecule. :param mol: the molecule to fragment :type mol: Chem.Mol :return: a generator of breakable core bond indices in the molecule :rtype: generator of int """ if self._need_Hs: mol = Chem.AddHs(mol) core_atoms = mol.GetSubstructMatch(self._core_smarts) for x, y in itertools.combinations(core_atoms, 2): if mol.GetBondBetweenAtoms(x, y) is not None: yield mol.GetBondBetweenAtoms(x, y).GetIdx()
[docs] def mapFunction(self, mol): # determine whether we need to add Hs to get core smarts matching if self._need_Hs is None: self._need_Hs = utils.need_Hs(mol, self._core_smarts) if self._need_Hs is None: # wasn't able to match return yield from self.trimmer(mol, max_fragments=self.settings.max_out)