Source code for schrodinger.application.desmond.launch_utils

"""
Functions to help with launching and restarting an FEP+ simulation.

Copyright Schrodinger, LLC. All rights reserved.
"""
import glob
import os
import sys
import tarfile
from typing import List
from typing import Optional

from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import stage
from schrodinger.infra import mm


[docs]def prepare_command_for_launch(host: str, subhost: str, jobname: str, msj: Optional[str], maxjob: int, input_fname: Optional[str] = None, program: str = "multisim", lics: Optional[List[str]] = None) -> List[str]: """ :param host: Hostname for the job. :param subhost: Subhost for the job. :param jobname: Jobname for the job. :param msj: The msj filename to pass to multisim. :param maxjob: The maximum number of concurrent jobs. :param input_fname: If set, the structure or fmp input filename. If an fmp file, the pv_file will be written to the same directory and used as the structure input. :param program: The name of the program that will be used by fep_plus to run the workflow. Default: "multisim" :param lics: Additional licenses to request for the main job. In the form of 'LICENSE:NTOKENS'. :return: Multisim command to launch the job. """ executor = os.path.join(os.environ["SCHRODINGER"], "utilities", "multisim") if program == "multisim" \ else program cmd = [executor, "-HOST", host] if subhost: cmd += ["-SUBHOST", subhost] cmd += [ "-JOBNAME", jobname, "-maxjob", str(maxjob), ] if lics: for lic in lics: cmd += ["-lic", lic] if msj: cmd.extend(["-m", msj]) if input_fname: root, ext = os.path.splitext(input_fname) if ext in ['.fmp', '.pkl'] and program == "multisim": from schrodinger.application.scisol.packages.fep import utils pv_file = f"{root}_pv.maegz" utils.fmp_to_mae(input_fname, pv_file) cmd.insert(1, pv_file) else: cmd.insert(1, input_fname) # Note: Either `pv_file` or `input_fname` must be right after the program's name. # Otherwise, we might get the error: # "toplevel.py: error: Please specify a single input PV or FMP file." return cmd
[docs]def prepare_command_for_restart(engine: cmj.Engine, stage_data_fnames: List[str], host: str, subhost: str, cpt_fname: str, maxjob: Optional[int] = None, jobname: Optional[str] = None, msj: Optional[str] = None, rst_stage_idx: Optional[int] = None, rst_whole: bool = False, input_fname: Optional[str] = None, lics: Optional[List[str]] = None) -> List[str]: """ :param engine: Represents the current job state. :param stage_data_fnames: List of filenames to be used for restarting the job. :param host: Hostname for the job. :param subhost: Subhost for the job. :param cpt_fname: Checkpoint filename. :param jobname: Jobname if set, otherwise get it from the engine. :param msj: If set, the msj filename to pass to multisim. :param rst_stage_idx: If set, the restart stage number. :param rst_whole: Set to True to restart the job from scratch or False to continue a partially complete job. :param input_fname: If set, the structure input filename. Only needed for a full restart at stage 2. :param lics: Additional licenses to request for the main job. In the form of 'LICENSE:NTOKENS'. :return: Multisim command to restart the job. :raise SystemExit: If file from `stage_data_fnames` could not be found. """ jobname = jobname or engine.jobname multisim_stage_numbers = get_multisim_stage_numbers(engine) previous_jobname = _get_previous_jobname( engine, multisim_stage_numbers[0]) if multisim_stage_numbers else None cmd = [ os.path.join(os.environ["SCHRODINGER"], "utilities", "multisim"), "-HOST", host, "-SUBHOST", subhost, "-JOBNAME", jobname, "-RESTART", f"{cpt_fname}:{rst_stage_idx}" if rst_whole else cpt_fname, ] cmd += ["-m", msj] if msj else [] # yapf: disable cmd += ["-maxjob", str(maxjob)] if maxjob else [] # yapf: disable # Add files, taking care of a change in the jobname old_jobnames = engine.old_jobnames if hasattr(engine, 'old_jobnames') else [] old_jobnames.reverse() if previous_jobname: old_jobnames.append(previous_jobname) # There may be duplicates for fname in sorted(set(stage_data_fnames)): if not os.path.exists(fname): for oj in old_jobnames: new_fname = fname.replace(engine.jobname, oj) if os.path.exists(new_fname): fname = new_fname break else: sys.exit(f"Cannot find file: {fname}") # Other files, such as msj files, will be passed in by multisim # directly and do not need to be added here. elif not fname.endswith('.fmpdb'): # tar file that will be unpacked cmd += ["-d", fname] if input_fname: cmd += ['-ADD_FILE', input_fname] if lics: for lic in lics: cmd += ["-lic", lic] return cmd
[docs]def additional_command_arguments( stage_data_fnames: List[str], retries: str, wait: bool, local: bool, debug: bool, tmpdir: Optional[str], forcefield: Optional[str], opls_dir: Optional[str], nice: bool, save: bool, ) -> List[str]: """ :param stage_data_fnames: List of filenames to be used for restarting the job. :param retries: Set to control the number of retries for a failed job. :param wait: Set to True to wait until the job has completed before returning. :param local: Set to True to run the job in the current directory. :param debug: Set to True to enable debugging. :param tmpdir: Set to override the temporary directory location. :param forcefield: Set to the name for the forcefield. :param opls_dir: If present, the path to the opls directory. :param nice: Set to True to run the job at reduced priority. :param save: Set to True to return a zip archive of the job directory on completion. :return: Additional arguments for the multisim command to launch the job. """ cmd = [] # yapf: disable for fname in stage_data_fnames: if fname.endswith('.fmpdb'): cmd += ["-ADD_FILE", fname] cmd += ["-RETRIES", retries] if retries else [] cmd += ["-WAIT"] if wait else [] cmd += ["-LOCAL"] if local else [] cmd += ["-DEBUG"] if debug else [] cmd += ["-TMPDIR", tmpdir] if tmpdir else [] cmd += ["-NICE"] if nice else [] cmd += ["-SAVE"] if save else [] if opls_dir: _check_compatible_forcefield(forcefield, opls_dir) cmd += ["-OPLSDIR", opls_dir] # yapf: enable return cmd
[docs]def find_checkpoint_file() -> str: """ Find and return a multisim checkpoint file (whose name should end with the "-multisim_checkpoint" substring) in the current working directory. If multiple multisim checkpoint files exist in the directory, return the latest one. :raise SystemExit: If no checkpoint files was found. """ checkpoint_files = glob.glob("*-multisim_checkpoint") if len(checkpoint_files) == 1: print("Restarting with multisim checkpoint file: ", checkpoint_files[0]) return checkpoint_files[0] elif checkpoint_files: checkpoint_files = sorted(checkpoint_files, key=os.path.getmtime) print("Found multiple multisim checkpoint files in the current dir:") for checkpoint_file in checkpoint_files: print(f" {checkpoint_file}") print(f"Restarting with the latest one: {checkpoint_files[-1]}.") return checkpoint_files[-1] sys.exit("ERROR: Failed to find multisim checkpoint file.")
[docs]def read_checkpoint_file(cpt_fname: str) -> cmj.Engine: """ Read a checkpoint file and return the initialized Engine object. :param cpt_fname: The checkpoint filename. :raise SystemExit: If checkpoint could not be read. """ try: with open(cpt_fname, "rb") as fh: engine = cmj.Engine.deserialize(fh) except EOFError: sys.exit( f"ERROR: Checkpoint file {cpt_fname} seems corrupted. Cannot restart." ) if not cmj.is_restartable_version(engine.version): print(f"Current version of multisim is {cmj.VERSION},") print( f"whereas this checkpoint file was generated by version {engine.version}." ) sys.exit( "This checkpoint file cannot be restarted with the current version of multisim." ) if not cmj.is_restartable_build(engine): sys.exit( "This workflow cannot be restarted with the Academic version of Desmond/Maestro." ) stage_state = [s.__getstate__() for s in engine.stage] stage_list = cmj.parse_msj(None, engine.msj_content) engine.stage = cmj.build_stages(stage_list, stage_state=stage_state) engine._find_restart_stage() return engine
[docs]def get_checkpoint_file_and_restart_number( checkpoint: Optional[str] = None) -> (str, Optional[int]): """ Return the checkpoint filename and the restart stage number if specified. :param checkpoint: If specified, the checkpoint string, in the format "checkpoint_filename:restart_stage_number" or "checkpoint_filename". Otherwise, find the checkpoint file in the current working directory. :raise SystemExit: If `checkpoint` is not valid or could not be found. """ if not checkpoint: checkpoint = find_checkpoint_file() split_str = checkpoint.split(':') try: rst_stage_idx = int(split_str[-1]) cpt_fname = ':'.join(split_str[:-1]) except ValueError: # There is no ':' rst_stage_idx = None cpt_fname = checkpoint if not cpt_fname: sys.exit("ERROR: Checkpoint filename must be specified.") return cpt_fname, rst_stage_idx
[docs]def get_restart_stage_from_engine(engine: cmj.Engine) -> Optional[int]: """ :param engine: Represents the current job state. :return: The restart stage number from the job `engine` if found. Otherwise, None. """ for i, stg in enumerate(engine.stage): if stg == engine.restart_stage: return i
[docs]def get_multisim_stage_numbers(engine: cmj.Engine) -> List[int]: """ :param engine: Represents the current job state. :return: List of multisim stage numbers. """ multisim_stage_numbers = [] for i, s in enumerate(engine.stage): if isinstance(s, stage.Multisim): multisim_stage_numbers.append(i) return multisim_stage_numbers
[docs]def validate_restart_stage(engine: cmj.Engine, rst_stage_idx: Optional[int] = None) -> None: """ Validate the `rst_stage_idx` for the given job `engine`. :param engine: Represents the current job state. :param rst_stage_idx: The restart stage index. :raise SystemExit: If the restart stage is not valid. """ if rst_stage_idx is None: print( "This job has successfully completed. If you intend to rerun\n" " certain stages, specify the stage number using the following\n syntax:" " -checkpoint <checkpoint-file>:<stage-number>.") sys.exit(0) if rst_stage_idx == 1: sys.exit( "Restart from the very beginning of the whole workflow? Why not\n" " rerun the job from scratch with the original input files?") elif (engine.restart_stage and engine.stage.index(engine.restart_stage) < rst_stage_idx): print(f"ERROR: Cannot restart from stage {rst_stage_idx}.") sys.exit( " You can restart from either the first uncompleted stage or a completed stage." ) elif rst_stage_idx < len(engine.stage): print( f"Will restart the workflow at stage {rst_stage_idx}: {engine.stage[rst_stage_idx].NAME}" ) else: sys.exit(f"There is no stage {rst_stage_idx} in this particular job.")
[docs]def prepare_multisim_files_for_restart(engine: cmj.Engine, multisim_stage_numbers: List[int], rst_stage_idx: int, rst_whole: bool, skip_traj: bool = False) -> List[str]: """ Prepare the files needed to restart a multisim job and return list of those files. :param engine: Represents the current job state. :param multisim_stage_numbers: List of multisim stage numbers. :param cpt_fname: Checkpoint filename. :param rst_stage_idx: The restart stage index. :param rst_whole: Set to True to restart the job from scratch or False to continue a partially complete job. :param skip_traj: Set to True to not compress the trajectory information in the tar. :return: List of filenames to be used for restarting the job. :raise SystemExit: If files could not be prepared. """ # Always need the task stage stage_data_fnames = [f"{engine.jobname}_1-out.tgz"] # If the prior stage was skipped, find the first # active stage. prior_stage_idx = rst_stage_idx - 1 while engine.stage[prior_stage_idx].param.should_skip.val: prior_stage_idx = max(1, prior_stage_idx - 1) if prior_stage_idx == 1: break # If stage before restart is multisim, tar it # otherwise just add existing tgz to the list. if prior_stage_idx in multisim_stage_numbers: # Prior stage was multisim stage _tar_multisim_stage_if_needed(engine, prior_stage_idx, skip_traj=skip_traj) stage_data_fnames.append(f"{engine.jobname}_{prior_stage_idx}-out.tgz") # If restarting from a partially complete job, also # need the rst_stage_idx out.tgz if not rst_whole: # If restart stage is multisim, tar it # otherwise just add existing tgz to the list if rst_stage_idx in multisim_stage_numbers: _tar_multisim_stage_if_needed(engine, rst_stage_idx, skip_traj=skip_traj) stage_data_fnames.append(f"{engine.jobname}_{rst_stage_idx}-out.tgz") # If restarting after the multisim stage(s), then they all need # to be passed to the backend. if rst_stage_idx > multisim_stage_numbers[-1]: for multisim_stage_number in multisim_stage_numbers: stage_data_fname = f"{engine.jobname}_{multisim_stage_number}-out.tgz" if stage_data_fname not in stage_data_fnames: _tar_multisim_stage_if_needed(engine, multisim_stage_number, skip_traj=skip_traj) stage_data_fnames.append(stage_data_fname) return stage_data_fnames
def _get_multisim_stage_dirname(engine: cmj.Engine, multisim_stage_number: int) -> str: """ :param engine: Represents the current job state. :param multisim_stage_number: Multisim stage number. :return: Multisim stage directory name. """ multisim_stage_dirname = f"{engine.jobname}_{multisim_stage_number}" multisim_stage = engine.stage[multisim_stage_number] if failed_jobs := multisim_stage.filter_jobs(failed=[True]): multisim_stage_dirname = os.path.basename(failed_jobs[0].dir) if successful_jobs := multisim_stage.filter_jobs(failed=[False]): multisim_stage_dirname = os.path.basename(successful_jobs[0].dir) return multisim_stage_dirname def _get_previous_jobname(engine: cmj.Engine, multisim_stage_number: int) -> str: """ Return the previous job name using job `engine` and the `multisim_stage_number`. This may be an empty string if it is not found. :return: The previous jobname or empty string if not found. """ multisim_stage_dirname = _get_multisim_stage_dirname( engine, multisim_stage_number) return multisim_stage_dirname[:-len(f"_{multisim_stage_number}")] def _check_compatible_forcefield(forcefield: Optional[str], opls_dir: Optional[str]) -> None: """ Check if a given opls_dir and a given forcefield name are compatible. If not, raise an error message. :param forcefield: Name of the forcefield. :param opls_dir: Path to the opls_dir if present. :raise SystemExit: If `opls_dir` is not compatible with the `forcefield`. """ if forcefield and opls_dir: try: mm.validate_opls_directory(opls_dir) except RuntimeError as e: sys.exit(f'ERROR: {e}. You can run with default ' f'{forcefield} parameters by removing the -OPLSDIR option ' f'from the job command, but we recommend using the Force ' f'Field Builder to generate new compatible custom ' f'parameters.') def _tar_multisim_stage(dirname: str, output_fname: str, skip_traj: bool = False, multisim_stage: Optional[stage.Multisim] = None): """ Compress the multisim `dirname` to `output_fname`. :param dirname: Path to the multisim job result directory. :param output_fname: Output filename for the tgz. :param skip_traj: Set to True to not compress trajectory. Default is False. :param multisim_stage: If `skip_traj` is True, still include failed and released jobs in the resulting tar file. Ignored if `skip_traj` is False. """ with tarfile.open(output_fname, 'w:gz') as tar: if skip_traj: for f in glob.glob(f"{dirname}/*-out.mae*"): tar.add(f) if multisim_stage: for subjob in multisim_stage.filter_jobs(is_incomplete=[True]): for f in glob.glob(f"{dirname}/{subjob.jobname}*"): tar.add(f) else: tar.add(dirname) def _tar_multisim_stage_if_needed(engine: cmj.Engine, multisim_stage_number: int, skip_traj: bool = False) -> str: """ Create an tar for the multisim stage if it does not exist or it is older than the checkpoint file. :param engine: Represents the current job state. :param multisim_stage_number: Multisim stage number. :param skip_traj: Set to True to not compress the trajectory information in the tar. :return: Multisim tar filename. :raise SystemExit: If tar file could not be prepared. """ multisim_out_tgz = f"{engine.jobname}_{multisim_stage_number}-out.tgz" multisim_stage_dirname = _get_multisim_stage_dirname( engine, multisim_stage_number) print("Preparing the data files for restarting.") print("This may take a while to finish.") if os.path.exists(multisim_stage_dirname): _tar_multisim_stage(multisim_stage_dirname, multisim_out_tgz, skip_traj=skip_traj, multisim_stage=engine.stage[multisim_stage_number]) else: sys.exit(f"Cannot find {multisim_stage_dirname}/.") return multisim_out_tgz
[docs]def find_stage(stages: List[cmj.StageBase], stage_name: str) -> cmj.StageBase: """ Given the raw stages, find the first stage that matches one of the given stage names. :param stages: List of schrodinger.application.desmond.stage objects. :param stage_name: Stage name to look for. """ for stg in stages: try: if stage_name == stg.NAME: return stg except AttributeError: if stage_name == stg.__NAME__: return stg
[docs]def find_stage_number(stages: List[cmj.StageBase], stage_name: str, picker: Optional[int] = None) -> int: """ Given the raw stages, find the index of the first matching stage (1-based index). :param stages: List of schrodinger.application.desmond.stage objects :param stage_name: Stage name to look for. :param picker: Set to return the `picker` + 1 th matching stage. Default of None means to return the first found stage. """ picker = picker or 0 def matcher(args): _, stg = args return (hasattr(stg, 'NAME') and stg.NAME == stage_name) or (hasattr(stg, '__NAME__') and stg.__NAME__ == stage_name) indices = [stgs[0] + 1 for stgs in filter(matcher, enumerate(stages))] if len(indices) < picker + 1: return None return indices[picker]