Source code for schrodinger.application.phase.shape_screen_reporter.tasks

import os
from typing import Set
from typing import List
from typing import Tuple
from typing import Optional

from hit_analysis_gui_dir import label_property_utils

from schrodinger import structure
from schrodinger.models import parameters
from schrodinger.tasks import jobtasks
from schrodinger.tasks import tasks
from schrodinger.utils import fileutils
from schrodinger.infra import phase

from schrodinger.application.phase.shape_screen_reporter import prop_utils
from schrodinger.application.phase.shape_screen_reporter import task_utils

ORDER_DB_CHECK = 0


class _AbstractShapeScreenReporterTaskMixin(parameters.CompoundParamMixin):
    """
    Abstract task to run shape_screen_reporter.

    Subclasses must mix this class into a Task subclass and define `MODE`. MODE
    must be one of the modes of shape_screen_reporter. Subclasses must also
    define getDBFile in order to specify the path to the database file used in
    makeCmd.

    Subclasses may override _getCmdOptions to add additional command options.

    The .vsdb file will be added to the command as `self.name` + .vsdb
    """

    CMD = "shape_screen_reporter"
    MODE = NotImplemented

    def makeCmd(self):
        cmd = [self.CMD, self.MODE]
        if not jobtasks.is_jobtask(self):
            cmd.append('-NOJOBID')
        cmd.extend(self._getCmdOptions())
        cmd.append(self.getDBFile())
        return cmd

    def _getCmdOptions(self):
        return []

    def getDBFile(self) -> str:
        """
        Return the path to the screening database file.
        """
        raise NotImplementedError


[docs]class CreateScreeningDBTask(_AbstractShapeScreenReporterTaskMixin, jobtasks.CmdJobTask): """ Task to create a virtual screening database. Adds a label property to the newly created DB in a postprocessor. """ MODE = "create"
[docs] class Input(parameters.CompoundParam): hits_file: tasks.TaskFile query_file: tasks.TaskFile = None props: List[str] label_prop: str
@tasks.preprocessor(tasks.AFTER_TASKDIR) def _checkInput(self): if self.input.label_prop and (self.input.label_prop not in self.input.props): return False, ("Expected the specified label property to be one of " "the included properties") if not self.input.hits_file: return False, "Hits file must be specified" if not os.path.isfile(self.input.hits_file): return False, f"Specified hits file {self.input.hits_file} does not exist" is_st_file = fileutils.get_structure_file_format( self.input.hits_file) is not None if not is_st_file: return False, ("Expected the specified hit file to be a structure" "file") @tasks.preprocessor(tasks.AFTER_TASKDIR + 5) def _checkQueryFile(self): if self.input.query_file is None: return if not os.path.isfile(self.input.query_file): return False, (f"Specified query file {self.input.query_file} does" "not exist") is_st_file = fileutils.get_structure_file_format( self.input.query_file) is not None if not is_st_file: return False, ("Expected the specified query file to be a structure" "file") def _getCmdOptions(self): opts = ["-hits", self.input.hits_file] if self.input.query_file: opts.extend(['-query', self.input.query_file]) if self.input.props: opts.extend(['-props', ','.join(self.input.props)]) return opts @tasks.postprocessor(ORDER_DB_CHECK) def _checkDB(self): db_file = self.getDBFile() # Check that DB file exists in the (auto-populated) output files if db_file not in self.output.output_files: return False, "Did not find expected database file {db_file}" @tasks.postprocessor(ORDER_DB_CHECK + 1) def _writeLabelPropertyToDB(self): """ Write the specified label property to the output DB file. """ label_property_utils.set_label_property(self.input.label_prop, self.getDBFile())
[docs] def getDBFile(self) -> str: """ @overrides: _AbstractShapeScreenReporterTaskMixin """ return self.getTaskFilename(f"{self.name}.vsdb")
[docs]class FilterDBTask(tasks.ComboSubprocessTask): """ Task to run filtering on a virtual screening database. """
[docs] class Input(parameters.CompoundParam): screening_db_file: tasks.TaskFile property_filters: List[Tuple] sel_features: Set[str] diverse_fraction: float # [0.0, 0.2]
@tasks.preprocessor def _checkAtLeastOneFilter(self): """ Confirm at least one filter is supplied. """ at_least_one_filter = any( (self.input.property_filters, self.input.sel_features, self.input.diverse_fraction)) if not at_least_one_filter: return False, 'No valid filters are provided.' @tasks.preprocessor def _checkDiverseFraction(self): """ Confirm the diverse fraction is in an acceptable range. The backend supports the range (0, 0.2], but this task will consider a fraction of 0 to indicate that no diverse filtering should be performed. """ if self.input.diverse_fraction < 0 or self.input.diverse_fraction > 0.2: msg = ('The diverse fraction is expected to be in the range ' f'[0, 0.2], but instead got {self.input.diverse_fraction}') return False, msg @tasks.preprocessor(tasks.AFTER_TASKDIR) def _checkDB(self): db_file = self.input.screening_db_file if not os.path.isfile(db_file): return False, f"Did not find expected database file {db_file}"
[docs] def mainFunction(self): """ Create the shape screening reporter and filter based on various criteria. Write the filtered structs to the hits file. """ reporter = phase.PhpShapeScreenReporter(self.input.screening_db_file) filtered_row_nums = self._runAllFiltering(reporter) structs = task_utils.get_structures(reporter, filtered_row_nums) with structure.StructureWriter( self.getTaskFilename(f'{self.name}.maegz')) as writer: writer.extend(structs)
[docs] def getDBFile(self) -> str: """ @overrides: _AbstractShapeScreenReporterTaskMixin """ return self.getTaskFilename(self.input.screening_db_file)
[docs] def getHitsFile(self) -> Optional[str]: """ Return the path to the output hits file or None if the taskdir has not yet been created. """ if not os.path.exists(self.getTaskDir()): return None return self.getTaskFilename(f'{self.name}.maegz')
def _runAllFiltering(self, reporter: phase.PhpShapeScreenReporter) -> List[int]: """ Return a subset of row numbers representing hits from the reporter based on several criteria. The subset may possibly be equal to the full set of hits if filter criteria are very broad. :param reporter: Shape screen reporter to use for filtering. """ filtered_row_nums = task_utils.run_filters( reporter, property_filter=self._getPropFilter(), diverse_fraction=self._getDiverseFraction()) if self.input.sel_features: filtered_row_nums = self._runPharmaFeatureFilter( reporter, filtered_row_nums) return filtered_row_nums def _getPropFilter(self) -> Optional[phase.PhpDbPropertyQuery]: """ Return a property filter or None if no filters were specified in the input. """ prop_filters = [] for prop_tuple in self.input.property_filters: prop_filters.append(','.join(str(item) for item in prop_tuple)) if prop_filters: return prop_utils.read_property_filters(prop_filters) return None def _getDiverseFraction(self) -> Optional[float]: """ Return the diverse fraction or None if the diverse fraction is zero. """ if self.input.diverse_fraction != 0: return self.input.diverse_fraction return None def _runPharmaFeatureFilter(self, reporter: phase.PhpShapeScreenReporter, row_nums: List[int]) -> List[int]: """ Return a subset of row numbers representing hits from the reporter based on matching pharmacophore features. :param reporter: Shape screen reporter to use for filtering :param row_nums: Subset of rows to filter :return: the row numbers of entries meet the filter requirements. """ hypo = task_utils.create_user_hypo(reporter, list(self.input.sel_features)) min_sites = len(self.input.sel_features) results = reporter.screenSitesInPlace(hypo, min_sites, row_nums) return [result.row_number for result in results]