import csv
import functools
import math
import os
import random
import shutil
import sys
import tarfile
import time
import zipfile
from collections import defaultdict
from schrodinger.job import jobcontrol
from schrodinger.utils import fileutils
from schrodinger.utils import log
logger = log.get_output_logger(__file__)
MINIMAL_LIGAND_ML_TRAIN = 40
BAD_SCORE = 1e9
EVAL_TASK = "evaluate"
SCREEN_TASK = "screen"
PILOT_TASK = "pilot"
SMILES_FORMAT = ".smi"
CSV_FORMAT = ".csv"
MAE_FORMAT = ".mae"
TRUTH_COL = "Value"
[docs]def positive_int(s):
"""
ArgumentParser function to check whether input can be converted to
positive integer.
:param s: input string
:type s: str
:return: integer value of input string
:rtype: int
"""
i = int(s)
if i <= 0:
raise ValueError()
return i
[docs]def split_smi_line(line):
"""
Split a line from .smi file to SMILES pattern and title.
Return empty list if line is empty.
:param line: line from .smi file
:type line: str
:return: SMILES pattern, title
:rtype: [str, str] or []
"""
cols = line.rstrip("\n").split(None, 1)
if not cols:
return []
pattern = cols[0]
if len(cols) == 1:
title = pattern
else:
title = cols[1]
return pattern, title
[docs]def my_csv_reader(filename):
"""
Yield a csv reader that skips the first line.
:param filename: .csv file name
:type filename: str
:return: csv.reader that skips first line of the file.
:rtype: iterator
"""
with open(filename, 'r', newline='') as f:
reader = csv.reader(f)
next(reader)
yield from reader
[docs]def read_score(score_file):
"""
Read known scores of ligands from args.score_file.
:return: a dictionary that maps ligand title to ligand score.
:rtype: dict
"""
if score_file is None or not os.path.isfile(score_file):
return None
known_title_to_score = defaultdict(lambda: BAD_SCORE)
csv_reader = my_csv_reader(score_file)
for value in csv_reader:
try:
title, score = value[0], float(value[1])
known_title_to_score[title] = min(known_title_to_score[title],
score)
except ValueError:
logger.warning("Fail to read score of ligand {}".format(value[0]))
return known_title_to_score
[docs]def random_split(file_list,
num_ligands,
prefix="splited",
block_size=100000,
name_index=0,
smi_index=1,
random_seed=None,
delimiter=",",
with_header=True):
"""
Combine input files, shuffle lines, split into files with
block_size line per file. Reorder the columns such that SMILES and name
is in the first and second column respectively.
:param file_list: paths of input files.
:type file_list: list
:param num_ligands: total number of ligands in all the input files.
:type num_ligands: int
:param prefix: prefix of split files
:type prefix: str
:param block_size: number of ligands in each sub .csv file.
:type block_size: int
:param name_index: column index of molecule name
:type name_index: int
:param smi_index: column index of molecule SMILES
:type smi_index: int
:param random_seed: random seed number for shuffling the ligands
:type random_seed: int or None
:param delimiter: delimiter of input csv files
:type delimiter: str
:param with_header: Whether input file(s) has header in its first line.
:type with_header: bool
:return: list of split files, reordered csv header
:rtype: list, list
"""
rand = random.Random(random_seed)
input_type = get_file_ext(file_list[0])
if input_type == SMILES_FORMAT:
header_list, reorder_index = get_smi_header()
elif input_type == CSV_FORMAT:
header_list, reorder_index = get_csv_header(file_list[0], smi_index,
name_index, delimiter,
with_header)
else:
logger.error(f"Unknown input file type: {file_list[0]}")
sys.exit(1)
num_subfile = math.ceil(num_ligands / block_size)
# Create writers for all the temporary files.
tmp_out_fname_list = []
tmp_out_writer_list = []
for current_out_findex in range(0, num_subfile):
current_out_fname = os.path.join(
os.getcwd(), 'tmp_{}_{}.txt'.format(prefix, current_out_findex))
tmp_out_fname_list.append(current_out_fname)
outf = open(current_out_fname, 'wb')
tmp_out_writer_list.append(outf)
# Write each line to temporary files randomly.
for file in file_list:
with fileutils.open_maybe_compressed(file, "rb") as f_in:
if with_header:
_ = f_in.readline()
for line in f_in:
writer = rand.choice(tmp_out_writer_list)
writer.write(line)
for file in tmp_out_writer_list:
file.close()
out_fname_list = []
# Shuffle lines of all the temporary files and write them as final file.
for file_index, file in enumerate(tmp_out_fname_list):
if os.path.isfile(file) and os.path.getsize(file) > 0:
with open(file, 'r', newline='') as f_in:
if input_type == CSV_FORMAT:
csv_reader = csv.reader(f_in)
line_list = [x for x in csv_reader]
elif input_type == SMILES_FORMAT:
line_list = [split_smi_line(x) for x in f_in]
else:
logger.error(f'Unknown input file type: {file}')
sys.exit(1)
rand.shuffle(line_list)
shuffled_file = os.path.join(os.getcwd(),
'{}_{}.csv'.format(prefix, file_index))
with open(shuffled_file, 'w', newline='') as f_out:
writer = csv.writer(f_out)
writer.writerow(header_list)
for line in line_list:
if not line:
continue
line_reorder = [line[x] for x in reorder_index]
writer.writerow(line_reorder)
out_fname_list.append(shuffled_file)
fileutils.force_remove(*tmp_out_fname_list)
return out_fname_list, header_list
[docs]def merge_ligand_ml_models(sub_model_name_list, final_model, job_directory):
"""
Merge multiple .tar.gz ligand_ml models to single zipped deepautoqsar
model.
:param sub_model_name_list: list of .tar.gz ligand_ml model name.
:type sub_model_name_list: [str]
:param final_model: full path of the final zipped deepautoqsar model.
:type final_model: str
:param job_directory: directory of the .tar.gz ligand_ml models.
:type job_directory: str
"""
import ligand_ml
run_dir_list = []
for sub_model_name in sub_model_name_list:
sub_model_path = os.path.join(job_directory, sub_model_name)
if not os.path.isfile(sub_model_path):
logger.warning(f"ligand_ml model {sub_model_path} does not "
f"exist.")
continue
with tarfile.open(sub_model_path) as tf:
tf.extractall(path=job_directory)
run_dir_list.append(fileutils.strip_extension(sub_model_path))
smasher_list = [
ligand_ml.Smasher.load_from_dir(run_dir) for run_dir in run_dir_list
]
pre_package_path = os.path.join(job_directory, 'smasher_pre_package')
fileutils.force_rmtree(pre_package_path)
final_smasher = ligand_ml.Smasher.merge(smasher_list, pre_package_path)
final_model_basename = fileutils.get_jobname(final_model)
fileutils.force_rmtree(final_model_basename)
os.makedirs(final_model_basename)
package_path = os.path.join(final_model_basename, 'smasher')
final_smasher.package(package_path, make_tarball=False)
with zipfile.ZipFile(final_model, 'w', zipfile.ZIP_DEFLATED) as zfile:
for root, dirs, files in os.walk(package_path):
zfile.write(root)
for file in files:
zfile.write(os.path.join(root, file))
for run_dir in run_dir_list:
fileutils.force_rmtree(run_dir)
fileutils.force_rmtree(final_model_basename)
fileutils.force_rmtree(pre_package_path)
[docs]def get_file_ext(filename):
"""
Get the extension of the file name. Skip 'gz' if it is a gz compressed
file.
:param filename: name of the file.
:type filename: str
:return: 'gz' excluded extension of the file.
:rtype: str
"""
root, ext = os.path.splitext(filename)
if ext.lower() == '.gz':
_, sub_ext = os.path.splitext(root)
return sub_ext
if ext[-2:].lower() == 'gz':
return ext[:-2]
return ext
[docs]def check_driver_disk_space(active_learning_job):
"""
Estimate the driver disk usage of an active learning job with some assumed
parameters. Print a warning is the available driver disk space is
smaller than the estimate space.
:param active_learning_job: current AL driver.
:type active_learning_job: ActiveLearningJob instance.
"""
backend = jobcontrol.get_backend()
free_disk = shutil.disk_usage(backend.job_dir).free
bytes_per_ml_model = 1024**3 # Assume each ML model is ~ 1GB.
extra_bytes_per_ml_score = 50 # Assume each ligands add 50B for ML score.
bytes_per_scored_ligand = 5 * 1024 # Assume each ligand adds 5KB in
# scoring related output.
redundancy = 1.2 # Assume 20% redundancy.
input_ligand_file_size = sum(
[os.path.getsize(x) for x in active_learning_job.args.infile_list])
num_ligands = active_learning_job.total_ligand_num
keep_size = min(active_learning_job.args.keep, num_ligands)
rescore_size = min(active_learning_job.args.num_rescore_ligand, keep_size)
if active_learning_job.args.task == SCREEN_TASK:
train_size = active_learning_job.args.train_size
num_iter = active_learning_job.args.num_iter
ml_model_size = bytes_per_ml_model * num_iter
ml_top_ligand_size = (input_ligand_file_size + num_ligands *
extra_bytes_per_ml_score) * \
active_learning_job.args.uncertainty_sample_ratio * num_iter
ligand_file_size = 2*input_ligand_file_size + num_ligands * \
extra_bytes_per_ml_score
scoring_result_size = (rescore_size + train_size*num_iter) * \
bytes_per_scored_ligand
pred_csv_size = (keep_size*extra_bytes_per_ml_score +
keep_size/num_ligands*input_ligand_file_size) * \
num_iter
total_size = ml_model_size + ml_top_ligand_size + ligand_file_size +\
scoring_result_size + pred_csv_size
elif active_learning_job.args.task == EVAL_TASK:
ml_model_size = os.path.getsize(active_learning_job.args.model)
pred_csv_size = keep_size*extra_bytes_per_ml_score + \
keep_size/num_ligands*input_ligand_file_size
ligand_file_size = 2*input_ligand_file_size + num_ligands * \
extra_bytes_per_ml_score
scoring_result_size = rescore_size * bytes_per_scored_ligand
total_size = ml_model_size + pred_csv_size + ligand_file_size + \
scoring_result_size
elif active_learning_job.args.task == PILOT_TASK:
pilot_size = min(active_learning_job.args.pilot_size, num_ligands)
ligand_file_size = 2 * input_ligand_file_size + pilot_size * extra_bytes_per_ml_score
scoring_result_size = pilot_size * bytes_per_scored_ligand
pred_csv_size = pilot_size*extra_bytes_per_ml_score + \
pilot_size/num_ligands*input_ligand_file_size
total_size = ligand_file_size + scoring_result_size + bytes_per_ml_model + pred_csv_size
else:
logger.error(
f"Error:Unknown task type: {active_learning_job.args.task}")
sys.exit(1)
logger.info(f"Estimate driver disk usage : "
f"{total_size * redundancy / 1024 ** 3:.2f} GB.")
logger.info(f"Current free space on driver disk: "
f"{free_disk / 1024 ** 3:.2f} GB.")
if free_disk < total_size * redundancy:
logger.error(
"Error: Current free space on driver disk is lower than the "
"estimate disk usage. Please increase the driver disk space or "
"specify another driver node.")
sys.exit(1)
[docs]def node_run_timer(func):
"""
Decorator for timing the running time of runNode method in
ActiveLearningNode
"""
@functools.wraps(func)
def wrap(node, *args, **kwargs):
t0 = time.time()
out = func(node, *args, **kwargs)
time_total = time.time() - t0
m, s = divmod(time_total, 60)
h, m = divmod(m, 60)
logger.info(f"Running time of node {node.node_name}: {int(h)} hours, "
f"{int(m)} minutes, {round(s)} seconds")
node.time_cost = (int(h), int(m), round(s))
return out
return wrap
[docs]def add_output_file(*output_files, incorporate=False):
"""
Add files to jobcontrol output files.
:param output_files: files to be transferred.
:type output_files: str
:param incorporate: marked files for incorporation by maestro.
:type incorporate: bool
"""
backend = jobcontrol.get_backend()
if backend:
for output_file in output_files:
job_dir = os.path.realpath(backend.job_dir)
output_file_rel_path = os.path.relpath(output_file, job_dir)
backend.addOutputFile(output_file_rel_path)
if incorporate:
backend.setStructureOutputFile(output_file_rel_path)
[docs]def concatenate_logs(combined_logfile, subjob_logfile_list, logger=None):
"""
Combine subjob logfiles into single combined logfile.
:param combined_logfile: combined log file name
:type combined_logfile: str
:param subjob_logfile_list: list of subjob logfile names to be combined.
:type subjob_logfile_list: list(str)
:param logger: logger for receiving the info and error message.
:type logger: Logger or None
"""
if logger:
logger.info("Combining log files to %s", combined_logfile)
with open(combined_logfile, 'w') as fho:
for logfile in subjob_logfile_list:
fho.write("######## begin of %s #########\n" % logfile)
try:
with open(logfile) as fhi:
shutil.copyfileobj(fhi, fho)
except IOError as e:
if logger:
logger.error(e)
else:
raise
fho.write("######## end of %s #########\n" % logfile)
[docs]def get_host_ncpu():
"""
Return the host and number of CPU that should be used to submit subjobs.
This function works both running under job control and not.
:rtype: str, int
"""
# Inspired by JobDJ.__init__
backend = jobcontrol.get_backend()
if backend:
host_list = jobcontrol.get_backend_host_list()
else:
host_list = jobcontrol.get_command_line_host_list()
if not host_list:
host_list = [('localhost', 1)]
return host_list[0][0], host_list[0][1] or 1