Source code for schrodinger.application.macromodel.paraUtils

"""

 Copyright Schrodinger, LLC. All rights reserved.

 This is a common module for parallel job execution.
 It provides functionalities to configure and run parallel jobs.
 Currently this script is used by epik_driver, bmin_driver, and ligprep_driver.
"""

# Maintainer: Hiral Oza

import datetime
import gzip
import os
import pickle
import re
import sys
import tarfile
from past.utils import old_div
from shutil import copy as shutil_copy
from shutil import rmtree as shutil_rmtree

from schrodinger import structure
from schrodinger.job import jobcontrol
from schrodinger.job import queue as jobqueue
from schrodinger.utils import deprecation
from schrodinger.utils import fileutils

############################## Global variables ###############################
(NJOBS, JOBCTS, FIRSTCT, LASTCT, RUN_JOBS, STRICT_END, MAX_RETRIES, OUTPUT_ORG,
 NO_EXECUTION, RESTART, NC) = list(range(0, 11))

###############################################################################

###############################################################################
######################### Common functions ####################################
###############################################################################


[docs]def add_para_job_options(parser, options=None): """ Adds common para job control options to a SingleDashOptionParser instance. :type parser: SingleDashOptionParser :param parser: Instance of SingleDashOptionParser :type options: List :param options: List of module enums that indicate what options to add to the parser. """ if not options: options = [ NJOBS, JOBCTS, FIRSTCT, LASTCT, RUN_JOBS, STRICT_END, MAX_RETRIES, OUTPUT_ORG, NO_EXECUTION, RESTART, NC ] if NJOBS in options: #Number of subjobs to prepare parser.add_option("-NJOBS", "-nprocs", action="store", type="int", dest="njobs", default=1) if JOBCTS in options: #Max number of cts parser.add_option('-JOBCTS', action="store", type="int", dest="jobcts", default=10000) if FIRSTCT in options: #First structure to process parser.add_option('-first', action="store", type="int", dest="firstct") if LASTCT in options: #Last structure to process parser.add_option('-last', action="store", type="int", dest="lastct") if RUN_JOBS in options: #Run specified subjob parser.add_option('-j', action="store", dest="run_jobs") if STRICT_END in options: #Job die if any subjob dies parser.add_option("-STRICT_END", action="store_true", dest="strict_end", default=False) if MAX_RETRIES in options: #Number of allowed retries per subjob parser.add_option('-MAX_RETRIES', action="store", type="int", default=2, dest="max_retries") if OUTPUT_ORG in options: #Organization output parser.add_option("-OUTPUT_ORG", action="store", dest="output_org") if NO_EXECUTION in options: #No execution parser.add_option("-nx", action="store_true", dest="no_execution", default=False) if RESTART in options: #Restart failed parent job parser.add_option("-RESTART", action="store_true", dest="restart", default=False) if NC in options: # Cleanup subdirectories for each subjob parser.add_option("-nc", "-NC", action="store_true", dest="nocleanup", default=False)
[docs]def validate_options(options): """ Validate the job control options :type options: Instance :param options: object containing values of para job options """ if hasattr(options, 'njobs') and options.njobs < 1: print("Error: -NJOBS/-nprocs option (%i) requests less than 1 job." % \ options.njobs) sys.exit(1) if hasattr(options, 'jobcts') and options.jobcts < 1: print("Error: -JOBCTS option (%i) requests less than 1 ct structure " \ "per job." % options.jobcts) sys.exit(1) if hasattr(options, 'firstct') and options.firstct: if options.firstct < 1: print("Error: Specified first structure to process (%i) is less " \ "than 1." % options.firstct) sys.exit(1) if options.debug: print("First structure to process set to :", \ options.firstct) if hasattr(options, 'lastct') and options.lastct: if options.lastct < 1: print("Error: Specified last structure to process (%i) is less " \ "than 1." % options.lastct) sys.exit(1) if options.debug: print("Last structure to process set to :", \ options.lastct) if hasattr(options, 'firstct') and \ hasattr(options, 'lastct') and \ options.firstct and options.lastct and options.lastct < options.firstct: print("Error: Specified last structure to process (%i) is less than " \ "that specified for the first structure (%i)." % (options.lastct, options.firstct)) sys.exit(1) if hasattr(options, 'output_org') and\ options.output_org and options.output_org != "BY_SUBJOB": print("Error: If -OUTPUT_ORG is specified, it must be set to BY_SUBJOB") sys.exit(1)
def _subjob_input_id(text): """ To be used for the subjob input names sorting. """ m = re.match(r'^.*_subjob_(\d+)(\..*)?$', text) assert m is not None return int(m.group(1)) def _append_st_file(st_file, st_writer, raw_append=False): """ Append structure files. :type st_file: String :param st_file: Structure file name :type raw_append: Boolean :param raw_append: Indicates to append file using raw I/O """ try: if raw_append: st_writer.write(open(st_file, 'rb').read()) else: for st in structure.StructureReader(st_file): st_writer.append(st) except: print("Error: appending structure output file.") def _print_subjob_logfile(log_file="", start_msg=None, end_msg=None): """ Prints given log file with start and end messages. :type log_file: String :param log_file: Log file name. Print the content of log_file, if specified :type start_msg: String :param start_msg: Message to place at the start of the log file :type end_msg: String :param end_msg: Message to place at the end of the log file """ if start_msg: print(start_msg) if log_file: in_fh = open(log_file) for line in in_fh: print(line, end=' ') # trailing comma replacement for line.strip() in_fh.close() if end_msg: print(end_msg) def _timestamp(): now = datetime.datetime.now() return now.strftime("%a, %d %b %Y %H:%M:%S.%f") def _merge_subjobs(backend_instance, subjob_input_mae_files, output_file, output_org, data): """ Merge subjobs structure and log files. :type backend_instance: _Backend :param backend_instance: Instance of the _Backend :type subjob_input_mae_files: List :param subjob_input_mae_files: List of subjob files with parent directory :type output_file: String :param output_file: Job output file name :type output_org: String :param output_org: Produce more than one output structure file """ msg1 = "###########################################\n" \ "###########################################\n" \ " Organizing Output Files: \n" start_msg = "...........................................\n" \ " Collecting Results from Job: %s \n" \ " ------------------------------------- \n" msg2 = "End of Log files for subjobs. \n\n" \ "-------------------------------------------\n\n" \ "All structure file output placed in one output " \ "structure file: %s.\n" \ "Number of structures placed in %s: %d.\n"\ "###########################################\n" \ "%s\nJob Complete" end_msg = "\n ------------------------------------- \n" \ "...........................................\n" base, ext = fileutils.splitext(output_file) st_writer = open(output_file, "wb") raw_append = True tar = tarfile.open(base + '.tar.gz', "w:gz") nfiles = len(subjob_input_mae_files) for count, file_path in enumerate(subjob_input_mae_files): subjobdir = os.path.dirname(file_path) tar.add(subjobdir) # Collect structures st_file = os.path.join(subjobdir, subjobdir + ext) if os.path.isfile(st_file): if backend_instance and output_org: # FIXME: we tried with backend.addOutputfile() but it couldn't # work so now coping subjob output files individually from # job_dir to launch_dir try: shutil_copy( os.path.join(backend_instance.getJob().JobDir, st_file), backend_instance.getJob().Dir) except: pass _append_st_file(st_file, st_writer, raw_append) # Collect log tmp1_msg = start_msg % subjobdir if count == 0: tmp1_msg = msg1 + tmp1_msg tmp2_msg = end_msg log_file = os.path.join(subjobdir, subjobdir + ".log") if os.path.isfile(log_file): _print_subjob_logfile(log_file, start_msg=tmp1_msg, end_msg=tmp2_msg) tmp1_msg = "" st_writer.close() tar.close() frmt = None if 'output_format' in data: frmt = data['output_format'] if frmt == 'mae': frmt = 'maestro' tmp2_msg = end_msg + msg2 % (output_file, output_file, structure.count_structures(output_file), _timestamp()) _print_subjob_logfile(start_msg=tmp1_msg, end_msg=tmp2_msg) def _get_jobs_to_run(run_jobs_str, njobs): """ Return the list of jobs to run. :type run_jobs_str: String :param run_jobs_str: String of ':' or ',' separated numbers :type njobs: Number :param njobs: Total number of jobs requested """ run_jobs = [] def validate_or_add_num(str, range_check=True): try: num = int(str) except: print("Error: Wrong job number '%s' specified." % str) sys.exit(1) if range_check: if num > 0: run_jobs.append(num) else: print("Warning: The specified job number %i is less than 0. " \ "Skipping this job." % job_num) return num if run_jobs_str: for item in run_jobs_str.strip().split(','): item = item.strip() tmp_list = item.split(':') if not tmp_list or len(tmp_list) > 2: print("Warning: can not identify the job '%s'. Skipping." % item) continue if len(tmp_list) == 2: start = validate_or_add_num(tmp_list[0].strip(), range_check=False) end = validate_or_add_num(tmp_list[1].strip(), range_check=False) for i in range(start, end + 1): validate_or_add_num(i) else: validate_or_add_num(item) # Remove duplicates if any run_jobs = list(set(run_jobs)) if not run_jobs: print("Error: Specified job numbers '%s' are not valid." % \ run_jobs_str) sys.exit(1) else: # Sort list run_jobs.sort() tmp_list = [] for job_num in run_jobs: if job_num > njobs: print("Warning: The specified job number %i is greater " \ "than the number of jobs (%i). Skipping this job." \ % (job_num, njobs)) else: tmp_list += [job_num] run_jobs = tmp_list print("Only running subjobs: %s" % \ ','.join(["%s" % j for j in run_jobs])) return run_jobs def _generate_subjobs(job_input_file, job_output_file, options, driver_script, split_input_file=True): """ Creates the subjob dirs and fill in data :type job_input_file: String :param job_input_file: Job input file :type job_output_file: String :param job_output_file: Job output file :type options: Instance :param options: object containing values of para job options """ in_file_base, in_file_ext = fileutils.splitext(job_input_file) in_file_base = fileutils.get_basename(job_input_file) out_file_base, out_file_ext = fileutils.splitext(job_output_file) subjob_input_dat = {} if split_input_file: nct_in = structure.count_structures(job_input_file) if options.firstct: print("First structure to process set to :", \ options.firstct) else: options.firstct = 1 if options.lastct: if options.lastct < nct_in: print("Number of structures present in input structure file:", \ nct_in) print("Number of last structure to be processed :", \ options.lastct) elif options.lastct > nct_in: print("Number of structures present in input structure file:", \ nct_in) print("Number of last structure to be processed :", \ options.lastct) print("Requested number of last structure to process exceeds " \ "number present.") print("Reseting number of last structure to process to number " \ "present (%i)." % nct_in) options.lastct = nct_in else: options.lastct = nct_in nct_use = options.lastct - options.firstct + 1 print("Number of structures to process :", nct_use) if nct_use < options.njobs: print("More jobs requested (%i) than number of structures to " \ "process (%i)." % (options.njobs, nct_use)) options.njobs = nct_use print("Number of jobs set to %i." % options.njobs) else: tmp_njobs = 0 if nct_use % options.jobcts != 0: tmp_njobs = int( old_div( (nct_use + (options.jobcts - nct_use % options.jobcts)), options.jobcts)) else: tmp_njobs = int(old_div(nct_use, options.jobcts)) if tmp_njobs > options.njobs: print("Number of jobs requested (%i) requires that more than\n" \ " JOBCTS (%i) structures be processed by each subjob." \ % (options.njobs, options.jobcts)) print("Number of jobs increased to: %i" % tmp_njobs) options.njobs = tmp_njobs run_jobs = _get_jobs_to_run(options.run_jobs, options.njobs) cts_per_job = old_div(nct_use, options.njobs) remainder = nct_use % options.njobs st_writer = None format = fileutils.get_structure_file_format(job_input_file) adj = 0 ict = 1 pattern = b"" header = b"" if (in_file_ext[-2:] == 'gz'): input_fh = gzip.open(job_input_file, 'rb') else: input_fh = open(job_input_file, 'rb') if format == structure.MAESTRO: pattern = rb"^\s*f_m_ct\s*{" #header = '''{\n s_m_m2io_version\n :::\n 2.0.0\n}\n\n''' input_dat = [] header_indx = 0 for line in input_fh: input_dat.append(line) header_indx += 1 if (header_indx == 6): break header = b''.join(input_dat[:6]) st_reader = structure.StructureReader(job_input_file, index=options.firstct) elif format == structure.SD: pattern = b"$$$$" header = b"" # Renumbering since delimiter is at end of ct ict = 0 adj = 1 st_reader = structure.StructureReader(job_input_file, index=options.firstct) elif format == structure.SMILES: st_reader = structure.SmilesReader(job_input_file, index=options.firstct) elif format == structure.SMILESCSV: st_reader = structure.SmilesCsvReader(job_input_file, index=options.firstct) for line in input_fh: if line[:6] == b"SMILES": header = line break if (header == b""): input_fh.seek(0) else: print("WARNING: Unsupported format ") st_reader = structure.StructureReader(job_input_file, index=options.firstct) #subjob_input_mae_files = [] input_mae_file = None ijob = 0 indx = 1 cts_for_job = cts_per_job if remainder > 0: cts_for_job += 1 cts_in_job = 0 ct_delim = 0 fw = None for line in input_fh: # For Maestro files, delimiter comes in the beginning so deal with here and for sdf later if format == structure.MAESTRO: ct_delim = 0 if re.match(pattern, line): ct_delim = 1 # Open up the subjob output file if required and write header into it if ((ct_delim != 0 and ict > 1 and cts_in_job == cts_for_job and ijob <= options.njobs) \ or (ijob == 0)): # Close the previous output file if (ijob != 0): fw.close() fw = None if (ijob == options.njobs): break ijob += 1 # Extra cts have been added, now back to cts_per_job if (ijob > remainder): cts_for_job = cts_per_job subjobname = out_file_base + "_subjob_" + str(ijob) #print subjobname # Create subjob direcotry name and the file name if not os.path.isdir(subjobname): os.mkdir(subjobname) input_mae_file = os.path.join(subjobname, in_file_base + "_subjob_" + \ str(ijob) + in_file_ext) #subjob_input_mae_files.append(input_mae_file) # Populate the subjobname and starting index in returning dictionary. subjob_input_dat[input_mae_file] = str(ict + adj) # If this job is not to be run, remove it from list of jobs returned if run_jobs and ijob not in run_jobs: #subjob_input_mae_files.remove(input_mae_file) subjob_input_dat.pop(input_mae_file) if (in_file_ext[-2:] == 'gz'): fw = gzip.open(input_mae_file, 'wb') else: fw = open(input_mae_file, 'wb') fw.write(header) cts_in_job = 0 if format == structure.SD: ct_delim = 0 if pattern in line: ct_delim = 2 fw.write(line) line = b"" elif format == structure.SMILESCSV: ct_delim = 0 if (len(line) > 1 and line[:6] != b"SMILES" and line[0] != b"#" and line[0] != b" "): ct_delim = 3 else: line = b"" elif format == structure.SMILES: ct_delim = 0 if (len(line) > 1 and line[0] != b"#" and line[0] != b" "): ct_delim = 4 if (ct_delim != 0): ict += 1 cts_in_job += 1 fw.write(line) input_fh.close() if fw is not None: fw.close() else: input_mae_file = None ijob = 1 while ijob <= options.njobs: subjobname = out_file_base + "_subjob_" + str(ijob) if not os.path.isdir(subjobname): os.mkdir(subjobname) input_mae_file = os.path.join(subjobname, in_file_base + "_subjob_" + \ str(ijob) + in_file_ext) subjob_input_dat[input_mae_file] = 1 ijob += 1 return subjob_input_dat def _run_subjobs(jdj, restart_file): jdj.run(status_change_callback=lambda _: jdj.dump(restart_file)) if jdj.isComplete(): # Remove restart file if present fileutils.force_remove(restart_file)
[docs]def launch_subjobs( options, driver_script, backend_instance, backend_args, job_input_file, job_output_file, cmd_append="", merge=True, prepare_subjob_callback=None, callback_data={}, # noqa: M511 split_input_file=True, post_execution_processing_callback=None, post_execution_processing_callback_data={}): # noqa: M511 """ Launch subjobs. :type options: Instance :param options: object containing values of para job options :type driver_script: String :param driver_script: Driver script name :type backend_instance: _Backend :param backend_instance: Instance of the _Backend :type backend_args: List :param backend_args: List of arguments :type job_input_file: String :param job_input_file: Job input file :type job_output_file: String :param job_output_file: Job output file :type cmd_append: String :param cmd_append: Command to be appended to subjob command :type merge: Boolean :param merge: Whether to join subjob outputs or not :type prepare_subjob_callback: Function :param prepare_subjob_callback: Function to be called to prepare subjob data :type callback_data: Dictionary :param callback_data: A dictionary to be passed to subjob callback """ jobname = fileutils.get_basename(job_output_file) restart_file = jobname + '.restart' # Job restart functionality if options.restart: if os.path.isfile(restart_file): fh = open(restart_file) jdj = pickle.load(fh) fh.close() print("Total subjobs :", len(jdj.all_jobs)) print("Finished subjobs:", len(jdj.done_jobs)) print("Failed subjobs :", len(jdj.failed_jobs)) print("Running failed subjobs...") _run_subjobs(jdj, restart_file) if jdj.isComplete(): if merge: # Prepare subjob input files to merge in_file_base, in_file_ext = fileutils.splitext( job_input_file) out_file_base, out_file_ext = fileutils.splitext( job_output_file) subjob_input_mae_files = [] all_jobs = jdj.all_jobs all_jobs.sort() for i, job in enumerate(all_jobs): job_obj = job.getJob() subjobname = out_file_base + "_subjob_" + str(i + 1) infile = os.path.join(subjobname, in_file_base + \ "_subjob_" + str(i + 1) + in_file_ext) if os.path.basename(infile) in job_obj.InputFiles: subjob_input_mae_files += [infile] if subjob_input_mae_files: subjob_input_mae_files.sort(key=_subjob_input_id) _merge_subjobs(backend_instance, subjob_input_mae_files, job_output_file, options.output_org, post_execution_processing_callback_data) return else: print("Warning: Restart file '%s', couldn't find." % restart_file) print("Running without restart file.") if split_input_file: if options.output_org: if options.debug: print("Output organization set to", options.output_org) if options.output_org == "BY_SUBJOB": print("Separate output structure files will be created for " \ "each subjob.") else: print("All output structures will be stored in one file called '%s'" % \ job_output_file) # Generate subjobs dirs and data subjob_input_dat = _generate_subjobs(job_input_file, job_output_file, options, driver_script, split_input_file) subjob_input_mae_files = list(subjob_input_dat) subjob_input_mae_files.sort(key=_subjob_input_id) print("Generated %d jobs" % len(subjob_input_mae_files)) #for key in subjob_input_mae_files: # print key, subjob_input_dat[key] #sys.exit(0) if hasattr(options, 'no_execution') and options.no_execution: print("------------------------------------------------------------") print("-nx specified only creating subdirectories and input " \ "structure files.") print(" jobs will not be run.") print("------------------------------------------------------------") return print("Preparing subjobs...") backend = "" if driver_script == "epik_driver": backend = "epik" elif driver_script == "bmin_driver": backend = "bmin" elif driver_script == "ligprep_driver": msg = 'Does not work with modern ligprep' raise deprecation.DeprecationError(msg) else: raise RuntimeError(f'Unrecognized driver {driver_script}') subjob_cmd = [os.path.join(os.environ['SCHRODINGER'], backend)] subjob_cmd += backend_args max_failures = jobqueue.NOLIMIT if options.strict_end: max_failures = 0 out_file_base, out_file_ext = fileutils.splitext(job_output_file) hosts = jobcontrol.get_backend_host_list() if not hosts: hosts = [('localhost', 1)] jdj = jobqueue.JobDJ(hosts=hosts, max_failures=max_failures, max_retries=options.max_retries, verbosity="verbose") for i, file_path in enumerate(subjob_input_mae_files): subjobdir = os.path.dirname(file_path) print(" subjobinfo subjobname", subjobdir) in_file = os.path.basename(file_path) if split_input_file: print(" subjobinfo input ", in_file) out_file = subjobdir + out_file_ext print(" subjobinfo output ", out_file) if split_input_file: print(" subjobinfo log ", subjobdir + '.log') if prepare_subjob_callback: prepare_subjob_callback(subjobdir, in_file, out_file, callback_data) cmd = list(subjob_cmd) if driver_script == "epik_driver": cmd.extend(['-imae', in_file, '-omae', out_file]) elif driver_script == "bmin_driver": cmd.extend([subjobdir]) elif driver_script == "ligprep_driver": tmp_cmd = cmd_append.strip().split(' ') tmp_cmd[1] = in_file tmp_cmd[3] = out_file cmd.extend(tmp_cmd) cmd.extend(['-indx', subjob_input_dat[file_path]]) if options.debug: print("subjob command", ' '.join(cmd)) jdj.addJob(cmd, command_dir=subjobdir) print("Running subjobs...") _run_subjobs(jdj, restart_file) if jdj.isComplete(): if merge: _merge_subjobs(backend_instance, subjob_input_mae_files, job_output_file, options.output_org, post_execution_processing_callback_data) if post_execution_processing_callback: base, ext = fileutils.splitext(job_output_file) main_log = base + '.log' post_execution_processing_callback( subjob_input_mae_files, main_log, post_execution_processing_callback_data) # EV# 109618: Remove subjob dirs if job ran locally (i.e. using -LOCAL or # -NOJOBID) and in all other cases jobcontrol would take care of deleting # subjob dirs. if backend_instance: backend_job = backend_instance.getJob() local = (backend_job.JobDir == backend_job.Dir) else: local = True if hasattr(options, 'nocleanup') and options.nocleanup == False and local: print("-------------------------------------------") print("Removing subjob directories") for file_path in subjob_input_mae_files: shutil_rmtree(os.path.dirname(file_path)) print("-------------------------------------------")
############################################################################### #EOF