Source code for schrodinger.application.combinatorial_diversity.splitter_utils

"""
This module provides functionality for splitting a large data set into
smaller chunks for scalable diversity selection via DiversitySelector.

Copyright Schrodinger LLC, All Rights Reserved.
"""

import itertools
import math
from operator import itemgetter

import numpy

from schrodinger.application.combinatorial_diversity import diversity_selector
from schrodinger.infra import canvas


[docs]def compute_factor_scores(xcols, evectors): """ Given N columns of autoscaled X variables and the N eigenvectors obtained from PCA of those X variables, this function computes the score on each eigenvector for each row of X values. :param evectors: Eigenvectors from a PCA analysis. The jth eigenvector is stored in evectors[:, j]. :type evectors: numpy.ndarray :param xcols: Columns of autoscaled X variables. The jth column is stored in xcols[j]. :type xcols: numpy.ndarray :return: N PCA scores for each row in xcols. The shape of the returned vector is (xcols.shape[1], xcols.shape[0]), i.e., the shape of the transpose of xcols. :rtype: numpy.ndarray """ return numpy.matmul(evectors, xcols).transpose()
[docs]def compute_sim_to_probes(fp_file, probe_rows): """ Given a 32-bit fingerprint file and the 0-based row numbers for N diverse probe structures, this function computes columns of autoscaled Tanimoto similarities between the probes and all fingerprints in the file. :param fp_file: Input file of 32-bit fingerprints. :type fp_file: str :param probe_rows: List of 0-based fingerprint row numbers for N diverse probe structures. :type probe_rows: list(int) :return: N columns of autoscaled similarities. :rtype: numpy.ndarray """ fpin = canvas.ChmFPIn32(fp_file) probe_fps = [] for row in probe_rows: fpin.setPos(row + 1) probe_fps.append(fpin.next()) num_rows = fpin.getRowCount() num_probes = len(probe_rows) sim_cols = numpy.zeros((num_probes, num_rows), numpy.double) fpin.rewind() # Compute raw similarities to probes. i = 0 while fpin.hasNext(): fp = fpin.next() for j, probe_fp in enumerate(probe_fps): sim_cols[j][i] = fp.simTanimoto(probe_fp) i += 1 # Autoscale similarities. for j in range(num_probes): sim_cols[j] -= numpy.mean(sim_cols[j]) sim_cols[j] /= numpy.std(sim_cols[j]) return sim_cols
[docs]def create_sim_cormat(sim_cols): """ Given N columns of autoscaled similarities, this function creates an an NxN matrix of Pearson correlations among those columns. :param sim_cols: N columns of autoscaled similarities. :type sim_cols: numpy.ndarray :return: NxN correlation matrix. :rtype: numpy.ndarray """ ncols, nrows = sim_cols.shape cormat = numpy.zeros((ncols, ncols), numpy.double) for i in range(ncols): cormat[i][i] = 1.0 for j in range(i): cormat[i][j] = numpy.dot(sim_cols[i], sim_cols[j]) / nrows cormat[j][i] = cormat[i][j] return cormat
[docs]def diagonalize_symmat(symmat): """ Diagonalizes a real, symmetric matrix and returns the eigenvalues and eigenvectors sorted by decreasing eigenvalue. :param symmat: Real, symmetric matrix. Not modified. :type symmat: numpy.ndarray :return: Reverse-sorted eigenvalues, followed by eigenvectors. The jth eigenvector is stored in the column slice [:, j] of the returned numpy.ndarray. :rtype: numpy.float64, numpy.ndarray """ evalues, evectors = numpy.linalg.eig(symmat) evalues = numpy.real(evalues) evectors = numpy.real(evectors) # Reverse-sort evalues and reorder columns of evectors accordingly. n = symmat.shape[0] sorted_evalues, indices = zip( *sorted(zip(evalues, range(n)), key=itemgetter(0), reverse=True)) sorted_evectors = evectors[:, indices] # There's no guarantee that different platforms/machines will yield # eigenvectors with consistent algebraic signs. To standardize results, # ensure that the largest element of each eigenvector is positive. for j in range(n): esign = math.copysign(1, max(sorted_evectors[:, j], key=math.fabs)) sorted_evectors[:, j] *= esign return sorted_evalues, sorted_evectors
[docs]def get_all_orthant_strings(ndim): """ Yields all possible orthant strings for the given number of dimensions. For example, if ndim = 2, this function would yield the 2-dimensional orthant strings '++', '+-', '-+', '--'. These correspond to the usual 4 quadrants in xy space. :param ndim: Number of dimensions. :type ndim: int :yield: All possible orthant strings of length ndim. :ytype: str """ for sign_list in itertools.product('+-', repeat=ndim): yield ''.join(sign_list)
[docs]def get_orthant_strings(scores, ndim): """ Given PCA factor scores over the full set of eigenvectors and a desired number of dimensions in that factor space, this function yields strings containing '+' and '-' characters which indicate the orthant in which each row of scores resides. A value of '+' is assigned if score >= 0 and a value of '-' is assigned if score is < 0. For example, if a given row consists of the following scores on 8 factors: [1.3289, -0.2439, -2.1774, 0.8391, 1.4632, -0.6268, 1.2238, -1.7802] and ndim = 4, the orthant string would be '+--+'. :param scores: PCA factor scores (see compute_factor_scores). :type scores: numpy.ndarray :param ndim: Number of factors to consider. This determines the number of characters in each orthant string. :type ndim: int :yield: Orthant string for each row in scores. :ytype: str """ for row in scores: signs = ndim * ['+'] for j in range(ndim): if row[j] < 0.0: signs[j] = '-' yield "".join(signs)
[docs]def partition_scores(scores, min_pop): """ Given PCA factor scores over the full set of eigenvectors and a minimum required population, this function partitions the scores into distinct orthant pairs of nearly equal population, where the smallest population is guaranteed to be at least min_pop. This is achieved by making a series of calls to get_orthant_strings with progressively larger values of ndim, grouping the scores by orthant string, sorting by population size and then combining the highest and lowest populations, the 2nd highest and 2nd lowest populations, etc. These combined populations decrease as ndim is increased, and the largest value of ndim which allows min_pop to be satisfied is used. For example: 1. Suppose ndim=4 is the largest dimension that satisfies min_pop 2. Suppose a given row of scores yields the orthant string '-+-+' 3. Suppose orthant '-+-+' is combined with orthant '+--+' 4. That row of scores would be assigned to orthant pair '+--+|-+-+' :param scores: PCA factor scores (see compute_factor_scores). :type scores: numpy.ndarray :param min_pop: Minimum required population of any orthant pair. :type min_pop: int :return: Dictionary of orthant pair --> list of 0-based row numbers. :rtype: dict{str: list(int)} """ nrows, ncols = scores.shape if nrows <= min_pop: # Use all rows. return {'+|-': list(range(nrows))} ndim_max = 1 orthant_pairs_max = ['+', '-'] for i in range(ncols): ndim = i + 1 # Accumulate counts by orthant string. counts = dict(zip(get_all_orthant_strings(ndim), 2**ndim * [0])) for orthant_string in get_orthant_strings(scores, ndim): counts[orthant_string] += 1 sorted_counts = sorted([(n, orthant) for orthant, n in counts.items()]) # Combine largest, smallest, 2nd largest, 2nd smallest, etc. nhalf = int(len(sorted_counts) / 2) min_pop_failure = False orthant_pairs = [] for j1 in range(nhalf): j2 = -j1 - 1 combined_count = sorted_counts[j1][0] + sorted_counts[j2][0] orthant_pair = sorted([sorted_counts[j1][1], sorted_counts[j2][1]]) orthant_pairs.append(orthant_pair) if combined_count < min_pop: min_pop_failure = True break if min_pop_failure: break ndim_max = ndim orthant_pairs_max = orthant_pairs # Assign final dictionary of orthant pairs and row numbers at ndim_max. members = {} for orthant_string in get_all_orthant_strings(ndim_max): members[orthant_string] = [] for i, orthant_string in enumerate(get_orthant_strings(scores, ndim_max)): members[orthant_string].append(i) orthant_pair_dict = {} for orthant_pair in orthant_pairs_max: orth1 = orthant_pair[0] orth2 = orthant_pair[1] pair_string = f'{orth1}|{orth2}' orthant_pair_dict[pair_string] = sorted(members[orth1] + members[orth2]) return orthant_pair_dict
[docs]def select_probes(fp_file, num_probes, rand_seed): """ Selects the requested number of diverse probe structures from the provided 32-bit fingerprint file and returns the corresponding 0-based fingerprint row numbers. :param fp_file: Input file of 32-bit fingerprints and SMILES. :type fp_file: str :param num_probes: Number of diverse probe structures. :type num_probes: int :param rand_seed: Random seed for underlying diversity algorithm. :type rand_seed: int :return: List of 0-based row numbers for diverse probe structures. :rtype: list(int) """ selector = diversity_selector.DiversitySelector(fp_file, rand_seed=rand_seed) selector.select(num_probes) return selector.subset_rows