"""
Performs task-based work for generating and using shape screen report
databases.
Copyright Schrodinger LLC, All Rights Reserved.
"""
import logging
import sys
import time
import zipfile
from collections import defaultdict
from schrodinger import structure
from schrodinger.application import report2d
from schrodinger.application.pathfinder import molio
from schrodinger.application.phase.shape_screen_reporter import option_utils
from schrodinger.application.phase.shape_screen_reporter import prop_utils
from schrodinger.infra import canvas2d
from schrodinger.infra import mm
from schrodinger.infra import phase
from schrodinger.job import jobcontrol
from schrodinger.Qt import QtPrintSupport
from schrodinger.Qt.QtCore import QRect
from schrodinger.structutils import filter
from schrodinger.thirdparty import rdkit_adapter
from schrodinger.utils import fileutils
CLUSTER_FEATURES_PROP = 's_phase_Cluster_Features'
CLUSTER_SCAFFOLD_PROP = 's_phase_Cluster_Scaffold'
[docs]def check_user_query(reporter, user_features, user_tol=None, user_rules=None):
"""
Returns a non-empty error message if any of the supplied query data
are illegal.
:param reporter: Connection to a shape screen report database
:type reporter: phase.PhpShapeScreenReporter
:param user_features: Feature names with 1-based numbering, e.g.,
['A1', 'D5', 'H9', 'R12']
:type subset: list(str)
:param user_tol: Dictionary of <feature>:<tol> pairs, where <feature>
consists of a feature name and <tol> is the matching tolerance,
e.g., {'A1': 1.5, 'R12', 2.5}.
:type user_tol: dict{str, float} or NoneType
:param user_rules: Dictionary of <feature>:<permitted> pairs, where
<feature> consists of a feature name and <permitted> contains
the first character of <feature>, followed by any other types
<feature> is allowed to match, e.g., {'A1':'AN', 'R12':'RH'}
:type user_rules: dict{str, str} or NoneType
:return: Non-empty error message if any data are invalid
:rtype: str
"""
query_features = set(
[site.getSiteName() for site in reporter.getSites([0])[0]])
for feature in user_features:
mesg = option_utils.check_feature_string(feature, expect_number=True)
if mesg:
return mesg
if feature not in query_features:
return f'Feature "{feature}" not found in shape query'
user_features_set = set(user_features)
if user_tol:
for feature, tol in user_tol.items():
mesg = option_utils.check_feature_string(feature,
expect_number=True)
if mesg:
return mesg
if feature not in user_features_set:
return f'Tolerance feature "{feature}" not found in supplied list'
if tol <= 0.0:
return f'Illegal tolerance: {tol}. Must be > 0.'
if user_rules:
for feature, permitted in user_rules.items():
mesg = option_utils.check_feature_string(feature,
expect_number=True)
if mesg:
return mesg
if feature not in user_features_set:
return f'Rules feature "{feature}" not found in supplied list'
mesg = option_utils.check_permitted_features_string(permitted)
if mesg:
return mesg
return ''
[docs]def cluster_hits(args, logger=None):
"""
Clusters hits in a shape screen report database according to the query
features they match or the largest Bemis-Murcko scaffold they contain.
:param args: argparse.Namespace with command line arguments
:type args: argparse.Namespace
:param logger: Logger for info level output
:type logger: logging.Logger
:return: Dictionary of cluster descripion --> cluster size, where the
description is a string of matched features or the scaffold
SMILES/SMARTS.
:rtype: dict{string: int}
"""
if args.by == option_utils.ClusterBy.features.name:
return cluster_hits_by_features(args, logger)
else:
return cluster_hits_by_scaffold(args, logger)
[docs]def cluster_hits_by_features(args, logger=None):
"""
Clusters hits in a shape screen report database according to the query
features they match. Any created hypotheses are copied into the archive
<jobname>_hypos.zip to simplify file transfer.
:param args: argparse.Namespace with command line arguments
:type args: argparse.Namespace
:param logger: Logger for info level output
:type logger: logging.Logger
:return: Dictionary of matched features --> cluster size
:rtype: dict{string: int}
"""
if logger:
feature_freq = defaultdict(int)
t1 = time.process_time()
reporter = phase.PhpShapeScreenReporter(args.vsdb)
site_names = [site.getSiteName() for site in reporter.getSites([0])[0]]
if logger:
logger.info(f'\nQuery features: {",".join(site_names)}')
tol_dict, _ = option_utils.parse_feature_tol(args.tol)
rules_dict, _ = option_utils.parse_feature_rules(args.rules)
query_hypo = create_cluster_hypo(reporter,
user_tol=tol_dict,
user_rules=rules_dict)
if logger:
total_hits = reporter.getHitCount()
logger.info(f'Clustering {total_hits} hits')
clusters = reporter.clusterBySitesMatched(query_hypo)
if logger:
logger.info(f'Total number of clusters: {len(clusters)}\n')
features_to_size = {}
clusters_retained = 0
hits_retained = 0
writer = structure.StructureWriter(args.hits) if args.hits else None
hypo_files = []
for i, cluster in enumerate(clusters, start=1):
# cluster[1] holds row numbers of hits.
cluster_size = len(cluster[1])
if cluster_size < args.min_hits:
break
# cluster[0] holds a string like '11000010100'.
site_numbers = [j for j, c in enumerate(cluster[0]) if c == '1']
if len(site_numbers) < args.min_features:
continue
cluster_features = ",".join(site_names[j] for j in site_numbers)
features_to_size[cluster_features] = cluster_size
if logger:
s = (f'Cluster {i}: {cluster_size} hits matching '
f'{cluster_features}')
logger.info(s)
for j in site_numbers:
feature_freq[site_names[j]] += cluster_size
clusters_retained += 1
hits_retained += cluster_size
if writer:
for st in get_structures(reporter, cluster[1]):
st.property[CLUSTER_FEATURES_PROP] = cluster_features
writer.append(st)
if args.hypo:
hypo = create_cluster_hypo(reporter,
subset=site_numbers,
user_tol=tol_dict,
user_rules=rules_dict)
filename = f'{args.hypo}_{hypo.getHypoID()}.phypo'
hypo.save(filename, True)
hypo_files.append(filename)
if writer:
writer.close()
if hypo_files:
zip_file = get_jobname(args.vsdb, 'cluster') + '_hypos.zip'
with zipfile.ZipFile(zip_file, 'w', zipfile.ZIP_DEFLATED) as zfile:
for hypo_file in hypo_files:
zfile.write(hypo_file)
if logger:
logger.info(f'\nNumber of clusters retained: {clusters_retained}')
logger.info(f'Number of hits retained: {hits_retained}')
logger.info('\nFrequency Matched:')
for site_name in site_names:
logger.info(f'{site_name}: {feature_freq[site_name]}')
t2 = time.process_time()
logger.info('\nCPU time = %.2f sec' % (t2 - t1))
return features_to_size
[docs]def cluster_hits_by_scaffold(args, logger=None):
"""
Clusters hits in a shape screen report database according to the largest
Bemis-Murcko scaffold they contain.
:param args: argparse.Namespace with command line arguments
:type args: argparse.Namespace
:param logger: Logger for info level output
:type logger: logging.Logger
:return: Dictionary of scaffold SMILES/SMARTS --> cluster size
:rtype: dict{string: int}
"""
if logger:
t1 = time.process_time()
reporter = phase.PhpShapeScreenReporter(args.vsdb)
if logger:
total_hits = reporter.getHitCount()
logger.info(f'Clustering {total_hits} hits')
clusters = reporter.clusterByScaffold(args.fuzzy, args.group_missing)
if logger:
logger.info(f'Total number of clusters: {len(clusters)}\n')
scaffold_to_size = {}
clusters_retained = 0
hits_retained = 0
writer = structure.StructureWriter(args.hits) if args.hits else None
for i, cluster in enumerate(clusters, start=1):
scaffold, cluster_rows = cluster[0], cluster[1]
cluster_size = len(cluster_rows)
if cluster_size < args.min_hits:
break
if scaffold:
msg = f'built on "{scaffold}"'
else:
msg = 'producing no scaffold'
scaffold_to_size[scaffold] = cluster_size
if logger:
logger.info(f'Cluster {i}: {cluster_size} hits {msg}')
clusters_retained += 1
hits_retained += cluster_size
if writer:
for st in get_structures(reporter, cluster_rows):
st.property[CLUSTER_SCAFFOLD_PROP] = scaffold
writer.append(st)
if logger:
logger.info(f'\nNumber of clusters retained: {clusters_retained}')
logger.info(f'Number of hits retained: {hits_retained}')
t2 = time.process_time()
logger.info('\nCPU time = %.2f sec' % (t2 - t1))
return scaffold_to_size
[docs]def create_cluster_hypo(reporter, subset=None, user_tol=None, user_rules=None):
"""
Constructs a pharmacophore hypothesis from the shape query for use
in the "cluster" task. All shape query sites are used to perform the
initial clustering, whereas a subset of sites are used the create
the hypothesis for a particular cluster. Positional tolerances and
feature-matching rules in the hypothesis are modified according to
dictionaries that contain any user-specified <feature>:<tol> and
<feature>:<permitted> pairs, where <feature> consists of only a
feature type, with no number.
:param reporter: Connection to a shape screen report database
:type reporter: phase.PhpShapeScreenReporter
:param subset: Zero-based list of query site numbers. All sites are
used by default.
:type subset: list(int) or NoneType
:param user_tol: Dictionary of <feature>:<tol> pairs, where <feature>
consists of only a feature type and <tol> is the matching
tolerance, e.g., {'A': 1.5, 'R', 2.5}.
:type user_tol: dict{str, float} or NoneType
:param user_rules: Dictionary of <feature>:<permitted> pairs, where
<feature> consists of only a feature type and <permitted>
contains <feature>, followed by any other types <feature>
is allowed to match, e.g., {'A':'AN', 'H':'HR', 'R':'RH'}
:type user_rules: dict{str, str} or NoneType
:return: The requested pharmacophore hypothesis
:rtype: phase.PhpHypoAdaptor
"""
hypo_sites = reporter.getSites([0])[0]
if subset:
hypo_sites = [hypo_sites[i] for i in subset]
hypo_id = "_".join(site.getSiteName() for site in hypo_sites)
ref_st = reporter.getStructures([0])[0]
hypo = phase.PhpHypoAdaptor(hypo_id, hypo_sites, reporter.getFd(), ref_st)
feature_tol = phase.PhpDeltaHypo()
feature_rules = phase.PhpFeatureRules()
for site in hypo_sites:
site_number = site.getSiteNumber()
site_type = site.getSiteType()
if user_tol and site_type in user_tol:
d = user_tol[site_type]
else:
d = phase.PHASE_DEFAULT_DELTA_DIST
feature_tol.addSiteData(site_number, site_type, d)
if user_rules and site_type in user_rules:
permitted = user_rules[site_type]
else:
permitted = site_type
feature_rules.addRule(site_number, permitted, "")
hypo.addTol(feature_tol)
hypo.addRules(feature_rules)
return hypo
[docs]def create_database(args, logger=None):
"""
Creates a new shape screen report database.
:param args: argparser.Namespace with command line arguments
:type args: argparser.Namespace
:param logger: Logger for info level output
:type logger: logging.Logger
"""
if logger:
t1 = time.process_time()
logger.info(f'\nCreating database "{args.vsdb}"')
fd = phase.read_feature_definitions(args.fd) if args.fd else []
overwrite = True
verbose = False if not logger else logger.isEnabledFor(logging.INFO)
if args.query:
shape_query = next(structure.StructureReader(args.query))
reporter = phase.PhpShapeScreenReporter(args.hits, shape_query, fd,
args.vsdb, overwrite, verbose)
else:
reporter = phase.PhpShapeScreenReporter(args.hits, fd, args.vsdb,
overwrite, verbose)
# Create table of physicochemical properties + any user-designated
# properties.
props = prop_utils.DEFAULT_VSDB
if args.epik:
props.append(prop_utils.ION_CLASS_VSDB)
if args.props:
trim = True
props += list(phase.tokenizeDelimitedString(args.props, ',', trim))
reporter.createPropertyTable(props)
if logger:
t2 = time.process_time()
logger.info(f'Number of hits stored = {reporter.getHitCount()}')
logger.info('CPU time = %.2f sec' % (t2 - t1))
[docs]def create_user_hypo(reporter, user_features, user_tol=None, user_rules=None):
"""
Constructs a pharmacophore hypothesis from the shape query based on
a list of feature names supplied by the user. Positional tolerances
and feature-matching rules in the hypothesis are modified according
to dictionaries that contain any user-specified <feature>:<tol> and
<feature>:<permitted> pairs, where <feature> consists of a feature
type followed by a 1-based feature number.
:param reporter: Connection to a shape screen report database
:type reporter: phase.PhpShapeScreenReporter
:param user_features: Feature names with 1-based numbering, e.g.,
['A1', 'D5', 'H9', 'R12']
:type user_features: list(str)
:param user_tol: Dictionary of <feature>:<tol> pairs, where <feature>
consists of a feature name and <tol> is the matching tolerance,
e.g., {'A1': 1.5, 'R12', 2.5}.
:type user_tol: dict{str, float} or NoneType
:param user_rules: Dictionary of <feature>:<permitted> pairs, where
<feature> consists of a feature name and <permitted> contains
the first character of <feature>, followed by any other types
<feature> is allowed to match, e.g., {'A1':'AN', 'R12':'RH'}
:type user_rules: dict{str, str} or NoneType
:return: The requested pharmacophore hypothesis
:rtype: phase.PhpHypoAdaptor
:raise: ValueError if any of the supplied feature names is illegal
or not found in the shape query, or if any of the tolerances
or permitted feature strings is illegal.
"""
mesg = check_user_query(reporter, user_features, user_tol, user_rules)
if mesg:
raise ValueError(mesg)
site_numbers = sorted(
[int(site_name[1:]) - 1 for site_name in user_features])
query_sites = reporter.getSites([0])[0]
hypo_sites = [query_sites[i] for i in site_numbers]
hypo_id = "_".join(site.getSiteName() for site in hypo_sites)
ref_st = reporter.getStructures([0])[0]
hypo = phase.PhpHypoAdaptor(hypo_id, hypo_sites, reporter.getFd(), ref_st)
feature_tol = phase.PhpDeltaHypo()
feature_rules = phase.PhpFeatureRules()
for site in hypo_sites:
site_number = site.getSiteNumber()
site_name = site.getSiteName()
site_type = site_name[0]
if user_tol and site_name in user_tol:
d = user_tol[site_name]
else:
d = phase.PHASE_DEFAULT_DELTA_DIST
feature_tol.addSiteData(site_number, site_type, d)
if user_rules and site_name in user_rules:
permitted = option_utils.sanitize_permitted(site_type,
user_rules[site_name])
else:
permitted = site_type
feature_rules.addRule(site_number, permitted, "")
hypo.addTol(feature_tol)
hypo.addRules(feature_rules)
return hypo
[docs]def export_hits_or_hypo(args, logger=None):
"""
Exports hits and/or hypothesis for a subset of query features.
:param args: argparser.Namespace with command line arguments
:type args: argparser.Namespace
:param logger: Logger for info level output
:type logger: logging.Logger
:raise: ValueError if any of the feature names, feature tolerances or
feature rules are illegal.
"""
if logger:
t1 = time.process_time()
reporter = phase.PhpShapeScreenReporter(args.vsdb)
hypo = setup_screen(reporter, args.features, args.tol, args.rules)
if args.hits:
if logger:
mesg = (f'\nScreening {reporter.getHitCount()} structures for '
f'matches to {args.features}')
logger.info(mesg)
results = reporter.screenSitesInPlace(hypo, hypo.getSiteCount())
if logger:
logger.info(f'Writing {len(results)} hits to {args.hits}')
rows = [result.row_number for result in results]
with structure.StructureWriter(args.hits) as writer:
for i, st in enumerate(get_structures(reporter, rows)):
st.property[phase.PHASE_SHAPE_SIM] = results[i].shape_sim
writer.append(st)
if args.hypo:
if logger:
logger.info(f'\nSaving hypothesis to {args.hypo}')
hypo.save(args.hypo, True)
if logger:
mesg = hypo.getSummary()
t2 = time.process_time()
mesg += 'CPU time = %.2f sec' % (t2 - t1)
logger.info(mesg)
elif logger:
t2 = time.process_time()
logger.info('\nCPU time = %.2f sec' % (t2 - t1))
[docs]def filter_hits(args, logger=None):
"""
Filters hits according to property and/or SMARTS filters.
:param args: argparser.Namespace with command line arguments
:type args: argparser.Namespace
:param logger: Logger for info level output
:type logger: logging.Logger
"""
reporter = phase.PhpShapeScreenReporter(args.vsdb)
property_filter = None
smarts_filter = None
if args.props:
property_filter = prop_utils.read_property_filters(args.props, args.dp)
if args.smarts:
smarts_filter = filter.SmartsFilter(filename=args.smarts,
delimiter=args.ds)
rows = run_filters(reporter, property_filter, smarts_filter, args.diverse,
logger)
if logger:
if not rows:
logger.info('\nNo hits survived')
return
logger.info(f'\nWriting {len(rows)} surviving hits to {args.hits}')
ftype = fileutils.get_structure_file_format(args.hits)
if ftype in (fileutils.MAESTRO, fileutils.SD):
with structure.StructureWriter(args.hits) as writer:
for st in get_structures(reporter, rows):
writer.append(st)
else:
ptable = reporter.getPropertyTable()
if args.all_props:
props = None
else:
props = list(ptable.getColumnNames()[1:])
props.remove(mm.M2IO_DATA_CT_TITLE)
with molio.CsvMolWriter(args.hits, properties=props) as writer:
for st in get_structures(reporter, rows):
writer.append(rdkit_adapter.to_rdkit(st, implicitH=True))
[docs]def get_feature_overlap_mappings(base_features_hit, base_features_query,
result):
"""
Constructs a list of FeatureOverlapMapping objects from the base
features of the hit, the base features of the query and the
PhpShapeScreenResult from screening the hit against the query.
:param base_features_hit: All base features in the hit
:type base_features_hit: list(phase.PhpBaseFeature)
:param base_features_query: All base features in the query
:type base_features_query: list(phase.PhpBaseFeature)
:param result: The result from screening the hit against the query
:type result: phase.PhpShapeScreenResult
:return: List of FeatureOverlapMapping objects
:rtype: list(canvas2d.FeatureOverlapMapping)
"""
mappings = []
for i, site_pair in enumerate(result.matched_site_numbers):
mapping = canvas2d.FeatureOverlapMapping()
bf_hit = base_features_hit[site_pair[0]]
mapping.hit_feature = f'{bf_hit.getFeatureID()}{site_pair[0] + 1}'
bf_query = base_features_query[site_pair[1]]
mapping.query_feature = f'{bf_query.getFeatureID()}{site_pair[1] + 1}'
mapping.fractional_overlap = result.matched_site_overlaps[i]
natoms_hit = bf_hit.getNumberOfAtoms()
hit_atoms = []
for j in range(natoms_hit):
hit_atoms.append(bf_hit.getAtomIndex(j))
mapping.hit_atoms = hit_atoms
natoms_query = bf_query.getNumberOfAtoms()
query_atoms = []
for j in range(natoms_query):
query_atoms.append(bf_query.getAtomIndex(j))
mapping.query_atoms = query_atoms
mappings.append(mapping)
return mappings
[docs]def get_jobname(dbfile, task):
"""
Determines the job name from SCHRODINGER_JOBNAME or from the base name
of the shape screen report database and the task.
:param dbfile: Shape screen report database file
:type dbfile: str
:param task: The task being performed
:type task: str
:return: Job name
:rtype: str
"""
file_task = fileutils.get_basename(dbfile) + "_" + task
return jobcontrol.get_jobname(file_task)
[docs]def get_structures(reporter, rows, max_in_memory=1000):
"""
Generator that yields structures from a shape screen report database
while limiting the number of structures held in memory.
:param reporter: Connection to a shape screen report database
:type reporter: phase.PhpShapeScreenReporter
:param rows: The rows to read. Row 0 corresponds to the shape query,
while rows 1, 2, etc. correspond to the hits. Rows may be in
any order.
:type rows: list(int)
:param max_in_memory: The maximum number of structures that will be
held in memory while reading
:type max_in_memory: int
:yield: The next structure in the list of rows
:ytype: structure.Structure
"""
start = 0
while start < len(rows):
end = start + max_in_memory
yield from reporter.getStructures(rows[start:end])
start = end
[docs]def print_images(images,
ncols,
pdf_file,
page_size=option_utils.PageSize.Letter):
"""
Prints a list of images to a table in PDF format.
:param images: List of images.
:type images: list(report2d.StructInfo)
:param ncols: Number of columns in table
:type ncols: int
:param pdf_file: Name of PDF file
:type pdf_file: str
:param page_size: Page size
:type page_size: option_utils.PageSize
"""
printer = QtPrintSupport.QPrinter()
printer.setFullPage(True)
if page_size == option_utils.PageSize.A4:
printer.setPageSize(QtPrintSupport.QPrinter.A4)
else:
printer.setPageSize(QtPrintSupport.QPrinter.Letter)
printer.setOrientation(QtPrintSupport.QPrinter.Portrait)
printer.setOutputFileName(pdf_file)
if sys.platform == 'win32':
printer.setOutputFormat(QtPrintSupport.QPrinter.PdfFormat)
elif sys.platform == 'darwin':
printer.setOutputFormat(QtPrintSupport.QPrinter.NativeFormat)
pp = report2d.PicturePrinter(printer, images, ncols, None)
pp.run()
[docs]def print_report(args, logger=None):
"""
Generates a PDF file containing a table of annotated 2D hits that
match a set of features in the shape query. Must construct a
QApplication before calling this function.
:param args: argparser.Namespace with command line arguments
:type args: argparser.Namespace
:param logger: Logger for info level output
:type logger: logging.Logger
:raise: ValueError if any of the feature names, feature tolerances or
feature rules are illegal.
"""
if logger:
t1 = time.process_time()
reporter = phase.PhpShapeScreenReporter(args.vsdb)
hypo = setup_screen(reporter, args.features, args.tol, args.rules)
features = ",".join(site.getSiteName() for site in hypo.getHypoSites())
if args.features:
min_sites = hypo.getSiteCount()
else:
min_sites = min(3, hypo.getSiteCount())
if logger:
nstr = reporter.getHitCount() + 1
mesg = (f'\nScreening {nstr} structures for {min_sites}-point matches '
f'to {features}')
logger.info(mesg)
# Screen query + hits so that query will be the first result.
results = list(reporter.screenSitesInPlace(hypo, min_sites, [0]))
results += list(reporter.screenSitesInPlace(hypo, min_sites))
if logger:
mesg = f'A total of {len(results)} matches found, including shape query'
logger.info(mesg)
# Setup for 2D images.
image_height = option_utils.IMAGE_HEIGHT[args.quality]
image_width = 1.5 * image_height
adaptor = canvas2d.ChmMmctAdaptor()
stereo = canvas2d.ChmMmctAdaptor.StereoFromAnnotationAndGeometry_Safe
query2d = adaptor.create(reporter.getStructures([0])[0], stereo)
hydrogen_opt = canvas2d.ChmAtomOption.H_Never
canvas2d.Chm2DCoordGen.generateAndApply(query2d, hydrogen_opt)
pdf_file = args.pdf if args.pdf else hypo.getHypoID() + '.pdf'
# Accumulate 2D images for query + hits.
max_images = min(len(results), args.limit)
update_freq = 10
if logger:
logger.info(f'Generating 2D images for {max_images} structures\n')
images = []
rows = [result.row_number for result in results]
base_features_hits = reporter.getBaseFeatures(rows)
base_features_query = base_features_hits[0]
for i, result, st, base_features_hit in zip(range(max_images), results,
get_structures(reporter, rows),
base_features_hits):
hit2d = adaptor.create(st, stereo)
canvas2d.Chm2DCoordGen.generateAndApply(hit2d, hydrogen_opt)
mappings = get_feature_overlap_mappings(base_features_hit,
base_features_query, result)
cview = canvas2d.ChemView()
if i == 0:
cview.maximizeWidth(query2d)
cview.alignFeatures(hit2d, query2d, mappings)
qrect = QRect(0, 0, image_width, image_height)
gen_coords = False
pic = cview.getPicture_FeatureOverlap(hit2d, qrect, mappings,
gen_coords)
if i == 0:
st_labels = ['Shape Query']
else:
title = f'Title: {st.title}'
sim = 'Shape similarity: %.4f' % result.shape_sim
st_labels = [title, sim]
images.append(report2d.StructInfo(pic, None, None, st_labels, ""))
if logger:
if (i + 1) % update_freq == 0 or i == max_images - 1:
logger.info(f'{i + 1} images generated')
if logger:
logger.info(f'\nPrinting images to {pdf_file}')
page_size = option_utils.PAGE_SIZES[args.size]
print_images(images, args.cols, pdf_file, page_size)
if logger:
t2 = time.process_time()
logger.info('\n\nCPU time = %.2f sec' % (t2 - t1))
[docs]def run_diversity_filter(reporter, diverse_fraction, rows=None, logger=None):
"""
Returns row numbers for a diverse fraction of structures in a shape
screen report database.
:param reporter: Connection to a shape screen report database
:type reporter: phase.PhpShapeScreenReporter
:param diverse_fraction: Fraction on the interval (0.0, 0.2].
:type diverse_fraction: float
:param rows: The rows to screen. Row 0 corresponds to the shape query,
while rows 1, 2, etc. correspond to the hits. Rows may be in
any order. If omitted, rows for all hits are screened.
:type rows: list(int)
:param logger: Logger for info level output
:type logger: logging.Logger
:return: Row numbers for the diverse structures
:rtype: list(int)
"""
to_screen = rows or list(range(1, reporter.getHitCount() + 1))
if logger:
logger.info(f'\nApplying diversity filter to {len(to_screen)} hits')
t1 = time.process_time()
matches = list(reporter.selectDiverseFraction(to_screen, diverse_fraction))
if logger:
logger.info(f'Number of surviving hits: {len(matches)}')
t2 = time.process_time()
logger.info('CPU time = %.2f sec' % (t2 - t1))
return matches
[docs]def run_filters(reporter,
property_filter=None,
smarts_filter=None,
diverse_fraction=None,
logger=None):
"""
Returns row numbers of structures in a shape screen report database
which satisfy the provided property, SMARTS and diversity filters.
:param reporter: Connection to a shape screen report database
:type reporter: phase.PhpShapeScreenReporter
:param property_filter: Property filter to be satisfied
:type property_filter: phase.PhpDbPropertyQuery or NoneType
:param smarts_filter: SMARTS filter to be satisfied
:type smarts_filter: filter.SmartsFilter or NoneType
:param diverse_fraction: Optional diverse fraction of rows to retain
after applying other filters. Must lie on (0.0, 0.2].
:type diverse_fraction: float or NoneType
:param logger: Logger for info level output
:type logger: logging.Logger or NoneType
:return: Row numbers that satisfy the filter
:rtype: list(int)
"""
if property_filter:
matches = run_property_filter(reporter, property_filter, logger)
else:
matches = list(range(1, reporter.getHitCount() + 1))
if smarts_filter and matches:
matches = run_smarts_filter(reporter, smarts_filter, matches, logger)
if diverse_fraction and matches:
matches = run_diversity_filter(reporter, diverse_fraction, matches,
logger)
return matches
[docs]def run_property_filter(reporter, property_filter, logger=None):
"""
Returns row numbers of structures in a shape screen report database
which satisfy the provided property filter.
:param reporter: Connection to a shape screen report database
:type reporter: phase.PhpShapeScreenReporter
:param property_filter: Property filter to be satisfied
:type property_filter: phase.PhpDbPropertyQuery
:param logger: Logger for info level output
:type logger: logging.Logger
:return: Row numbers that satisfy the filter
:rtype: list(int)
"""
if logger:
hit_count = reporter.getHitCount()
logger.info(f'\nApplying property filter to {hit_count} hits')
t1 = time.process_time()
ptable = reporter.getPropertyTable()
matches = list(ptable.runQuery(property_filter))
if logger:
logger.info(f'Number of surviving hits: {len(matches)}')
t2 = time.process_time()
logger.info('CPU time = %.2f sec' % (t2 - t1))
return matches
[docs]def run_smarts_filter(reporter, smarts_filter, rows=None, logger=None):
"""
Returns row numbers of structures in a shape screen report database
which satisfy the provided SMARTS filter.
:param reporter: Connection to a shape screen report database
:type reporter: phase.PhpShapeScreenReporter
:param smarts_filter: SMARTS filter to be satisfied
:type smarts_filter: filter.SmartsFilter
:param rows: The rows to screen. Row 0 corresponds to the shape query,
while rows 1, 2, etc. correspond to the hits. Rows may be in
any order. If omitted, rows for all hits are screened.
:type rows: list(int)
:param logger: Logger for info level output
:type logger: logging.Logger
:return: Row numbers that satisfy the filter
:rtype: list(int)
"""
to_screen = rows if rows else list(range(1, reporter.getHitCount() + 1))
if logger:
logger.info(f'\nApplying SMARTS filter to {len(to_screen)} hits')
t1 = time.process_time()
matches = []
for i, st in enumerate(get_structures(reporter, to_screen)):
if smarts_filter.checkStructure(st):
matches.append(to_screen[i])
if logger:
logger.info(f'Number of surviving hits: {len(matches)}')
t2 = time.process_time()
logger.info('CPU time = %.2f sec' % (t2 - t1))
return matches
[docs]def setup_screen(reporter, features, tol, rules):
"""
Creates a pharmacophore hypothesis from user-supplied strings of
features, tolerances and feature-matching rules.
:param reporter: Connection to a shape screen report database
:type reporter: phase.PhpShapeScreenReporter
:param features: String containing a comma-delimited list of query
features to match. If None, all features in the query will
be used.
:type features: str or NoneType
:param tol: String containing a dictionary-like specification of
matching tolerances
:type tol: str
:param rules: String containing a dictionary-like specification of
feature type matching rules
:type rules: str
:return: The pharmacophore hypothesis with which to perform the screen
:rtype: phase.PhpHypoAdaptor
:raise: ValueError if any of the supplied feature names is illegal
or not found in the shape query, or if any of the tolerances
or permitted feature strings is illegal.
"""
if features:
feature_list = features.split(',')
else:
query_sites = reporter.getSites([0])[0]
feature_list = [site.getSiteName() for site in query_sites]
tol_dict, mesg = option_utils.parse_feature_tol(tol, True)
if mesg:
raise ValueError(mesg)
rules_dict, mesg = option_utils.parse_feature_rules(rules, True)
if mesg:
raise ValueError(mesg)
return create_user_hypo(reporter, feature_list, tol_dict, rules_dict)