Source code for schrodinger.active_learning.al_node

import csv
import heapq
import itertools
import os
import pickle
import random
import sys

import more_itertools

from schrodinger.active_learning import al_utils
from schrodinger.active_learning.al_utils import EVAL_TASK
from schrodinger.active_learning.al_utils import MINIMAL_LIGAND_ML_TRAIN
from schrodinger.active_learning.al_utils import PILOT_TASK
from schrodinger.active_learning.al_utils import TRUTH_COL
from schrodinger.job import jobcontrol
from schrodinger.job import queue
from schrodinger.utils import fileutils
from schrodinger.utils import log

logger = log.get_output_logger(__file__)

UNCERTAINTY = "uncertainty"
SCORE = "score"


[docs]def estimate_time_cost(num_ligands, num_iter, train_size, train_time, num_score_license, num_autoqsar_license, available_cpu=None, score_per_ligand_cost=20, autoqsar_per_ligand_cost=0.02, num_rescore_ligand=0, multiplier=1.0, application=''): """ Roughly estimate the time cost a active learning job based on the inputs and number of available licenses. :param num_ligands: total number of ligands in the library. :type num_ligands: int :param num_iter: number of active learning iterations. :type num_iter: int :param train_size: Ligand_ML training size per iteration. :type train_size: int :param train_time: Ligand_ML training time per iteration in hours. :type train_time: float :param num_score_license: total number of the application licenses :type num_score_license: int :param num_autoqsar_license: total number of AutoQSAR licenses :type num_autoqsar_license: int :param available_cpu: number of available CPU :type available_cpu: int :param score_per_ligand_cost: estimate time of of single ligand scoring time cost in second. :type score_per_ligand_cost: float :param autoqsar_per_ligand_cost: estimate time of of single ligand Ligand_ML time cost in second. :type autoqsar_per_ligand_cost: float :param num_rescore_ligand: Number of ligands to be rescored. :type train_size: int :param multiplier: estimate expansion number per ligand. :type multiplier: float :param application: name of the application that provides score :type application: str :return: estimate time cost in hour :rtype: float """ if num_score_license < 1: raise ValueError(f"No available {application} license.") if num_autoqsar_license < 1: raise ValueError("No available DeepChem/AutoQSAR license.") estimate_train_capacity_per_hour = 10_000 estimate_max_train_hour = 24.0 if available_cpu is not None: num_score_job = min(available_cpu, num_score_license) num_autoqsar_job = min(available_cpu, num_autoqsar_license) else: num_score_job = num_score_license num_autoqsar_job = num_autoqsar_license def time_per_iteration(current_iter_num): current_train_size = train_size * current_iter_num minimal_train_time = min( estimate_max_train_hour, current_train_size / estimate_train_capacity_per_hour) training_time = max(minimal_train_time, train_time) score_time = (multiplier * train_size * score_per_ligand_cost / 60 / 60 / num_score_job) eval_time = (num_ligands * autoqsar_per_ligand_cost / 60 / 60 / num_autoqsar_job) return training_time + score_time + eval_time rescore_time = (multiplier * num_rescore_ligand * score_per_ligand_cost / 60 / 60 / num_score_job) total_time_cost = sum( time_per_iteration(current_iter_num) for current_iter_num in range(1, num_iter + 1)) + rescore_time return total_time_cost
[docs]def get_jobdj(host_list=None): """ Return JobDJ with specified host list :param host_list: A list of (<hostname>, <maximum_concurrent_subjobs>) :type host_list: [(str, int)] or None :return: JobDJ with specific settings. :rtype: queue.JobDJ object """ return queue.JobDJ(hosts=host_list, default_max_retries=1, verbosity="normal", max_failures=queue.NOLIMIT)
[docs]def get_top_ligands_from_csv_list(csv_list, output_csv, num_ligands): """ Get the top ligands from a list of .csv files. Write the selected ligands to output csv file. :param csv_list: list of .csv files containing the ligands. :type csv_list: list(str) :param output_csv: name of output .csv file. :type output_csv: str :param num_ligands: number of ligands to select. :type num_ligands: int """ count = 0 with open(output_csv, "wb") as f_out: for ligands_csv_index, ligands_csv in enumerate(csv_list): with open(ligands_csv, "rb") as f_in: for line_index, line in enumerate(f_in): if line_index == 0: if ligands_csv_index == 0: f_out.write(line) continue if count < num_ligands: f_out.write(line) else: break count += 1 if count > num_ligands: break
[docs]class ActiveLearningNode:
[docs] def __init__(self, iter_num=1, job_name="active_learning", job_dir="."): """ Initialize node for active learning workflow. :param iter_num: current active learning iteration number. :type iter_num: int :param job_name: active learning job name. :type job_name: str :param job_dir: directory of where the jobs in the node will run. :type job_dir: str """ self.iter_num = iter_num self.job_dir = job_dir self.input_for_next_node = None self.node_name = self.getName(self.iter_num) self.job_name = job_name + "_" + self.node_name self.restart_files = [] self.optional_restart_files = [] self.log_file = None self.time_cost = (0, 0, 0) # (hours, minutes, seconds)
[docs] @classmethod def getName(cls, iter_num): return f"{cls.__name__}_iter_{iter_num}"
[docs] def addOptionalRestartFiles(self, active_learning_job): """ Add node's optional restart file(s) to driver's restart dict. Dump the restart dict to the restart .pkl file. :param active_learning_job: current AL driver :type active_learning_job: ActiveLearningJob instance """ al_utils.add_output_file(*self.optional_restart_files) with open(active_learning_job.restart_file, 'wb') as f: active_learning_job.restart_dict["optional_restart_files"] = \ self.optional_restart_files pickle.dump(active_learning_job.restart_dict, f, 0) al_utils.add_output_file(active_learning_job.restart_file)
[docs]class PrepareSmilesNode(ActiveLearningNode):
[docs] def __init__(self, args, iter_num, job_name, job_dir): """ Initialize node for selecting ligands (SMILES) to be scored by ScoreProviderNode. """ super().__init__(iter_num, job_name, job_dir) self.batch_size = args.train_size self.random_seed = args.random_seed self.csv_list = None if args.selection_rule == "random": self.select = self.randomSelect elif args.selection_rule == "diversity": self.select = self.diversitySelect elif args.selection_rule == "most_uncertain": # We do random selection for the first iteration since we do not # have the uncertain value yet. if iter_num == 1: self.select = self.randomSelect else: self.select = self.uncertaintySelect else: raise ValueError(f"Unknown selection rule" f" {args.selection_rule}")
[docs] @staticmethod def readScoredLigands(scored_csv_file_list): """ Read the ligands that were already scored by ScoreProviderNode. :param scored_csv_file_list: list of ligand_ml training .csv files. :type scored_csv_file_list: list(str) :return: set of titles of the scored ligands. :rtype: set(str) """ scored_ligands = set() for csv_file in scored_csv_file_list: reader = al_utils.my_csv_reader(csv_file) # In writeScoreCsv we put title in the third column. scored_ligands.update({x[2] for x in reader}) return scored_ligands
[docs] def checkOutcome(self, smi_file): """ Validate the generated SMILES file. :param smi_file: name of SMILES file to be validated. :type smi_file: str """ if not os.path.isfile(smi_file): logger.error(f"Error: Failed to generate SMILES file {smi_file} " f"for running iteration {self.iter_num} scoring job") sys.exit(1) num_ligands = fileutils.count_lines(smi_file) if num_ligands == 0: logger.error(f"Error: Failed to select any ligands " f"iteration {self.iter_num} training.") sys.exit(1) if self.iter_num == 1 and num_ligands < MINIMAL_LIGAND_ML_TRAIN: logger.error(f"Error: Only selected {num_ligands} ligands " f"for training. ligand_ml requires at least " f"{MINIMAL_LIGAND_ML_TRAIN} training ligands.") sys.exit(1)
[docs] @al_utils.node_run_timer def runNode(self, csv_list, active_learning_job, smi_file_name=None, **kwargs): """ Select ligands to be scored. :param csv_list: list of csv files that contain candidate ligands. :type csv_list: list(str) :param active_learning_job: current active learning job. :type active_learning_job: ActiveLearningJob instance. :param smi_file_name: SMILES file name that contains selected ligands. :type smi_file_name: str """ self.csv_list = csv_list scored_csv_file_list = active_learning_job.scored_csv_file_list if smi_file_name is None: smi_file_name = os.path.join(self.job_dir, self.job_name + ".smi") # Selecting from the whole library in first round if self.iter_num == 1: self.select(smi_file_name, scored_csv_file_list, sample_size=self.batch_size, sort=False) # Selecting from the top ligands predicted by the ML Model generated # In previous iteration. else: y_index = None if self.select == self.uncertaintySelect: with open(self.csv_list[0], "r", newline='') as f: reader = csv.reader(f) header = next(reader) y_index = header.index(UNCERTAINTY) self.select(smi_file_name, scored_csv_file_list, sample_size=self.batch_size, y_index=y_index, sort=True) self.checkOutcome(smi_file_name) al_utils.add_output_file(smi_file_name) self.restart_files.append(smi_file_name) self.input_for_next_node = {"smi_file_name": smi_file_name}
[docs] def uncertaintySelect(self, smi_file_name, scored_csv_file_list, sample_size, y_index=None, **kwargs): """ Select random ligands from initial input csv or ligands with largest uncertainty from sorted ligand_ml .csv output. :param smi_file_name: SMILES file name that contains selected ligands. :type smi_file_name: str :param scored_csv_file_list: list of ligand_ml training .csv file. :type scored_csv_file_list: list(str) :param sample_size: number of ligands to be sampled. :type sample_size: int :param y_index: column index of values to be sorted. :type y_index: int """ # In the random_split(), we always put SMILES and name as first column # and second column in the sub .csv files. smi_index = 0 name_index = 1 scored_ligands = self.readScoredLigands(scored_csv_file_list) reader_list = [ al_utils.my_csv_reader(filename) for filename in self.csv_list ] smi_writer = open(smi_file_name, "w") selected_smi = 0 # Select ligands based on y_index merged = heapq.merge(*reader_list, key=lambda x: float(x[y_index]), reverse=True) for result in merged: if result[name_index] not in scored_ligands: smi_writer.write("{} {}\n".format(result[smi_index], result[name_index])) selected_smi += 1 if selected_smi >= sample_size: break smi_writer.close()
[docs] def randomSelect(self, smi_file_name, scored_csv_file_list, sample_size, sort=True, **kwargs): """ Select sample_size random ligands from input csv file(s). :param smi_file_name: SMILES file name that contains selected ligands. :type smi_file_name: str :param scored_csv_file_list: list of ligand_ml training .csv file. :type scored_csv_file_list: list(str) :param sample_size: number of ligands to be sampled. :type sample_size: int :param sort: Whether the csv files were sorted or initial inputs. :type sort: bool """ smi_index = 0 name_index = 1 scored_ligands = self.readScoredLigands(scored_csv_file_list) reader_list = [ al_utils.my_csv_reader(filename) for filename in self.csv_list ] smi_writer = open(smi_file_name, "w") # Ligands were sorted, so we need to sample randomly. if sort: rand = random.Random(self.random_seed) selected_ligands = [] current_count = 0 # reservoir sampling for reader in reader_list: for result in reader: if result[name_index] in scored_ligands: continue current_count += 1 if current_count <= sample_size: selected_ligands.append( (result[smi_index], result[name_index])) else: x = rand.randint(1, current_count) if x < sample_size: selected_ligands[x] = (result[smi_index], result[name_index]) for smi, name in selected_ligands: smi_writer.write("{} {}\n".format(smi, name)) # We only need to use roundrobin to select random ligands since we # already randomized the ligands when creating the sub csv files. else: rr = more_itertools.roundrobin(*reader_list) for result in itertools.islice(rr, sample_size): smi_writer.write("{} {}\n".format(result[smi_index], result[name_index])) smi_writer.close()
[docs] def diversitySelect(self, smi_file_name, scored_csv_file_list, sample_size, sort=True, **kwargs): """ Use combinatorial_diversity to select diverse ligands from input csv or sorted ligand_ml .csv output. :param smi_file_name: SMILES file name that contains selected ligands. :type smi_file_name: str :param scored_csv_file_list: list of ligand_ml training .csv file. :type scored_csv_file_list: list(str) :param sample_size: number of ligands to be sampled. :type sample_size: int :param sort: Whether the csv files were sorted or initial inputs.. :type sort: bool """ tmp_smi_file_name = os.path.join( self.job_dir, self.job_name + "_random_for_diversity.smi") max_diversity_sample_size = 10_000_000 max_diversity_selection_cpu = 100 self.randomSelect(tmp_smi_file_name, scored_csv_file_list, max_diversity_sample_size, sort) if sample_size >= fileutils.count_lines(tmp_smi_file_name): fileutils.force_rename(tmp_smi_file_name, smi_file_name) return host, ncpu = al_utils.get_host_ncpu() ncpu = min(max_diversity_selection_cpu, ncpu) logger.info(f"Diversity selection will run with {ncpu} CPU(s)") cmd = [ "utilities/combinatorial_diversity", os.path.basename(tmp_smi_file_name), str(sample_size) ] cmd += ["-out", os.path.basename(smi_file_name), "-no3d", "-nosplit"] cmd += ["-JOBNAME", self.job_name, "-HOST", host + ":" + str(ncpu)] cmd += ["-DRIVERHOST", "localhost"] self.log_file = os.path.join(self.job_dir, self.job_name + ".log") diversity_job = jobcontrol.launch_job(cmd, launch_dir=self.job_dir) diversity_job.wait()
[docs]class ScoreProviderNode(ActiveLearningNode):
[docs] def __init__(self, iter_num, job_name, job_dir): """ Initialize node for obtaining the score of each ligand (SMILES). """ super().__init__(iter_num, job_name, job_dir) self.smi_file_name = None self.score_csv_file = None self.known_title_to_score = None
[docs] def checkOutcome(self, score_csv_file): """ Validate the .csv score file. :param score_csv_file: name of generated .csv score file. :type score_csv_file: str """ if not os.path.isfile(score_csv_file): logger.error(f"Error: Failed to generate the .csv score file for " f"iteration {self.iter_num} training ligands.") sys.exit(1) num_ligands = fileutils.count_lines(score_csv_file) - 1 if num_ligands <= 0: logger.error(f"Error: Failed to generate any score for " f"iteration {self.iter_num} training ligands.") sys.exit(1) if self.iter_num == 1 and num_ligands < MINIMAL_LIGAND_ML_TRAIN: logger.error(f"Error: Only {num_ligands} ligands produced " f"score. ligand_ml requires at least " f"{MINIMAL_LIGAND_ML_TRAIN} training ligands.") sys.exit(1)
[docs] def writeScoreCsv(self, title_to_score, output_csv): """ Write score to .csv file that ligand_ml needs for training :param title_to_score: dict that maps ligand title to score :type title_to_score: defaultdict(lambda : BAD_SCORE) :param output_csv: ligand_ml training .csv file. :param output_csv: str """ with open(self.smi_file_name, "r") as f: smi_lines = f.readlines() with open(output_csv, "w", newline='') as csv_f: csv_writer = csv.writer(csv_f) csv_writer.writerow(["SMILES", TRUTH_COL, "Title"]) for smi_line in smi_lines: smi_title = al_utils.split_smi_line(smi_line) if not smi_title: continue smi, title = smi_title csv_writer.writerow([smi, title_to_score[title], title])
[docs]class KnownScoreProviderNode(ScoreProviderNode): """ Class for obtaining the scores from external .csv file. This class is only used for the purpose of testing the performance active learning workflow. """
[docs] def __init__(self, args, iter_num, job_name, job_dir): super().__init__(iter_num, job_name, job_dir)
[docs] @al_utils.node_run_timer def runNode(self, smi_file_name, active_learning_job, score_csv_file=None): """ Read scores from active_learning_job.known_title_to_score. :param smi_file_name: SMILES file that contains the ligands to be scored. :type smi_file_name: str :param active_learning_job: current active learning job. :type active_learning_job: ActiveLearningJob instance. :param score_csv_file: ligand_ml training .csv file. :type score_csv_file: str """ self.smi_file_name = smi_file_name known_title_to_score = active_learning_job.known_title_to_score if score_csv_file is None: score_csv_file = os.path.join( self.job_dir, "score_iter_{}_result.csv".format(self.iter_num)) self.writeScoreCsv(known_title_to_score, score_csv_file) self.checkOutcome(score_csv_file) al_utils.add_output_file(score_csv_file) self.restart_files.append(score_csv_file) self.input_for_next_node = {} self.score_csv_file = score_csv_file
[docs]class LigandMLTrainNode(ActiveLearningNode): """ Class for ligand_ml model generation. """
[docs] def __init__(self, args, iter_num, job_name, job_dir): super().__init__(iter_num, job_name, job_dir) self.combined_train_csv_file = os.path.join( job_dir, "ligand_ml_combined_training.csv") self.train_csv_file_list = None self.train_host = args.train_host self.num_train_core = args.num_train_core self.train_time = args.train_time self.random_seed = args.random_seed self.chosen_models = args.chosen_models self.max_iter = args.num_iter self.mix_score_failed = args.mix_score_failed self.score_failed_ratio = args.score_failed_ratio
[docs] def checkOutcome(self, model_file): """ Check whether ligand_ml model exist. :param model_file: name of ligand_ml .qzip model file :type model_file: str """ if not os.path.isfile(model_file): logger.error(f"Error: Failed to generate machine learning model " f"for iteration {self.iter_num}") sys.exit(1)
[docs] def createTrainingCsvFile(self, discard_cutoff, ascending=True): """ Generate .csv file for ligand_ml training :param discard_cutoff: score cutoff for excluding the ligands in ML training set. :type discard_cutoff: float :param ascending: lower value means better ligand if ascending is True :type ascending: bool Generate training .csv file for ligand_ml model generation. """ scored_smi_title_to_score = {} failed_smi_title_to_score = {} for score_file in self.train_csv_file_list: with open(score_file, "r", newline='') as f_in: score_reader = csv.reader(f_in) header = next(score_reader) for smi, score, title in score_reader: try: score = float(score) except ValueError: continue score_decorated = score if ascending else -score discard_cutoff_decorated = discard_cutoff if ascending \ else -discard_cutoff if score_decorated < discard_cutoff_decorated: scored_smi_title_to_score[(smi, title)] = score else: failed_smi_title_to_score[(smi, title)] = score train_smi_title_to_score = scored_smi_title_to_score.copy() # Add failed scored ligands such as undockable ligands to training set. if self.mix_score_failed: if self.iter_num == self.max_iter or \ (not failed_smi_title_to_score): pass else: num_scored = len(scored_smi_title_to_score) num_selected_score_failed = min( int(num_scored * self.score_failed_ratio), len(failed_smi_title_to_score)) failed_selected = random.sample( failed_smi_title_to_score.keys(), num_selected_score_failed) failed_selected_dict = { x: discard_cutoff for x in failed_selected } train_smi_title_to_score.update(failed_selected_dict) if len(train_smi_title_to_score) < MINIMAL_LIGAND_ML_TRAIN: logger.error(f"Error: Only selected " f"{len(train_smi_title_to_score)} " f"ligands for training. ligand_ml requires at least " f"{MINIMAL_LIGAND_ML_TRAIN} training ligands.") sys.exit(1) with open(self.combined_train_csv_file, "w", newline='') as f_out: writer = csv.writer(f_out) writer.writerow(header) for (smi, title), score in train_smi_title_to_score.items(): writer.writerow([smi, score, title])
[docs] @al_utils.node_run_timer def runNode(self, active_learning_job): """ Perform ligand_ml training with all the scored ligands. :param active_learning_job: current active learning job. :type active_learning_job: ActiveLearningJob instance. """ from schrodinger.active_learning.al_report import make_train_report self.train_csv_file_list = active_learning_job.scored_csv_file_list self.createTrainingCsvFile(active_learning_job.discard_cutoff, active_learning_job.ascending) al_utils.add_output_file(self.combined_train_csv_file) csv_file_abspath = os.path.abspath(self.combined_train_csv_file) model_file = os.path.join(self.job_dir, self.job_name + ".qzip") sub_model_list = [] subjob_log_list = [] jobdj = get_jobdj() for model_index in range(0, self.num_train_core): sub_ligand_ml_model = f"{self.job_name}_sub_model_" \ f"{model_index}.tar.gz" input_basename = fileutils.get_jobname(csv_file_abspath) model_basename = fileutils.get_jobname(sub_ligand_ml_model) subjob_name = f"{model_basename}_{input_basename}_build" subjob_log = os.path.join(self.job_dir, subjob_name + ".log") subjob_log_list.append(subjob_log) cmd = ["run", "al_ligand_ml_worker.py", "build"] cmd += [ "-model", sub_ligand_ml_model, "-input_csv", csv_file_abspath, "-task_type", "regression", "-target_col", TRUTH_COL, "-train_time", str(self.train_time) ] if self.chosen_models is not None: cmd += ["-chosen_models", self.chosen_models] if self.random_seed is not None: cmd += ["-seed", str(self.random_seed)] cmd += ["-HOST", f"{self.train_host}:1"] sub_model_list.append(sub_ligand_ml_model) al_utils.add_output_file(subjob_log) jobdj.addJob(cmd, command_dir=self.job_dir) self.log_file = os.path.join(self.job_dir, self.job_name + "_build.log") jobdj.run() al_utils.merge_ligand_ml_models(sub_model_list, model_file, self.job_dir) self.checkOutcome(model_file) self.restart_files.append(model_file) fileutils.force_remove(*[ os.path.join(self.job_dir, sub_model) for sub_model in sub_model_list ]) try: report_pdf = os.path.join( self.job_dir, f"ML_model_iter_" f"{self.iter_num}_testset_metric.pdf") make_train_report(model_file, report_pdf, self.iter_num) al_utils.add_output_file(report_pdf) except RuntimeError: logger.warning(f"Failed to generate metric file for iteration " f"{self.iter_num} machine learning model.") al_utils.add_output_file(model_file) al_utils.add_output_file(*subjob_log_list) self.input_for_next_node = {"model_file": model_file}
[docs]class LigandMLEvalNode(ActiveLearningNode): """ Class for performing ligand_ml prediction with generated model. """
[docs] def __init__(self, args, iter_num, job_name, job_dir): super().__init__(iter_num, job_name, job_dir) self.model_file = None self.eval_csv_list = None self.uncertainty_sample_ratio = args.uncertainty_sample_ratio self.result_prefix = args.result_prefix self.keep = args.keep self.task = args.task self.sub_csv_header = None self.max_num_subjobs = args.max_ml_eval_cpu
[docs] def getBestResults(self, file_list, outfile, ascending=True): """ Get the best ligands (with lowest score) predicted by ligand_ml. :param file_list: list of ligand_ml .csv output files. Each file is sorted by ligand_ml prediction score. :type file_list: list(str) :param outfile: .csv file that contains best ligands. :type outfile: str :param ascending: lower value means better ligand if ascending is True :type ascending: bool """ with open(file_list[0], "r", newline='') as f: reader = csv.reader(f) header = next(reader) y_index = header.index(SCORE) # Keep the last two columns (score, uncertainty) of ligand_ml .csv output file. result_header = self.sub_csv_header + header[-2:] reader_list = [ al_utils.my_csv_reader(filename) for filename in file_list ] with open(outfile, "w", newline='') as result_f: result_writer = csv.writer(result_f) result_writer.writerow(result_header) merged = heapq.merge(*reader_list, key=lambda x: float(x[y_index]), reverse=not ascending) for result in itertools.islice(merged, self.keep): result_writer.writerow(result)
[docs] def checkOutcome(self, pred_csv_list, uncertain_csv_list): """ Check the existence of ligand_ml prediction files. :param pred_csv: list of ligand_ml prediction csv file(s) :type pred_csv: list(str) :param uncertain_csv: list of ligand_ml prediction with uncertainty csv file(s). :type uncertain_csv: list(str) """ if not (pred_csv_list and uncertain_csv_list): logger.error("Error: Failed to generate machine learning " "prediction for any ligand") sys.exit(1) for csv_file in pred_csv_list + uncertain_csv_list: if not os.path.isfile(csv_file): logger.error(f"Error: Prediction result file: {csv_file} " f"does not exist.")
[docs] @al_utils.node_run_timer def runNode(self, model_file, active_learning_job): """ Use the trained model to evaluate all the ligands. :param model_file: ligand_ml .qzip model file. :param model_file: str :param active_learning_job: current active learning job. :type active_learning_job: ActiveLearningJob instance. """ self.model_file = model_file self.eval_csv_list = active_learning_job.sub_infile_list if self.task == PILOT_TASK: self.eval_csv_list, _ = al_utils.random_split( [active_learning_job.args.pilot_ligands_csv], active_learning_job.args.pilot_size, prefix="pilot_eval_split", block_size=active_learning_job.args.block_size, name_index=1, smi_index=0) self.sub_csv_header = active_learning_job.sub_csv_header pred_csv_list = [] pred_csv_uncertain_sorted_list = [] output_csv = "{}_pred_iter_{}.csv".format(self.result_prefix, self.iter_num) if self.task == EVAL_TASK: output_csv = "{}_pred.csv".format(self.result_prefix) host, ncpu = al_utils.get_host_ncpu() max_subjobs = min(ncpu, self.max_num_subjobs) if \ self.max_num_subjobs else ncpu jobdj = get_jobdj(host_list=[(host, max_subjobs)]) logger.info(f"ML evaluation jobs will run with {max_subjobs} CPU(" f"s)") tar_model_file = \ al_utils.convert_ligand_ml_model_format(self.model_file) with open(self.eval_csv_list[0], 'r', newline='') as f_in: reader = csv.reader(f_in) header = next(reader) for eval_csv in self.eval_csv_list: eval_csv_basename = fileutils.get_jobname(eval_csv) pred_csv = "{}_iter_{}_pred.csv".format(eval_csv_basename, self.iter_num) sorted_by_uncertain = "{}_iter_{}_pred_uncertain_sorted.csv".format( eval_csv_basename, self.iter_num) cmd = [ "run", "al_ligand_ml_worker.py", "evaluate", "-model", os.path.basename(tar_model_file), "-smiles_col", header[0] ] cmd += [ "-input_csv", eval_csv, "-output_csv", pred_csv, "-sorted_by_uncertainy_csv", sorted_by_uncertain, "-uncertainty_sample_ratio", str(self.uncertainty_sample_ratio) ] if not active_learning_job.ascending: cmd += ["-descending"] jobdj.addJob(cmd, command_dir=self.job_dir) pred_csv_list.append(os.path.join(self.job_dir, pred_csv)) pred_csv_uncertain_sorted_list.append( os.path.join(self.job_dir, sorted_by_uncertain)) jobdj.run() pred_csv_finished_list = [] pred_csv_uncertain_sorted_finished_list = [] for pred_csv, pred_csv_uncertain_sorted in zip( pred_csv_list, pred_csv_uncertain_sorted_list): if os.path.isfile(pred_csv): pred_csv_finished_list.append(pred_csv) else: logger.warning(f"Cannot locate Ligand_ML prediction file " f"{pred_csv}") if os.path.isfile(pred_csv_uncertain_sorted): pred_csv_uncertain_sorted_finished_list.append( pred_csv_uncertain_sorted) else: logger.warning(f"Cannot locate Ligand_ML prediction file " f"{pred_csv_uncertain_sorted}") self.checkOutcome(pred_csv_finished_list, pred_csv_uncertain_sorted_finished_list) self.getBestResults(pred_csv_finished_list, output_csv, active_learning_job.ascending) if self.task == EVAL_TASK: al_utils.add_output_file(output_csv) self.restart_files.append(output_csv) self.input_for_next_node = {"ligands_csv": output_csv} else: al_utils.add_output_file(output_csv, *pred_csv_uncertain_sorted_finished_list) self.restart_files.append(output_csv) self.restart_files += pred_csv_uncertain_sorted_finished_list self.input_for_next_node = { "ligands_csv": output_csv, "csv_list": pred_csv_uncertain_sorted_finished_list } try: fileutils.force_remove(*pred_csv_finished_list) except OSError: pass
[docs]class ActiveLearningNodeSupplier:
[docs] def __init__(self, calculate_score_node, pilot_score_node, rescore_node, score_provider_node=ScoreProviderNode, prepare_smi_node=PrepareSmilesNode, known_score_provider_node=KnownScoreProviderNode, ligand_ml_train_node=LigandMLTrainNode, ligand_ml_eval_node=LigandMLEvalNode): self.calculate_score_node = calculate_score_node self.pilot_score_node = pilot_score_node self.rescore_node = rescore_node self.score_provider_node = score_provider_node self.prepare_smi_node = prepare_smi_node self.known_score_provider_node = known_score_provider_node self.ligand_ml_train_node = ligand_ml_train_node self.ligand_ml_eval_node = ligand_ml_eval_node