Source code for schrodinger.active_learning.al_driver

"""
Implementation of screening large library with active learning scheme.

Active learning scheme
1. Select N ligands from the library
2. Dock the selected portion of the library.
3. Train a ligand_ml model with the scores.
4. Evaluate the whole library with the generated ligand_ml model.
5. Pick N from the top M best ligands predicted by the ligand_ml model.
6. Dock the ligands picked in step 5 and repeat step 3 until it reaches
num_iter.

Copyright Schrodinger Inc, All Rights Reserved.
"""

import argparse
import csv
import itertools
import math
import os
import pickle
import sys
from collections import OrderedDict

from rdkit import Chem

from schrodinger.active_learning import al_utils
from schrodinger.active_learning.al_utils import CSV_FORMAT
from schrodinger.active_learning.al_utils import EVAL_TASK
from schrodinger.active_learning.al_utils import MINIMAL_LIGAND_ML_TRAIN
from schrodinger.active_learning.al_utils import PILOT_TASK
from schrodinger.active_learning.al_utils import SCREEN_TASK
from schrodinger.active_learning.al_utils import SMILES_FORMAT
from schrodinger.active_learning.al_utils import TRUTH_COL
from schrodinger.active_learning.al_utils import get_file_ext
from schrodinger.active_learning.al_utils import positive_int
from schrodinger.job import jobcontrol
from schrodinger.utils import fileutils
from schrodinger.utils import log

try:
    import resource
except ModuleNotFoundError:
    resource = None

logger = log.get_output_logger(__file__)

LOCALHOST = "localhost"
PRED_Y = "Pred"
JOBDIR_PREFIX = "iter"
MINIMAL_TOTAL_LIGAND = 100

TASK_LIST = [SCREEN_TASK, EVAL_TASK, PILOT_TASK]

# Map translating Python types into ConfigObj types.
TYPEMAP = {
    bool: 'boolean',
    int: 'integer',
    float: 'float',
    str: 'string',
    positive_int: 'integer'
}

FINISHALL = 'FinishAll'


[docs]class Option: """ A class to represent "options" which may be translated into argparse command-line arguments or an InputConfig spec for parsing input files. This is used to support the behavior of the legacy SiteMap driver, where every option could be specified in an input file or on the command line, with the latter taking precedence. """
[docs] def __init__(self, *names, dest=None, help=None, type=str, metavar=None, default=None, action=None, nargs=None, choices=None, required=False): """ The arguments all have the same meaning as for argparse.ArgumentParser.add_argument(), except `min` and `max` which are only used by ConfigObj and limit the range of allowed values for numeric types. """ self.names = names self.dest = dest self.help = help self.metavar = metavar self.type = type self.action = action self.nargs = nargs self.choices = choices self.default = default self.required = required
[docs] def toArgparse(self, parser): """ Add an option to an argument parser. :param parser: argument parser :type parser: arparse.ArgumentParser """ if self.action is None: parser.add_argument(*self.names, dest=self.dest, help=self.help, type=self.type, nargs=self.nargs, choices=self.choices, metavar=self.metavar, required=self.required, action=self.action, default=self.default) else: parser.add_argument(*self.names, dest=self.dest, help=self.help, required=self.required, action=self.action)
[docs] def toConfigObj(self): """ Return a ConfigObj validator spec for self. :return: validation spec :rtype: str """ typename = TYPEMAP[self.type] restrictions = '' default = self.default if self.type == str and default is not None: default = f"'{self.default}'" if self.type == positive_int: restrictions += 'min=1' if self.action == 'append': default = 'list()' typename += '_list' comment = "" if self.help: comment = f" # {self.help}" name = self.names[0].upper().replace('-', '') if self.action == "store_false": typename = "boolean" default = True name = self.dest.upper() if self.action == "store_true": typename = "boolean" default = False name = self.dest.upper() if self.choices and self.type == str: typename = 'option' restrictions += ', '.join(f'"{s}"' for s in self.choices) restrictions += ', ' # because default always follows line = "{} = {}({}default={}){}".format(name, typename, restrictions, default, comment) return line
[docs] def toSubparser(self, subparsers): """ Create a subparser for certain task. :return: argument parser :rtype: argparse.ArgumentParser """ subparse = subparsers.add_parser(*self.names, help=self.help) return subparse
TASK_OPTIONS = [ Option(SCREEN_TASK, help='Screen library with active learning.\ For help message type screen -h'), Option(EVAL_TASK, help='Evaluate library with generated ML model.\ For help message type evaluate -h'), Option(PILOT_TASK, help='Assess active learning workflow on the library with pilot \ mode. For help message type pilot -h') ] SHARED_OPTIONS = [ # Hidden option: Sample ligands with largest uncertainty in the defined ratio Option("-uncertainty_sample_ratio", type=float, default=0.1, help=argparse.SUPPRESS), Option("-random_seed", type=int, metavar="<random_seed_number>", help="Random seed number for shuffling all the ligands \ and seeding ligand_ml training."), Option("-overwrite_args", action="store_true", dest="overwrite_args", help="Overwrite previous arguments, Default is False."), Option("-force_restart", action="store_true", dest="force_restart", help="Force the workflow to restart when some restarting files " "are missing.") ] SHARED_FILE_OPTIONS = [ Option("-infile_list_file", metavar="<infile_path_list>", help="A file that contains a list of infile paths."), Option("-block_size", metavar="<num_lig_per_block>", type=int, default=100000, help="Number of ligands in each sub input ligands file. Default is " "100,000."), Option("-smi_index", metavar="<smiles_column_index>", type=int, default=1, help="1-based column index of ligand's SMILES. Default is 1."), Option("-name_index", metavar="<title_column_index>", type=int, default=2, help="1-based column index of ligand's title. Default is 2."), Option("-no_header", action="store_false", dest="with_header", help="Whether the input file(s) has header in the first line."), Option("-result_prefix", metavar="<output_file_prefix>", help="prefix of the .csv result files. Default is -jobname."), Option("-remote_input_ligands", action="store_true", dest="remote_input_ligands", help="Whether input ligand files are located at remote. Absolute " "paths of input ligand files are required if this flag is " "provided."), Option("-restart_file", type=str, metavar="<restart_pkl_file>", help=".pkl file for restarting or continuing the active learning " "workflow."), Option("-score_file", help=argparse.SUPPRESS), Option("-local_args_file", help=argparse.SUPPRESS) ] SHARED_JOB_OPTIONS = [ Option("-jobname", metavar="<jobname>", help="Job name of the active learning workflow run."), Option("-stop_after", metavar="<stop_workflow_after_stage>", type=str, help="Terminate the workflow after the specified stage finished. " f"Specify '{FINISHALL}' to run all the remaining stages."), Option("-max_ml_eval_cpu", metavar="<maximum_ml_evaluation_cpu>", type=positive_int, help="Allowed maximum number of CPU for machine learning evaluation " "subjobs.") ] SCREEN_OPTIONS = [ Option("-selection_rule", metavar="<training_ligands_selection_protocol>", type=str, default="diversity", choices=["random", "diversity", "most_uncertain"], help="Rule of selecting training set ligands. Supported selection " "rule: [random, diversity, most_uncertain], Default is diversity."), Option("-num_iter", metavar="<num_of_iteration>", type=int, default=3, help="Number of active learning iteration. Default is 3."), Option("-train_time", metavar="<hours>", type=float, default=4.0, help="Floating point time limit for training deep learning models.\ Default is 4 hours."), Option("-train_host", metavar="<ligand_ml_training_host>", help="ligand_ml training host name. Default is the same as -HOST."), Option("-num_train_core", type=int, default=1, metavar="<ligand_ml_training_core>", help="Number of cpu or gpu for ligand_ml training. Default is 1."), Option("-chosen_models", metavar="<ligand_ml_chosen_models>", type=str, help="Type of sub-models to consider in ligand_ml. chosen_models " "should be contained in a string and separated by space." "Default is all available models in ligand_ml."), Option("-pilot_score_file", type=str, metavar="<score_csv_file>", help="Pilot ligands score .csv file generated by pilot mode."), Option("-score_failed_ratio", type=float, default=1, help=argparse.SUPPRESS) ] EVAL_OPTIONS = [ Option("-model", metavar="<ml_model.qzip>", help="Generated ligand_ml .qzip model.") ] PILOT_OPTIONS = [ Option("-selection_rule", metavar="<training_ligands_selection_protocol>", type=str, default="diversity", choices=["random", "diversity", "most_uncertain"], help="Rule of selecting training set ligands. Supported selection " "rule: [random, diversity, most_uncertain] Default is diversity."), Option("-train_time", metavar="<hours>", type=float, default=4, help="Floating point time limit for training pilot deep learning " "models. Default is 4 hours."), Option("-chosen_models", metavar="<ligand_ml_chosen_models>", type=str, help="Type of sub-models to consider in ligand_ml. chosen_models " "should be listed in a string and separated by space." "Default is all available models in ligand_ml."), Option("-train_host", metavar="<ligand_ml_training_host>", help="ligand_ml training host name. Default is the same as -HOST."), Option("-num_train_core", type=int, default=1, metavar="<ligand_ml_training_core>", help="Number of cpu or gpu for ligand_ml training. Default is 1."), ] SHARED_MUTUALLY_EXCLUSIVE_OPTIONS = [ [ Option( "-keep", metavar="<num_returned_ligand>", type=int, default=10_000_000, help="Number of best ligands to be returned. Default is 10,000,000." ), Option("-keep_fraction", metavar="<fraction_of_returned_ligand>", type=float, help="Fraction of the ligands to be returned."), ], [ Option( "-num_rescore_ligand", metavar="<num_rescore_ligand>", type=int, default=1_000_000, help="Number of the best ligands to do rescore with Glide. Default " "is 1,000,000."), Option("-rescore_ligand_fraction", metavar="<fraction_of_rescore_ligand>", type=float, help="Fraction of the ligands to be rescored."), ] ]
[docs]def get_workflow_node_names(task, num_iter, use_known_score, run_rescore_ligand, al_node_supplier): """ Return a list of stages needed to complete the workflow based on the task type, number of iteration, whether score is known and whether to run rescore stage. :param task: workflow task type :type task: str in [SCREEN_TASK, PILOT_TASK or EVAL_TASK] :param num_iter: number of iterations :type num_iter: int :param use_known_score: Use known scores in score_file to obtain the score. :type use_known_score: bool :param run_rescore_ligand: run rescore stage for ligand. :type run_rescore_ligand: bool :param al_node_supplier: Supplier of active learning nodes :type al_node_supplier: ActiveLearningNodeSupplier :return: list of names of stages needed to complete the workflow :rtype: list(str) """ candidate_node_names = [] if task == PILOT_TASK: current_iter = 0 node_class = al_node_supplier.pilot_score_node candidate_node_names.append(node_class.getName(current_iter)) if task in [SCREEN_TASK, PILOT_TASK]: node_class_list = ActiveLearningJob.getNodeClasses( use_known_score, al_node_supplier) for current_iter in range(1, num_iter + 1): for node_class in node_class_list: candidate_node_names.append(node_class.getName(current_iter)) if task == EVAL_TASK: current_iter = 0 node_class = al_node_supplier.ligand_ml_eval_node candidate_node_names.append(node_class.getName(current_iter)) if run_rescore_ligand: node_class = al_node_supplier.rescore_node candidate_node_names.append(node_class.getName(current_iter)) return candidate_node_names
[docs]def validate_stop_after(stop_after_node, task, num_iter, use_known_score, run_rescore_ligand, restart_file, al_node_supplier): """ Check whether the node name user specified in -stop_after is valid. :param stop_after_node: name of the node where workflow will exit when it was finished. :type stop_after_node: str in [ActiveLearningNode name] or 'FinishAll' :param task: workflow task type :type task: str in [SCREEN_TASK, PILOT_TASK or EVAL_TASK] :param num_iter: number of iterations :type num_iter: int :param use_known_score: Use known scores in use_known_score to obtain the score. :type use_known_score: bool :param run_rescore_ligand: run rescore stage for ligands. :type run_rescore_ligand: bool :param al_node_supplier: Supplier of active learning nodes :type al_node_supplier: ActiveLearningNodeSupplier :return: error message if validation failed; None if it passed :rtype: str or None """ finished_node = ActiveLearningJob.LoadPreviousNodes(restart_file) \ if restart_file else [] candidate_node_names = [ node_name for node_name in get_workflow_node_names( task, num_iter, use_known_score, run_rescore_ligand, al_node_supplier) if node_name not in finished_node ] + [FINISHALL] if candidate_node_names == [FINISHALL]: error_msg = "Error: All stages have finished. No more stage to run." return error_msg if stop_after_node not in candidate_node_names: error_msg = f"Error: Stop after stage {stop_after_node} is not in " \ f"the stages to run. Please choose from" \ f" {candidate_node_names}." return error_msg stages_to_run = candidate_node_names[:candidate_node_names. index(stop_after_node) + 1] stages_to_run = [node_name for node_name in stages_to_run if node_name != \ FINISHALL] left_stages = [ node_name for node_name in candidate_node_names if node_name not in stages_to_run and node_name != FINISHALL ] logger.info(f"Stages in current job: {stages_to_run}.\n" f"Stages not in current job: {left_stages}.")
[docs]def validate_input_files(input_files, remote_input_ligands=False, allowed_format=None): """ Check the existence and format of input files. Return error message if validation failed, otherwise return None. :param input_files: paths of input files. :type input_files: list(str) :param remote_input_ligands: Whether input ligand files are located at remote. :type remote_input_ligands: bool :param allowed_format: allowed input file formats. :type allowed_format: list or None :return: error message if validation failed; None if it passed :rtype: str or None """ if allowed_format is None: allowed_format = [SMILES_FORMAT, CSV_FORMAT] for file_index, file in enumerate(input_files): if remote_input_ligands: if not os.path.isabs(file): error_msg = "Error: absolute path of input file: {} is " \ "required for remote mode".format(file) return error_msg else: if not os.path.isfile(file): error_msg = "Error: input file: {} does not exist".format(file) return error_msg input_type = get_file_ext(file) if file_index == 0: first_input_type = input_type if not (input_type in allowed_format): error_msg = f"Error: Unknown input file type: {file}" return error_msg if input_type != first_input_type: error_msg = "Error: input file {} does not have consistent " \ "format".format(file) return error_msg return
[docs]def validate_input_smiles(input_files, smi_index, name_index, with_header=True, max_check=10): """ Validate SMILES in input files. :param input_files: paths of input files. :type input_files: list(str) :param smi_index: column index of molecule SMILES :type smi_index: int :param name_index: column index of molecule name :type name_index: int :param with_header: Whether the file has header in its first line :type with_header: bool :param max_check: maximum number of SMILES to validate. :type max_check: int :return: error message if validation failed; None if it passed :rtype: str or None """ for file in input_files: input_type = get_file_ext(file) with fileutils.open_maybe_compressed(file, "rt") as f_in: if with_header: _ = next(f_in) if input_type == CSV_FORMAT: csv_reader = csv.reader(f_in) line_list = [x for x in itertools.islice(csv_reader, max_check)] elif input_type == SMILES_FORMAT: if smi_index != 1 or name_index != 2: error_msg = "Error: smi_index and name_index of SMILES " \ "file(s) must be 1 and 2 respectively." return error_msg line_list = [ al_utils.split_smi_line(x) for x in itertools.islice(f_in, max_check) ] line_list = [x for x in line_list if x] if not line_list: error_msg = f"Error: First {max_check} lines of input file: {file} are all empty." return error_msg try: for line in line_list: smi, _ = line[smi_index - 1], line[name_index - 1] if not Chem.MolFromSmiles(smi): error_msg = f"Error: Unable to convert {smi} to molecule." return error_msg except IndexError: error_msg = f"Error: Missing SMILES or title in input file" \ f" {file}" return error_msg return
[docs]def count_ligands(file_list, with_header=True): """ Count the number of ligands in all the files by counting the total number of lines. We assume each line contains a SMILES string. :param file_list: list of input file paths. :type file_list: list(str) :param with_header: Whether the input files have header. :type with_header: bool :return: Number of ligands in all the input files. :rtype: int """ total_line = 0 for file in file_list: total_line += fileutils.count_lines(file) if with_header: total_line -= 1 return total_line
[docs]class ActiveLearningJob:
[docs] def __init__(self, args, al_node_supplier): """ Initialize the ActiveLearningJob from the cmd argumenets. :param args: argument namespace with command line options :type args: argparse.Namespace """ self.args = args self.al_node_supplier = al_node_supplier self.jobname = self.args.jobname self.subjob_prefix = self.jobname + "_sub" self.sub_infile_list = [] self.total_ligand_num = None self.sub_csv_header = None self.backend = None self.known_title_to_score = None self.nodes_to_run = None self.finished_nodes = None self.pilot_score_file = None self.optional_restart_files = [] self.restart_dict = {} self.restart_file = "{}_restart.pkl".format(self.jobname) self.discard_cutoff = 10 self.ascending = True
[docs] @staticmethod def LoadPreviousNodes(restart_file): """ Load nodes that were finished in previous job. :param restart_file: filename of the AL .pkl restart file :type restart_file: str :return: Nodes that were finished in previous job. :rtype: OrderedDict that maps node name to node instance. """ finished_nodes = OrderedDict() if restart_file: with open(restart_file, 'rb') as f: finished_nodes = pickle.load(f)["finished_nodes"] return finished_nodes
[docs] @staticmethod def getNodeClasses(use_known_score, al_node_supplier): """ Return a list of node classes to run based on the job type. :param use_known_score: Use known scores in score_file to obtain the score. :type use_known_score: bool :param al_node_supplier: Supplier of active learning nodes :type al_node_supplier: ActiveLearningNodeSupplier :return: a list of ActiveLearningNode subclass :rtype: list """ if use_known_score: score_node = al_node_supplier.known_score_provider_node else: score_node = al_node_supplier.calculate_score_node node_class_list = [ al_node_supplier.prepare_smi_node, score_node, al_node_supplier.ligand_ml_train_node, al_node_supplier.ligand_ml_eval_node ] return node_class_list
[docs] def LoadOptionalRestartFiles(self): """ Load the restart files for the possible restarting of the running node. :return: list of filenames :type: list(str) or None """ optional_restart_files = [] if self.args.restart_file: with open(self.args.restart_file, 'rb') as f: optional_restart_files = pickle.load(f).get( "optional_restart_files") return optional_restart_files
[docs] def configure(self): """ Prepare the active learning job. """ self.total_ligand_num = count_ligands(self.args.infile_list, self.args.with_header) if self.args.keep is None: self.args.keep = math.ceil(self.args.keep_fraction * self.total_ligand_num) if self.args.num_rescore_ligand is None: self.args.num_rescore_ligand = math.ceil( self.args.rescore_ligand_fraction * self.total_ligand_num) self.args.num_rescore_ligand = min(self.args.num_rescore_ligand, self.args.keep) if self.args.task == PILOT_TASK: self.args.pilot_size = min(self.total_ligand_num, self.args.pilot_size) if self.args.task == SCREEN_TASK: if self.total_ligand_num < MINIMAL_LIGAND_ML_TRAIN: logger.error( f"Number of total ligands is smaller than the " f"minimal requirement ({MINIMAL_LIGAND_ML_TRAIN}).") sys.exit(1) if self.total_ligand_num < MINIMAL_TOTAL_LIGAND: logger.warning( f"Warning: Number of total ligands is smaller than the " f"suggested minimal number ({MINIMAL_TOTAL_LIGAND}).") self.checkOSFileLimit() if jobcontrol.get_backend(): al_utils.check_driver_disk_space(self) self.backend = jobcontrol.get_backend() self.known_title_to_score = al_utils.read_score(self.args.score_file) self.pilot_score_file = self.getPilotScoreFile() self.splitInputfiles() self.optional_restart_files = self.LoadOptionalRestartFiles() self.nodes_to_run, self.finished_nodes = self.getNodesToRun()
@property def scored_csv_file_list(self): """ Get all the .csv files that contain scored ligands from ScoreProviderNode. :return: list of .csv files contain score ligands. :rtype: list(str) """ csv_list = [] if self.args.task == SCREEN_TASK and self.args.pilot_score_file: csv_list.append(self.pilot_score_file) for node_name, node in self.finished_nodes.items(): if isinstance(node, self.al_node_supplier.score_provider_node): if not os.path.isfile(node.score_csv_file): logger.warning(f"Warning: Score file for ML training " f"{node.score_csv_file} does not exist.") continue csv_list.append(node.score_csv_file) return csv_list @property def restart_files(self): """ Get all the necessary files for restarting the workflow from finished nodes. :return: a set of files for restarting. :rtype: set(str) """ restart_files = set() for node in self.finished_nodes.values(): for node_restart in node.restart_files: restart_files.add(node_restart) return restart_files
[docs] def getPilotScoreFile(self): """ Reorder the columns in the pilot ligand score file for the use of machine learning model training input. :return: name of reorder .csv file. :rtype: str """ import pandas as pd if self.args.task != SCREEN_TASK or self.args.pilot_score_file is None: return None pilot_score_file_reordered = os.path.splitext( self.args.pilot_score_file)[0] + "_reordered.csv" score_df = pd.read_csv(self.args.pilot_score_file) # In PilotScoreNode the pilot score file follow order: title, score, # SMILES score_df.columns = ["Title", TRUTH_COL, "SMILES"] score_df.to_csv(pilot_score_file_reordered, columns=["SMILES", TRUTH_COL, "Title"], index=False) return pilot_score_file_reordered
[docs] def checkOSFileLimit(self): """ Check the system file descriptors limit. """ if resource is None: return padding = 100 soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE) target_limit = math.ceil(self.total_ligand_num / self.args.block_size) if target_limit + padding > soft_limit: logger.error( f"Error: Current job settings will result in {target_limit} " f"open files. We estimate that more than {soft_limit-padding} " f"open files will likely lead to job failure. Please increase " f"-block_size so that fewer files are opened by the job or " f"increase the OS file descriptor limit") sys.exit(1)
[docs] def getNodesToRun(self): """ Return nodes to run and finished nodes for current active learning job. """ finished_nodes = ActiveLearningJob.LoadPreviousNodes( self.args.restart_file) nodes_to_run = OrderedDict() if self.args.task == PILOT_TASK: iter_num = 0 job_dir = "{}_{}_pilot".format(self.jobname, JOBDIR_PREFIX) fileutils.mkdir_p(job_dir) pilot_score_node = self.al_node_supplier.pilot_score_node( self.args, iter_num, self.jobname, job_dir) if pilot_score_node.node_name not in finished_nodes: nodes_to_run[pilot_score_node.node_name] = pilot_score_node if self.args.task in [SCREEN_TASK, PILOT_TASK]: node_class_list = ActiveLearningJob.getNodeClasses( bool(self.args.score_file), self.al_node_supplier) for iter_num in range(1, self.args.num_iter + 1): current_iter = iter_num job_dir = "{}_{}_{}".format(self.jobname, JOBDIR_PREFIX, iter_num) fileutils.mkdir_p(job_dir) for node_class in node_class_list: node = node_class(self.args, iter_num, self.jobname, job_dir) if node.node_name not in finished_nodes: nodes_to_run[node.node_name] = node if self.args.task == EVAL_TASK: current_iter = 0 node = self.al_node_supplier.ligand_ml_eval_node( self.args, current_iter, self.jobname, ".") if node.node_name not in finished_nodes: nodes_to_run[node.node_name] = node if self.args.num_rescore_ligand: node = self.al_node_supplier.rescore_node(self.args, current_iter, self.jobname, ".") if node.node_name not in finished_nodes: nodes_to_run[node.node_name] = node return nodes_to_run, finished_nodes
[docs] def splitInputfiles(self): """ Separate the input files into small blocks randomly. """ self.sub_infile_list, self.sub_csv_header = al_utils.random_split( self.args.infile_list, self.total_ligand_num, prefix=self.subjob_prefix, block_size=self.args.block_size, name_index=self.args.name_index - 1, smi_index=self.args.smi_index - 1, random_seed=self.args.random_seed, with_header=self.args.with_header)
[docs] def getRestartNode(self): """ Get the node for restarting the workflow. :return: last finished node :rtype: ActiveLearningNode """ for node in list(self.finished_nodes.values())[::-1]: if not isinstance(node, self.al_node_supplier.rescore_node): return node
[docs] def getInitialInputs(self): """ Get the inputs for the runNode() method of the first node in the workflow. :return: dict that contains the keyword arguments and values for the runNode() of the first node. :rtype: dict{keyword argument: value} """ if self.finished_nodes: last_restart_node = self.getRestartNode() return last_restart_node.input_for_next_node if self.args.task in [SCREEN_TASK, PILOT_TASK]: return {"csv_list": self.sub_infile_list} if self.args.task == EVAL_TASK: return {"model_file": self.args.model}
[docs] def getLocalArgs(self): """ :return: arguments on local machine. :rtype: argparse.Namespace """ if jobcontrol.get_backend(): with open(self.args.local_args_file, "rb") as f: local_args = pickle.load(f) else: local_args = self.args return local_args
[docs] def runNodes(self): """ Run all the ActiveLearningNode instances in self.nodes_to_run. """ local_args = self.getLocalArgs() self.restart_dict = { "remote_args": self.args, "local_args": local_args, "finished_nodes": self.finished_nodes, "restart_files": self.restart_files, "optional_restart_files": self.optional_restart_files } with open(self.restart_file, 'wb') as f: pickle.dump(self.restart_dict, f, 0) if self.backend: self.backend.addOutputFile(self.restart_file) inputs = dict(self.getInitialInputs()) try: for node_name, node in self.nodes_to_run.items(): logger.info("Running node {}".format(node_name)) inputs.update({"active_learning_job": self}) node.runNode(**inputs) inputs = dict(node.input_for_next_node) self.finished_nodes[node.node_name] = node self.optional_restart_files = node.optional_restart_files with open(self.restart_file, 'wb') as f: self.restart_dict["restart_files"] = self.restart_files self.restart_dict["optional_restart_files"] = \ self.optional_restart_files pickle.dump(self.restart_dict, f, 0) al_utils.add_output_file(self.restart_file) if self.args.stop_after is not None and node_name \ == self.args.stop_after: logger.info(f"Last requested stage" f" {self.args.stop_after} finished. ") break finally: combined_logfile = self.jobname + "_subjobs.log" subjob_logfile_list = [ node.log_file for node in self.nodes_to_run.values() if node.log_file and os.path.isfile(node.log_file) ] al_utils.concatenate_logs(combined_logfile, subjob_logfile_list, logger) al_utils.add_output_file(combined_logfile)
[docs]def read_paths_listed_in_file(old_paths, paths_list_file): """ Add the paths specified in the paths_list_file to old_paths. :param old_paths: None or list of original paths :type old_paths: list :param paths_list_file: path of the file that contains paths to be added :type paths_list_file: string :return: list of paths :rtype: list(str) """ screen = old_paths.copy() if old_paths else [] if paths_list_file: with open(paths_list_file, 'r') as fh: screen += [ x.strip() for x in fh if not x.startswith("#") and x.strip() ] return list(set(screen))
[docs]def restart_args_handler(args): """ Load the previous arguments stored in args.restart_file. :param args: argument namespace with command line options :type args: argparse.Namespace :return: updated argument namespace, argument namespace of previous job or None :rtype: argument namespace, argument namespace or None """ if args.restart: restart_file = args.jobname + "_restart.pkl" if not args.restart_file: if not os.path.isfile(restart_file): logger.error(f"Error: Can not find restart .pkl file" f" {restart_file}.") sys.exit(1) args.restart_file = restart_file if args.restart_file: with open(args.restart_file, 'rb') as f: restart_dict = pickle.load(f) restart_input_files = restart_dict["restart_files"] optional_restart_files = restart_dict.get("optional_restart_files", []) args.restart_input_files = restart_input_files args.optional_restart_files = optional_restart_files if not args.overwrite_args: if jobcontrol.get_backend(): prev_args = restart_dict["remote_args"] else: prev_args = restart_dict["local_args"] prev_args.restart_file = args.restart_file prev_args.restart_input_files = restart_input_files prev_args.optional_restart_files = optional_restart_files # Always overwrite args.stop_after and args.train_host if they # were specified in the restart run. if args.stop_after is not None: prev_args.stop_after = args.stop_after if args.task in [SCREEN_TASK, PILOT_TASK] and args.train_host\ is not None: prev_args.train_host = args.train_host for key, value in args.__dict__.items(): if not hasattr(prev_args, key): setattr(prev_args, key, value) logger.info("Restoring previous workflow...") logger.info("Previous arguments:") for key, value in prev_args.__dict__.items(): logger.info("argument: {:<30} value: {}".format( key, value)) return args, prev_args return args, None
[docs]def common_parse_args(args): """ Parses command-line arguments. :param args: argument namespace with command line options :type args: argparse.Namespace :return: argument namespace with command line options :rtype: argparse.Namespace """ # Append files listed in infile_list_file to infile_list if args.infile_list_file: args.infile_list = read_paths_listed_in_file(args.infile, args.infile_list_file) else: args.infile_list = args.infile # Set default result file prefix if not args.result_prefix: args.result_prefix = args.jobname if args.keep_fraction is not None: args.keep = None if args.rescore_ligand_fraction is not None: args.num_rescore_ligand = None host_list = jobcontrol.get_backend_host_list() if not host_list: host_list = [(LOCALHOST, 1)] if args.task in [SCREEN_TASK, PILOT_TASK]: if not args.train_host: args.train_host = host_list[0][0] if args.task == EVAL_TASK: args.outfile = "{}_pred.csv".format(args.result_prefix) if args.task == PILOT_TASK: args.num_iter = 1 args.keep = args.pilot_size args.num_rescore_ligand = 0 args.score_file = f"{args.jobname}_pilot_score.csv" args.pilot_ligands_csv = f"{args.jobname}_pilot_ligands.csv" args.mix_score_failed = False # To be changed in specific application args.score_failed_ratio = 1 # To be changed in specific application return args
[docs]def common_validate_args(args): """ Validate command-line arguments :param args: argument namespace with command line options :type args: argparse.Namespace :return: (validation outcome, error message) :rtype: (bool, str) """ if not args.infile_list: msg = "Error: input ligands file is not specified" return False, msg msg = validate_input_files(args.infile_list, args.remote_input_ligands) if msg: return False, msg if not args.remote_input_ligands: msg = validate_input_smiles(args.infile_list, args.smi_index, args.name_index, args.with_header) if msg: return False, msg if args.keep is not None and args.keep < 1: msg = "Error: keep: {} is smaller than 1".format(args.keep) return False, msg if args.keep_fraction is not None: if not (0.0 < args.keep_fraction <= 1.0): msg = "Error: keep_fraction: {} is <= 0 or > 1".format( args.keep_fraction) return False, msg if args.num_rescore_ligand is not None and args.num_rescore_ligand < 0: msg = "Error: number of rescore ligands: {} is smaller than " \ "0".format(args.num_rescore_ligand) return False, msg if args.rescore_ligand_fraction is not None: if args.rescore_ligand_fraction > 1 or args.rescore_ligand_fraction < 0: msg = "Error: rescore_ligand_fraction: {} is < 0 or > 1".format( args.rescore_ligand_fraction) return False, msg if args.task in [SCREEN_TASK, PILOT_TASK]: if args.train_time <= 0: msg = "Error: train time: {} is smaller than 0".format( args.train_time) return False, msg if args.train_size < MINIMAL_LIGAND_ML_TRAIN: msg = "Error: train size ({}) is smaller than the minimal " \ "training size requirement of ligand_ml ({}) ".format( args.train_size, MINIMAL_LIGAND_ML_TRAIN) return False, msg if args.num_iter < 1: msg = "Error: iteration: {} is smaller than 1".format(args.num_iter) return False, msg if args.task == EVAL_TASK: if args.model is None: msg = "ML model file is not specified." return False, msg if not os.path.isfile(args.model): msg = f"ML model file {args.model} does not exist." return False, msg return True, None
[docs]def common_get_job_spec_from_args(args, jsb): if args.input_file: al_utils.add_input_file(jsb, args.input_file) if args.restart_file: al_utils.add_input_file(jsb, args.restart_file) for file in args.restart_input_files: if os.path.isfile(file): al_utils.add_input_file(jsb, file) else: if args.force_restart: logger.warning(f"Warning: restarting file: {file} does " f"not exist.") else: logger.error(f"Error: restarting file: {file} does not " f"exist.") sys.exit(1) for file in args.optional_restart_files: if os.path.isfile(file): al_utils.add_input_file(jsb, file) if args.infile_list_file: al_utils.add_input_file(jsb, args.infile_list_file) if not args.remote_input_ligands: al_utils.add_input_file(jsb, *args.infile_list) if args.task == SCREEN_TASK and args.score_file: al_utils.add_input_file(jsb, args.score_file) if args.task == SCREEN_TASK and args.pilot_score_file: al_utils.add_input_file(jsb, args.pilot_score_file) # This is only used for restarting the pilot job. if args.task == PILOT_TASK and os.path.isfile(args.score_file): al_utils.add_input_file(jsb, args.score_file) if args.task == EVAL_TASK: al_utils.add_input_file(jsb, args.model) jsb.setOutputFile(args.outfile)