Source code for schrodinger.application.combinatorial_diversity.diversity_selector

"""
This module contains the DiversitySelector class, which combines greedy
and stochastic approaches in an optimization algorithm that chooses a
diverse subset of compounds from a larger pool, with optional biasing of
the selections to satisfy a set of property filters.

The objective function to be minimized is the average nearest neighbor
similarity within the subset plus the average fraction of property filters
failed. Minimization is achieved by starting with a random subset of
compounds and then repeatedly attempting to replace the subset member
exhibiting the highest nearest neighbor similarity with a randomly chosen
member of the pool. Replacements that decrease both the average nearest
neighbor similarity and average filter score are always made, as are
replacements that decrease one quantity and leave the other unchanged.
Replacements that increase either of the quantities are accepted or
rejected in accordance with a Monte Carlo test whose probability of being
satisfied decreases from 50% to 1% over the course of the optimization.

For further details, see: Bioorg. Med. Chem. 2012 20, 5379–5387.

Copyright Schrodinger LLC, All Rights Reserved.
"""

import heapq
import math
import os
import random
from collections import defaultdict

from schrodinger.infra import canvas

OPT_CYCLES = 10
CONVRG_TOL = 0.001
CONVRG_CYCLES = 3
MC_TOL = 0.001
LOG_MC_PROB_INITIAL = math.log(0.5)
LOG_MC_PROB_FINAL = math.log(0.01)
RAND_SEED = 1
MAX_NN_CACHE = 10


[docs]class PropertyFilter: """ Simple object that holds the name of a property and the minimum and maximum allowed values of that property. """
[docs] def __init__(self, name, min_value, max_value): """ Constructor taking the name of a numeric property and limits on the value of that property. :param name: Property name :type name: str :param min_value: Minimum allowed value :type min_value: float :param max_value: Maximum allowed value :type max_value: float """ if max_value < min_value: mesg = f'Maximum value of {name} cannot be less than minimum value' raise ValueError(mesg) self.name = name self.min_value = min_value self.max_value = max_value
[docs]def compute_filter_score(filters, prop_values, filter_columns): """ Computes the fraction of property filters failed for a given compound. :param filters: List of property filters. :type filters: list(PropertyFilter) or NoneType :param prop_values: Values of all properties for the compound. :type prop_values: list(str) :param filter_columns: 0-based indices into prop_values for just the properties in filters. :type filter_columns: list(int) :return: Fraction of filters failed. :rtype: float """ if not filters: return 0.0 dscore = 1.0 / len(filters) filter_score = 0.0 for i, filter in enumerate(filters): prop_value = float(prop_values[filter_columns[i]]) if prop_value < filter.min_value or prop_value > filter.max_value: filter_score += dscore return filter_score
[docs]def get_filter_columns(fpin, filters): """ Given 32-bit fingerprint file connection and a list of property filters, this function returns 0-based indices of the filter properties into the the full list of fingerprint properties. Raises a KeyError if any of the properties aren't found. :param fpin: Fingerprint file connection. :type fpin: canvas.ChmFPIn32 :param filters: Property filters. :type filters: list(PropertyFilter) :return: Filter property indices. :rtype: list(int) """ if not filters: return [] filter_columns = [] prop_names = fpin.getExtraColumnNames() for filter in filters: try: filter_columns.append(prop_names.index(filter.name)) except ValueError: raise KeyError(f'Filter property "{filter.name}" not found') return filter_columns
[docs]def get_fp_domain(fpin, fp_domain=None): """ Given a 32-bit fingerprint file connection and an optional list of 0-based row numbers that define the domain of fingerprints to use, this function returns a 0-based list of all row numbers, or the sorted unique rows from the supplied list. Raises a ValueError if any row numbers in fp_domain are outside the legal range. :param fpin: Fingerprint file connection. :type fpin: canvas.ChmFPIn32 :param fp_domain: 0-based fingerprint row numbers. :type fp_domain: list(int) or NoneType :return: All row numbers, or the supplied row numbers, after sorting and removing duplicates. :rtype: list(int) """ row_count = fpin.getRowCount() if fp_domain: user_domain = sorted(set(fp_domain)) if user_domain[0] < 0 or user_domain[-1] >= row_count: raise ValueError(f'fp_domain must lie in [0, {row_count})') return user_domain else: return list(range(row_count))
[docs]def get_smiles_column(fpin): """ Given 32-bit fingerprint file connection, this function returns the 0-based index of the first column whose name contains "SMILES". Raises a KeyError if no such column is found. :param fpin: Fingerprint file connection. :type fpin: canvas.ChmFPIn32 :return: Zero-based SMILES column index. :rtype: int """ for i, prop in enumerate(fpin.getExtraColumnNames()): if 'SMILES' in prop: return i raise KeyError('No SMILES property found in fingerprint file')
[docs]class DiversitySelector: """ Given a file of 32-bit fingerprints, this class combines greedy and stochastic approaches to select a diverse subset of structures and, optionally, to bias the selections to favor compounds that satisfy a set of property filters. """
[docs] def __init__(self, fp_file, opt_cycles=OPT_CYCLES, convrg_tol=CONVRG_TOL, convrg_cycles=CONVRG_CYCLES, mc_tol=MC_TOL, rand_seed=RAND_SEED, filters=None, fp_domain=None, logger=None): """ Constructor taking the name of a 32-bit fingerprint file and options for optimizing diversity and, optionally, properties. :param fp_file: Input file of 32-bit fingerprints and SMILES. :type fp_file: str :param opt_cycles: Maximum number of optimization cycles. For a subset of N compounds, an optimization cycle consists of N passes, each of which involves an attempt to replace the compound with the highest nearest neighbor similarity. :type opt_cycles: int :param convrg_tol: Convergence tolerance on the absolute change in the objective function. If the change is less than this value, the convergence tolerance is satisfied. :type convrg_tol: float :param convrg_cycles: Number of consecutive cycles over which the convergence tolerance must be satisfied in order to halt the optimization. :type convrg_cycle: int :param mc_tol: Monte Carlo criterion. An increase of mc_tol in the objective function will be accepted with a probability of 50% in the first cycle and 1% in the last cycle. :type mc_tol: float :param rand_seed: Random seed for initial subset selection and Monte Carlo tests. :type rand_seed: int :param filters: List of property filters that selected compounds should preferentially satisfy. All properties must be present in fp_file. If omitted, properties will not be optimized. :type filters: list(PropertyFilter) or NoneType :param fp_domain: List of 0-based row numbers in fp_file from which selections should be made. If omitted, all rows will be considered. :type fp_domain: list(int) :param logger: Logger for output of INFO level progress messages. Feedback can be helpful when large subsets are selected, as a given optimization cycle may take minutes or longer if the subset is significantly larger than 1000. :type logger: logging.Logger or NoneType """ if not os.path.isfile(fp_file): raise FileNotFoundError(f'Fingerprint file "{fp_file}" not found') # Allow opt_cycles of 0 for comparison to pure random selection. if opt_cycles < 0: raise ValueError("Number of optimization cycles cannot be negative") self.opt_cycles = opt_cycles if convrg_tol <= 0: raise ValueError("Convergence tolerance must be greater than 0") self.convrg_tol = convrg_tol if convrg_cycles < 1: mesg = ("Number of consecutive cycles required for convergence " "must be 1 or greater") raise ValueError(mesg) self.convrg_cycles = convrg_cycles if mc_tol <= 0: raise ValueError("Monte Carlo criterion must be greater than 0") self.mc_tol = mc_tol self.rand_seed = rand_seed self._random = random.Random(rand_seed) self._fpin = canvas.ChmFPIn32(fp_file) self._smiles_column = get_smiles_column(self._fpin) self._filter_columns = get_filter_columns(self._fpin, filters) self._filters = filters self._fp_domain = get_fp_domain(self._fpin, fp_domain) self.row_count = len(self._fp_domain) self._logger = logger self.subset_rows = [] self.subset_titles = [] self.subset_smiles = []
[docs] def select(self, num_select): """ Selects the indicated number of optimized compounds and stores the subset data in the following member variables: self.subset_rows - 0-based row numbers within fingerprint file self.subset_titles - Compound titles self.subset_smiles - Compound SMILES :param num_select: Desired number of compounds with optimized diversity and, optionally, optimized properties. Note that computational effort scales quadratically with this number, and values significantly larger than 1000 may lead to very long run times. :type num_select: int """ if num_select >= self.row_count: mesg = ("Subset size must be less than the number of compounds " f"({self.row_count})") raise ValueError(mesg) self._random.seed(self.rand_seed) self._info(f"\nAssigning initial scores for {num_select} compounds") self._select_initial_subset(num_select) self._build_nn_cache() self._score_subset() sim_nn = self._nn_sim_avg filter = self._fscore_avg total_score = sim_nn + filter mesg = "Initial scores: Sim_NN = %.6f, Filter = %.6f\n" self._info(mesg % (sim_nn, filter)) tmax = -self.mc_tol / LOG_MC_PROB_INITIAL tmin = -self.mc_tol / LOG_MC_PROB_FINAL if self.opt_cycles > 1: tstep = (tmax - tmin) / (self.opt_cycles - 1) else: tstep = 0 self._temp = tmax total_score_prev = total_score convrg_count = 0 for cycle in range(self.opt_cycles): num_replace = 0 for ipass in range(num_select): num_replace += self._replace_worst() sim_nn = self._nn_sim_avg filter = self._fscore_avg total_score = sim_nn + filter score_diff = total_score - total_score_prev mesg = "Cycle %d: Sim_NN = %.6f, Filter = %.6f, Change = %.6f" self._info(mesg % (cycle + 1, sim_nn, filter, score_diff)) self._info(f"Number of replacements = {num_replace}") if self._nn_row_max is None: self._info("No further improvement of the scores is possible") break elif math.fabs(score_diff) < self.convrg_tol: convrg_count += 1 if convrg_count == self.convrg_cycles: mesg = (f"Convergence tolerance of {self.convrg_tol} " f"satisfied for {self.convrg_cycles} consecutive " "cycles") self._info(mesg) break else: convrg_count = 0 total_score_prev = total_score self._temp -= tstep self.subset_titles, self.subset_smiles = self._read_extra_data()
def _build_nn_cache(self): """ Builds a cache of nearest neighbors for the current subset. This reduces the effort required to maintain the nearest neighbor list as members of the diverse subset are replaced. """ self._nn_sim_row_cache = defaultdict(list) # Accumulate cache from upper triangle of subset similarity matrix. for i in range(len(self.subset_rows)): row_i = self.subset_rows[i] fp_i = self._subset_fp_and_filter[row_i][0] nn_sim_row_cache_i = self._nn_sim_row_cache[row_i] for j in range(i): row_j = self.subset_rows[j] nn_sim_row_cache_j = self._nn_sim_row_cache[row_j] sim_ij = fp_i.simTanimoto(self._subset_fp_and_filter[row_j][0]) nn_i = [sim_ij, row_j] if len(nn_sim_row_cache_i) == MAX_NN_CACHE: heapq.heappushpop(nn_sim_row_cache_i, nn_i) else: heapq.heappush(nn_sim_row_cache_i, nn_i) nn_j = [sim_ij, row_i] if len(nn_sim_row_cache_j) == MAX_NN_CACHE: heapq.heappushpop(nn_sim_row_cache_j, nn_j) else: heapq.heappush(nn_sim_row_cache_j, nn_j) # Sort nearest neighbor similarities and store associated row # numbers as sets. self._nn_row_cache = defaultdict(set) for row in self.subset_rows: self._nn_sim_row_cache[row].sort(reverse=True) row_cache = self._nn_row_cache[row] for sim, row_i in self._nn_sim_row_cache[row]: row_cache.add(row_i) def _get_fp_and_filter_score(self, row): """ Returns the fingrprint and filter score for the indicated row in the fingerprint file. :param row: Zero-based fingerprint row. :type row: int :return: Tuple of fingeprint and fraction of filters failed. :rtype: canvas.ChmSparsetBitset, float """ self._fpin.setPos(row + 1) fp, title, props = self._fpin.nextExtra() filter_score = compute_filter_score(self._filters, props, self._filter_columns) return fp, filter_score def _info(self, mesg): """ Writes the provided message to the logger at the INFO level if the logger exists. :param mesg: The message to write. :type mesg: str """ if self._logger: self._logger.info(mesg) def _read_extra_data(self): """ Reads and returns compound titles and SMILES for the current subset. :return: lists of titles and smiles :rtype: list(str), list(str) """ subset_titles = [] subset_smiles = [] for row in self.subset_rows: self._fpin.setPos(row + 1) fp, title, props = self._fpin.nextExtra() subset_titles.append(title) subset_smiles.append(props[self._smiles_column]) return subset_titles, subset_smiles def _replace_worst(self): """ Attempts to replace the member of the current subset which has the highest nearest neighbor similarity. If successful, all associated member data are updated and a value of 1 is returned. Otherwise, no action is taken and a value of 0 is returned. :return: 1 if replacement is made, 0 if not :rtype: int """ worst_row = self._nn_row_max if worst_row is None: # This happens when the most recent replacement yielded a subset # for which no improvement is possible (e.g., all similarities # within the subset are 0.0). return 0 # Randomly choose a candidate row to replace worst_row. Note that # the subset size is required to be less than the row count, so # this loop won't be infinite. while True: candidate_pos = self._random.randint(0, self.row_count - 1) candidate_row = self._fp_domain[candidate_pos] if candidate_row not in self._subset_fp_and_filter: break fp_candidate, filter_candidate = self._get_fp_and_filter_score( candidate_row) dfilter = filter_candidate - self._subset_fp_and_filter[worst_row][1] # Accumulate changes that would occur in the average nearest # neighbor similarity if candidate_row replaced worst_row. dsim = -self._nn_sim_row_cache[worst_row][0][0] sim_candidate_max = 0.0 # Nearest neighbor similarity to candidate. for i, fp_and_score in self._subset_fp_and_filter.items(): if i == worst_row: continue sim_to_i = fp_candidate.simTanimoto(fp_and_score[0]) if sim_to_i > sim_candidate_max: sim_candidate_max = sim_to_i nn_sim_row_cache_i = self._nn_sim_row_cache[i] nn_sim_i, nn_row_i = nn_sim_row_cache_i[0] if sim_to_i > nn_sim_i: # Candidate would be the new nearest neighbor of i. dsim += sim_to_i - nn_sim_i elif nn_row_i == worst_row: # worst_row is the nearest neighbor of i. Since we'd be # losing worst_row, the second ranked nearest neighbor # of i becomes its nearest neighbor. Note that the cache # makes this operation much faster. dsim += nn_sim_row_cache_i[1][0] - nn_sim_i norm_factor = 1.0 / len(self.subset_rows) dsim = (dsim + sim_candidate_max) * norm_factor dfilter *= norm_factor # Use a Pareto-like approach, where a preferred move is one that # decreases either nearest neighbor similarity or filter score # without increasing the other. if dsim < 0.0 and dfilter <= 0.0: replace = True elif dsim <= 0.0 and dfilter < 0.0: replace = True else: # It's not a strictly downhill move. Use Monte Carlo test on # the total increase. etest = max(dsim, 0.0) + max(dfilter, 0.0) replace = self._random.random() < math.exp(-etest / self._temp) if replace: self._update_subset(candidate_row) return int(replace) def _reverse_insort(self, sorted_list, value): """ Analogous to bisect.insort except that it does an insertion that maintains a list sorted by descending value. :param sorted_list: List that's sorted by descending value. :type sorted_list: list(float) :param value: Value to be inserted. :type value: float """ left = 0 right = len(sorted_list) while left < right: middle = (left + right) // 2 if value > sorted_list[middle]: right = middle else: left = middle + 1 sorted_list.insert(left, value) def _score_subset(self): """ Computes the average nearest neighbor similarity and average filter score for the current subset, and identifies the subset member with the highest nearest neighbor similarity. On return, the following member data are updated: self._nn_sim_avg - Average nearest neighbor similarity self._fscore_avg - Average number of property filters failed self._nn_row_max - Row of highest nearest neighbor similarity """ self._nn_sim_avg = 0.0 self._fscore_avg = 0.0 self._nn_row_max = None sim_max = 0.0 fscore_sim_max = 0.0 # Used to break ties in sim_max # Loop in order of increasing row number for stability in the # determination of self._nn_row_max. for row in self.subset_rows: nn_sim = self._nn_sim_row_cache[row][0][0] self._nn_sim_avg += nn_sim fscore = self._subset_fp_and_filter[row][1] replace = nn_sim > sim_max if not replace: replace = nn_sim == sim_max and fscore > fscore_sim_max if replace: sim_max = nn_sim fscore_sim_max = fscore self._nn_row_max = row self._fscore_avg += fscore norm_factor = 1.0 / len(self.subset_rows) self._nn_sim_avg *= norm_factor self._fscore_avg *= norm_factor def _select_initial_subset(self, num_select): """ Selects the initial random subset of compounds and stores their fingerprints and filter scores in self._subset_fp_and_filter. :param num_select: The size of the subset to select. :type num_select: int """ self._subset_fp_and_filter = {} for i in self._random.sample(range(self.row_count), num_select): # row is either i, or the ith user-supplied row number. row = self._fp_domain[i] fp, filter_score = self._get_fp_and_filter_score(row) self._subset_fp_and_filter[row] = [fp, filter_score] self.subset_rows = sorted(self._subset_fp_and_filter) def _update_nn_cache(self): """ Rebuilds any nearest neighbors cache whose length is less than 2. """ for i in self.subset_rows: nn_sim_row_cache_i = self._nn_sim_row_cache[i] if len(nn_sim_row_cache_i) > 1: continue nn_sim_row_cache_i.clear() fp_i = self._subset_fp_and_filter[i][0] for j in self.subset_rows: if j == i: continue sim_ij = fp_i.simTanimoto(self._subset_fp_and_filter[j][0]) nn_i = [sim_ij, j] if len(nn_sim_row_cache_i) == MAX_NN_CACHE: heapq.heappushpop(nn_sim_row_cache_i, nn_i) else: heapq.heappush(nn_sim_row_cache_i, nn_i) nn_sim_row_cache_i.sort(reverse=True) nn_row_cache_i = self._nn_row_cache[i] nn_row_cache_i.clear() for sim, j in nn_sim_row_cache_i: nn_row_cache_i.add(j) def _update_subset(self, row_new): """ Replaces self._nn_row_max with row_new and updates all member data. :param row_new: The row that replaces self._nn_row_max. :type row_new: int """ row_old = self._nn_row_max del self._subset_fp_and_filter[row_old] del self._nn_sim_row_cache[row_old] del self._nn_row_cache[row_old] fp_new, filter_new = self._get_fp_and_filter_score(row_new) for i, fp_and_score in self._subset_fp_and_filter.items(): sim_to_i = fp_new.simTanimoto(fp_and_score[0]) nn_sim_row_cache_i = self._nn_sim_row_cache[i] nn_row_cache_i = self._nn_row_cache[i] # Remove row_old from cache if present. if row_old in nn_row_cache_i: nn_row_cache_i.remove(row_old) for j in range(len(nn_sim_row_cache_i)): if nn_sim_row_cache_i[j][1] == row_old: del nn_sim_row_cache_i[j] break # Add row_new to cache only if sim_to_i is larger than the # smallest similarity already in the cache. if sim_to_i > nn_sim_row_cache_i[-1][0]: nn_i = [sim_to_i, row_new] # We can't use heapq to add to cache because it's already # sorted by descending value and is therefore not a heap. self._reverse_insort(nn_sim_row_cache_i, nn_i) sim_last, row_last = nn_sim_row_cache_i.pop() nn_row_cache_i.remove(row_last) nn_row_cache_i.add(row_new) self._subset_fp_and_filter[row_new] = [fp_new, filter_new] self.subset_rows = sorted(self._subset_fp_and_filter) self._update_nn_cache() self._score_subset()