Source code for schrodinger.application.canvas.cluster

"""

Canvas clustering functionality.

There are classes to perform custering and to support command line and
graphical interfaces to the clustering options.

Copyright Schrodinger, LLC. All rights reserved.

"""

# Contributors: Quentin McDonald

# TODO: add additional fingerprint generation options incl custom atom typing
# TODO: add support for clustering directly from distance matrices
# TODO: property clustering and combination property/fingerprint clustering
# TODO: add support for keeping intermediate files and running in "group" mode
# TODO: support for callbacks for statistics and dendrogram plots

import csv
import gc
import os
import os.path
import tempfile
import time
from past.utils import old_div
from textwrap import dedent

from schrodinger.infra import canvas
from schrodinger.utils import csv_unicode


[docs]def mktemp(): """ A simple wrapper to tempfile.mkstemp which closes the file and returns just the name of the file """ (fh, name) = tempfile.mkstemp() os.close(fh) return name
############# Canvas classes begin here ##################################
[docs]class CanvasFingerprintCluster(object): """ A class which handles clustering of canvas fingerprints. This maintains a list of the possible linkage types and keeps track of the current type of linkage specified """ LINKAGE_TYPES = [ "Single", "Complete", "Average", "Centroid", "McQuitty", "Ward", "Weighted Centroid", "Flexible Beta", "Schrodinger" ]
[docs] def __init__(self, logger): """ Initialize the instance of the cluster class """ self._logger = logger # Create a mapping between "short" names (as might be used # in a command line application) and the full linkage names. self.SHORT_LINKAGE_TYPES = [] self.SHORT_TO_LONG_NAMES = {} for linkage in self.LINKAGE_TYPES: # Convert spaces to underscores short = linkage.lower().replace(" ", "_") self.SHORT_LINKAGE_TYPES.append(short) self.SHORT_TO_LONG_NAMES[short] = linkage # Create a table which associates linkage types with # the names: self._linkage_types = {} self._linkage_types["Single"] = \ canvas.ChmHierarchicalClustering.SingleLinkage self._linkage_types["Complete"] = \ canvas.ChmHierarchicalClustering.CompleteLinkage self._linkage_types["Average"] = \ canvas.ChmHierarchicalClustering.GroupAverageLinkage self._linkage_types["Centroid"] = \ canvas.ChmHierarchicalClustering.UnweightedCentroidLinkage self._linkage_types["McQuitty"] = \ canvas.ChmHierarchicalClustering.WeightedAverageLinkage self._linkage_types["Ward"] = \ canvas.ChmHierarchicalClustering.WardsMinimumVarianceLinkage self._linkage_types["Weighted Centroid"] = \ canvas.ChmHierarchicalClustering.WeightedCentroidLinkage self._linkage_types["Flexible Beta"] = \ canvas.ChmHierarchicalClustering.FlexibleBetaLinkage self._linkage_types["Schrodinger"] = \ canvas.ChmHierarchicalClustering.SchrodingerLinkage # Set the default to Average self.setLinkage("Average") self._distance_matrix_file = None self._out_file_name = None self._int_file_name = None self._stat_file_name = None self._grp_file_name = None self._num_clusters = None self._distance_matrix_time = 0.0 self._cluster_time = 0.0 self._group_time = 0.0
def __del__(self): """ Destructor: cleanup and delete the temporary files """ self._logger = None gc.collect() self._deleteIfExists(self._distance_matrix_file) self._deleteIfExists(self._out_file_name) self._deleteIfExists(self._int_file_name) self._deleteIfExists(self._stat_file_name) self._deleteIfExists(self._grp_file_name) def _deleteIfExists(self, filename): """ A utility function which is used by the destructor which checks if a file exists and removes it if it does. """ if filename and os.path.exists(filename): os.unlink(filename)
[docs] def getDescription(self): """ Returns a string representing a summary of the current linkage settings """ desc = "%s" % (self._current_linkage) return desc
[docs] def debug(self, output): """ Wrapper for debug logging, just to simplify logging """ self._logger.debug(output)
[docs] def setLinkage(self, linkage): """ Set the current linkage based on the linkage name """ # Convert to the long name if necessary: name = self.SHORT_TO_LONG_NAMES.get(linkage, linkage) if (name in self.LINKAGE_TYPES): self.debug("FPClu - setting linkage to %s" % name) self._current_linkage = name else: raise Exception("Unknown cluster linkage name: %s" % linkage)
[docs] def getCurrentLinkage(self): """ Returns the current linkage definition """ return self._current_linkage
def _createTempFile(self, temp_file_name): """ If temp_file_name exists then remove it. Create a new temporary file name and return it. We force the '.tmp' extension to prevent the file name from from accidentally ending in, e.g., "gz" and tricking readers into handling this as a compressed file (see SHARED-7807) """ if (temp_file_name and os.path.exists(temp_file_name)): try: os.unlink(temp_file_name) except: pass (fh, name) = tempfile.mkstemp(suffix='.tmp') os.close(fh) return name
[docs] def clusterDM(self, dm_file_name): """ Cluster the distance matrix file given in dm_file_name, using similarity settings encapsulated in dp_sim. The value returned is the cluster strain. The dm_file_name should point to a CSV file containing the matrix """ self._distance_matrix_file = dm_file_name # Create a cluster object clusterHC = canvas.ChmHierarchicalClustering( self._linkage_types[self._current_linkage]) # We need ostream objects for the files created during clustering: self._out_file_name = self._createTempFile(self._out_file_name) self._int_file_name = self._createTempFile(self._int_file_name) self._stat_file_name = self._createTempFile(self._stat_file_name) self.debug("About to do clustering to %s" % self._out_file_name) # Create a distance matrix reader: pairwise_in = canvas.ChmPairwiseFPIn(self._distance_matrix_file, canvas.CSV) # Actually to the clustering start = time.perf_counter() self._strain = clusterHC.cluster( pairwise_in, self._out_file_name, self._int_file_name, self._stat_file_name, canvas.ChmHierarchicalClustering.UseEither, True) pairwise_in = None clusterHC = None self._cluster_time = float(time.perf_counter() - start) # Initialize the clustering stats lists as we want to force them # to be re-read if necessary: self._num_cluster_list = [] self._rsquared_list = [] self._semi_partial_rsquared_list = [] self._kelley_penalty_list = [] self._is_kelley_min_list = [] self._merge_distance_list = [] self._separation_ratio_list = [] # Initialize the grouping maps and statistics as we want to force # them to be re-read if self._cluster_map = {} self._cluster_order_map = {} self._cluster_contents = {} self._distance_to_centroid = {} self._is_nearest_to_centroid = {} self._is_farthest_from_centroid = {} self._max_distance_from_centroid = {} self._average_distance_from_centroid = {} self._cluster_variance = {} return self._strain
[docs] def generateDM(self, dm_file_name, fp_file, fp_gen, fp_sim): """ Generate a distance matrix of the specified filename from the finger print file fp_file. The fp_gen and fp_sim objects encapsulate the current fingerprint and similarity settings """ # Create a pairwise out object in order to write the distance # matrix file. How we do this depends on the number of bits # used for fingerprint generation: if fp_gen.getPrecision() == 32: pairwise_out = canvas.ChmPairwiseFPOut32(dm_file_name, False, fp_sim.getAlpha(), fp_sim.getBeta(), False, False) else: pairwise_out = canvas.ChmPairwiseFPOut64(dm_file_name, False, fp_sim.getAlpha(), fp_sim.getBeta(), False, False) # Set the similarity metric: pairwise_out.setMetric(fp_sim.getMetricStyle()) pairwise_out.setHumanReadable(True) # Process the input file pairwise_out.process(fp_file) pairwise_out = None
[docs] def clusterFP(self, fp_file, fp_gen, fp_sim): """ Cluster the fingerprints contained in fp_file. The bitsize will be taken from the CanvasFingerpintGenerator(). The similarity metric will be taken from the CanvasFingerprintSimilarity object fp_sim This function returns the 'strain' reported by the clustering """ # Distance matrix file only exists while we are running and # is not exposed to the user. Therefore use a temporary file # and remove any existing file before we create a new one self._distance_matrix_file = self._createTempFile( self._distance_matrix_file) # Create a pairwise out object in order to write the distance # matrix file. How we do this depends on the number of bits # used for fingerprint generation: pairwise_out = None if fp_gen.getPrecision() == 32: pairwise_out = canvas.ChmPairwiseFPOut32(self._distance_matrix_file, False, fp_sim.getAlpha(), fp_sim.getBeta(), False, False) else: pairwise_out = canvas.ChmPairwiseFPOut64(self._distance_matrix_file, False, fp_sim.getAlpha(), fp_sim.getBeta(), False, False) # Set the similarity metric: pairwise_out.setMetric(fp_sim.getMetricStyle()) pairwise_out.setHumanReadable(True) self.debug("About to do distance matrix gen to %s" % self._distance_matrix_file) # Process the input file start = time.perf_counter() pairwise_out.process(fp_file) self._distance_matrix_time = float(time.perf_counter() - start) pairwise_out = None return self.clusterDM(self._distance_matrix_file)
[docs] def group(self, num_clusters): """ Perform a grouping operation based on an existing clustering run. If the clustering has not actually been performed yet then an exception will be raised. """ # Test intermediate file exists and have exception if not: if self._int_file_name and os.path.exists(self._int_file_name): grouper = canvas.ChmClusterGrouper() self._grp_file_name = self._createTempFile(self._grp_file_name) self.debug("About to group to %s" % self._grp_file_name) pairwise_in = canvas.ChmPairwiseFPIn(self._distance_matrix_file, canvas.CSV) start = time.perf_counter() grouper.group(pairwise_in, self._int_file_name, self._grp_file_name, num_clusters, False, True) pairwise_in = None self._group_time = float(time.perf_counter() - start) grouper = None # Initialize the grouping maps and statistics as we want to force # them to be re-read if self._cluster_map = {} self._cluster_contents = {} self._distance_to_centroid = {} self._is_nearest_to_centroid = {} self._is_farthest_from_centroid = {} self._max_distance_from_centroid = {} self._average_distance_from_centroid = {} self._cluster_variance = {} else: raise Exception("Cannot perform grouping before clustering") return
[docs] def getMatrixTime(self): """ Returns the time required for distance matrix generation """ return self._distance_matrix_time
[docs] def getClusterTime(self): """ Returns the time required for clustering """ return self._cluster_time
[docs] def getGroupTime(self): """ Returns the time required for group creation """ return self._group_time
[docs] def getClusteringMap(self): """ Once grouping has been done this method may be called to return a dictionary where the keys represent the original fingerprint IDs (usually the position of the structure in the file or the entry ID) and the values are the cluster this structure belongs to """ if len(self._cluster_map) == 0: self._readGroupFile() return self._cluster_map
[docs] def getClusterContents(self): """ Once grouping has been done this method may be called to return a dictionary where the keys represent the cluster number and the values are a list of ID (usually position in the file or entry ids) """ if len(self._cluster_contents) == 0: self._readGroupFile() return self._cluster_contents
def _readGroupFile(self): """ A private method which reads the group file and extracts the cluster membership and per-cluster statistics """ self._cluster_map = {} self._cluster_contents = {} self._distance_to_centroid = {} self._is_nearest_to_centroid = {} self._is_farthest_from_centroid = {} self._max_distance_from_centroid = {} self._average_distance_from_centroid = {} self._cluster_variance = {} if (self._grp_file_name and os.path.exists(self._grp_file_name)): with csv_unicode.reader_open(self._grp_file_name) as grp_file: csv_reader = csv.DictReader(grp_file) for row in csv_reader: clu_num = int(row["ClusterNumber"]) item = row["ItemLabel"] if clu_num not in self._cluster_contents: self._cluster_contents[clu_num] = [] self._cluster_contents[clu_num].append(item) self._cluster_map[item] = int(row["ClusterNumber"]) self._distance_to_centroid[item] = float( row["DistanceToCentroid"]) if row["IsNearestToCentroid"] == "1": self._is_nearest_to_centroid[item] = True else: self._is_nearest_to_centroid[item] = False if row["IsFarthestFromCentroid"] == "1": self._is_farthest_from_centroid[item] = True else: self._is_farthest_from_centroid[item] = False self._max_distance_from_centroid[item] = float( row["MaxDistanceFromCentroid"]) self._average_distance_from_centroid[item] = float( row["AvgDistanceFromCentroid"]) self._cluster_variance[item] = float(row["ClusterVariance"]) else: raise Exception( "Grouping must be performed before the group informatin can be accessed. " )
[docs] def getDistanceToCentroid(self, item): """ For a given item in the most recent cluster grouping return the distance to the centroid of the cluster which contains this item """ if len(self._distance_to_centroid) == 0: self._readGroupFile() return self._distance_to_centroid[item]
[docs] def getIsNearestToCentroid(self, item): """ For a given item in the most recent cluster grouping return a boolean value which indicates whether the item is nearest the centroid """ if len(self._is_nearest_to_centroid) == 0: self._readGroupFile() return self._is_nearest_to_centroid[item]
[docs] def getIsFarthestFromCentroid(self, item): """ For a given item in the most recent cluster grouping return a boolean value which indicates whether the item is nearest the centroid """ if len(self._is_farthest_from_centroid) == 0: self._readGroupFile() return self._is_farthest_from_centroid[item]
[docs] def getMaxDistanceFromCentroid(self, item): """ For a given item in the most recent cluster grouping return the maximum distance to the centroid for any item in the cluster """ if len(self._max_distance_from_centroid) == 0: self._readGroupFile() return self._max_distance_from_centroid[item]
[docs] def getAverageDistanceFromCentroid(self, item): """ For a given item in the most recent cluster grouping return the average distance to the centroid for any item in the cluster """ if len(self._average_distance_from_centroid) == 0: self._readGroupFile() return self._average_distance_from_centroid[item]
[docs] def getClusterVariance(self, item): """ For a given item return the variance of the cluster which that item belongs to. """ if len(self._cluster_variance) == 0: self._readGroupFile() return self._cluster_variance[item]
[docs] def getBestNumberOfClusters(self): """ The cluster statistics file contains information about each clustering level. This function returns the number of clusters at which the Kelley function has a minimum """ best_cluster = 0 if len(self._is_kelley_min_list) == 0: self._readClusterStatistics() best_cluster = self._is_kelley_min_list.index(1) + 1 return best_cluster
def _readClusterStatistics(self): """ A private method which reads the cluster statistics from the stat file and fills up the internal lists """ self._num_cluster_list = [] self._rsquared_list = [] self._semi_partial_rsquared_list = [] self._kelley_penalty_list = [] self._is_kelley_min_list = [] self._merge_distance_list = [] if (self._stat_file_name and os.path.exists(self._stat_file_name)): with csv_unicode.reader_open(self._stat_file_name) as stat_file: csv_reader = csv.DictReader(stat_file) for row in csv_reader: self._num_cluster_list.append(int(row["NumberOfClusters"])) self._rsquared_list.append(float(row["R-Squared(RSQ)"])) self._semi_partial_rsquared_list.append( float(row["Semipartial R-Squared(SPRSQ)"])) self._kelley_penalty_list.append( float(row["Kelley Penalty"])) kelley_min = row["IsKelleyGlobalMinimum"] if kelley_min: self._is_kelley_min_list.append(int(kelley_min)) else: self._is_kelley_min_list.append(0) merge_distance = row["MergeDistance"] if merge_distance: self._merge_distance_list.append(float(merge_distance)) else: self._merge_distance_list.append(0.0) else: raise Exception( "Clustering must be performed before best number of clusters may be accessed." )
[docs] def getNumberOfClustersList(self): """ Returns the number of clusters at each level """ if len(self._num_cluster_list) == 0: self._readClusterStatistics() return self._num_cluster_list
[docs] def getRSquaredList(self): """ Returns the r-squared value at each clustering level """ if len(self._rsquared_list) == 0: self._readClusterStatistics() return self._rsquared_list
[docs] def getSemiPartialRSquaredList(self): """ Returns the semi-partial R-squared value at each clustering level """ if len(self._semi_partial_rsquared_list) == 0: self._readClusterStatistics() return self._semi_partial_rsquared_list
[docs] def getKelleyPenaltyList(self): """ Returns the Kelley Penalty value at each clustering level """ if len(self._kelley_penalty_list) == 0: self._readClusterStatistics() return self._kelley_penalty_list
[docs] def getMergeDistanceList(self): """ Returns the merge distance value at each clustering level """ if len(self._merge_distance_list) == 0: self._readClusterStatistics() return self._merge_distance_list
[docs] def getSeparationRatioList(self): """ Returns the separation ratio - calculated from the merge distance of """ if len(self._merge_distance_list) == 0: self._readClusterStatistics() if len(self._separation_ratio_list) == 0: self._separation_ratio_list.append(1.0) for i in range(1, len(self._merge_distance_list)): a = self._merge_distance_list[i - 1] b = self._merge_distance_list[i] if a > 0.0001 and b > 0.00001: self._separation_ratio_list.append(old_div(a, b)) else: self._separation_ratio_list.append(1.0) return self._separation_ratio_list
def _getLine(self, x1, x2, y1, y2): """ A private function which takes the input parameters and returns [[x1,x2],[y1,y2]]. This is used in dendrogram generation """ x = [] y = [] x.append(x1) x.append(x2) y.append(y1) y.append(y2) line = [] line.append(x) line.append(y) return line
[docs] def getDendrogramData(self): """ Returns a tuple with 1) a list of line positions, each in the form [x1,x2][y1,y2] each one of which defines a line segment to be plotted in a dendrogram 2) a list of x-axis tick positions 3) a list of x-axis tick labels """ if self._int_file_name and os.path.exists(self._int_file_name): # Calculate a grouping for a single cluster to get the # cluster order n = len(self.getNumberOfClustersList()) self.group(n) # Read the cluster order out of the group file and # use that to associate cluster number with the X-axis # on the dendrogram: clu_index_map = {} x_axis_ticks = [] x_axis_tick_labels = [] with csv_unicode.reader_open(self._grp_file_name) as grp_file: csv_reader = csv.DictReader(grp_file) for row in csv_reader: clu_index_map[row["ItemLabel"]] = int(row["ClusterNumber"]) x_axis_ticks.append(float(row["ClusterNumber"])) x_axis_tick_labels.append(row["ItemLabel"]) # Now read the .out file. The data is read into a map # representing the level. Each level will have # a "Distance" value and a L1 and C1 value, a L1 and L2 # or a C1 C2 depending on whether two items are being formed # into a single cluster (L1+L2), a single item is joining # an existing cluster (L1+C1) or two clusters are joining # (C1+C2) levels = [] cnt = 0 level = {} with open(self._out_file_name) as fh: for row in fh: if cnt == 3: levels.append(level) level = {} cnt = 0 lne = row.rstrip() fields = lne.split(" ") if fields[0] == "L": current_line = int(clu_index_map[fields[1]]) if "L1" in level: level["L2"] = current_line else: level["L1"] = current_line elif fields[0] == "C": if "C1" in level: level["C2"] = int(fields[1]) else: level["C1"] = int(fields[1]) else: level["Distance"] = (float(fields[1])) cnt += 1 levels.append(level) lines = [] # Now draw generate the lines: mid_points = {} for i, lvl in enumerate(levels): d = lvl["Distance"] if "L1" in lvl: l1 = lvl["L1"] # Generate vertical line from 0.0 to the distance at x=l1 line = [] line = self._getLine(l1, l1, 0.0, d) lines.append(line) if "L2" in lvl: # Generate a vertical line from 0.0 to the distance # at x=l2 l2 = lvl["L2"] line = self._getLine(l2, l2, 0.0, d) lines.append(line) m = (l1 + l2) * 0.5 mid_points["%d" % i] = m # In the special case of an L2 then we need to # draw a horizontal line between l1 and l2 at # d in order to indicate the first cluster linkage: line = [] line = self._getLine(l1, l2, d, d) lines.append(line) if "C1" in lvl: # How we do the linking depends on if it's a single # item to cluster merge or the merge of two clusters: if "C2" in lvl: # Two cluster merge c1 = lvl["C1"] d1 = levels[c1]["Distance"] c2 = lvl["C2"] d2 = levels[c2]["Distance"] m1 = mid_points["%d" % c1] m2 = mid_points["%d" % c2] # Draw a vertical line at the midpoint of c1 to the # current clustering distance: line = [] line = self._getLine(m1, m1, d1, d) lines.append(line) # Draw a vertical line at the midpoint of c2 to # the current clustering distance: line = [] line = self._getLine(m2, m2, d2, d) lines.append(line) # Draw a horizontal line which connects the two # points at the current distance line = [] line = self._getLine(m1, m2, d, d) lines.append(line) # Calculate the midpoint for this level: m = (m1 + m2) * 0.5 mid_points["%d" % i] = m else: c = lvl["C1"] d0 = levels[c]["Distance"] m = mid_points["%d" % c] # Cluster linkage. Draw a horizontal line from the # current line to the previous midpoint at d line = [] line = self._getLine(l1, m, d, d) lines.append(line) # Draw a vertical line from d0 to d at the previous # midpoint: line = self._getLine(m, m, d0, d) lines.append(line) # Calculate a new midpoint for this level m = (l1 + m) * 0.5 mid_points["%d" % i] = m else: raise Exception("Dendrogram data only available after clustering") return (lines, x_axis_ticks, x_axis_tick_labels)
[docs] def getDistanceMatrixFile(self): """ Returns the name of the distance matrix file used in the most recent clustering """ if self._distance_matrix_file is None: raise Exception( "Clustering must be performed before the distance matrix file can be accessed" ) return self._distance_matrix_file
[docs] def getClusterOrderMap(self, num_clusters): """ Returns a dictionary where the keys are the item labels and the values represent the index it would have in the grouping which places the items in cluster order """ if len(self._cluster_order_map) == 0: # Calculate a grouping to get the cluster order: self.group(num_clusters) # Read the cluster order out of the group file with csv_unicode.reader_open(self._grp_file_name) as grp_file: csv_reader = csv.DictReader(grp_file) for i, row in enumerate(csv_reader): self._cluster_order_map[row["ItemLabel"]] = i return self._cluster_order_map
############# Command line specific classes start here:
[docs]class CanvasFingerprintClusterCLI(CanvasFingerprintCluster): """ A subclass of the canvas fingerprint cluster manager which is to be used from a program with a command line interface. This class has methods for defining options in an option parser and for applying those options once they've been parsed. The idea is to provide a standard command line interface for setting the clustering options """
[docs] def __init__(self, logger): super(CanvasFingerprintClusterCLI, self).__init__(logger)
[docs] def addOptions(self, parser): """ Add options for cluster linkage """ parser.add_argument("-clu_linkage", action="store", type=str, choices=self.SHORT_LINKAGE_TYPES, metavar="<linkage type>", default='average', help="Cluster linkage method") parser.add_argument("-clu_num_clusters", action="store", metavar="<num>", help="Number of clusters to create")
[docs] def parseOptions(self, options): """ Examine the options and set the internal state to reflect them """ self.setLinkage(options.clu_linkage)
[docs] def getLinkageDescription(self): """ Return a string which contains a description of the linkage methods available for cluster linkage """ desc = """ Linkage types used in Canvas Hierarchical Clustering -------------------------------------------------------- Linkage is a term to describe how the distance between two clusters are defined. In the following definition, distance refers to a pair of objects, one from each cluster. single: Uses the closest pair, aka nearest neighbor technique. complete: Uses the most distant pair, aka farthest neighbor technique. average: Uses the average distance between all pairs. centroid: Uses the Euclidean distance between the centroid of the two clusters. mcquitty: Uses a simple average between two clusters ward: Uses inner squared distance (minimum-variance technique). weightedcentroid: Weighted center of mass distance, aka median. flexiblebeta: Another centroid-based variant due to Lance and Williams with beta=0.25. schrodinger: Uses the closest terminal distance between two clusters. """ return dedent(desc)