Source code for schrodinger.application.pathfinder.synthesis

"""
A module to support reaction enumeration.

Examples::

    # combinatorial enumeration from a route file
    route = route.read_route('route.json')
    synthesizer = Synthesizer(route)
    # Assuming acids and amines are iterables with the reactants...
    for product in synthesizer.synthesizeCombinations([acids, amines]):
        print Chem.MolToSmiles(product)

"""

import collections
import copy
import csv
import functools
import itertools
import json
import os
import random
import re
from functools import reduce
from operator import mul

from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem.Descriptors import MolWt

from schrodinger import structure
from schrodinger.application.pathfinder import molio
from schrodinger.application.pathfinder import reaction as reaction_mod
from schrodinger.application.pathfinder import route as route_mod
from schrodinger.thirdparty import rdkit_adapter
from schrodinger.utils import fileutils
from schrodinger.utils import log

logger = log.get_output_logger("pathfinder")

TITLE_PROP = 's_m_title'
ATOM_LABEL_PROP = 'i_reaction_atom_label'
PRODUCT_PROP = 'b_reaction_product'

H = Chem.MolFromSmarts('[H]')


[docs]class SystematicIterator: """ Generate all combinations (or a slice thereof) of products that can be synthesized by a Synthesizer using the given sets of starting materials. """
[docs] def __init__(self, synthesizer, reagent_sources, start=0, stop=None, dedup=True): """ :param synthesizer: Synthesizer :type synthesizer: Synthesizer :param reagent_sources: List of reagent sources. :type reagent_sources: iterable of ReagentSource or iterable of list of Mol. :param start: Start yielding from this reagent combination index (counting from zero). :type start: int :param stop: Stop yielding when this product reagent combination index is reached. None means unlimited. :type stop: int or NoneType :param dedup: skip duplicate products :type dedup: bool """ def get_iter(): nonlocal reagent_sources try: check_frozen = any(r.has_frozen_atoms for r in reagent_sources) reagent_sources = [r.mols for r in reagent_sources] except AttributeError: check_frozen = False ranges = [range(len(source)) for source in reagent_sources] cartesian_product = itertools.product(*ranges) combinations = itertools.islice(cartesian_product, start, stop) prods = set() self.ntries = 0 for idxs in combinations: self.ntries += 1 starting_materials = [ source[i] for source, i in zip(reagent_sources, idxs) ] if not all(starting_materials): continue # skip false mols which come from parsing errors for product in synthesizer.run(starting_materials, check_frozen): smiles = Chem.MolToSmiles(product) if not dedup: yield product elif smiles not in prods: prods.add(smiles) yield product self.ntries = 0 self.route = synthesizer.route self._iter = get_iter()
def __iter__(self): return self._iter def __next__(self): return next(self._iter)
[docs]class RandomSampleIterator: """ Generate a random sample of unique products by repeatedly choosing a random reagent from each reagent source and keeping track of which products have been seen so far. Therefore, it is most efficient when only a fraction of the possible space of reagent combinations is sampled, because otherwise there may be many unproductive duplicates. Also, since it keeps the set of products seen so far, it uses memory proportional to 'size', so it is best if that number is not too large (thousands is fine). """
[docs] def __init__(self, synthesizer, reagent_sources, size, max_tries=None, dedup=True, prng=random): """ :param synthesizer: Synthesizer :type synthesizer: Synthesizer :param reagent_sources: List of reagent sources :type reagent_sources: iterable of ReagentSource or iterable of list of Mol. :param: size: desired sample size (number of products). :type size: int :param max_tries: maximum number of random attempts to make before giving up, even if not enough products have been synthesized. The default is a function of 'size' and the number of reactant combinations. :type max_tries: int :param dedup: skip duplicate products :type dedup: bool :param prng: pseudo-random number generator :type prng: random.Random """ def get_iter(): if self.ncombs == 0: return prods = set() nprods = 0 while nprods < size and self.ntries < self.max_tries: starting_materials = self._pickReagents() if starting_materials is None: break # Couldn't find any good combination. self.npass_filters += 1 for product in synthesizer.run(starting_materials, check_frozen): smiles = Chem.MolToSmiles(product) if not dedup: nprods += 1 yield product elif smiles not in prods: nprods += 1 prods.add(smiles) yield product self.prng = prng self.ntries = 0 self.npass_filters = 0 self.tries_per_source = [0] * len(reagent_sources) self.route = synthesizer.route try: self.mol_lists = [r.mols for r in reagent_sources] self.filter_lists = [r.filters for r in reagent_sources] check_frozen = any(r.has_frozen_atoms for r in reagent_sources) except AttributeError as e: self.mol_lists = reagent_sources self.filter_lists = [[] for s in reagent_sources] check_frozen = False self.ncombs = reduce(mul, (len(r) for r in self.mol_lists), 1) self.max_tries = max_tries if max_tries is None: self.max_tries = min(size * 100, self.ncombs * 2) self._iter = get_iter()
def __iter__(self): return self._iter def __next__(self): return next(self._iter) def _pickReagents(self): """ Find a random combination of reactants that pass the filters. If none can be found within the alloted number of attempts, return None. :return: reactants :rtype: list of Mol or None """ ret = [] self.ntries += 1 for i, mols, filts in zip(itertools.count(), self.mol_lists, self.filter_lists): while self.ntries < self.max_tries: mol = self._pickMol(mols, filts) self.tries_per_source[i] += 1 if mol: ret.append(mol) break self.ntries += 1 else: return None return ret def _pickMol(self, mols, filters): """ Pick a random molecule from a list and apply the filters. If any of the filters don't pass, return None. :param mols: molecules to pick from :type mols: list of Mol :param filters: filters to apply. Each is a callable taking an iterable of Mol and yielding Mol. :type filters: list of callable :returns: selected Mol or None :rtype: Mol or None """ mol = self.prng.choice(mols) if not mol: return None # false mols which come from parsing errors passed = [mol] for filt in filters: passed = list(filt(passed)) if not passed: return None return passed[0]
[docs]class Synthesizer(object): """ A Synthesizer is a "machine" that given a RouteNode, knows how to apply all the reactions in the RouteNode in the right order to a list of starting materials to give the final products. """
[docs] def __init__(self, route, id_property=TITLE_PROP, allow_multiple_products=False, ncycles=None, keep_labels=False, failed_reaction_handle=None): """ Create a synthesizer from a route. The value of id_property is propagated from the starting materials to the product, to make it possible to keep track of where a product came from. :param route: synthetic route to apply :type route: RouteNode :param id_property: name of the property that gets propagated to the product for identification purposes :type id_property: str :param allow_multiple_products: if True, the synthesis won't fail if a reaction generates more than one product :type allow_multiple_products: bool :param ncycles: how many times to try to apply each reaction; when multiple substitution or polymerization is desired, use ncycles > 1. The default is to use the ncycles specified by each reaction node, or 1 if the node doesn't specify it either. :type ncycles: int or None :param keep_labels: keep the frozen atom labels? If so, they are added to the i_reaction_atom_label property. :type keep_labels: bool """ self._sm_counter = itertools.count() self.instructions = [] self._compile(route) self.id_property = id_property self.route = route self.ncycles = ncycles self.allow_multiple_products = allow_multiple_products self.keep_labels = keep_labels self.extra_props = collections.defaultdict(dict) self.reaction_failure_tracker = None if failed_reaction_handle is not None: self.reaction_failure_tracker = ReactionFailureTracker( failed_reaction_handle)
def _compile(self, route): """ Turn the route tree into a list of instructions for the stack-based machine implemented in the run() method. """ if route.isStartingMaterial(): self.instructions.append(next(self._sm_counter)) else: rxn_inst = route.reaction_instance for precursor in route.precursors: self._compile(precursor) self.instructions.append((rxn_inst.reaction.inverse(), route.ncycles, route.fallback_reagent))
[docs] def run(self, starting_materials, check_frozen=False): """ Return the final product of the RouteNode given a list of starting materials. :type starting_materials: iterable of Mol :param check_frozen: if True, only return products for which all frozen atoms from the reactants are still present in the product. This also clears the frozen atom info (currently encoded as isotopes) from the atoms. :type check_frozen: bool :return: list of products (may be empty) :rtype: list of Mol """ reaction_mod.debug_mols('Synthesize from ', starting_materials) self.has_nulls = any(is_null(mol) for mol in starting_materials) stack = [] for step in self.instructions: if isinstance(step, tuple): reaction, rxn_ncycles, fallback_reagent = step ncycles = self.ncycles or rxn_ncycles or 1 reactants_lists = self._popReactants(reaction, stack) products = [] for reactants in itertools.product(*reactants_lists): products += self._multiApplyReaction( reaction, reactants, ncycles, fallback_reagent) reaction_mod.debug_mols("pushing products: ", products) stack.append(products) if not products: break else: stack.append([starting_materials[step]]) products = stack.pop() if check_frozen: products = self._checkFrozen(products, starting_materials) products = uniquify_mols(products) self._setProductProperties(products, starting_materials) return products
def _popReactants(self, reaction, stack): n = reaction.rxn.GetNumReactantTemplates() reactants = stack[-n:] del stack[-n:] return reactants def _multiApplyReaction(self, reaction, reactants, ncycles, fallback_reagent): """ Apply a reaction up to `ncycles` times and return the products from the last cycle that yielded products. Each cycle takes each of the products from the previous cycle and tries to reuse it as each of the reactants. For example, if the first cycle was OCC(C)O + CBr -> COCC(C)O + OCC(C)OC the second cycle would try to do COCC(C)O + CBr -> COCC(C)OC OCC(C)OC + CBr -> COCC(C)OC COCC(C)O + OCC(C)O + -> fail OCC(C)OC + OCC(C)O + -> fail (Multiple paths lead to the same polysubstituted product, but they get deduplicated.) When the reaction fails to generate any products, if fallback_reagent is not None, the reactant with that index (1-based) is considered to be the product. This is used when a route has an optional step (try the reaction, but don't mind if it fails). """ products = self._applyReaction(reaction, reactants) products = uniquify_mols(products) for cycle in range(ncycles - 1): new_products = [] for product in products: for reactant_idx in range(len(reactants)): curr_reactants = list(reactants) curr_reactants[reactant_idx] = product new_products += self._applyReaction(reaction, curr_reactants) if new_products: # Uniquify here to reduce the combinatorial explosion due to # many paths leading to the same products. products = uniquify_mols(new_products) else: break if not products: # When products were not generated because the reaction didn't # apply, turn the "fallback reactant", if any, into the product. # We deliberately don't do this when the reaction is failed due # to generating multiple products when allow_multiple_products is # False. if fallback_reagent: # Copy because reactant is owned by caller. product = copy.deepcopy(reactants[fallback_reagent - 1]) products = [product] logger.debug(" optional %s failed" % reaction) else: logger.debug(" %s failed" % reaction) elif len(products) > 1 and not self.allow_multiple_products: logger.debug(" %s produces multiple products" % reaction) reaction_mod.debug_mols(" reactants: ", reactants) reaction_mod.debug_mols(" products: ", products) products = [] # synthesis failed due to selectivity issues if not products and self.reaction_failure_tracker is not None: self.reaction_failure_tracker.writeFailure(reaction, reactants) return products def _applyReaction(self, reaction, reactants): products_lists = reaction.apply(reactants) # flatten products_lists because synthetic reactions have a # a single product on the RHS. products = [p[0] for p in products_lists] if not products and self.has_nulls: products = self._applyNull(reactants) for product in products: product.SetBoolProp(PRODUCT_PROP, True) return products def _applyNull(self, reactants): """ Apply a "reaction" using special rules for null reagents. The reaction is considered successful in any of these cases: - all the reactants but one are null; - exactly one reactant is a product from a previous step (i.e., not a starting material); or - all the reactants are null. The qualifying reactant becomes the product of the reaction. (In the last case, we just pick the first null, under the assumption that all nulls are the same.) """ nulls = [] products = [] starting_materials = [] for mol in reactants: if is_null(mol): nulls.append(mol) elif mol.HasProp(PRODUCT_PROP) and mol.GetProp(PRODUCT_PROP): products.append(mol) else: starting_materials.append(mol) non_nulls = starting_materials + products if nulls and len(non_nulls) == 1: ret = non_nulls elif len(products) == 1: ret = products elif len(reactants) == len(nulls): ret = nulls[:1] else: ret = [] # Copy because starting materials are owned by the caller and we # may modify them. return copy.deepcopy(ret)
[docs] def synthesizeCombinations(self, reagent_sources, **kwargs): """ Create a SystematicIterator using the Synthesizer and the other arguments. See SystematicIterator for details. (This method is redundant but was kept for backward compatibility.) """ return SystematicIterator(self, reagent_sources, **kwargs)
[docs] def synthesizeRandomSample(self, reagent_sources, size, **kwargs): """ Crate a RandomSampleIterator using the Synthesizer and the other arguments. See RandomSampleIterator for details. (This method is redundant but was kept for backward compatibility.) """ return RandomSampleIterator(self, reagent_sources, size, **kwargs)
def _appendExtraProperties(self, product): product_key = self._MolToKey(product) if product_key in self.extra_props: for prop_name, prop_value in self.extra_props[product_key].items(): product.SetProp(prop_name, prop_value) def _setExtraProductProperties(self, product, prop_name, prop_value): product_key = self._MolToKey(product) self.extra_props[product_key][prop_name] = prop_value def _MolToKey(self, product): return id(product) def _setProductProperties(self, products, starting_materials): for product_idx, product in enumerate(products, 1): self._clearProductProperties(product) reagent_ids = [self._getReagentId(sm) for sm in starting_materials] for i, reagent_id in enumerate(reagent_ids, 1): prop = 's_reaction_reagent_%d' % i product.SetProp(prop, reagent_id) title = ' + '.join(reagent_ids) title += f' [{product_idx}/{len(products)}]' product.SetProp(TITLE_PROP, title) product.SetProp('_Name', title) # used by RDKit file writers product.SetIntProp('i_reaction_num_reagents', len(starting_materials)) product.SetIntProp('i_reaction_num_products', len(products)) self._addReagentProperties(product, starting_materials) # add extra product properties self._appendExtraProperties(product) def _addReagentProperties(self, product, starting_materials): for sm_idx, sm in enumerate(starting_materials, 1): try: # Reactants labelled by ReagentNode.getLabeledMols() have a # CXSMILES property based on the original reactant before # applying the labels. cxsmiles = sm.GetProp(route_mod.CXSMILES_PROP) except KeyError: cxsmiles = Chem.MolToCXSmiles(sm) product.SetProp(f's_reaction_reagent_{sm_idx}_cxsmiles', cxsmiles) props = rdkit_adapter.translate_rdkit_props_dict( sm.GetPropsAsDict()) for name, value in props.items(): typechar = name[0] new_name = '%s_reaction_reagent_%d_%s' % (typechar, sm_idx, name) set_prop(product, new_name, value) def _checkFrozen(self, products, starting_materials): """ Check that: 1) All frozen atoms present in the starting materials are present in the product. 2) All bonds between frozen atoms in the starting materials are present in the product. 3) No new bonds invovling innerfrozen atoms are present in the product. :param products: products to check :type products: iterable of Mol :param starting_materials: mols used to synthesize the products :type products: iterable of Mol :return: products that respect the frozen atom rules above :rtype: generator of Mol """ sm_bonds = set() sm_atoms = set() for mol in starting_materials: for atom in mol.GetAtoms(): iso = atom.GetIsotope() if iso: sm_atoms.add(iso) for iso1, iso2 in get_bond_labels(mol): if iso1 and iso2: sm_bonds.add(tuple(sorted([iso1, iso2]))) for i, product in enumerate(products, 1): product_bonds = set() product_atoms = set() forbidden_bonds = set() frozen_info = {} for iso1, iso2 in get_bond_labels(product): innerfrozen1 = iso1 and iso1 > route_mod.INNERFROZEN_OFFSET innerfrozen2 = iso2 and iso2 > route_mod.INNERFROZEN_OFFSET if iso1 and iso2: product_bonds.add(tuple(sorted([iso1, iso2]))) elif innerfrozen1 or innerfrozen2: forbidden_bonds.add(iso1 or iso2) for atom in product.GetAtoms(): iso = atom.GetIsotope() if iso: product_atoms.add(iso) atom.SetIsotope(0) if self.keep_labels: atom.SetIntProp(ATOM_LABEL_PROP, iso) if iso > route_mod.FROZEN_FACTOR: frozen_attr = route_mod._decode_frozen_props(iso) source_index = frozen_attr['source_index'] reagent_index = frozen_attr['reagent_index'] alignment_key = str((reagent_index, source_index)) # If the route has different reagent indices, map the # (reagent_index, source_index) combination to the target # index. if self.route.target_atom_index_map: alignment_key = self.route.target_atom_index_map[ alignment_key] atom_prop = {'alignment_key': alignment_key} frozen_info[atom.GetIdx() + 1] = atom_prop missing_bonds = sm_bonds - product_bonds missing_atoms = sm_atoms - product_atoms if missing_bonds: logger.debug( "Skipping product %d due to missing frozen bonds: %s", i, missing_bonds) if missing_atoms: logger.debug( "Skipping product %d due to missing frozen atoms: %s", i, missing_atoms) if forbidden_bonds: logger.debug( "Skipping product %d due to new bonds to innerfrozen " "atoms: %s", i, forbidden_bonds) if not (missing_atoms or missing_bonds or forbidden_bonds): if frozen_info: self._setExtraProductProperties(product, 's_reaction_frozen_atoms', json.dumps(frozen_info)) yield product def _clearProductProperties(self, mol): for prop in mol.GetPropNames(): if re.match(r'^._reaction_', prop): mol.ClearProp(prop) def _getReagentId(self, mol): return reaction_mod.get_title(mol, self.id_property) or '<none>'
[docs]def get_bond_labels(mol): """ Yield the "labels" (isotopes) from the two atoms in each bond """ for bond in mol.GetBonds(): a1 = bond.GetBeginAtom() a2 = bond.GetEndAtom() yield a1.GetIsotope(), a2.GetIsotope()
[docs]class ReagentSource: """ A ReagentSource is just a struct-like object with the following fields: - mols: a generator or list of Mol objects; - size: (int) the number of molecules in mols; - filename: (str) file where the mols came from (or as a special case, "SMILES" if they came from the SMILES list hard-coded into the route); - reagent_class: (str) name of the reagent class. - index: the 1-based index that identifies the starting material in a route. - filters: a list of filters to apply to this reagent source. Each filter is a callable that takes an iterable of Mol and generating Mol. - has_frozen_atoms: if true, the molecules have frozen atoms """
[docs] def __init__(self, mols, size, filename, reagent_class, index, has_frozen_atoms=False): self.mols = mols self.size = size self.filename = filename self.reagent_class = str(reagent_class) self.index = index self.filters = [] self.has_frozen_atoms = has_frozen_atoms
[docs]def get_reagent_sources(route, r_dict=None, libpath=None): """ Return a list of reagent sources given a route and, optionally, a dictionary mapping reagent index to filename/SMILES (e.g,, {1: 'foo.smi', 2: 'CCO'}). The special filename '' (empty string) means use SMILES associated with the node. Values in r_dict take precedence, followed by filename associated with the node, reagent class associated with the node, and SMILES associated with the node. :type r_dict: dict {int: str} :param libpath: list of directories to prepend to the standard reagent library search path :type libpath: list of str :return: reagent sources :rtype: list of ReagentSource :raises: ValueError when no file is found in the search path for a given reagent class or there is neither reagent class nor SMILES. """ sources = [] r_dict = r_dict or {} for index, sm in enumerate(route.getStartingNodes(), 1): source = sm.getReagentSource(r_dict.get(index), libpath) if source != route_mod.SMILES: mols = molio.get_mol_reader(source, skip_bad=False) try: size = len(mols) except TypeError: size = structure.count_structures(source) else: route.updateTargetIndexMap(sm.getTargetIndexMap(index)) mols = sm.getLabeledMols(index) size = len(mols) sources.append( ReagentSource(mols, size, source, sm.reagent_class, index, has_frozen_atoms=bool(sm.frozen_atoms))) return sources
[docs]def get_one_step_route(rxn): """ Create a route for a one-step synthesis given a Reaction. :param rxn: reaction object :type rxn: Reaction :return: new route object :rtype: RouteNode """ route = route_mod.RouteNode() precursors = [] for cls in rxn.rhs_classes: prec = route_mod.ReagentNode(reagent_class=cls) precursors.append(prec) route.setReactionInstance(rxn, precursors) return route
[docs]def uniquify_mols(mols): """ Return a list of unique molecules from 'mols', using the canonical SMILES as the comparison key. :param mols: molecules to uniquify :type mols: iterable of Mol :return: unique molecules :rtype: list of Mol """ unique_mols = [] seen = set() for mol in mols: smiles = Chem.MolToSmiles(mol) if smiles in seen: continue seen.add(smiles) unique_mols.append(mol) return unique_mols
[docs]def desalt_mol(mol): """ Desalt molecule (keep first largest fragment). :param mol: molecule to desalt :type mol: rdkit.Chem.Mol :return: desalted molecule (original is not modified) :rtype: rdkit.Chem.Mol """ frags = Chem.GetMolFrags(mol, asMols=True, sanitizeFrags=True) if len(frags) <= 1: return mol else: desalted = max(frags, key=lambda f: (f.GetNumHeavyAtoms(), MolWt(f))) for name in mol.GetPropNames(includePrivate=True): desalted.SetProp(name, mol.GetProp(name)) return desalted
[docs]@functools.lru_cache() def get_neutralization_patterns(): """ Return the default neturalization patterns. :return: list of (query, replacement) :rtype: [(Mol, Mol)] """ # Patterns taken from # https://www.rdkit.org/docs/Cookbook.html#neutralizing-charged-molecules patts = ( # (SMARTS query, SMILES replacement) # Imidazoles ('[n+;H]', 'n'), # Amines ('[N+;!H0]', 'N'), # Carboxylic acids and alcohols ('[$([O-]);!$([O-][#7])]', 'O'), # Thiols ('[S-;X1]', 'S'), # Sulfonamides ('[$([N-;X2]S(=O)=O)]', 'N'), # Enamines ('[$([N-;X2][C,N]=C)]', 'N'), # Tetrazoles ('[n-]', '[nH]'), # Sulfoxides ('[$([S-]=O)]', 'S'), # Amides ('[$([N-]C=O)]', 'N'), ) return [(Chem.MolFromSmarts(smarts), Chem.MolFromSmiles(smiles, False)) for smarts, smiles in patts]
[docs]def neutralize_mol(mol): """ Neutralize a molecule. :param mol: molecule to neutralize :type mol: rdkit.Chem.Mol :return: neutralized molecule (original is not modified) :rtype: rdkit.Chem.Mol """ replaced = False for reactant, product in get_neutralization_patterns(): while mol.HasSubstructMatch(reactant): replaced = True mol = AllChem.ReplaceSubstructs(mol, reactant, product)[0] if replaced: # This is needed to make sure the substructure search in # create_reagent_library works properly. Chem.SanitizeMol(mol) return mol
[docs]def create_reagent_library(name, input_file, smarts, id_property=TITLE_PROP, properties=None, require_single_match=False, pfx=False, add=False, dedup=True, desalt=False, neutralize=False): """ Create a reagent library file for one reagent class by searching through an input file using the provided SMARTS patterns. Writes a file called <name>.csv or <name>.pfx, depending on the value of the `pfx` flag. A structure is selected for the library when it matches ANY of the specified patterns. :param name: name of the reagent class. :type name: str :param input_file: name of structure file to process :type input_file: str :param smarts: SMARTS pattern (or patterns) to search :type smarts: str or list of str :param id_property: name of structure property to store as the title of each reagent. :type id_property: str :param properties: properties to store in reaction file. None means "use all properties from the first structure". :type properties: (list of str) or None :param require_single_match: skip input structures matching the SMARTS pattern multiple times :type require_single_match: bool :param pfx: write pfx library file :type pfx: bool :param add: add new structures to output file if it exists, instead of clobbering :type add: bool :param dedup: skip duplicate compunds, identified by canonical SMILES :type dedup: bool :param desalt: keep first largest molecule. This is done _before_ doing the the SMARTS query. :type desalt: bool :param neutralize: neutralize the reactants. This is done _before_ doing the the SMARTS query. :type neutralize: bool :return: output filename :rtype: str """ def _write_mol(mol): if dedup: smiles = Chem.MolToSmiles(mol) if smiles in seen: return seen.add(smiles) try: title = mol.GetProp(id_property) except KeyError: try: title = mol.GetProp('_Name') except KeyError: title = f'{name}_{writer.written_count}' mol.SetProp('_Name', title) writer.append(mol) seen = set() if isinstance(smarts, str): smarts = [smarts] patts = [Chem.MolFromSmarts(s) for s in smarts] explicit_h = any(patt.HasSubstructMatch(H) for patt in patts) output_file = name output_file += molio.PFX if pfx else '.csv' writer_class = molio.PfxMolWriter if pfx else molio.CsvMolWriter if add and os.path.isfile(output_file): tmpout = '.tmp' + output_file writer = writer_class(tmpout, properties=properties) with molio.get_mol_reader(output_file, random_access=False) as reader: for mol in reader: _write_mol(mol) old = writer.written_count else: writer = writer_class(output_file, properties=properties) tmpout = None with molio.get_mol_reader(input_file, random_access=False) as reader: for mol in reader: if desalt: mol = desalt_mol(mol) if neutralize: mol = neutralize_mol(mol) match_mol = Chem.AddHs(mol) if explicit_h else mol for patt in patts: if require_single_match: keep = (1 == len( match_mol.GetSubstructMatches(patt, maxMatches=2))) else: keep = match_mol.HasSubstructMatch(patt) if keep: _write_mol(mol) break writer.close() total = writer.written_count if tmpout: fileutils.force_rename(tmpout, output_file) new = total - old logger.info(f'Wrote {total} structures ({new} new, {old} old) ' f'to {output_file}') else: logger.info(f'Wrote {total} structures to {output_file}') return output_file
[docs]def set_prop(mol, name, value): """ Set a property for a Mol object, with the property type deduced from the name, which should follow m2io conventions. :param mol: molecule to modify :type mol: rdkit.Chem.Mol :param name: property name following m2io convention :type name: str :param value: property value :type value: int, bool, float, or str, depending on `name` """ if name[0] == 'b': mol.SetBoolProp(name, value) elif name[0] == 'i': mol.SetIntProp(name, value) elif name[0] == 'r': mol.SetDoubleProp(name, value) else: mol.SetProp(name, value)
[docs]def is_null(mol): """ Check if a molecule is considered a null reagent. The convention is that null reagents are molecules consisting only of a dummy atom. :param mol: molecule to test :type mol: rdkit.Chem.Mol :return: is the molecule a null reagent? :rtype: bool """ return mol.GetNumAtoms() == 1 and mol.GetAtomWithIdx(0).GetAtomicNum() == 0
[docs]class ReactionFailureTracker: """ Helper class to track failed reactions, and print them out to a csv for easy debugging. It currently also tracks total failures recorded, and is intended to be a convenience class to track and report other statistics. """
[docs] def __init__(self, output_handle): """ :param output_handle: file handle for output :type output_handle: _io.TextIOWrapper """ self.num_failures = 0 self.writer = csv.writer(output_handle, delimiter=',')
[docs] def writeFailure(self, reaction, reactants): """ Write failures into the output file handle provided at construction :param reaction: reaction type :type reaction: reaction.Reaction :param reactants: list of reactants :type reactants: list(rdkit.Chem.Mol) """ reactant_smiles = [Chem.MolToSmiles(reactant) for reactant in reactants] self.writer.writerow([str(reaction)] + reactant_smiles) self.num_failures += 1