Source code for schrodinger.application.phase.shape_screen_reporter.task_utils

"""
Performs task-based work for generating and using shape screen report
databases.

Copyright Schrodinger LLC, All Rights Reserved.
"""

import logging
import sys
import time
import zipfile
from collections import defaultdict

from schrodinger import structure
from schrodinger.application import report2d
from schrodinger.application.pathfinder import molio
from schrodinger.application.phase.shape_screen_reporter import option_utils
from schrodinger.application.phase.shape_screen_reporter import prop_utils
from schrodinger.infra import canvas2d
from schrodinger.infra import mm
from schrodinger.infra import phase
from schrodinger.job import jobcontrol
from schrodinger.Qt import QtPrintSupport
from schrodinger.Qt.QtCore import QRect
from schrodinger.structutils import filter
from schrodinger.thirdparty import rdkit_adapter
from schrodinger.utils import fileutils

CLUSTER_FEATURES_PROP = 's_phase_Cluster_Features'
CLUSTER_SCAFFOLD_PROP = 's_phase_Cluster_Scaffold'


[docs]def check_user_query(reporter, user_features, user_tol=None, user_rules=None): """ Returns a non-empty error message if any of the supplied query data are illegal. :param reporter: Connection to a shape screen report database :type reporter: phase.PhpShapeScreenReporter :param user_features: Feature names with 1-based numbering, e.g., ['A1', 'D5', 'H9', 'R12'] :type subset: list(str) :param user_tol: Dictionary of <feature>:<tol> pairs, where <feature> consists of a feature name and <tol> is the matching tolerance, e.g., {'A1': 1.5, 'R12', 2.5}. :type user_tol: dict{str, float} or NoneType :param user_rules: Dictionary of <feature>:<permitted> pairs, where <feature> consists of a feature name and <permitted> contains the first character of <feature>, followed by any other types <feature> is allowed to match, e.g., {'A1':'AN', 'R12':'RH'} :type user_rules: dict{str, str} or NoneType :return: Non-empty error message if any data are invalid :rtype: str """ query_features = set( [site.getSiteName() for site in reporter.getSites([0])[0]]) for feature in user_features: mesg = option_utils.check_feature_string(feature, expect_number=True) if mesg: return mesg if feature not in query_features: return f'Feature "{feature}" not found in shape query' user_features_set = set(user_features) if user_tol: for feature, tol in user_tol.items(): mesg = option_utils.check_feature_string(feature, expect_number=True) if mesg: return mesg if feature not in user_features_set: return f'Tolerance feature "{feature}" not found in supplied list' if tol <= 0.0: return f'Illegal tolerance: {tol}. Must be > 0.' if user_rules: for feature, permitted in user_rules.items(): mesg = option_utils.check_feature_string(feature, expect_number=True) if mesg: return mesg if feature not in user_features_set: return f'Rules feature "{feature}" not found in supplied list' mesg = option_utils.check_permitted_features_string(permitted) if mesg: return mesg return ''
[docs]def cluster_hits(args, logger=None): """ Clusters hits in a shape screen report database according to the query features they match or the largest Bemis-Murcko scaffold they contain. :param args: argparse.Namespace with command line arguments :type args: argparse.Namespace :param logger: Logger for info level output :type logger: logging.Logger :return: Dictionary of cluster descripion --> cluster size, where the description is a string of matched features or the scaffold SMILES/SMARTS. :rtype: dict{string: int} """ if args.by == option_utils.ClusterBy.features.name: return cluster_hits_by_features(args, logger) else: return cluster_hits_by_scaffold(args, logger)
[docs]def cluster_hits_by_features(args, logger=None): """ Clusters hits in a shape screen report database according to the query features they match. Any created hypotheses are copied into the archive <jobname>_hypos.zip to simplify file transfer. :param args: argparse.Namespace with command line arguments :type args: argparse.Namespace :param logger: Logger for info level output :type logger: logging.Logger :return: Dictionary of matched features --> cluster size :rtype: dict{string: int} """ if logger: feature_freq = defaultdict(int) t1 = time.process_time() reporter = phase.PhpShapeScreenReporter(args.vsdb) site_names = [site.getSiteName() for site in reporter.getSites([0])[0]] if logger: logger.info(f'\nQuery features: {",".join(site_names)}') tol_dict, _ = option_utils.parse_feature_tol(args.tol) rules_dict, _ = option_utils.parse_feature_rules(args.rules) query_hypo = create_cluster_hypo(reporter, user_tol=tol_dict, user_rules=rules_dict) if logger: total_hits = reporter.getHitCount() logger.info(f'Clustering {total_hits} hits') clusters = reporter.clusterBySitesMatched(query_hypo) if logger: logger.info(f'Total number of clusters: {len(clusters)}\n') features_to_size = {} clusters_retained = 0 hits_retained = 0 writer = structure.StructureWriter(args.hits) if args.hits else None hypo_files = [] for i, cluster in enumerate(clusters, start=1): # cluster[1] holds row numbers of hits. cluster_size = len(cluster[1]) if cluster_size < args.min_hits: break # cluster[0] holds a string like '11000010100'. site_numbers = [j for j, c in enumerate(cluster[0]) if c == '1'] if len(site_numbers) < args.min_features: continue cluster_features = ",".join(site_names[j] for j in site_numbers) features_to_size[cluster_features] = cluster_size if logger: s = (f'Cluster {i}: {cluster_size} hits matching ' f'{cluster_features}') logger.info(s) for j in site_numbers: feature_freq[site_names[j]] += cluster_size clusters_retained += 1 hits_retained += cluster_size if writer: for st in get_structures(reporter, cluster[1]): st.property[CLUSTER_FEATURES_PROP] = cluster_features writer.append(st) if args.hypo: hypo = create_cluster_hypo(reporter, subset=site_numbers, user_tol=tol_dict, user_rules=rules_dict) filename = f'{args.hypo}_{hypo.getHypoID()}.phypo' hypo.save(filename, True) hypo_files.append(filename) if writer: writer.close() if hypo_files: zip_file = get_jobname(args.vsdb, 'cluster') + '_hypos.zip' with zipfile.ZipFile(zip_file, 'w', zipfile.ZIP_DEFLATED) as zfile: for hypo_file in hypo_files: zfile.write(hypo_file) if logger: logger.info(f'\nNumber of clusters retained: {clusters_retained}') logger.info(f'Number of hits retained: {hits_retained}') logger.info('\nFrequency Matched:') for site_name in site_names: logger.info(f'{site_name}: {feature_freq[site_name]}') t2 = time.process_time() logger.info('\nCPU time = %.2f sec' % (t2 - t1)) return features_to_size
[docs]def cluster_hits_by_scaffold(args, logger=None): """ Clusters hits in a shape screen report database according to the largest Bemis-Murcko scaffold they contain. :param args: argparse.Namespace with command line arguments :type args: argparse.Namespace :param logger: Logger for info level output :type logger: logging.Logger :return: Dictionary of scaffold SMILES/SMARTS --> cluster size :rtype: dict{string: int} """ if logger: t1 = time.process_time() reporter = phase.PhpShapeScreenReporter(args.vsdb) if logger: total_hits = reporter.getHitCount() logger.info(f'Clustering {total_hits} hits') clusters = reporter.clusterByScaffold(args.fuzzy, args.group_missing) if logger: logger.info(f'Total number of clusters: {len(clusters)}\n') scaffold_to_size = {} clusters_retained = 0 hits_retained = 0 writer = structure.StructureWriter(args.hits) if args.hits else None for i, cluster in enumerate(clusters, start=1): scaffold, cluster_rows = cluster[0], cluster[1] cluster_size = len(cluster_rows) if cluster_size < args.min_hits: break if scaffold: msg = f'built on "{scaffold}"' else: msg = 'producing no scaffold' scaffold_to_size[scaffold] = cluster_size if logger: logger.info(f'Cluster {i}: {cluster_size} hits {msg}') clusters_retained += 1 hits_retained += cluster_size if writer: for st in get_structures(reporter, cluster_rows): st.property[CLUSTER_SCAFFOLD_PROP] = scaffold writer.append(st) if logger: logger.info(f'\nNumber of clusters retained: {clusters_retained}') logger.info(f'Number of hits retained: {hits_retained}') t2 = time.process_time() logger.info('\nCPU time = %.2f sec' % (t2 - t1)) return scaffold_to_size
[docs]def create_cluster_hypo(reporter, subset=None, user_tol=None, user_rules=None): """ Constructs a pharmacophore hypothesis from the shape query for use in the "cluster" task. All shape query sites are used to perform the initial clustering, whereas a subset of sites are used the create the hypothesis for a particular cluster. Positional tolerances and feature-matching rules in the hypothesis are modified according to dictionaries that contain any user-specified <feature>:<tol> and <feature>:<permitted> pairs, where <feature> consists of only a feature type, with no number. :param reporter: Connection to a shape screen report database :type reporter: phase.PhpShapeScreenReporter :param subset: Zero-based list of query site numbers. All sites are used by default. :type subset: list(int) or NoneType :param user_tol: Dictionary of <feature>:<tol> pairs, where <feature> consists of only a feature type and <tol> is the matching tolerance, e.g., {'A': 1.5, 'R', 2.5}. :type user_tol: dict{str, float} or NoneType :param user_rules: Dictionary of <feature>:<permitted> pairs, where <feature> consists of only a feature type and <permitted> contains <feature>, followed by any other types <feature> is allowed to match, e.g., {'A':'AN', 'H':'HR', 'R':'RH'} :type user_rules: dict{str, str} or NoneType :return: The requested pharmacophore hypothesis :rtype: phase.PhpHypoAdaptor """ hypo_sites = reporter.getSites([0])[0] if subset: hypo_sites = [hypo_sites[i] for i in subset] hypo_id = "_".join(site.getSiteName() for site in hypo_sites) ref_st = reporter.getStructures([0])[0] hypo = phase.PhpHypoAdaptor(hypo_id, hypo_sites, reporter.getFd(), ref_st) feature_tol = phase.PhpDeltaHypo() feature_rules = phase.PhpFeatureRules() for site in hypo_sites: site_number = site.getSiteNumber() site_type = site.getSiteType() if user_tol and site_type in user_tol: d = user_tol[site_type] else: d = phase.PHASE_DEFAULT_DELTA_DIST feature_tol.addSiteData(site_number, site_type, d) if user_rules and site_type in user_rules: permitted = user_rules[site_type] else: permitted = site_type feature_rules.addRule(site_number, permitted, "") hypo.addTol(feature_tol) hypo.addRules(feature_rules) return hypo
[docs]def create_database(args, logger=None): """ Creates a new shape screen report database. :param args: argparser.Namespace with command line arguments :type args: argparser.Namespace :param logger: Logger for info level output :type logger: logging.Logger """ if logger: t1 = time.process_time() logger.info(f'\nCreating database "{args.vsdb}"') fd = phase.read_feature_definitions(args.fd) if args.fd else [] overwrite = True verbose = False if not logger else logger.isEnabledFor(logging.INFO) if args.query: shape_query = next(structure.StructureReader(args.query)) reporter = phase.PhpShapeScreenReporter(args.hits, shape_query, fd, args.vsdb, overwrite, verbose) else: reporter = phase.PhpShapeScreenReporter(args.hits, fd, args.vsdb, overwrite, verbose) # Create table of physicochemical properties + any user-designated # properties. props = prop_utils.DEFAULT_VSDB if args.epik: props.append(prop_utils.ION_CLASS_VSDB) if args.props: trim = True props += list(phase.tokenizeDelimitedString(args.props, ',', trim)) reporter.createPropertyTable(props) if logger: t2 = time.process_time() logger.info(f'Number of hits stored = {reporter.getHitCount()}') logger.info('CPU time = %.2f sec' % (t2 - t1))
[docs]def create_user_hypo(reporter, user_features, user_tol=None, user_rules=None): """ Constructs a pharmacophore hypothesis from the shape query based on a list of feature names supplied by the user. Positional tolerances and feature-matching rules in the hypothesis are modified according to dictionaries that contain any user-specified <feature>:<tol> and <feature>:<permitted> pairs, where <feature> consists of a feature type followed by a 1-based feature number. :param reporter: Connection to a shape screen report database :type reporter: phase.PhpShapeScreenReporter :param user_features: Feature names with 1-based numbering, e.g., ['A1', 'D5', 'H9', 'R12'] :type user_features: list(str) :param user_tol: Dictionary of <feature>:<tol> pairs, where <feature> consists of a feature name and <tol> is the matching tolerance, e.g., {'A1': 1.5, 'R12', 2.5}. :type user_tol: dict{str, float} or NoneType :param user_rules: Dictionary of <feature>:<permitted> pairs, where <feature> consists of a feature name and <permitted> contains the first character of <feature>, followed by any other types <feature> is allowed to match, e.g., {'A1':'AN', 'R12':'RH'} :type user_rules: dict{str, str} or NoneType :return: The requested pharmacophore hypothesis :rtype: phase.PhpHypoAdaptor :raise: ValueError if any of the supplied feature names is illegal or not found in the shape query, or if any of the tolerances or permitted feature strings is illegal. """ mesg = check_user_query(reporter, user_features, user_tol, user_rules) if mesg: raise ValueError(mesg) site_numbers = sorted( [int(site_name[1:]) - 1 for site_name in user_features]) query_sites = reporter.getSites([0])[0] hypo_sites = [query_sites[i] for i in site_numbers] hypo_id = "_".join(site.getSiteName() for site in hypo_sites) ref_st = reporter.getStructures([0])[0] hypo = phase.PhpHypoAdaptor(hypo_id, hypo_sites, reporter.getFd(), ref_st) feature_tol = phase.PhpDeltaHypo() feature_rules = phase.PhpFeatureRules() for site in hypo_sites: site_number = site.getSiteNumber() site_name = site.getSiteName() site_type = site_name[0] if user_tol and site_name in user_tol: d = user_tol[site_name] else: d = phase.PHASE_DEFAULT_DELTA_DIST feature_tol.addSiteData(site_number, site_type, d) if user_rules and site_name in user_rules: permitted = option_utils.sanitize_permitted(site_type, user_rules[site_name]) else: permitted = site_type feature_rules.addRule(site_number, permitted, "") hypo.addTol(feature_tol) hypo.addRules(feature_rules) return hypo
[docs]def export_hits_or_hypo(args, logger=None): """ Exports hits and/or hypothesis for a subset of query features. :param args: argparser.Namespace with command line arguments :type args: argparser.Namespace :param logger: Logger for info level output :type logger: logging.Logger :raise: ValueError if any of the feature names, feature tolerances or feature rules are illegal. """ if logger: t1 = time.process_time() reporter = phase.PhpShapeScreenReporter(args.vsdb) hypo = setup_screen(reporter, args.features, args.tol, args.rules) if args.hits: if logger: mesg = (f'\nScreening {reporter.getHitCount()} structures for ' f'matches to {args.features}') logger.info(mesg) results = reporter.screenSitesInPlace(hypo, hypo.getSiteCount()) if logger: logger.info(f'Writing {len(results)} hits to {args.hits}') rows = [result.row_number for result in results] with structure.StructureWriter(args.hits) as writer: for i, st in enumerate(get_structures(reporter, rows)): st.property[phase.PHASE_SHAPE_SIM] = results[i].shape_sim writer.append(st) if args.hypo: if logger: logger.info(f'\nSaving hypothesis to {args.hypo}') hypo.save(args.hypo, True) if logger: mesg = hypo.getSummary() t2 = time.process_time() mesg += 'CPU time = %.2f sec' % (t2 - t1) logger.info(mesg) elif logger: t2 = time.process_time() logger.info('\nCPU time = %.2f sec' % (t2 - t1))
[docs]def filter_hits(args, logger=None): """ Filters hits according to property and/or SMARTS filters. :param args: argparser.Namespace with command line arguments :type args: argparser.Namespace :param logger: Logger for info level output :type logger: logging.Logger """ reporter = phase.PhpShapeScreenReporter(args.vsdb) property_filter = None smarts_filter = None if args.props: property_filter = prop_utils.read_property_filters(args.props, args.dp) if args.smarts: smarts_filter = filter.SmartsFilter(filename=args.smarts, delimiter=args.ds) rows = run_filters(reporter, property_filter, smarts_filter, args.diverse, logger) if logger: if not rows: logger.info('\nNo hits survived') return logger.info(f'\nWriting {len(rows)} surviving hits to {args.hits}') ftype = fileutils.get_structure_file_format(args.hits) if ftype in (fileutils.MAESTRO, fileutils.SD): with structure.StructureWriter(args.hits) as writer: for st in get_structures(reporter, rows): writer.append(st) else: ptable = reporter.getPropertyTable() if args.all_props: props = None else: props = list(ptable.getColumnNames()[1:]) props.remove(mm.M2IO_DATA_CT_TITLE) with molio.CsvMolWriter(args.hits, properties=props) as writer: for st in get_structures(reporter, rows): writer.append(rdkit_adapter.to_rdkit(st, implicitH=True))
[docs]def get_feature_overlap_mappings(base_features_hit, base_features_query, result): """ Constructs a list of FeatureOverlapMapping objects from the base features of the hit, the base features of the query and the PhpShapeScreenResult from screening the hit against the query. :param base_features_hit: All base features in the hit :type base_features_hit: list(phase.PhpBaseFeature) :param base_features_query: All base features in the query :type base_features_query: list(phase.PhpBaseFeature) :param result: The result from screening the hit against the query :type result: phase.PhpShapeScreenResult :return: List of FeatureOverlapMapping objects :rtype: list(canvas2d.FeatureOverlapMapping) """ mappings = [] for i, site_pair in enumerate(result.matched_site_numbers): mapping = canvas2d.FeatureOverlapMapping() bf_hit = base_features_hit[site_pair[0]] mapping.hit_feature = f'{bf_hit.getFeatureID()}{site_pair[0] + 1}' bf_query = base_features_query[site_pair[1]] mapping.query_feature = f'{bf_query.getFeatureID()}{site_pair[1] + 1}' mapping.fractional_overlap = result.matched_site_overlaps[i] natoms_hit = bf_hit.getNumberOfAtoms() hit_atoms = [] for j in range(natoms_hit): hit_atoms.append(bf_hit.getAtomIndex(j)) mapping.hit_atoms = hit_atoms natoms_query = bf_query.getNumberOfAtoms() query_atoms = [] for j in range(natoms_query): query_atoms.append(bf_query.getAtomIndex(j)) mapping.query_atoms = query_atoms mappings.append(mapping) return mappings
[docs]def get_jobname(dbfile, task): """ Determines the job name from SCHRODINGER_JOBNAME or from the base name of the shape screen report database and the task. :param dbfile: Shape screen report database file :type dbfile: str :param task: The task being performed :type task: str :return: Job name :rtype: str """ file_task = fileutils.get_basename(dbfile) + "_" + task return jobcontrol.get_jobname(file_task)
[docs]def get_structures(reporter, rows, max_in_memory=1000): """ Generator that yields structures from a shape screen report database while limiting the number of structures held in memory. :param reporter: Connection to a shape screen report database :type reporter: phase.PhpShapeScreenReporter :param rows: The rows to read. Row 0 corresponds to the shape query, while rows 1, 2, etc. correspond to the hits. Rows may be in any order. :type rows: list(int) :param max_in_memory: The maximum number of structures that will be held in memory while reading :type max_in_memory: int :yield: The next structure in the list of rows :ytype: structure.Structure """ start = 0 while start < len(rows): end = start + max_in_memory yield from reporter.getStructures(rows[start:end]) start = end
[docs]def run_diversity_filter(reporter, diverse_fraction, rows=None, logger=None): """ Returns row numbers for a diverse fraction of structures in a shape screen report database. :param reporter: Connection to a shape screen report database :type reporter: phase.PhpShapeScreenReporter :param diverse_fraction: Fraction on the interval (0.0, 0.2]. :type diverse_fraction: float :param rows: The rows to screen. Row 0 corresponds to the shape query, while rows 1, 2, etc. correspond to the hits. Rows may be in any order. If omitted, rows for all hits are screened. :type rows: list(int) :param logger: Logger for info level output :type logger: logging.Logger :return: Row numbers for the diverse structures :rtype: list(int) """ to_screen = rows or list(range(1, reporter.getHitCount() + 1)) if logger: logger.info(f'\nApplying diversity filter to {len(to_screen)} hits') t1 = time.process_time() matches = list(reporter.selectDiverseFraction(to_screen, diverse_fraction)) if logger: logger.info(f'Number of surviving hits: {len(matches)}') t2 = time.process_time() logger.info('CPU time = %.2f sec' % (t2 - t1)) return matches
[docs]def run_filters(reporter, property_filter=None, smarts_filter=None, diverse_fraction=None, logger=None): """ Returns row numbers of structures in a shape screen report database which satisfy the provided property, SMARTS and diversity filters. :param reporter: Connection to a shape screen report database :type reporter: phase.PhpShapeScreenReporter :param property_filter: Property filter to be satisfied :type property_filter: phase.PhpDbPropertyQuery or NoneType :param smarts_filter: SMARTS filter to be satisfied :type smarts_filter: filter.SmartsFilter or NoneType :param diverse_fraction: Optional diverse fraction of rows to retain after applying other filters. Must lie on (0.0, 0.2]. :type diverse_fraction: float or NoneType :param logger: Logger for info level output :type logger: logging.Logger or NoneType :return: Row numbers that satisfy the filter :rtype: list(int) """ if property_filter: matches = run_property_filter(reporter, property_filter, logger) else: matches = list(range(1, reporter.getHitCount() + 1)) if smarts_filter and matches: matches = run_smarts_filter(reporter, smarts_filter, matches, logger) if diverse_fraction and matches: matches = run_diversity_filter(reporter, diverse_fraction, matches, logger) return matches
[docs]def run_property_filter(reporter, property_filter, logger=None): """ Returns row numbers of structures in a shape screen report database which satisfy the provided property filter. :param reporter: Connection to a shape screen report database :type reporter: phase.PhpShapeScreenReporter :param property_filter: Property filter to be satisfied :type property_filter: phase.PhpDbPropertyQuery :param logger: Logger for info level output :type logger: logging.Logger :return: Row numbers that satisfy the filter :rtype: list(int) """ if logger: hit_count = reporter.getHitCount() logger.info(f'\nApplying property filter to {hit_count} hits') t1 = time.process_time() ptable = reporter.getPropertyTable() matches = list(ptable.runQuery(property_filter)) if logger: logger.info(f'Number of surviving hits: {len(matches)}') t2 = time.process_time() logger.info('CPU time = %.2f sec' % (t2 - t1)) return matches
[docs]def run_smarts_filter(reporter, smarts_filter, rows=None, logger=None): """ Returns row numbers of structures in a shape screen report database which satisfy the provided SMARTS filter. :param reporter: Connection to a shape screen report database :type reporter: phase.PhpShapeScreenReporter :param smarts_filter: SMARTS filter to be satisfied :type smarts_filter: filter.SmartsFilter :param rows: The rows to screen. Row 0 corresponds to the shape query, while rows 1, 2, etc. correspond to the hits. Rows may be in any order. If omitted, rows for all hits are screened. :type rows: list(int) :param logger: Logger for info level output :type logger: logging.Logger :return: Row numbers that satisfy the filter :rtype: list(int) """ to_screen = rows if rows else list(range(1, reporter.getHitCount() + 1)) if logger: logger.info(f'\nApplying SMARTS filter to {len(to_screen)} hits') t1 = time.process_time() matches = [] for i, st in enumerate(get_structures(reporter, to_screen)): if smarts_filter.checkStructure(st): matches.append(to_screen[i]) if logger: logger.info(f'Number of surviving hits: {len(matches)}') t2 = time.process_time() logger.info('CPU time = %.2f sec' % (t2 - t1)) return matches
[docs]def setup_screen(reporter, features, tol, rules): """ Creates a pharmacophore hypothesis from user-supplied strings of features, tolerances and feature-matching rules. :param reporter: Connection to a shape screen report database :type reporter: phase.PhpShapeScreenReporter :param features: String containing a comma-delimited list of query features to match. If None, all features in the query will be used. :type features: str or NoneType :param tol: String containing a dictionary-like specification of matching tolerances :type tol: str :param rules: String containing a dictionary-like specification of feature type matching rules :type rules: str :return: The pharmacophore hypothesis with which to perform the screen :rtype: phase.PhpHypoAdaptor :raise: ValueError if any of the supplied feature names is illegal or not found in the shape query, or if any of the tolerances or permitted feature strings is illegal. """ if features: feature_list = features.split(',') else: query_sites = reporter.getSites([0])[0] feature_list = [site.getSiteName() for site in query_sites] tol_dict, mesg = option_utils.parse_feature_tol(tol, True) if mesg: raise ValueError(mesg) rules_dict, mesg = option_utils.parse_feature_rules(rules, True) if mesg: raise ValueError(mesg) return create_user_hypo(reporter, feature_list, tol_dict, rules_dict)