Source code for schrodinger.application.combinatorial_diversity.driver_utils

"""
Provides miscellaneous functionality for combinatorial_diversity_driver.py.

Copyright Schrodinger LLC, All Rights Reserved.
"""

import argparse
import csv
import math
import os
import random
import zipfile

from rdkit import Chem

from schrodinger import structure
from schrodinger.application.combinatorial_diversity import \
    diversity_fingerprinter
from schrodinger.application.combinatorial_diversity import diversity_selector
from schrodinger.application.combinatorial_diversity import diversity_splitter
from schrodinger.application.pathfinder import molio
from schrodinger.infra import canvas
from schrodinger.infra import phase
from schrodinger.job import jobcontrol
from schrodinger.utils import cmdline
from schrodinger.utils import csv_unicode
from schrodinger.utils import fileutils

COMBINATORIAL_DIVERSITY = "combinatorial_diversity"

# Default ratio of products/ndiverse:
PRODUCT_RATIO = 20

# Minimum number of diverse structures per chunk:
MIN_DIVERSE_PER_CHUNK = 100

# User must specify minimum number of products if ndiverse exceeds this value:
NDIVERSE_THRESHOLD = 50000

# Default minimum population of each chunk:
DEFAULT_MIN_POP = diversity_splitter.MIN_POP

# Default (and minimum legal) number of diverse probe structures used to
# construct the similarity space from which chunks are defined.
DEFAULT_NUM_PROBES = diversity_splitter.NUM_PROBES

# Default maximum number of chunks:
DEFAULT_MAX_CHUNKS = 512

# Default inflation of the number of requested random products to ensure
# that combinatorial_synthesis doesn't come up short:
DEFAULT_INFLATION_FACTOR = 1.25

# Default random seed for intializing diversity algorithm:
DEFAULT_RAND_SEED = diversity_selector.RAND_SEED

# Legal input file types:
JSON, FP, CSV, SMI = "json", "fp", fileutils.SMILESCSV, fileutils.SMILES
INFILE_EXT_TO_TYPE = {".json": JSON, ".fp": FP, ".csv": CSV, ".smi": SMI}

LEGAL_OUTFILE_TYPES = [
    fileutils.MAESTRO, fileutils.SD, fileutils.SMILESCSV, fileutils.SMILES
]
# Note that Chem.SDWriter cannot write compressed SD files.
OUTFILE_TYPE_TO_EXT = {
    fileutils.MAESTRO: ".maegz",
    fileutils.SD: ".sdf",
    fileutils.SMILESCSV: ".csv",
    fileutils.SMILES: ".smi"
}

LEGAL_FP_TYPES = diversity_fingerprinter.LEGAL_FP_TYPES
# This default is chosen mainly for speed and disk space requirements:
DEFAULT_FP_TYPE = diversity_fingerprinter.MOLPRINT2D
MIN_FP_PER_SUBJOB = 1000

# Properties calculated automatically for .json and .smi inputs:
AUTO_PROPERTIES = diversity_fingerprinter.PROPERTY_NAMES
AUTO_KEYS = diversity_fingerprinter.PROPERTY_DESCRIPTIONS
AUTO_TYPES = diversity_fingerprinter.PROPERTY_TYPES

MAX_ROWS_DETECT = 1000


[docs]def add_property_biasing_options(parser): """ Adds property biasing options to the provided parser. :param parser: Argument parser object. :type parser: argparser.ArgumentParser """ property_biasing_options = parser.add_argument_group( title="Property Biasing Options") property_biasing_options.add_argument( "-filter", metavar="<file>", help="CSV file containing one or more property filters, with one " "filter per line. Each filter consists of the name of a property, " "followed by the preferred minimum and maximum values of that " "property, e.g., AlogP,2.0,5.0. In the case of .json or .smi input, " "use of this option triggers the creation of a set of default " "physicochemical properties to which filters may be applied. In the " "case of .fp or .csv input, filters may be applied only to the numeric " "properties present in those files. Use -list_props to see available " "properties. Note that diverse structures are selected with a bias " "toward satisfying as many filters as possible, but not necessarily " "all filters. Note also that a given property may appear in more " "than one filter, so that multiple desired ranges are possible.") property_biasing_options.add_argument( "-list_props", action="store_true", help="Get the list of properties available for biasing. Will be the " "automatically calculated properties for .json and .smi inputs, and " "the properties present in the file for .fp and .csv inputs.") property_biasing_options.add_argument( "-hba", metavar="<file>", help="Use supplied rules to assign hydrogen bond acceptor counts for " ".json and .smi input. Default rules are in the file HbondAcceptor.typ " "in the Schrodinger software installation.") property_biasing_options.add_argument( "-hbd", metavar="<file>", help="Use supplied rules to assign hydrogen bond donor counts for " ".json and .smi input. Default rules are in the file HbondDonor.typ " "in the Schrodinger software installation.")
[docs]def adjust_min_pop(min_pop, ndiverse, min_diverse_per_chunk, pool_size): """ Adjusts the minimum population per chunk, if necessary, to ensure a minimum number of diverse structures per chunk. :param min_pop: Requested minimum population per chunk. :type min_pop: int :param ndiverse: Total number of diverse structures to select. :type ndiverse: int :param min_diverse_per_chunk: Minimum allowed number of diverse structures per chunk. :type min_diverse_per_chunk: int :param pool_size: Total number of structures in the pool. :type pool_size: int :return: The appropriate minimum population. :rtype: int """ min_pop_adj = min(min_pop, pool_size) ndiverse_per_chunk = ndiverse / (pool_size / min_pop_adj) if ndiverse_per_chunk < min_diverse_per_chunk: return int(math.ceil(min_diverse_per_chunk * pool_size / ndiverse)) return min_pop_adj
[docs]def combine_diverse_structures(subjob_names, outfile): """ Combines diverse structures from subjobs to the indicated output file. :param subjob_names: Subjob names. :type subjob_names: list(str) :param outfile: Output Maestro, SD, CSV or SMILES file. Diverse structures from subjobs must be in the same format. :type outfile: str """ ftype = fileutils.get_structure_file_format(outfile) subjob_file_ext = f'_diverse{OUTFILE_TYPE_TO_EXT[ftype]}' existing_files = [] for subjob_name in subjob_names: subjob_file = subjob_name + subjob_file_ext if os.path.isfile(subjob_file): existing_files.append(subjob_file) if not existing_files: return molio.combine_output_files(existing_files, outfile, dedup=False)
[docs]def detect_property_types(infile, max_rows=MAX_ROWS_DETECT, sticky_missing=False): """ Given a .json, .fp, .csv or .smi input file, this function returns a dictionary of property names to property types for all properties, excluding SMILES and title, which are present in the file (.fp, .csv) or automatically calculated (.json, .smi). In the case of .fp and .csv, the first max_rows are examined to deduce property types. :param infile: Input file (.json, .fp, .csv or .smi). :type infile: str :param max_rows: The maximum number of rows to examine. :type max_rows: int :param sticky_missing: If True, a property with any missing values will be assigned a type of PropertyType.MISSING. If False, the property type will be deduced from non-missing values. :type sticky_missing: bool :return: Dictionary of property name to PropertyType. :rtype: dict{str: diversity_fingerprinter.PropertyType} """ infile_type = get_infile_type(infile) if infile_type in [JSON, SMI]: return dict(zip(AUTO_PROPERTIES, AUTO_TYPES)) prop_names, prop_rows = read_properties(infile, max_rows) prop_dict = {} for j in range(len(prop_names)): prop_dict[prop_names[j]] = get_property_type(prop_rows[0][j]) missing_int = (diversity_fingerprinter.PropertyType.MISSING, diversity_fingerprinter.PropertyType.INT) for i in range(1, len(prop_rows)): for prop_name, prop_value in zip(prop_names, prop_rows[i]): old_type = prop_dict[prop_name] new_type = get_property_type(prop_value) if sticky_missing: if old_type == diversity_fingerprinter.PropertyType.MISSING: continue elif new_type == diversity_fingerprinter.PropertyType.MISSING: # (FLOAT, INT, STR)-->MISSING prop_dict[ prop_name] = diversity_fingerprinter.PropertyType.MISSING continue if old_type == diversity_fingerprinter.PropertyType.STR: # Do not override. continue if new_type in (diversity_fingerprinter.PropertyType.STR, diversity_fingerprinter.PropertyType.FLOAT): # (FLOAT, INT, MISSING)-->STR or (FLOAT, INT, MISSING)-->FLOAT prop_dict[prop_name] = new_type elif (old_type, new_type) == missing_int: prop_dict[prop_name] = new_type return prop_dict
[docs]def extract_subjob_chunks(subjob_name, infile): """ Exracts chunk files from the archive <subjob_name>.zip and returns lists of the fingerprint files, numbers of diverse structures, and fingerprint domains that should be supplied to the DiversitySelector object that will operate on each chunk. One of two behaviors will occur: 1. If the archive contains .csv files, then each fingerprint file will be infile, and the row numbers in each .csv file will be returned as the fingerprint domains. 2. If the archive contains .fp files, then those fingerprint file names will be returned, and each fingerprint domain will be None. :param subjob_name: The subjob name. :type subjob_name: str :param infile: If the archive contains .csv files, this should be the name of the fingerprint file for which in-place splitting was done. Will be either the user-supplied input fingerprint file (-nocopy, -nosplit) or the fingerprint file generated from the input structures (-nosplit). Ignored if the archive contains .fp files. :type infile: str :return: Lists of fingerprint file names, numbers of diverse structures and fingerprint domains. :rtype: list(str), list(int), list(list(int)) """ with zipfile.ZipFile(f'{subjob_name}.zip') as zfile: zfile.extractall() chunk_files = [] ndiverse_per_chunk = [] manifest_file = f'{subjob_name}_manifest.csv' with open(manifest_file, newline='') as fh: for row in csv.reader(fh): chunk_files.append(row[0]) ndiverse_per_chunk.append(int(row[1])) nchunks = len(chunk_files) if chunk_files[0].endswith(".csv"): fp_files = nchunks * [infile] fp_domains = [] for chunk_file in chunk_files: with open(chunk_file, newline='') as fh: fp_domain = next(csv.reader(fh)) fp_domains.append(list(map(int, fp_domain))) else: fp_files = chunk_files fp_domains = nchunks * [None] return fp_files, ndiverse_per_chunk, fp_domains
[docs]def generate_fingerprints(infile, outfile, fptype, want_props=False, hba_file=None, hbd_file=None, logger=None): """ Generates Canvas fingerprints and, optionally, a default set of physicochemical properties for the structures in a SMILES or CSV file. :param infile: Input SMILES or CSV file. :type infile: str :param outfile: Output fingerprint file. :type outpfile: str :param fptype: Fingerprint type (see LEGAL_FP_TYPES). :type fptype: str :param want_props: Whether to generate properties. Should be True only for SMILES input. :type want_props: bool or NoneType :param hba_file: File with customized hydrogen bond acceptor rules. Ignored if want_props is False. :type hba_file: str or NoneType :param hbd_file: File with customized hydrogen bond donor rules. Ignored if want_props is False. :type hbd_file: str or NoneType :param logger: Logger for warning and info messages. :type logger: logging.Logger or NoneType :raises ValueError: If properties are requested for CSV input. """ csv_input = fileutils.is_csv_file(infile) if want_props and csv_input: raise ValueError("Properties are generated only for SMILES input") div_fp = diversity_fingerprinter.DiversityFingerprinter( fptype, want_props, hba_file, hbd_file) fp_type_info = diversity_fingerprinter.FP_DICT[fptype]().getTypeInfo() fpout = canvas.ChmCustomOut32(fp_type_info, True) fpout.open(outfile) if csv_input: total_input, total_output = generate_fingerprints_from_csv( infile, div_fp, fpout, logger) else: total_input, total_output = generate_fingerprints_from_smi( infile, want_props, div_fp, fpout, logger) fpout.close() if logger: mesg = (f"Successfully processed {total_output} of {total_input} " "structures") logger.info(mesg)
[docs]def generate_fingerprints_from_csv(csv_file, div_fp, fpout, logger): """ Generates fingerprints for the SMILES in a .csv file, and writes the fingerprints, titles, properties from columns 2 and beyond and SMILES to an open fingeprint file. Returns the total number of input rows and the total number of fingerprint rows written. :param csv_file: CSV file name. :type csv_file: str :param div_fp: Diversity fingerprinter configured to generate only fingerprints. :type div_fp: diversity_fingerprinter.DiversityFingerprinter :param fpout: 32-bit custom fingerprint connection. :type fpout: canvas.ChmCustomOut32 :param logger: Logger for warning and info messages. :type logger: logging.Logger or NoneType :return: Tuple of the number of input rows and the number of fingerprints successfully generated and written. :rtype: int, int """ total_input = 0 total_output = 0 with open(csv_file, newline='') as fh: reader = csv.reader(fh) col_names = next(reader)[2:] + ['SMILES'] for line in reader: total_input += 1 if logger and total_input and total_input % 1000 == 0: logger.info(f"Processed {total_input} structures") smiles = line[0] title = line[1] prop_values = line[2:] + [smiles] try: fp, _ = div_fp.compute(smiles) extra_data = dict(zip(col_names, prop_values)) fpout.write(fp, title, extra_data) total_output += 1 except canvas.ChmException as err: if logger: logger.warning(str(err)) return total_input, total_output
[docs]def generate_fingerprints_from_smi(smi_file, want_props, div_fp, fpout, logger): """ Generates fingerprints and properties for the SMILES in a .smi file, and writes the fingerprints, titles, properties and SMILES to an open fingerprint file. Returns the total number of input rows and the total number of fingerprint rows written. :param smi_file: SMILES file name. :type smi_file: str :param want_props: Whether properties are being generated. :type want_props: bool :param div_fp: Diversity fingerprinter configured to generate fingerprints and, if want_props is True, properties. :type div_fp: diversity_fingerprinter.DiversityFingerprinter :param fpout: 32-bit custom fingerprint connection. :type fpout: canvas.ChmCustomOut32 :param logger: Logger for warning and info messages. :type logger: logging.Logger or NoneType :return: Tuple of the number of input rows and the number of fingerprints successfully generated and written. :rtype: int, int """ total_input = 0 total_output = 0 col_names = AUTO_PROPERTIES + ['SMILES'] if want_props else ['SMILES'] with open(smi_file) as fh: for line in fh: total_input += 1 if logger and total_input and total_input % 1000 == 0: logger.info(f"Processed {total_input} structures") tokens = line.rstrip("\n").split(None, 1) smiles = tokens[0] title = "" if len(tokens) == 1 else tokens[1] try: fp, prop_values = div_fp.compute(smiles) prop_strings = list(map(str, prop_values)) + [smiles] extra_data = dict(zip(col_names, prop_strings)) fpout.write(fp, title, extra_data) total_output += 1 except canvas.ChmException as err: if logger: logger.warning(str(err)) return total_input, total_output
[docs]def get_available_properties(infile, descriptions=False): """ Returns a list of the available properties in the provided input file. If .json or .smi, the properties that are calculated automatically are returned. If .csv, properties in columns 3 and beyond are returned. If .fp, extra data columns other than SMILES are returned. :param infile: Input file with source of structures. :type infile: str :param descriptions: Whether to include descriptions for automatically calculated properties. :type descriptions: bool :return: Property names. :rtype: list(str) :raises KeyError: If any required columns are missing. """ infile_type = get_infile_type(infile) if infile_type in [JSON, SMI]: if descriptions: return [" - ".join(x) for x in zip(AUTO_PROPERTIES, AUTO_KEYS)] else: return AUTO_PROPERTIES elif infile_type == FP: fpin = canvas.ChmFPIn32(infile) # This will raise KeyError if no SMILES column is found. smiles_col = diversity_selector.get_smiles_column(fpin) props = list(fpin.getExtraColumnNames()) del props[smiles_col] return props else: with open(infile, newline='') as fh: header = next(csv.reader(fh)) ncol = len(header) if ncol < 2: mesg = f'{infile} must contain SMILES and title columns' raise KeyError(mesg) if 'SMILES' not in header[0] and 'smiles' not in header[0]: mesg = f'First column of {infile} is not named "SMILES"' raise KeyError(mesg) if ncol > 2: return header[2:] return []
[docs]def get_distributed_fp_generation_commands(args, nsub): """ Returns lists of subjob commands for running distributed fingerprint and property generation. :param args: Command line arguments. :type args: argparse.Namespace :param nsub: Number of subjobs. :type nsub: int :return: list of subjob commands. :rtype: list(list(str)) """ jobname = get_jobname(args) commands = [] for i in range(nsub): subjob_name = f"{jobname}_fpgen_sub_{i + 1}" command = [ COMBINATORIAL_DIVERSITY, args.infile, str(args.ndiverse), "-fsubjob", subjob_name, "-fptype", args.fptype ] if args.filter: command += ["-filter", args.filter] if args.hba: command += ["-hba", args.hba] if args.hbd: command += ["-hbd", args.hbd] commands.append(command) return commands
[docs]def get_distributed_selection_commands(args, nsub): """ Returns lists of subjob commands for running distributed diverse structure selection. :param args: Command line arguments. :type args: argparse.Namespace :param nsub: Number of subjobs. :type nsub: int :return: list of subjob commands. :rtype: list(list(str)) """ jobname = get_jobname(args) commands = [] for i in range(nsub): subjob_name = f"{jobname}_select_sub_{i + 1}" command = [ COMBINATORIAL_DIVERSITY, args.infile, str(args.ndiverse), "-dsubjob", subjob_name ] if args.nosplit: # Subjob needs these parent job arguments in order to correctly # handle the case of generated fingerprints. command += ["-nosplit", "-fptype", args.fptype] outfile_ext = ".csv" if args.outfile: ftype = fileutils.get_structure_file_format(args.outfile) outfile_ext = OUTFILE_TYPE_TO_EXT[ftype] command += ["-out", f"{subjob_name}_diverse{outfile_ext}"] if not args.gen_coords: command.append("-no3d") if args.v3000: command.append("-v3000") if args.verbose: command.append("-verbose") if args.filter: command += ["-filter", args.filter] commands.append(command) return commands
[docs]def get_generated_fingerprint_filename(args): """ Returns the name of the generated fingerprint file supplied to a diversity subjob when -nosplit is in effect and the original source of structures was anything other than fingerprints. :param args: Command line arguments for a diversity subjob :type args: argparse.Namespace :return: Generated fingerprint file name :rtype: str """ pos = args.dsubjob.find('_select_sub_') parent_jobname = args.dsubjob[0:pos] return f'{parent_jobname}_{args.fptype}.fp'
[docs]def get_infile_type(infile): """ Returns the input file type (JSON, FP, CSV, SMI) based on extension, or an empty string if the extension isn't recognized. :param infile: Input file with source of structures. :type infile: str :return: Input file type or empty string. :rtype: str """ root, ext = fileutils.splitext(infile) if ext in INFILE_EXT_TO_TYPE: return INFILE_EXT_TO_TYPE[ext] return ""
[docs]def get_jobname(args): """ Returns an appropriate job name based on args.fsubjob, args.dsubjob, SCHRODINGER_JOBNAME, the job control backend, or the base name of args.infile. :param args: Command line arguments :type args: argparse.Namespace :return: job name :rtype: str """ if args.fsubjob: return args.fsubjob if args.dsubjob: return args.dsubjob return jobcontrol.get_jobname(args.infile)
[docs]def get_parser(): """ Creates argparse.ArgumentParser with supported command line options. :return: Argument parser object :rtype: argparser.ArgumentParser """ parser = argparse.ArgumentParser( prog=COMBINATORIAL_DIVERSITY, formatter_class=argparse.RawDescriptionHelpFormatter, add_help=False) parser.add_argument("-h", "--help", action="help", help="Show this message and exit.") parser.add_argument( "infile", metavar="<infile>", help="Source of input structures. May be a combinatorial synthetic " "route file (.json), a 32-bit Canvas fingerprint file with SMILES and " "properties (.fp), a CSV file with SMILES, titles and properties " "(.csv) or a SMILES file (.smi).") parser.add_argument( "ndiverse", metavar="<ndiverse>", type=int, help="The number of diverse structures to select. Linear scaling and " "distributed processing are achieved by splitting chemical space into " "2**N distinct regions (where N is determined by -min_pop) and " "selecting the appropriate number of diverse structures from each " "region. To ensure speedy selection and high diversity, it is strongly " "recommended that <ndiverse> be no larger than 5%% of the total pool " "from which selections are to be made.") parser.add_argument( "-min_pop", metavar="<m>", type=int, default=DEFAULT_MIN_POP, help="The minimum population of each distinct region of chemical " "space. This option would normally be used to speed up a job for which " "the 5%% rule is being exceeded. For example, if selecting 10,000 " "diverse structures from a pool of 100,000, reducing the minimum " "population from 10,000 to 5,000 would typically double the number of " "regions and halve total selection time (default: " f"{DEFAULT_MIN_POP:,}).") parser.add_argument( "-ndim", metavar="<n>", type=int, default=DEFAULT_NUM_PROBES, help="The number of dimensions in the chemical space from which the " "distinct regions are defined. A maximum of 2**(n-1) regions are " f"possible, so if n={DEFAULT_NUM_PROBES}, up to {DEFAULT_MAX_CHUNKS:,} " "regions can be defined. This parameter would normally be adjusted " "only when the pool of structures is so large that the population of " f"each region significantly exceeds {DEFAULT_MIN_POP:,} even when " "splitting over the maximum number of regions (default and minimum " f"legal value: {DEFAULT_NUM_PROBES}).") parser.add_argument( "-rand", dest="seed", metavar="<seed>", type=int, default=DEFAULT_RAND_SEED, help="Random seed integer for initializing diversity algorithm. " "Results are always the same for a given random seed (default: " "%(default)d).") parser.add_argument( "-nocopy", action="store_true", help="Utilize <infile> at its specified location and do not copy to " "the job directory. This option is most useful for very large input " "fingerprint files, as it allows a given diversity subjob to " "directly access the fingerprint rows assigned to it, without the " "cost of copying or physically splitting the fingerprint file. The " "file name must be specified using an absolute path, and that path " "must be accessible to all compute nodes on the host where the job is " "to run.") parser.add_argument( "-nosplit", action="store_true", help="Do not physically split an input fingerprint file or an " "intermediate fingerprint file generated from the input structures. " "Similar to -nocopy, in that it avoids the expense of splitting the " "fingerprint file, and it allows each diversity subjob to directly " "access its fingerprint rows. Differs from -nocopy, in that it does " "not require an absolute path, but it does result in the entire " "fingerprint file being copied to the job directory of each diversity " "subjob. Use of this option is strongly recommended when the number of " "input structures or fingerprints exceeds 5 million.") parser.add_argument( "-products", metavar="<p>", type=int, help="The minimum number of products that must be successfully " "enumerated before selecting diverse structures. Applies only to .json " f"input. The default is {PRODUCT_RATIO} times the number of diverse " "structures. This option MUST be specified if the number of diverse " f"structures is greater than {NDIVERSE_THRESHOLD:,}.") parser.add_argument( "-inflate", metavar="<factor>", type=float, default=DEFAULT_INFLATION_FACTOR, help="Product inflation factor. This value is multiplied by the " "minimum number of products and supplied to combinatorial_synthesis " "to ensure that an excess of products are made. Applies only to .json " "input (default: %(default).2f).") parser.add_argument( "-fptype", choices=LEGAL_FP_TYPES, default=DEFAULT_FP_TYPE, help="The type of Canvas fingerprints to generate for .json, .csv and " ".smi inputs (default: %(default)s).") parser.add_argument( "-savefp", action="store_true", help="Save generated fingerprints to <jobname>_<fptype>.fp. A default " "set of physicochemical properties are saved with the fingerprints for " ".json and .smi inputs if a property filter is supplied (see -filter).") parser.add_argument( "-onlyfp", action="store_true", help="Save generated fingerprints and exit without selecting diverse " "structures. This option is provided to allow large fingerprint files " "to be moved to a cross-mounted location and supplied in a subsequent " "job with the -nocopy option.") parser.add_argument( "-out", dest="outfile", metavar="<outfile>", help="Output Maestro, SD, CSV or SMILES file for diverse structures " "(default: <jobname>_diverse.csv).") parser.add_argument( "-no3d", dest="gen_coords", action="store_false", help="Skip 3D coordinate generation for diverse structures.") parser.add_argument("-v3000", action="store_true", help="Write SD file structures in V3000 format.") parser.add_argument( "-verbose", action="store_true", help="Output details of diversity selection/property biasing.") add_property_biasing_options(parser) jobcontrol_options = [cmdline.HOST, cmdline.JOBNAME, cmdline.TMPDIR] cmdline.add_jobcontrol_options(parser, options=jobcontrol_options) # A subjob that computes fingerprints: parser.add_argument("-fsubjob", help=argparse.SUPPRESS) # A subjob that selects diverse structures: parser.add_argument("-dsubjob", help=argparse.SUPPRESS) return parser
[docs]def get_property_type(value): """ Returns the apparent PropertyType of the supplied value. :param value: The value whose type is to be deduced. :type value: str :return: The apparent type of value. :rtype: diversity_fingerprinter.PropertyType """ if value == "": return diversity_fingerprinter.PropertyType.MISSING try: float(value) try: int(value) return diversity_fingerprinter.PropertyType.INT except: return diversity_fingerprinter.PropertyType.FLOAT except ValueError: return diversity_fingerprinter.PropertyType.STR
[docs]def read_properties(infile, max_rows=MAX_ROWS_DETECT): """ Given a .fp or .csv file, this function returns the list of property names, excluding SMILES and title, followed by the property values for the first max_rows rows. :param infile: Input .fp or .csv file. :type infile: str :param max_rows: The maximum number of rows to read. :type max_rows: int :return: list of property names followed by lists of property values :rtype: list(str), list(list(str)) :raises ValueError: If infile is of the wrong type. :raises RuntimeError: If .csv file has inconsistent numbers of values. """ infile_type = get_infile_type(infile) if infile_type == FP: return read_properties_from_fp_file(infile, max_rows) elif infile_type == CSV: return read_properties_from_csv_file(infile, max_rows) else: mesg = f'Unsupported file type: "{infile}". Must be .fp or .csv.' raise ValueError(mesg)
[docs]def read_properties_from_csv_file(infile, max_rows=MAX_ROWS_DETECT): """ Given a .csv file, this function returns the list of property names from columns 2 and beyond, which excludes SMILES and title, followed by the property values for the first max_rows rows. :param infile: Input .csv file. :type infile: str :param max_rows: The maximum number of rows to read. :type max_rows: int :return: list of property names followed by lists of property values :rtype: list(str), list(list(str)) :raises RuntimeError if .csv file has inconsistent numbers of values. """ prop_rows = [] with open(infile, newline='') as fh: reader = csv.reader(fh) prop_names = next(reader) ncol = len(prop_names) prop_cols = list(range(2, ncol)) del prop_names[0:2] if prop_cols: row_count = 0 for row in reader: row_count += 1 if len(row) != ncol: mesg = (f'File {infile}, row {row_count + 1}: ' f'Expecting {ncol} values but found {len(row)}') raise RuntimeError(mesg) prop_rows.append([row[i] for i in prop_cols]) if row_count == max_rows: break return prop_names, prop_rows
[docs]def read_properties_from_fp_file(infile, max_rows=MAX_ROWS_DETECT): """ Given a .fp file, this function returns the list of property names, excluding SMILES and title, followed by the property values for the first max_rows rows. :param infile: Input .fp file. :type infile: str :param max_rows: The maximum number of rows to read. :type max_rows: int :return: list of property names followed by lists of property values :rtype: list(str), list(list(str)) """ prop_rows = [] fpin = canvas.ChmFPIn32(infile) prop_names = list(fpin.getExtraColumnNames()) prop_cols = list(range(len(prop_names))) smiles_col = diversity_selector.get_smiles_column(fpin) del prop_names[smiles_col] del prop_cols[smiles_col] if prop_cols: row_count = 0 while fpin.hasNext(): row_count += 1 fp, title, props = fpin.nextExtra() prop_rows.append([props[i] for i in prop_cols]) if row_count == max_rows: break return prop_names, prop_rows
[docs]def read_property_filters(filter_file): """ Reads property filters from the provided CSV file. The format of each line is: prop_name,min_value,max_value :param filter_file: CSV file containing property filters. :type filter_file: str :return: List of property filters. :rtype: list(diversity_selector.PropertyFilter) :raises RuntimeError: If filter_file is incorrectly formatted. :raises ValueError: If limits are invalid. """ filters = [] with open(filter_file, newline='') as fh: for i, line in enumerate(csv.reader(fh), start=1): if len(line) != 3: mesg = (f'File {filter_file}: Incorrect number of values on ' f'line {i}: {line}') raise RuntimeError(mesg) filters.append( diversity_selector.PropertyFilter(line[0], float(line[1]), float(line[2]))) return filters
[docs]def split_fingerprints(fp_file, ndiverse, nsub, jobname, inplace=False, min_pop=DEFAULT_MIN_POP, num_probes=DEFAULT_NUM_PROBES): """ Splits a fingerprint file literally or figuratively into chunks using DiversitySplitter, and places the chunks into a series of zip archives named <jobname>_select_sub_i.zip, where i = 1, 2,...,nsub. Each archive contains one or more chunks to be processed by the associated subjob. Chunk j consists of exactly one of the following two files: <jobname>_chunk_j.fp - Fingerprints in the chunk (if inplace=False) <jobname>_chunk_j.csv - Row numbers in the chunk (if inplace=True) The value of inplace determines whether fp_file is literally split into smaller fingerprint files, or figuratively split by way or reporting the 0-based row numbers in each chunk. In addition to the chunk files, <jobname>_select_sub_i.zip contains the file <jobname>_select_sub_i_manifest.csv, which contains an ordered list of the chunk file names and the number of diverse structures to select from each chunk. :param fp_file: 32-bit Canvas fingerprint file containing SMILES and any properties to be biased. :type fp_file: str :param ndiverse: The total number of diverse structures to select. Must be at least twice as large as the number of chunks. :type ndiverse: int :param nsub: The desired number of subjobs. This would normally be the number of CPUs over which the job is to be distributed, since finer grained processing is already achieved by assigning one or more chunks to each subjob. The actual number of subjobs run may end up being smaller than this value. :type nsub: int :param jobname: Job name. Determines the names of the archives and chunk files that will be created. :type jobname: str :param inplace: Controls whether to split fp_file into smaller files (inplace=False), or simply write the row numbers of each chunk (inplace=True). :type inplace: bool :param min_pop: Suggested minimum number of structures in each chunk. An adjustment is made, as necessary, to ensure the number of diverse structures per chunk is at least MIN_DIVERSE_PER_CHUNK. :type min_pop: int :param num_probes: The number of diverse probe structures used to construct the similarity space from which chunks are defined. :type num_probes: int :return: tuple of the actual number of subjobs and the number of chunks :rtype: int, int """ # Adjust min_pop, if necessary. row_count = canvas.ChmFPIn32(fp_file).getRowCount() min_pop_adj = adjust_min_pop(min_pop, ndiverse, MIN_DIVERSE_PER_CHUNK, row_count) # Perform split. splitter = diversity_splitter.DiversitySplitter(fp_file, min_pop_adj, num_probes) if inplace: # Just write out the rows of each chunk. orthant_rows = splitter.getOrthantRows() all_chunk_files = [] for j, rows in enumerate(orthant_rows, start=1): chunk_file = f'{jobname}_chunk_{j}.csv' with csv_unicode.writer_open(chunk_file) as fh: csv_unicode.writer(fh).writerow(rows) all_chunk_files.append(chunk_file) else: # Split the fingerprint file. all_chunk_files = splitter.splitFingerprints(f'{jobname}_chunk') # Divide all_chunk_files as equally as possible over subjobs. nchunks = len(all_chunk_files) nsub_actual = min(nsub, nchunks) chunks_per_subjob = phase.partitionValues(nchunks, nsub_actual) ndiverse_per_chunk = phase.partitionValues(ndiverse, nchunks) isub = 0 subjob_chunk_files = [] subjob_ndiverse = [] for i in range(nchunks): subjob_chunk_files.append(all_chunk_files[i]) subjob_ndiverse.append(ndiverse_per_chunk[i]) if len(subjob_chunk_files) == chunks_per_subjob[isub]: isub += 1 subjob_manifest_file = f'{jobname}_select_sub_{isub}_manifest.csv' with csv_unicode.writer_open(subjob_manifest_file) as fh: writer = csv_unicode.writer(fh) for file, ndiv in zip(subjob_chunk_files, subjob_ndiverse): writer.writerow([file, ndiv]) zip_file = f'{jobname}_select_sub_{isub}.zip' with zipfile.ZipFile(zip_file, 'w', zipfile.ZIP_DEFLATED) as zfile: zfile.write(subjob_manifest_file) fileutils.force_remove(subjob_manifest_file) for subjob_chunk_file in subjob_chunk_files: zfile.write(subjob_chunk_file) fileutils.force_remove(subjob_chunk_file) subjob_chunk_files = [] return nsub_actual, nchunks
[docs]def split_structures(struct_file, nsub, jobname): """ Splits a SMILES or CSV file into nsub chunks, creating the files <jobname>_fpgen_sub_i.<ext>, where i=1,2,...,nsub and <ext> is "smi" or "csv". Each chunk will contain a minimum of MIN_FP_PER_SUBJOB structures, so the number of chunks actually created may be less than nsub. :param struct_file: SMILES or CSV file to be split. :type struct_file: str :param nsub: The desired number of subjobs. :type nsub: int :return: The actual number of files created. Will be <= nsub. :rtype: int """ nstruct = structure.count_structures(struct_file) file_ext = ".smi" csv_file = fileutils.is_csv_file(struct_file) if csv_file: file_ext = ".csv" nsub_max = max(1, int(nstruct / MIN_FP_PER_SUBJOB)) nsub_actual = min(nsub, nsub_max) structs_per_file = phase.partitionValues(nstruct, nsub_actual) header = "" ifile = 0 fh_out = open(f'{jobname}_fpgen_sub_{ifile + 1}{file_ext}', 'w') struct_count_file = 0 struct_count_total = 0 with open(struct_file, 'r') as fh_in: if csv_file: header = fh_in.readline().rstrip() fh_out.write(f'{header}\n') for line in fh_in: fh_out.write(f'{line.rstrip()}\n') struct_count_file += 1 if struct_count_file == structs_per_file[ifile]: fh_out.close() struct_count_total += struct_count_file if struct_count_total == nstruct: break struct_count_file = 0 ifile += 1 fh_out = open(f'{jobname}_fpgen_sub_{ifile + 1}{file_ext}', 'w') if csv_file: fh_out.write(f'{header}\n') if not fh_out.closed: fh_out.close() return nsub_actual
[docs]def summarize_property_filters(filter_file): """ Generates a string with a summary of the property filters in the provided file. :param filter_file: CSV file with property filters. :type filter_file: str :return: Summary of property filters. :rtype: str """ filters = read_property_filters(filter_file) summary = ["Property Filters", "----------------"] for f in filters: s = "%.2f <= %s <= %.2f" % (f.min_value, f.name, f.max_value) summary.append(s) return "\n".join(summary)
[docs]def validate_args(args, startup=False): """ Checks the validity of command line arguments. :param args: argparser.Namespace with command line arguments :type args: argparser.Namespace :param startup: Set to True if validating at starup time :type startup: bool :return: tuple of validity and non-empty error message if not valid :rtype: bool, str """ infile_type = get_infile_type(args.infile) if not infile_type: return False, f'Unrecognized input file type: "{args.infile}"' if args.nocopy and args.nosplit: return False, '-nocopy and -nosplit are mutually exclusive options' if not args.nocopy or not startup: if not os.path.isfile(args.infile): return False, f'Input file "{args.infile}" not found' ok, mesg = validate_properties(args.infile, args.filter) if not ok: return False, mesg if infile_type == FP: if args.savefp: return False, "-savefp is not legal with fingerprint input" if args.onlyfp: return False, "-onlyfp is not legal with fingerprint input" if args.outfile: ftype = fileutils.get_structure_file_format(args.outfile) if ftype not in LEGAL_OUTFILE_TYPES: return False, f'Illegal output file type: "{args.outfile}"' if args.ndiverse < 2: return False, "Number of diverse structures must be 2 or greater" if args.min_pop < 10: mesg = "Minimum population of each region must be 10 or greater" return False, mesg if args.ndim < DEFAULT_NUM_PROBES: mesg = f"Number of dimensions must be {DEFAULT_NUM_PROBES} or greater" return False, mesg if infile_type == JSON: if args.products is not None: if args.products <= args.ndiverse: mesg = ("Number of products must be greater than number of " "diverse structures") return False, mesg elif args.ndiverse > NDIVERSE_THRESHOLD: mesg = ("Number of products must be specified when number of " f"diverse structures is > {NDIVERSE_THRESHOLD:,}") return False, mesg if args.inflate <= 1.0: return False, "Product inflation factor must be > 1.0" elif args.products: mesg = "-products is valid only with synthetic route file input" return False, mesg if args.hba: if not os.path.isfile(args.hba): mesg = f'Hydrogen bond acceptor rules file "{args.hba}" not found' return False, mesg if args.hbd: if not os.path.isfile(args.hbd): mesg = f'Hydrogen bond donor rules file "{args.hbd}" not found' return False, mesg return True, ""
[docs]def validate_properties(infile, filter_file=None): """ Validates the input file and the property filter file to ensure that the required properties are present and numeric. :param infile: Input file with source of structures. :type infile: str :param filter_file: Property filter file, if any. :type filter_file: str or NoneType :return: tuple of validity and non-empty error message if not valid :rtype: bool, str """ if filter_file: if not os.path.isfile(filter_file): return False, f'Property filter file "{filter_file}" not found' try: get_available_properties(infile) except KeyError as err: return False, str(err) if filter_file: try: filters = read_property_filters(filter_file) except (ValueError, RuntimeError) as err: return False, str(err) prop_types = detect_property_types(infile, sticky_missing=True) numeric_types = [ diversity_fingerprinter.PropertyType.FLOAT, diversity_fingerprinter.PropertyType.INT ] for filter in filters: if filter.name not in prop_types: if get_infile_type(infile) in [JSON, SMI]: missing = "is not calculated for" else: missing = "is not present in" mesg = f'Filter property "{filter.name}" {missing} {infile}' return False, mesg prop_type = prop_types[filter.name] if prop_type == diversity_fingerprinter.PropertyType.MISSING: mesg = f'Filter property "{filter.name}" has missing values' return False, mesg if prop_type not in numeric_types: return False, f'Filter property "{filter.name}" is not numeric' return True, ""
[docs]def write_random_smi_subset(infile, outfile, nsub, rand_seed=DEFAULT_RAND_SEED): """ Selects a random subset of rows from a .smi file and writes them to another .smi file. :param infile: Input .smi file. :type infile: str :param outfile: Output .smi file. :type outfile: str :param nsub: Random subset size. :type nsub: int :param rand_seed: Seed to initialize random number generator. :type rand_seed: int :raises ValueError: If nsub exceeds the number of rows in infile. """ nstruct = structure.count_structures(infile) if nsub > nstruct: mesg = (f"Random subset size ({nsub}) exceeds the number of " f"structures ({nstruct}) in {infile}") raise ValueError(mesg) sample = set(random.Random(rand_seed).sample(range(nstruct), nsub)) with open(infile) as fh_in: with open(outfile, 'w') as fh_out: for i, line in enumerate(fh_in): if i in sample: fh_out.write(line)
[docs]def write_subjob_selections(fp_files, diverse_subset_rows, outfile, gen_coords=False, v3000=False, logger=None): """ Reads diverse structures and properties from the supplied fingerprint files and writes them to the indicated output Maestro, SD, CSV or SMILES file. :param fp_files: Fingerprint file names. :type fp_files: list(str) :param diverse_subset_rows: Zero-based lists of row numbers for diverse structures in each fingerprint file. :type diverse_subset_rows: list(list(int)) :param outfile: Output file for diverse structures and properties. :type outfile: str :param gen_coords: Whether to generate 3D coordinates for Maestro or SD output. :type gen_coords: bool :param v3000: Whether to write SD file structures in V3000 format. :type v3000: bool :param logger: Logger for warning messages. :type logger: logging.Logger or NoneType """ fpin = canvas.ChmFPIn32(fp_files[0]) prop_names = fpin.getExtraColumnNames() prop_cols = list(range(len(prop_names))) smiles_col = diversity_selector.get_smiles_column(fpin) del prop_cols[smiles_col] prop_types = detect_property_types(fp_files[0]) writer = molio.get_mol_writer(outfile, generate_coordinates=gen_coords, require_stereo=False, v3000=v3000) with writer: for fp_file, subset_rows in zip(fp_files, diverse_subset_rows): fpin = canvas.ChmFPIn32(fp_file) for row in subset_rows: fpin.setPos(row + 1) fp, title, prop_values = fpin.nextExtra() smiles = prop_values[smiles_col] chem_mol = Chem.MolFromSmiles(smiles) chem_mol.SetProp("_Name", title) for j in prop_cols: prop_name = prop_names[j] prop_value = prop_values[j] prop_type = prop_types[prop_name] if prop_type == diversity_fingerprinter.PropertyType.FLOAT: chem_mol.SetDoubleProp(prop_name, float(prop_value)) elif prop_type == diversity_fingerprinter.PropertyType.INT: chem_mol.SetIntProp(prop_name, int(prop_value)) else: chem_mol.SetProp(prop_name, prop_value) try: writer.append(chem_mol) except structure.UndefinedStereoChemistry as e: if logger: logger.warning(e)