Source code for schrodinger.pipeline.stages.combine

# -*- coding: utf-8 -*-
"""
Core stages for Pipeline mechanism.

CombineStage
  Stage for combining multiple structure PipeIO sets into one PipeIO set
  and labelling each ligand by the input set from which it came.

DataFusionMergeStage
  Stage for merging results in a Data Fusion workflow.

Copyright Schrodinger, LLC. All rights reserved.

"""

import sys
from past.utils import old_div

import numpy

from schrodinger import structure
from schrodinger.pipeline import pipeio
from schrodinger.pipeline import pipeutils
from schrodinger.pipeline import stage

# Properties written out by the scoring methods:
PHASE_SHAPE_SCORE_PROP = "r_phase_Shape_Sim"
CANVAS_2D_SCORE_PROP = "r_canvas_FP_Similarity"
GLIDE_SCORE_PROP = "r_i_docking_score"

# Properties that are written by the DataFusionMergeStage:
ZSCORE_FINGERPRINT_PROP = "r_datafusion_Z-score_Fingerprint"
ZSCORE_SHAPE_PROP = "r_datafusion_Z-score_Shape"
ZSCORE_DOCKING_PROP = "r_datafusion_Z-score_Docking"
ZSCORE_AVERAGE_PROP = "r_datafusion_Z-score_Average"


[docs]class CombineStage(stage.Stage): """ Stage for combining multiple Structures objects into one. Source object can be labeled by supplying LABEL keywords The keywords specific to this stage are... LABELFIELD An optional property added to each ouput structure that holds the label for the input set whence it came. LABELS List of labels for the input sets. The stage takes up to ten input structure files sets and generates one output structure file set. """
[docs] def __init__(self, *args, **kwargs): """ See class docstring. """ specs = """ LABELFIELD = string(default="") LABELS = string_list(default=None) """ stage.Stage.__init__(self, specs=specs, *args, **kwargs) for i in range(1, 10): # Expects up to 10 input sets # Type of an object to expect self.addExpectedInput(i, "structures", i <= 1) self.addExpectedOutput(1, "structures", True)
[docs] def operate(self): """ Combine all the input files from all input sets into one set, optionally labelling each structure according to the set from which it originated. Raises a RuntimeError if there is a problem reading an input file or writing an output file. """ labelfield = self['LABELFIELD'] labels = self['LABELS'] inputs = {} # key: input number, value: list of files counts = {} # key: input number, value: number of structures (or None) i = 0 while True: i += 1 inputobj = self.getInput(i) if not inputobj: break # No more inputs ligfiles = inputobj.getFiles() # may return None (not calculated) input_count = inputobj.getCount() inputs[i] = ligfiles counts[i] = input_count if labels and labelfield: if len(labels) != len(inputs): msg = "ERROR: Number of LABELS (%i) != number of INPUTs (%i)" % \ (len(labels), len(inputs)) self.exit(msg) # Exit the stage do_label = True else: do_label = False writer = structure.MultiFileStructureWriter(self.getOutputName(1), '.maegz') for i, ligfiles in inputs.items(): if do_label: label = labels[i - 1] # 0-indexed self.info("Labeling ligands with: %s" % label) self.info("Combining ligand files...") input_count = counts[i] # May be None dp = pipeutils.DotPrinter(input_count) for ligfile in ligfiles: for st in structure.StructureReader(ligfile): dp.dot() if do_label: st.property[labelfield] = label writer.append(st) writer.close() st_num = writer.getNumStructures() combined_files = writer.getFiles() self.info("Number of structures processed: %i" % st_num) msg = "\nCombined files: %s" % ' '.join(combined_files) self.info(msg) sys.stdout.flush() self.setOutput(1, pipeio.Structures(combined_files, st_num)) return
[docs]class DataFusionMergeStage(stage.Stage): """ This stage takes in results from a Glide docking job, Phase Shape job, and a Canvas 2D fingerprint job and combines them based on the Z-score for each method. This stage is used by Data Fusion workflow (data_fusion_backend.py). Z-score = the number of standard deviations above or below the mean of the distribution of scores for that method. Final score = For each compound, the sum of z-scores for the 3 methods. Compounds will be sorted by the final score in the output. STAGE Merge Results Input 1: Docking PV file Input 2: Phase Shape output file Input 3: Canvas 2D Fingerprints output file Output: Merged Maestro file of compounds sorted by the final score. Properties included in the output: 1. Glide score 2. Phase Shape score 3. Similarity score 4. Consensus/final score (average of z-scores) WARNING: This stage assumes that only one ligand with the same UNIQUEFIELD exists in each input set. """
[docs] def __init__(self, *args, **kwargs): """ Creates the stage instance, and passes the <args> and <kwargs> to the stage.Stage's constructor. """ specs = """ TOP_N = integer(default=2) # Use only the top N scores for each ligand when combining Z-scores to compute the average. UNIQUEFIELD = string(default="s_m_title") # Property to uniquely identify compounds by. NREPORT = integer() # Number of best compounds to report """ stage.Stage.__init__(self, specs=specs, *args, **kwargs) # Expects up to 3 input sets (each is required): self.addExpectedInput(1, "structures", True) self.addExpectedInput(2, "structures", True) self.addExpectedInput(3, "structures", True) # Output pin #1 must be of type "structures" and is always produced: self.addExpectedOutput(1, "structures", True)
[docs] def operate(self): """ The only overridden & required method in this class. Called by the Pipeline to run this stage's main code. """ hits_files = [] glide_results_files = self.getInput(1).getFiles() shape_results_files = self.getInput(2).getFiles() canvas_results_files = self.getInput(3).getFiles() jobname = self.genFileName() merged_file = self.genOutputFileName(1, extension='.maegz') top_n = self["TOP_N"] uniquefield = self["UNIQUEFIELD"] try: nreport = int(self["NREPORT"]) except ValueError: msg = "ERROR: Invalid NREPORT value specified (must be an integer)" raise RuntimeError(msg) if top_n < 1 or top_n > 3: raise RuntimeError( "Invalid TOP_N value specified (must be between 1 and 3)") merged_writer = structure.StructureWriter(merged_file) num_read = 0 num_written = 0 self.info("Reading Phase Shape results...") shape_scores_by_compound = {} reader = structure.MultiFileStructureReader(shape_results_files, skip_receptors=False) for st_num, st in enumerate(reader, start=1): # Get compound code: try: compound_id = pipeutils.read_unique_field(st, uniquefield) except KeyError: err = "No field %s in structure #%i (file %s)!" % \ (uniquefield, reader._curr_file_st_num, reader._curr_file) raise RuntimeError(err) # Get the Phase Shape score: try: phase_score = st.property[PHASE_SHAPE_SCORE_PROP] except KeyError: err = "No field %s in structure #%i (file %s)!" % \ (PHASE_SHAPE_SCORE_PROP, reader._curr_file_st_num, reader._curr_file) raise RuntimeError(err) # FIXME what if multiple ligands exist with this ID???? shape_scores_by_compound[compound_id] = phase_score self.info("Number of unique compounds in Phase Shape results: %i" % len(shape_scores_by_compound)) self.info("Reading Canvas Fingerprint results...") canvas_scores_by_compound = {} reader = structure.MultiFileStructureReader(canvas_results_files, skip_receptors=False) for st_num, st in enumerate(reader, start=1): # Get compound code: try: compound_id = pipeutils.read_unique_field(st, uniquefield) except KeyError: err = "No field %s in structure #%i (file %s)!" % \ (uniquefield, reader._curr_file_st_num, reader._curr_file) raise RuntimeError(err) # Get the similarity score: try: canvas_score = st.property[CANVAS_2D_SCORE_PROP] except KeyError: err = "No field %s in structure #%i (file %s)!" % \ (CANVAS_2D_SCORE_PROP, reader._curr_file_st_num, reader._curr_file) raise RuntimeError(err) # FIXME what if multiple ligands exist with this ID???? canvas_scores_by_compound[compound_id] = canvas_score self.info("Number of unique compounds in Canvas results: %i" % len(canvas_scores_by_compound)) self.info("Reading Glide Docking results...") glide_scores_by_compound = {} reader = structure.MultiFileStructureReader(glide_results_files, skip_receptors=True) for st_num, st in enumerate(reader, start=1): # Get compound code: try: compound_id = pipeutils.read_unique_field(st, uniquefield) except KeyError: err = "No field %s in structure #%i (file %s)!" % \ (uniquefield, reader._curr_file_st_num, reader._curr_file) raise RuntimeError(err) # Get the docking score: try: glide_score = st.property[GLIDE_SCORE_PROP] except KeyError: err = "No field %s in structure #%i (file %s)!" % \ (GLIDE_SCORE_PROP, reader._curr_file_st_num, reader._curr_file) raise RuntimeError(err) # FIXME what if multiple ligands exist with this ID???? glide_scores_by_compound[compound_id] = glide_score self.info("Number of unique compounds in Glide results: %i" % len(glide_scores_by_compound)) # Compound IDs of ligands that had no docked poses: ligands_with_no_poses = (set(shape_scores_by_compound.keys()) \ | set(canvas_scores_by_compound.keys())) \ - set(glide_scores_by_compound.keys()) if ligands_with_no_poses: self.warning("WARNING: %i ligands had no docked poses" % \ len(ligands_with_no_poses)) # A set of all compound IDs: all_compound_ids = (set(shape_scores_by_compound.keys()) \ | set(canvas_scores_by_compound.keys())) \ | set(glide_scores_by_compound.keys()) self.info("Calculating z-scores...") # FIXME what to do for <None> scores? shape_zscores_by_compound = self.calcZScores( shape_scores_by_compound) # FIXME canvas_zscores_by_compound = self.calcZScores( canvas_scores_by_compound) # FIXME glide_zscores_by_compound = self.calcZScores(glide_scores_by_compound, True) # FIXME make sure we include ligands that have no Glide poses in merged output!!!!!!!!!! # Calculate the consensus scores self.info("Calculating consensus scores...") consensus_scores = {} for compound_id in all_compound_ids: shape_zscore = shape_zscores_by_compound.get(compound_id) canvas_zscore = canvas_zscores_by_compound.get(compound_id) glide_zscore = glide_zscores_by_compound.get(compound_id) # Use only scores from methods which produced hits, and only # from methods that produced 2 or more hits (otherwise the # z-score can't be calculated and is set to <None>): scores = [] if shape_zscore is not None: scores.append(shape_zscore) if canvas_zscore is not None: scores.append(canvas_zscore) if glide_zscore is not None: scores.append(glide_zscore) # Consensus score is ave of the best "top_n" (default=2) Z-scores: consensus_score = self.calcConsensusScore(scores, top_n) consensus_scores[compound_id] = consensus_score self.info("Scored %d compounds" % len(consensus_scores)) # Keep only the best number of compounds that was requested: decorated_list = [ (score, compound) for compound, score in consensus_scores.items() ] decorated_list.sort(reverse=True) # Higher scores first keep_compounds = set() for score, compound in decorated_list[:nreport]: keep_compounds.add(compound) self.info("Keeping the best %d compounds" % len(keep_compounds)) # No read the structures in this order (as available): # 1. Glide poses # 2. Shape results # FIXME current line #1 is supported written_compounds = set() def merge_compound(st): # Get compound code: compound_id = pipeutils.read_unique_field(st, uniquefield) if compound_id in keep_compounds and compound_id not in written_compounds: zscore = shape_zscores_by_compound.get(compound_id) if zscore is not None: st.property[ZSCORE_SHAPE_PROP] = zscore zscore = canvas_zscores_by_compound.get(compound_id) if zscore is not None: st.property[ZSCORE_FINGERPRINT_PROP] = zscore zscore = glide_zscores_by_compound.get(compound_id) if zscore is not None: st.property[ZSCORE_DOCKING_PROP] = zscore st.property[ZSCORE_AVERAGE_PROP] = consensus_scores[compound_id] merged_writer.append(st) written_compounds.add(compound_id) return True return False # It's preferable to have write the Glide poses out first: self.info("Writing merged from glide poses file...") reader = structure.MultiFileStructureReader(glide_results_files, skip_receptors=True) num_written = 0 for st in reader: if merge_compound(st): num_written += 1 self.info(" wrote %i compounds" % num_written) # It may be that not all ligands have Glide poses, so read the other # files also: if len(written_compounds) < len(keep_compounds): self.info("Writing merged from shape results file...") num_written = 0 reader = structure.MultiFileStructureReader(shape_results_files) for st in reader: if merge_compound(st): num_written += 1 self.info(" wrote %i compounds" % num_written) if len(written_compounds) < len(keep_compounds): self.info("Writing merged from canvas results file...") reader = structure.MultiFileStructureReader(canvas_results_files) num_written = 0 for st in reader: if merge_compound(st): num_written += 1 self.info(" wrote %i compounds" % num_written) merged_writer.close() total_num_written = len(written_compounds) self.info("Number of merged compounds: %i" % total_num_written) if total_num_written == 0: self.error("ERROR: No hits are present in the output") sys.exit(1) self.info("Merged file: %s" % merged_file) # Set the output #1: self.setOutput(1, pipeio.Structures([merged_file], total_num_written))
[docs] def calcZScores(self, scores_by_compound, more_negative_is_better=False): """ Takes in a dictionary where keys are compound IDs, and values are scores, and returns a dict of z-scores (keys are compound IDs also). Z-score is calculated by:: z-score = (score-average) / std-deviations If the number of compounds is 1, then the z-score will be <None>. :param scores_by_compound: Dictionary of scores (e.g. Glide score) keyed by compound ID. :type scores_by_compound: dict :param more_negative_is_better: If set to True, more negative scores are considered to be better, and will result in higher Z-scores. :type more_negative_is_better: bool :return: Z-Scores, in a dictionary keyed by compound ID. :rtype: dict """ if more_negative_is_better: # Invert the scores PYAPP-6534 scores_by_compound = { compound: -score for compound, score in scores_by_compound.items() } ave = numpy.average(list(scores_by_compound.values())) std = numpy.std(list(scores_by_compound.values())) zscores_dict = {} for compound, score in scores_by_compound.items(): if std == 0: # There was only one ligand zscore = 0 print( "WARNING: z-score could not be calculated because there is only one ligand (setting to zero)" ) else: zscore = old_div((score - float(ave)), float(std)) zscores_dict[compound] = zscore return zscores_dict
[docs] def calcConsensusScore(self, scores, top_n): """ Given a list of scores, select the best <top_n> of them, and calculate their average. """ # Keep the top_n "best" (highest) Z-scores: top_scores = sorted(scores, reverse=True)[:top_n] ave = numpy.average(top_scores) return ave
#EOF