Source code for schrodinger.application.pathfinder.multiroute

"""
Functions to support multi-route enumeration (AKA "simple reaction enumeration"
or "automated reaction enumeration").
"""

import contextlib
import functools
import hashlib
import itertools
import math
import os
import random
import sys
import tempfile
import time
import zlib

import networkx as nx
import numpy as np
import requests
from rdkit import Chem

from schrodinger import structure
from schrodinger.application.combinatorial_screen import fingerprint_utils
from schrodinger.application.pathfinder import aligner
from schrodinger.application.pathfinder import analysis
from schrodinger.application.pathfinder import filtering
from schrodinger.application.pathfinder import molio
from schrodinger.application.pathfinder import reaction
from schrodinger.application.pathfinder import synthesis
from schrodinger.job import jobcontrol
from schrodinger.structutils import analyze
from schrodinger.structutils import transform
from schrodinger.thirdparty import rdkit_adapter
from schrodinger.utils import fileutils
from schrodinger.utils import log

logger = log.get_output_logger('pathfinder')

# Special reagent class for core hopping.
CORES = reaction.ReagentClass('cores')

ROUTE_ID_PROP = 'i_reaction_route_index'
ROUTE_PROP = 's_reaction_route'

# Maximum analysis depth to consider when the user doesn't specify a depth.
AUTO_MAX_DEPTH = 5

# Other defaults
DEFAULT_MAX_ROUTES = 100
DEFAULT_MAX_PER_ROUTE = 1000

# How many bytes to read at a time when downloading / hashing a file.
CHUNK_SIZE = 65536


[docs]class MultiRouteEnumerator: """ A generator of products following the multiroute enumeration protocol. """
[docs] def __init__(self, mol, reactions_dict, *, dedup=True, depth=None, descriptors=filtering.DEFAULT_DESCRIPTORS, frozen_atoms=frozenset(), libpath=None, max_per_route=DEFAULT_MAX_PER_ROUTE, max_routes=DEFAULT_MAX_ROUTES, no_core_hopping=False, product_property_filter_file=None, product_smarts_filter_file=None, ref_mols=None, ch_dist_tol=1.0, ch_ang_tol=15.0, bond_reactions=None, prefilter_reagents=None, fp_dir=None, fp_url=None, fp_namespace=None, forward=False, prng=random, route_prng=None, **unused_args): """ :param mol: input molecule to be used for the initial retrosynthetic or forward analysis. :type mol: rdkit.Chem.Mol :param dedup: skip duplicate products (using SMILES for comparison) :type dedup: bool :param depth: analysis depth (if None, increasing depths will be attempted until enough routes are found) :type depth: int or NoneType :param descriptors: names of RDKit descriptors to compute for each product :type descriptors: list of str :param frozen_atoms: indexes (1-based) of atoms to keep in the product :type frozen_atoms: set of int :param libpath: directories to search for reactant files :type libpath: list of str :param max_per_route: maximum number of products per route :type max_per_route: int :param max_routes: maximum number of routes to sample :type max_routes: int :param no_core_hopping: don't use the special core hopping mode even when possible :type no_core_hopping: bool :param product_property_filter_file: name of JSON file with product property filters :type product_property_filter_file: str :param product_smarts_filter_file: name of .cflt file with SMARTS patterns :type product_smarts_filter_file: str :param ref_mols: reference molecules for similarity calculations :type ref_mols: list(Mol) or str :param ch_dist_tol: core-hopping distance tolerance in Angstroms (maximum allowed change in the distance between side chains, relative to the input structure) :type ch_dist_tol: float :param ch_ang_tol: core-hopping angle tolerance in degrees (maximum change bond vector angle for side chains, relative to the input structure) :type ch_ang_tol: float :param bond_reactions: dict specifying which reactions are allowed to break certain bonds. Keys are tuples of two ints (sorted atom indexes); values are sets of reaction names. :param bond_reactions: {(int, int): set(str)} :param prefilter_reagents: number of most similar molecules to return. :type prefilter_reagents: int or NoneType :param fp_dir: directory to search for fingerprint files in addition to CWD :type fp_dir: str or NoneType :param forward: use forward analysis mode (routes start from `mol` instead of ending there) :type forward: bool :param prng: pseudo-random number generator to be used for the enumeration, for picking the route and reactants to try at each iteration :type prng: random.Random :param route_prng: pseudo-random number generator to be used for selecting the subset of routes to use, based on max_routes. If not supplied, `prng` will be used. :type route_prng: random.Random or NoneType """ self.mol = mol self.reactions_dict = reactions_dict or reaction.get_reactions() self.dedup = dedup self.depth = depth self.descriptors = descriptors self.forward = forward self.frozen_atoms = frozen_atoms self.libpath = libpath self.max_per_route = max_per_route self.max_routes = max_routes self.no_core_hopping = no_core_hopping self.product_property_filter_file = product_property_filter_file self.product_smarts_filter_file = product_smarts_filter_file self.ch_dist_tol = ch_dist_tol self.ch_ang_tol = ch_ang_tol self.bond_reactions = bond_reactions self.prefilter_reagents = prefilter_reagents self.fp_dir = fp_dir self.fp_url = fp_url self.fp_namespace = fp_namespace self.prng = prng self.route_prng = route_prng or prng if isinstance(ref_mols, str): self.ref_mols = list(molio.get_mol_reader(ref_mols)) self.ref_mol = self.ref_mols[0] else: self.ref_mols = ref_mols self.ref_mol = ref_mols[0] if ref_mols else None if self.no_core_hopping: self.ref_measurements, self.core_atoms, self.core_neighbors = ( {}, set(), set()) else: self.ref_measurements, self.core_atoms, self.core_neighbors = \ analyze_frozen_atoms(mol, frozen_atoms) self.frozen_atom_maps = []
[docs] def generate_mols(self): """ :rtype: generator of Mol """ samples = self._generate_samples() unfiltered_products = meta_sample(samples, self.dedup, prng=self.prng) products = filtering.add_filters( unfiltered_products, ref_mols=self.ref_mols, property_filter_file=self.product_property_filter_file, smarts_filter_file=self.product_smarts_filter_file, descriptors=self.descriptors) products = self._apply_core_hopping_filters(products) return products
def _apply_core_hopping_filters(self, products): """ Generator to filter `products` to exclude those in which any measurement of distance and angles between side chains differs too much from the reference measurements. (If there are no reference measurements, all products pass). :param products: products to filter :type products: iterator of rdkit.Chem.Mol :return: molecules meeting the geometric criteria :rtype: generator of rdkit.Chem.Mol """ for mol in products: ok = True if self.ref_measurements: edge_atoms = self._get_edge_atoms(mol) # 1-based st = st_from_mol(mol) for (r1, tgt_r1), (r2, tgt_r2) in itertools.combinations(edge_atoms, 2): path = Chem.GetShortestPath(mol, r1 - 1, r2 - 1) # 0-based if len(path) < 3: continue c1, c2 = path[1] + 1, path[-2] + 1 dist, angle = measure_vectors(st, r1, c1, r2, c2) key = tuple(sorted([tgt_r1, tgt_r2])) logger.debug('prod: %.2f, %.1f' % (dist, angle)) ref_dist, ref_angle = self.ref_measurements[key] logger.debug('ref: %.2f, %.1f' % (ref_dist, ref_angle)) if (abs(dist - ref_dist) > self.ch_dist_tol or abs(angle - ref_angle) > self.ch_ang_tol): ok = False break if ok: yield mol def _get_edge_atoms(self, mol): """ Return a list of tuples (index, ref_index) for atoms which were labeled as attachment atoms and came from frozen components. :param mol: molecule :type mol: rdkit.Chem.Mol :return: list of (index, ref_index), both 1-based, where "index" refers to `mol` and "ref_index" is the index of the corresponding atom in the input molecule. :rtype: list of (int, int) """ atoms = [] route_idx = mol.GetIntProp(ROUTE_ID_PROP) for atom in mol.GetAtoms(): try: label = atom.GetIntProp(synthesis.ATOM_LABEL_PROP) except KeyError: continue try: ref_idx = self.frozen_atom_maps[route_idx][label] except KeyError: # Not all frozen atoms are edge atoms, so skip. continue atoms.append((atom.GetIdx() + 1, ref_idx)) return atoms def _getRetrosyntheticRoutes(self): """ Perform a retrosynthetic analysis of `self.mol` and return routes. """ logger.info('Performing retrosynthetic analysis...') if self.depth is not None: depth_range = [self.depth] else: depth_range = range(1, AUTO_MAX_DEPTH + 1) prev_nroutes = -1 for curr_depth in depth_range: tree = analysis.retrosynthesize(self.mol, self.reactions_dict, max_depth=curr_depth, frozen_atoms=self.frozen_atoms, label_attachment_atoms=True, broken_bonds={}) routes = [ r for r in tree.getRoutes(bond_reactions=self.bond_reactions) if has_variable_reactants(r) ] nroutes = len(routes) logger.info(f'{nroutes} routes found with depth <= {curr_depth}.') if nroutes == prev_nroutes or nroutes >= self.max_routes: break prev_nroutes = nroutes return routes def _getSyntheticRoutes(self): """ Perform a "forward analysis" of `self.mol` and return routes. """ logger.info('Performing forward analysis...') transformer = analysis.Transformer(self.reactions_dict) transform_routes = transformer.apply(self.mol, self.depth) logger.info('Functional group transformations generated ' f'{len(transform_routes)} initial routes.') analyzer = analysis.ForwardAnalyzer(self.reactions_dict) routes = analyzer.analyzeRoutes(transform_routes, self.depth) logger.info(f'Generated {len(routes)} synthetic routes.') return routes def _generate_samples(self): """ Analyze `self.mol`, generate all routes and pick up to max_routes at random, and finally turn each route into a RandomSampleIterator. :return: samples :rtype: list of RandomSampleIterator """ if self.forward: routes = self._getSyntheticRoutes() else: routes = self._getRetrosyntheticRoutes() if len(routes) > self.max_routes: routes = self.route_prng.sample(routes, self.max_routes) logger.info(f'Will sample {len(routes)} routes.') samples = [] # Zeroth element placeholder because routes are numbered from 1. self.frozen_atom_maps = [{}] for i, route in enumerate(routes, 1): logger.debug( f'Setting up route {i}: {route.getOneLineRepresentation()}') sources, atom_map = self._get_reagent_sources(route) self.frozen_atom_maps.append(atom_map) syn = synthesis.Synthesizer(route, allow_multiple_products=True, keep_labels=True) sample = synthesis.RandomSampleIterator(syn, sources, self.max_per_route, prng=self.prng) sample.index = i samples.append(sample) return samples def _get_reagent_sources(self, route): """ Find the reagent sources for a route. :param route: route to analyze :type route: schrodinger.application.pathfinder.route.RouteNode :return: reagent sources and frozen atom map, the latter being a dict with frozen atom labels as keys and input molecule atom indexes as values. :rtype: ([ReagentSource], {int: int}) """ sources = [] sizes = [] frozen_atom_map = {} logger.debug(route.treeAsString()) for index, sm in enumerate(route.getStartingNodes(), 1): if sm.reagent_class: if is_core_sm(sm, self.core_atoms): reagent_class = CORES else: reagent_class = sm.reagent_class filename = reagent_class.findReagentFile(self.libpath) mols = self._get_mol_supplier(filename) source = filename else: route.updateTargetIndexMap(sm.getTargetIndexMap(index)) mols = sm.getLabeledMols(index)[:1] self._update_frozen_atom_map(mols[0], self.core_neighbors, frozen_atom_map) source = 'SMILES' size = len(mols) logger.debug(f'{index}: {source} ({size:,})') sources.append( synthesis.ReagentSource(mols, size, source, sm.reagent_class, index, has_frozen_atoms=bool(sm.frozen_atoms))) return sources, frozen_atom_map def _update_frozen_atom_map(self, mol, core_neighbors, atom_map): """ Update the `atom_map` by adding keys for each frozen atom that was labeled as an "attachment atom" by the retrosynthesis and with the label (which is the atom index in the original input molecule) as a value. As a side effect, also removes the attachment atom labels. :param mols: molecule to modify :type mols: rdkit.Chem.rdchem.Mol :param core_neighbors: indices of atoms directly bound to the core :type core_neighbors: set of int :param atom_map: {frozen_atom_label: target_atom_idx} :type atom_map: {int: int} :return: molecule :rtype: rdkit.Chem.rdchem.Mol """ for atom in mol.GetAtoms(): num = atom.GetAtomMapNum() if num: atom.SetAtomMapNum(0) if num in core_neighbors: iso = atom.GetIsotope() atom_map[iso] = num @functools.lru_cache(maxsize=256) def _get_mol_supplier(self, filename): """ Read a reagent file. Calls are cached so if the function is called again with the same arguments, the return value will be reused for speed. If a positive value is given for prefilter_reagents and a reference molecule is provided, the reactant file is filtered and all the top `prefilter_reagents` molecules are returned, based on similarity to the reference. :param filename: filename :type filename: str :return: reactants :rtype: rdkit.Chem.rdmolfiles.SmilesMolSupplier or list of rdkit.Chem.rdchem.Mol """ # The caching speeds the setup of the job by at least an order of # magnitude. In practice, let's say we are using 100 routes, but most of # them share the same reagent files, so each file might be read at least # 20 times. Currently we only have 82 reagent files, so even if a ligand # managed to use them all (which is unlikely), we are still pretty far # from filling the cache. logger.debug(f'reading {filename}') mols = molio.get_mol_reader(filename, skip_bad=False) if self.prefilter_reagents: size = self.prefilter_reagents if self.fp_namespace is not None: subdir = self.fp_namespace else: subdir = '' fp_path = get_fp_file(filename, self.fp_dir, url_base=self.fp_url, subdir=subdir) if fp_path: logger.info( f'Using top {size} reagents by similarity: {filename}') query_smiles = Chem.MolToSmiles(self.ref_mol) query_fp = fingerprint_utils.smiles_to_fingerprint(query_smiles) _, _, indexes, _ = fingerprint_utils.rank_reactants(fp_path, query_fp, size, alpha=1.0, beta=1.0) mols = [mols[i] for i in indexes] else: logger.warning( f'Fingerprint file not found for: {filename}. ' 'Will skip similarity prefiltering for this reagent class.') return mols
[docs]def get_fp_file(reactant_file, cache_dir, url_base=None, subdir=''): """ Get a fingerprint file for a given reactant file. First look at the cache dir; if not found, and a URL is supplied, try to download it from the server and write it to the cache dir. For a reactant file named foo.pfx, the fingerprint file must be named foo-<sha1>.fp, where <sha1> is the SHA1 hash of foo.pfx. :param reactant_file: reactant file for which we are looking for fingerprints :type reactant_file: str :param cache_dir: local directory where fingerprint files are/will be stored :type cache_dir: str :url_base: base URL to try to download fingerprint files from :type url_base: str or NoneType :subdir: optionally, subdirectory of cache_dir and URL to use :type subdir: str :return: path to fingerprint file, if found; else None :rtype: str or NoneType """ if cache_dir is None: return None fp_basename = get_fp_basename(reactant_file) fp_dir = os.path.join(cache_dir, subdir) fp_path = os.path.join(fp_dir, fp_basename) # Try locally if os.path.isfile(fp_path): logger.info(f'Using cached {fp_path}') return fp_path # Try remote if url_base: if subdir: fp_url = f'{url_base}/{subdir}/{fp_basename}.gz' else: fp_url = f'{url_base}/{fp_basename}.gz' fileutils.mkdir_p(fp_dir) with get_lock(fp_path, max_wait=180): if os.path.isfile(fp_path): logger.info(f'Using cached {fp_path}') return fp_path else: try: return download_and_decompress(fp_url, fp_path) except requests.exceptions.ConnectionError as e: logger.error(e) return None return None
[docs]def download_and_decompress(url, dest): """ GET a gzip-compressed file from a URL and write it out, decompressed to the local filesystem. The contents are first downloaded to a temporary file and then renamed atomically. A locking mechanism is employed to try to prevent concurrent downloads of the same file. :param url: URL to download :type url: str :param dest: destination filename :type dest: str :return: `dest` if successful, else None (e.g. in case of 404) :rtype: str or NoneType """ logger.info(f'GET {url}') response = requests.get(url, stream=True, timeout=10) if response.status_code == 200: # wbits=31 means maximum window size, expect gzip header. decompressor = zlib.decompressobj(wbits=31) fp_dir = os.path.dirname(dest) with tempfile.NamedTemporaryFile(dir=fp_dir, delete=False) as fh: for chunk in response.iter_content(CHUNK_SIZE): fh.write(decompressor.decompress(chunk)) try: os.rename(fh.name, dest) except FileExistsError: # On Windows, someone else wrote the file already (maybe # thanks to our imperfect locking scheme), but that's OK. pass return dest else: logger.info(f'GET failed: {response.status_code}') return None
[docs]def get_fp_basename(reactant_file): """ Return the basename of the fingerprint file corresponding to the given reactant file, following the convention that for a reactant file named foo.pfx, the fingerprint file must be named foo-<sha1>.fp, where <sha1> is the SHA1 hash of foo.pfx. :param reactant_file: reactant file :type reactant_file: str :return: basename of fingerprint file :rtype: str """ base = fileutils.get_basename(reactant_file) checksum = get_sha1(reactant_file) return f'{base}-{checksum}.fp'
[docs]def get_sha1(filename): """ Return the SHA1 hash of a file. :param filename: input file :type filename: str :return: hex SHA1 digest of file contents :rtype: str """ h = hashlib.sha1() with open(filename, 'rb') as fh: while True: chunk = fh.read(CHUNK_SIZE) if not chunk: break h.update(chunk) return h.hexdigest()
[docs]@contextlib.contextmanager def get_lock(basename, max_wait, interval=1.0): """ Create a <basename>.lock file on entry and remove it on exit. If the file already exists, wait up to `max_wait` seconds for the lock to clear. If the timeout is exceeded, assume that the lock is stale and ignore it. This is a very rudimentary mechanism, but is good enough for the purposes of this module, which is just to _try_ to prevent simultaneous downloads of the same file, but where occasional collisions don't hurt beyond the slight waste of bandwith and temporary use of disk space. :param basename: basename of lock file :type basename: str :param max_wait: maximum wait in seconds :type max_wait: float :param interval: time to sleep between attempts, in seconds :type interval: float :return: context manager that removes lock file on exit :rtype: contextlib._GeneratorContextManager """ lockfile = basename + '.lock' try: deadline = os.stat(lockfile).st_mtime + max_wait except FileNotFoundError: deadline = None if deadline is not None: logger.info(f'Waiting for lock {lockfile} to clear...') while time.time() <= deadline: time.sleep(interval) try: deadline = os.stat(lockfile).st_mtime + max_wait except FileNotFoundError: break # Lock is gone else: logger.info('Lock file is presumed stale ' f'(older than {max_wait} s).') with open(lockfile, 'wb') as fh: pass try: yield finally: fileutils.force_remove(lockfile)
[docs]def has_variable_reactants(route): """ Check if the route has at least one variable reactant. :param route: route to analyze :type route: schrodinger.application.pathfinder.route.RouteNode :return: does the route have at least one variable reactant? :rtype: bool """ return any(sm.reagent_class for sm in route.getStartingNodes())
[docs]def is_core_sm(sm, core_atoms): """ Check if a starting material node corresponds to a core. :param sm: starting material :type sm: schrodinger.application.pathfinder.route.ReagentNode :param core_atoms: core atom indices (1-based) :type core_atoms: set of int :rtype: bool """ if core_atoms: mol = Chem.MolFromSmiles(sm.smiles_list[0]) attachment_atoms = set() for atom in mol.GetAtoms(): num = atom.GetAtomMapNum() if num: attachment_atoms.add(num) return attachment_atoms <= core_atoms return False
[docs]def meta_sample(samples, dedup=True, prng=random): """ A generator that, on each cycle, picks a random element of `samples` and yields the next element from said sample. It never stops unless all samples raise StopIteration. Each product gets annotated with properties representing the route that was used to make the molecule. :param samples: molecule samples :type samples: list of iterator of Mol :param dedup: skip duplicate products :type dedup: bool :param prng: pseudo-random number generator :type prng: random.Random :return: molecule generator :rtype: generator of rdkit.Chem.Mol """ samples = samples[:] logger.debug(f'Combinations: {[s.ncombs for s in samples]}') seen = set() while samples: sample = prng.choice(samples) logger.debug(f'Trying sample {sample.index}') try: product = next(sample) except StopIteration: logger.info(f'Sample {sample.index} exhausted') samples.remove(sample) continue if dedup: smiles = Chem.MolToSmiles(product) if smiles in seen: continue seen.add(smiles) product.SetIntProp(ROUTE_ID_PROP, sample.index) product.SetProp(ROUTE_PROP, sample.route.getOneLineRepresentation()) yield product
[docs]def measure_vectors(st, r1, c1, r2, c2): """ Measure the distance and angle between two bond vectors. The distance is measured between atoms r1 and r2; the angle is between the c1-r1 and c2-r2 vectors. :param st: structure to measure :type st: schrodinger.structure.Structure :param r1: R-group attachment atom 1 :type r1: int :param c1: core atom 1 :type c1: int :param r2: R-group attachment atom 2 :type r2: int :param c2: core atom 2 :type c2: int :return: distance and angle :rtype: float, float """ dist = st.measure(r1, r2) v1 = np.array(st.atom[r1].xyz) - np.array(st.atom[c1].xyz) v2 = np.array(st.atom[r2].xyz) - np.array(st.atom[c2].xyz) angle = math.degrees(transform.get_angle_between_vectors(v1, v2)) return dist, angle
[docs]def st_from_mol(mol): """ Convert a Mol into a Structure, with 3D coordinates but no added hydrogens. Missing stereochemistry is tolerated. :param mol: molecule :type mol: rdkit.Chem.rdchem.Mol :return: Structure :rtype: schrodinger.structure.Structure """ st = rdkit_adapter.from_rdkit(mol) st.generate3dConformation(require_stereo=False) added_hydrogens = range(mol.GetNumAtoms() + 1, st.atom_total + 1) st.deleteAtoms(added_hydrogens) return st
[docs]def is_core(graph, free_component, frozen_components): """ Check if the `free_component` subgraph should be considered a core, meaning that it is connected to more than one of the `free_components`. :param graph: molecular graph :type graph: networkx.classes.graph.Graph :param free_component: possible core atom indices :type free_component: set of int :param frozen_components: possible sidechain atom indexes :type frozen_components: list of set of int :return: is it a core? :rtype: bool """ neighbors = set().union(*[graph.neighbors(n) for n in free_component]) n = 0 for comp in frozen_components: n += bool(neighbors & comp) return n > 1
[docs]def apply_similarity_filters(products, args): """ Implement the -sim_keep_percent and -sim_discard_percent functionality. :param products: molecules to filter :type products: iterator of rdkit.Chem.Mol :param args: command-line arguments :type args: `argparse.Namespace` :return: filtered products and number of products to keep :rtype: generator of rdkit.Chem.Mol, int """ nkeep = None if args.sim_keep_percent is not None: nkeep = args.max_products * args.sim_keep_percent / 100.0 reverse = False elif args.sim_discard_percent is not None: nkeep = args.max_products * (1.0 - args.sim_discard_percent / 100.0) reverse = True if nkeep is not None: filtered_products = filtering.keep_top_n(products, nkeep, args.max_products, prop='r_reaction_similarity_1', reverse=reverse) return filtered_products, nkeep else: return products, args.max_products
[docs]def analyze_frozen_atoms(mol, frozen_atoms): """ Examine the molecular graph of the input molecule to partition it, based on the set of frozen atoms, into free regions and frozen regions. Also determine which free region is the core, if any. A core in this context is a contiguous set of non-frozen atoms which is adjacent to two or more sets of frozen atoms (the side chains). Jobs with two or more cores will abort immediately. When there is a core, also measure the distances and angles between all the pairs of vectors leading from the core to the side chains. The resulting dict has pairs of atoms as keys, and (distance, angle) tuples as values. In addition to the set of core atoms, a set of "core neighbors" is also returned. These are the non-core atoms that are directly connected to the core. :param mol: input molecule :type mol: rdkit.Chem.rdchem.Mol :param frozen_atoms: frozen atom indices :type frozen_atoms: set of int :return: measurements, core atoms, core neighbors :rtype: dict {(int, int): (float, float)}, set of int, set of int """ st = st_from_mol(mol) graph = analyze.create_nx_graph(st) frozen_graph = nx.subgraph(graph, frozen_atoms) frozen_components = list(nx.connected_components(frozen_graph)) free_graph = graph.copy() free_graph.remove_nodes_from(n for n in graph if n in frozen_graph) free_components = list(nx.connected_components(free_graph)) logger.info(f'Frozen regions: {frozen_components}') logger.info(f'Free regions: {free_components}') # Figure out which of these are cores... core_atoms = set() core_neighbors = set() measurements = {} for comp in free_components: if is_core(graph, comp, frozen_components): if core_atoms: raise ValueError('Too many cores.') core_atoms = comp core_neighbors = set().union( *[graph.neighbors(n) for n in core_atoms]) - core_atoms if core_atoms: logger.info(f'Core_atoms: {sorted(core_atoms)}') # Find attachment atoms attachment_atoms = [] for comp in frozen_components: att = list(comp & core_neighbors) if len(att) != 1: raise ValueError( 'Core-hopping only supports R-groups attached via one bond') attachment_atoms.append(att[0]) for r1, r2 in itertools.combinations(attachment_atoms, 2): path = nx.shortest_path(graph, r1, r2) logger.debug(f'Core path: {path}') dist, angle = measure_vectors(st, path[0], path[1], path[-1], path[-2]) key = tuple(sorted([r1, r2])) measurements[key] = (dist, angle) logger.info('Reference measurements:') logger.info(' atom1 atom2 dist angle') for (a1, a2), (d, ang) in measurements.items(): logger.info('%6d %6d %8.2f %8.1f' % (a1, a2, d, ang)) return measurements, core_atoms, core_neighbors
[docs]def generate_mols(*a, **d): """ A generator of products following the multiroute enumeration protocol. This is a thin functional wrapper around the MultiRouteEnumerator class. For arguments, see MultiRouteEnumerator.__init__. :rtype: generator of Mol """ enumerator = MultiRouteEnumerator(*a, **d) return enumerator.generate_mols()
[docs]def write_products(products, filename, max_products, has_frozen_atoms=False): """ Write out up to `max_products` from a mol iterator to a file. :param products: molecules to write :type products: iterator of Mol :param filename: filename :type filename: str :param max_products: maximum number of structures to write :type max_products: int :param has_frozen_atoms: True if user specified frozen atoms :type has_frozen_atoms: bool """ nproducts = 0 backend = jobcontrol.get_backend() percent_done = 0 align_products = (fileutils.is_maestro_file(filename) and has_frozen_atoms) with get_output_writer(align_products, filename) as writer: mol_aligner = aligner.AlignedMolStructureGenerator() for product in products: logger.debug( '%s %s' % (product.GetIntProp(ROUTE_ID_PROP), Chem.MolToSmiles(product))) output = product if align_products: output = mol_aligner.generateAlignedStructure(product) writer.append(output) nproducts += 1 new_percent = int(nproducts / max_products * 100) if new_percent > percent_done: percent_done = new_percent logger.info(f'{percent_done}% done') if backend and nproducts % 100 == 0: backend.setJobProgress(steps=nproducts, totalsteps=max_products) if nproducts >= max_products: break logger.info(f'Wrote {nproducts:,} products to {filename}')
[docs]def get_output_writer(align_products, filename): """ If we're generating maestro structures (.mae, mae.gz, .maegz we need to generate structures, align, them, and then write them out. If not, we can write products out with the MolWriter. :param args: input arguments used to create the output sink :type args: argparse.Namespace :align_products: flag indicating product alignment :type align_products: bool """ try: # We need to generate and align structures if align_products: return structure.StructureWriter(filename) # we can let the MolWriter handle structure generation return molio.get_mol_writer(filename) except ValueError as e: sys.exit(e)