Source code for schrodinger.active_learning.al_utils

import csv
import functools
import math
import os
import random
import shutil
import sys
import tarfile
import time
import zipfile
from collections import defaultdict

from schrodinger.job import jobcontrol
from schrodinger.utils import fileutils
from schrodinger.utils import log

logger = log.get_output_logger(__file__)

MINIMAL_LIGAND_ML_TRAIN = 40
BAD_SCORE = 1e9

EVAL_TASK = "evaluate"
SCREEN_TASK = "screen"
PILOT_TASK = "pilot"
SMILES_FORMAT = ".smi"
CSV_FORMAT = ".csv"
MAE_FORMAT = ".mae"

TRUTH_COL = "Value"


[docs]def positive_int(s): """ ArgumentParser function to check whether input can be converted to positive integer. :param s: input string :type s: str :return: integer value of input string :rtype: int """ i = int(s) if i <= 0: raise ValueError() return i
[docs]def split_smi_line(line): """ Split a line from .smi file to SMILES pattern and title. Return empty list if line is empty. :param line: line from .smi file :type line: str :return: SMILES pattern, title :rtype: [str, str] or [] """ cols = line.rstrip("\n").split(None, 1) if not cols: return [] pattern = cols[0] if len(cols) == 1: title = pattern else: title = cols[1] return pattern, title
[docs]def get_smi_header(): """ Create header for .smi input file. We assume the SMILES is in the first column and title in in the second column. :return: header list, header index for reordering SMILES and title :rtype: list(str), list(int) """ reorder_index = [0, 1] header_list = ['SMILES', 'Title'] return header_list, reorder_index
[docs]def get_csv_header(filename, smi_index, name_index, delimiter=",", with_header=True): """ Create header for .csv input file. The reordered index will put SMILES at first column and title at the second column. :param filename: .csv input file :type filename: str :param smi_index: column index of molecule SMILES :type smi_index: int :param name_index: column index of molecule name :type name_index: int :param delimiter: delimiter of input csv files :type delimiter: str :param with_header: Whether the file has header in its first line :type with_header: bool :return: header list, header index for reordering SMILES and title :rtype: list(str), list(int) """ with fileutils.open_maybe_compressed(filename, "rt") as f: reader = csv.reader(f, delimiter=delimiter) if with_header: old_header = next(reader) else: old_header = [] num_col = len(next(reader)) other_prop_index = 1 for col_index in range(0, num_col): if col_index == smi_index: old_header.append("SMILES") elif col_index == name_index: old_header.append("Title") else: old_header.append(f"Property_{other_prop_index}") other_prop_index += 1 # Always put SMILES and name as the first and second column. reorder_index = [smi_index, name_index] + [ x for x in range(0, len(old_header)) if x not in [smi_index, name_index] ] header_list = [old_header[x] for x in reorder_index] return header_list, reorder_index
[docs]def my_csv_reader(filename): """ Yield a csv reader that skips the first line. :param filename: .csv file name :type filename: str :return: csv.reader that skips first line of the file. :rtype: iterator """ with open(filename, 'r', newline='') as f: reader = csv.reader(f) next(reader) yield from reader
[docs]def read_score(score_file): """ Read known scores of ligands from args.score_file. :return: a dictionary that maps ligand title to ligand score. :rtype: dict """ if score_file is None or not os.path.isfile(score_file): return None known_title_to_score = defaultdict(lambda: BAD_SCORE) csv_reader = my_csv_reader(score_file) for value in csv_reader: try: title, score = value[0], float(value[1]) known_title_to_score[title] = min(known_title_to_score[title], score) except ValueError: logger.warning("Fail to read score of ligand {}".format(value[0])) return known_title_to_score
[docs]def random_split(file_list, num_ligands, prefix="splited", block_size=100000, name_index=0, smi_index=1, random_seed=None, delimiter=",", with_header=True): """ Combine input files, shuffle lines, split into files with block_size line per file. Reorder the columns such that SMILES and name is in the first and second column respectively. :param file_list: paths of input files. :type file_list: list :param num_ligands: total number of ligands in all the input files. :type num_ligands: int :param prefix: prefix of split files :type prefix: str :param block_size: number of ligands in each sub .csv file. :type block_size: int :param name_index: column index of molecule name :type name_index: int :param smi_index: column index of molecule SMILES :type smi_index: int :param random_seed: random seed number for shuffling the ligands :type random_seed: int or None :param delimiter: delimiter of input csv files :type delimiter: str :param with_header: Whether input file(s) has header in its first line. :type with_header: bool :return: list of split files, reordered csv header :rtype: list, list """ rand = random.Random(random_seed) input_type = get_file_ext(file_list[0]) if input_type == SMILES_FORMAT: header_list, reorder_index = get_smi_header() elif input_type == CSV_FORMAT: header_list, reorder_index = get_csv_header(file_list[0], smi_index, name_index, delimiter, with_header) else: logger.error(f"Unknown input file type: {file_list[0]}") sys.exit(1) num_subfile = math.ceil(num_ligands / block_size) # Create writers for all the temporary files. tmp_out_fname_list = [] tmp_out_writer_list = [] for current_out_findex in range(0, num_subfile): current_out_fname = os.path.join( os.getcwd(), 'tmp_{}_{}.txt'.format(prefix, current_out_findex)) tmp_out_fname_list.append(current_out_fname) outf = open(current_out_fname, 'wb') tmp_out_writer_list.append(outf) # Write each line to temporary files randomly. for file in file_list: with fileutils.open_maybe_compressed(file, "rb") as f_in: if with_header: _ = f_in.readline() for line in f_in: writer = rand.choice(tmp_out_writer_list) writer.write(line) for file in tmp_out_writer_list: file.close() out_fname_list = [] # Shuffle lines of all the temporary files and write them as final file. for file_index, file in enumerate(tmp_out_fname_list): if os.path.isfile(file) and os.path.getsize(file) > 0: with open(file, 'r', newline='') as f_in: if input_type == CSV_FORMAT: csv_reader = csv.reader(f_in) line_list = [x for x in csv_reader] elif input_type == SMILES_FORMAT: line_list = [split_smi_line(x) for x in f_in] else: logger.error(f'Unknown input file type: {file}') sys.exit(1) rand.shuffle(line_list) shuffled_file = os.path.join(os.getcwd(), '{}_{}.csv'.format(prefix, file_index)) with open(shuffled_file, 'w', newline='') as f_out: writer = csv.writer(f_out) writer.writerow(header_list) for line in line_list: if not line: continue line_reorder = [line[x] for x in reorder_index] writer.writerow(line_reorder) out_fname_list.append(shuffled_file) fileutils.force_remove(*tmp_out_fname_list) return out_fname_list, header_list
[docs]def merge_ligand_ml_models(sub_model_name_list, final_model, job_directory): """ Merge multiple .tar.gz ligand_ml models to single zipped deepautoqsar model. :param sub_model_name_list: list of .tar.gz ligand_ml model name. :type sub_model_name_list: [str] :param final_model: full path of the final zipped deepautoqsar model. :type final_model: str :param job_directory: directory of the .tar.gz ligand_ml models. :type job_directory: str """ import ligand_ml run_dir_list = [] for sub_model_name in sub_model_name_list: sub_model_path = os.path.join(job_directory, sub_model_name) if not os.path.isfile(sub_model_path): logger.warning(f"ligand_ml model {sub_model_path} does not " f"exist.") continue with tarfile.open(sub_model_path) as tf: tf.extractall(path=job_directory) run_dir_list.append(fileutils.strip_extension(sub_model_path)) smasher_list = [ ligand_ml.Smasher.load_from_dir(run_dir) for run_dir in run_dir_list ] pre_package_path = os.path.join(job_directory, 'smasher_pre_package') fileutils.force_rmtree(pre_package_path) final_smasher = ligand_ml.Smasher.merge(smasher_list, pre_package_path) final_model_basename = fileutils.get_jobname(final_model) fileutils.force_rmtree(final_model_basename) os.makedirs(final_model_basename) package_path = os.path.join(final_model_basename, 'smasher') final_smasher.package(package_path, make_tarball=False) with zipfile.ZipFile(final_model, 'w', zipfile.ZIP_DEFLATED) as zfile: for root, dirs, files in os.walk(package_path): zfile.write(root) for file in files: zfile.write(os.path.join(root, file)) for run_dir in run_dir_list: fileutils.force_rmtree(run_dir) fileutils.force_rmtree(final_model_basename) fileutils.force_rmtree(pre_package_path)
[docs]def convert_ligand_ml_model_format(qzip_model): """ Convert .qzip deepautoqsar model to .tar.gz ligand_ml model. :param qzip_model: .qzip deepautoqsar model filename. :type qzip_model: str :return: .tar.gz ligand_ml model filename :rtype: str """ model_name = os.path.splitext(qzip_model)[0] model_basename = fileutils.get_jobname(model_name) tar_model = model_name + '.tar.gz' with zipfile.ZipFile(qzip_model, 'r') as zipobj: with tarfile.open(tar_model, 'w:gz') as tar: smasher_path = os.path.join(model_basename, 'smasher') for file in zipobj.namelist(): if file.startswith(smasher_path): zipobj.extract(file) tar.add(smasher_path, arcname=model_basename) fileutils.force_rmtree(model_basename) return tar_model
[docs]def get_file_ext(filename): """ Get the extension of the file name. Skip 'gz' if it is a gz compressed file. :param filename: name of the file. :type filename: str :return: 'gz' excluded extension of the file. :rtype: str """ root, ext = os.path.splitext(filename) if ext.lower() == '.gz': _, sub_ext = os.path.splitext(root) return sub_ext if ext[-2:].lower() == 'gz': return ext[:-2] return ext
[docs]def check_driver_disk_space(active_learning_job): """ Estimate the driver disk usage of an active learning job with some assumed parameters. Print a warning is the available driver disk space is smaller than the estimate space. :param active_learning_job: current AL driver. :type active_learning_job: ActiveLearningJob instance. """ backend = jobcontrol.get_backend() free_disk = shutil.disk_usage(backend.job_dir).free bytes_per_ml_model = 1024**3 # Assume each ML model is ~ 1GB. extra_bytes_per_ml_score = 50 # Assume each ligands add 50B for ML score. bytes_per_scored_ligand = 5 * 1024 # Assume each ligand adds 5KB in # scoring related output. redundancy = 1.2 # Assume 20% redundancy. input_ligand_file_size = sum( [os.path.getsize(x) for x in active_learning_job.args.infile_list]) num_ligands = active_learning_job.total_ligand_num keep_size = min(active_learning_job.args.keep, num_ligands) rescore_size = min(active_learning_job.args.num_rescore_ligand, keep_size) if active_learning_job.args.task == SCREEN_TASK: train_size = active_learning_job.args.train_size num_iter = active_learning_job.args.num_iter ml_model_size = bytes_per_ml_model * num_iter ml_top_ligand_size = (input_ligand_file_size + num_ligands * extra_bytes_per_ml_score) * \ active_learning_job.args.uncertainty_sample_ratio * num_iter ligand_file_size = 2*input_ligand_file_size + num_ligands * \ extra_bytes_per_ml_score scoring_result_size = (rescore_size + train_size*num_iter) * \ bytes_per_scored_ligand pred_csv_size = (keep_size*extra_bytes_per_ml_score + keep_size/num_ligands*input_ligand_file_size) * \ num_iter total_size = ml_model_size + ml_top_ligand_size + ligand_file_size +\ scoring_result_size + pred_csv_size elif active_learning_job.args.task == EVAL_TASK: ml_model_size = os.path.getsize(active_learning_job.args.model) pred_csv_size = keep_size*extra_bytes_per_ml_score + \ keep_size/num_ligands*input_ligand_file_size ligand_file_size = 2*input_ligand_file_size + num_ligands * \ extra_bytes_per_ml_score scoring_result_size = rescore_size * bytes_per_scored_ligand total_size = ml_model_size + pred_csv_size + ligand_file_size + \ scoring_result_size elif active_learning_job.args.task == PILOT_TASK: pilot_size = min(active_learning_job.args.pilot_size, num_ligands) ligand_file_size = 2 * input_ligand_file_size + pilot_size * extra_bytes_per_ml_score scoring_result_size = pilot_size * bytes_per_scored_ligand pred_csv_size = pilot_size*extra_bytes_per_ml_score + \ pilot_size/num_ligands*input_ligand_file_size total_size = ligand_file_size + scoring_result_size + bytes_per_ml_model + pred_csv_size else: logger.error( f"Error:Unknown task type: {active_learning_job.args.task}") sys.exit(1) logger.info(f"Estimate driver disk usage : " f"{total_size * redundancy / 1024 ** 3:.2f} GB.") logger.info(f"Current free space on driver disk: " f"{free_disk / 1024 ** 3:.2f} GB.") if free_disk < total_size * redundancy: logger.error( "Error: Current free space on driver disk is lower than the " "estimate disk usage. Please increase the driver disk space or " "specify another driver node.") sys.exit(1)
[docs]def node_run_timer(func): """ Decorator for timing the running time of runNode method in ActiveLearningNode """ @functools.wraps(func) def wrap(node, *args, **kwargs): t0 = time.time() out = func(node, *args, **kwargs) time_total = time.time() - t0 m, s = divmod(time_total, 60) h, m = divmod(m, 60) logger.info(f"Running time of node {node.node_name}: {int(h)} hours, " f"{int(m)} minutes, {round(s)} seconds") node.time_cost = (int(h), int(m), round(s)) return out return wrap
[docs]def add_output_file(*output_files, incorporate=False): """ Add files to jobcontrol output files. :param output_files: files to be transferred. :type output_files: str :param incorporate: marked files for incorporation by maestro. :type incorporate: bool """ backend = jobcontrol.get_backend() if backend: for output_file in output_files: job_dir = os.path.realpath(backend.job_dir) output_file_rel_path = os.path.relpath(output_file, job_dir) backend.addOutputFile(output_file_rel_path) if incorporate: backend.setStructureOutputFile(output_file_rel_path)
[docs]def add_input_file(jsb, *input_files): """ Check the existence of input file(s). Add it as jobcontrol input file if it exists, otherwise exit with error. :param jsb: job specification builder :type jsb: launchapi.JobSpecificationArgsBuilder :param input_files: input file(s) to be added. :type input_files: str """ for input_file in input_files: if not os.path.isfile(input_file): sys.exit(f"Error. input file: {input_file} does not exist.") jsb.setInputFile(input_file)
[docs]def concatenate_logs(combined_logfile, subjob_logfile_list, logger=None): """ Combine subjob logfiles into single combined logfile. :param combined_logfile: combined log file name :type combined_logfile: str :param subjob_logfile_list: list of subjob logfile names to be combined. :type subjob_logfile_list: list(str) :param logger: logger for receiving the info and error message. :type logger: Logger or None """ if logger: logger.info("Combining log files to %s", combined_logfile) with open(combined_logfile, 'w') as fho: for logfile in subjob_logfile_list: fho.write("######## begin of %s #########\n" % logfile) try: with open(logfile) as fhi: shutil.copyfileobj(fhi, fho) except IOError as e: if logger: logger.error(e) else: raise fho.write("######## end of %s #########\n" % logfile)
[docs]def get_host_ncpu(): """ Return the host and number of CPU that should be used to submit subjobs. This function works both running under job control and not. :rtype: str, int """ # Inspired by JobDJ.__init__ backend = jobcontrol.get_backend() if backend: host_list = jobcontrol.get_backend_host_list() else: host_list = jobcontrol.get_command_line_host_list() if not host_list: host_list = [('localhost', 1)] return host_list[0][0], host_list[0][1] or 1