Source code for schrodinger.application.desmond.starter.ui.cmdline

"""
Common command line arguments
Lightweight framework for defining new command line arguments.

Copyright Schrodinger, LLC. All rights reserved.
"""

import argparse
import os
import sys
import time
import warnings
from typing import Dict
from typing import List
from typing import NamedTuple
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Union

from schrodinger.application.desmond.constants import UiMode
from schrodinger.application.desmond import launch_utils
from schrodinger.application.desmond import struc
from schrodinger.application.desmond import util
from schrodinger.infra import mm
from schrodinger.structure import Structure
from schrodinger.utils.cmdline import DEBUG
from schrodinger.utils.cmdline import HOST
from schrodinger.utils.cmdline import LOCAL
from schrodinger.utils.cmdline import NICE
from schrodinger.utils.cmdline import OPLSDIR
from schrodinger.utils.cmdline import RESTART
from schrodinger.utils.cmdline import RETRIES
from schrodinger.utils.cmdline import SAVE
from schrodinger.utils.cmdline import SUBHOST
from schrodinger.utils.cmdline import TMPDIR
from schrodinger.utils.cmdline import WAIT
from schrodinger.utils.cmdline import add_jobcontrol_options
from schrodinger.utils.cmdline import add_standard_options
from schrodinger.utils.fileutils import is_maestro_file

Destination = Union[None, str, Dict[str, str]]
ERROR_MISSING_INPUT = ('ERROR: Must provide input file to run a new job. See '
                       '--help for more information.')
ERROR_WRONG_EXTENSION = ('ERROR: Invalid input file extension. See --help for '
                         'more information.')
ERROR_DUPLICATE_TITLE = (
    "ERROR: The input file contains structures with "
    "duplicate titles. Please ensure all titles are unique "
    "and try again.")
MIN_LEG_TIME = 500.0
ERROR_TOO_SHORT = f'ERROR: Simulation time must be >= {MIN_LEG_TIME:.1f} ps.'


[docs]def get_sim_time_message( default_time: float, leg_type: Optional[str] = None, default_arg: Optional[str] = None, ): """ Get the help message for the sim time arguments to the FEP interfaces """ msg = "Specify the production-simulation time (in ps)" if leg_type: msg += f" for the {leg_type} leg of the FEP stage" msg += ". " if default_arg: msg += (f"If provided, this value will override {default_arg} for this " f"leg. ") msg += ("For extension, this option specifies the additional simulation " "time (in ps). ") msg += (f"For new jobs - Default: {default_time:.1f} Min value: " f"{MIN_LEG_TIME:.1f}. ") msg += f"For extension - Default: {default_time:.1f} Min value: 0.0. " return msg
[docs]class Option(NamedTuple): name: Union[str, List] default: object help: str dest: Dict = None
[docs]def define_options(parser: argparse.ArgumentParser, options: List[Option]): """ Define the options on a specified parser. :param parser: Add options to this parser. :param options: List of options in the format (name, default, help, dest). """ for opt in options: default = opt.default kwarg = { "default": default, "help": opt.help, } if isinstance(default, str): kwarg["metavar"] = "<string>" elif isinstance(default, bool): kwarg["action"] = "store_%s" % ("false" if default else "true") elif isinstance(default, int): kwarg["metavar"], kwarg["type"] = "<integer>", int elif isinstance(default, float): kwarg["metavar"], kwarg["type"] = "<real>", float elif isinstance(default, list): if len(default) >= 1: if (isinstance(default[0], str)): kwarg["metavar"] = "<string>" elif (isinstance(default[0], int)): kwarg["metavar"], kwarg["type"] = "<integer>", int elif (isinstance(default[0], float)): kwarg["metavar"], kwarg["type"] = "<real>", float kwarg["nargs"] = '*' kwarg["action"] = "append" dest = opt.dest if isinstance(dest, str): kwarg["dest"] = dest elif isinstance(dest, dict): kwarg.update(dest) opt_names = isinstance(opt.name, list) and opt.name or [opt.name] parser.add_argument(*opt_names, **kwarg)
[docs]def auto_int(string): if string == "auto": return string else: try: return int(string) except TypeError: msg = f'{string} is not an integer or the word \"auto\"' raise argparse.ArgumentTypeError(msg)
[docs]def get_common_options() -> List[Option]: """ Return list of options common to all scripts. """ ffld_names = mm.opls_names() return [ Option( "-ff", ffld_names[-1], f"Specify the forcefield to use. Default: {ffld_names[-1]}.", { "dest": "forcefield", "metavar": "{%s}" % "|".join(ffld_names) }, ), Option( "-seed", 2014, "Specify seed of pseudorandom number generator for initial atom" " velocities. Default: 2014", ), Option( "-ppj", 0, "Specify number of processors per job. Default: 4.", {"metavar": "PPJ"}, ), Option("-mps", 0, argparse.SUPPRESS, {"type": auto_int}), Option( "-checkpoint", "", "Specify the multisim checkpoint file.", {"metavar": "<multisim-checkpoint-file>"}, ), Option( "-prepare", False, "Do not run job. Only prepare multisim input files.", ), Option( "-skip_traj", False, argparse.SUPPRESS, ), # "Do not copy trajectories to host machine on restart (saves bandwidth, # "but may affect the cleanup stage)." Option( "-JOBNAME", "", "Specify the job name.", ), Option( "-buffer", 5.0, "Specify a larger buffer size (in Angstroms). Defaults: 5 in" " complex leg; 5 in solvent leg of protein-residue-mutation FEP;" " 10 in solubility FEP; 10 in solvent leg of other types of FEP." " The custom value will be used only if it's greater than the" " corresponding default values.", ), Option( "-maxjob", 0, "Maximum number of simultaneous subjobs. Default: 0 (unlimited)", ), Option( ["-lambda-windows", "-lambda_windows"], 12, "Number of lambda windows for the default protocol. Default: 12", ), Option( "-no_concat", False, argparse.SUPPRESS, ), Option( "-max-walltime", 0, argparse.SUPPRESS, # "Specify the maximum number of seconds to run the subjobs " # "before automatically checkpointing and requeuing them. " # "The default of 0 means to run all subjobs to completion. "" ), Option("-ffbuilder", False, "Run the ffbuilder workflow."), Option( "-ff-host", "", "Host for the ffbuilder jobs specified as HOST:MAX_FF_BUILDER_JOBs. " "This must be set if using -ffbuilder. ") ]
[docs]def get_common_fep_options() -> List[Option]: """ Return list of options common to all fep scripts. """ return [ Option( "-salt", 0.0, "Add salt to simulation box with default or specified salt concentration (in M). " "Charged protocol will always have a minimum of 0.15 M salt added. " "Default: 0.0 (only adds 0.15 M salt to charged protocol). ", {"type": float}) ]
[docs]def suppress_options(options: List[Option], excluded: Set[str]): """ Modify the options as specified by `excluded` by replacing the help text with `argparse.SUPPRESS`, which will effectively hide the specified options in the command line. No effects if either `options` or `excluded` is an empty container. """ for index, (name, default, _, dest) in enumerate(options): name = isinstance(name, list) and name[0] or name if name in excluded: suppressed_opt = Option(name, default, argparse.SUPPRESS, dest) options[index] = suppressed_opt
[docs]def get_parser(usage: str, options: List[Option], add_help: bool = True, add_subhost: bool = True) \ -> argparse.ArgumentParser: """ Return a command-line parser with the given options. :param usage: Usage to display if no arguments given. :param options: List of options :param add_help: Whether to add help flag to the parser :return: Configured command-line parser """ parser = argparse.ArgumentParser(usage=usage, add_help=add_help, allow_abbrev=False) define_options(parser, options) add_jobcontrol_options(parser, options=( HOST, WAIT, LOCAL, DEBUG, TMPDIR, NICE, SAVE, OPLSDIR, )) standard_options = (SUBHOST, RETRIES, RESTART) if add_subhost else \ (RETRIES, RESTART) add_standard_options(parser, options=standard_options) return parser
[docs]def parse_known_options( usage: str, options: List[Option], argv: List[str], add_subhost: bool = True ) -> Tuple[argparse.Namespace, List[str], argparse.ArgumentParser]: """ Parse and return the parsed options. :param usage: Usage to display if no arguments given. :param options: List of options in the format (name, default, help, destination). :param argv: List of input arguments. :return: (Known parsed options, unknown options) :raise SystemExit: If no arguments given, show the usage and exit. """ parser = get_parser(usage, options, add_subhost=add_subhost) if len(argv) < 1: parser.print_help() sys.exit(0) opts, other_args = parser.parse_known_args(argv) return opts, other_args, parser
[docs]def parse_options(usage: str, options: List[Option], argv: List[str], add_subhost: bool =True) \ -> argparse.Namespace: """ Parse and return the parsed options. :param usage: Usage to display if no arguments given. :param options: List of options in the format (name, default, help, dest). :param argv: List of input arguments :param add_subhost: Add the -SUBHOST option? :return: Parsed options :raise SystemExit: If no arguments given or if there is unknown arguments, show the usage and exit. """ check_discontinued_args(argv) opts, other_args, parser = parse_known_options(usage, options, argv, add_subhost=add_subhost) for other_arg in other_args: if other_arg.startswith('-'): parser.print_help() print("ERROR: Unrecognized option {}".format(other_arg)) sys.exit(1) return opts
[docs]def check_discontinued_args(args: List[str]): """ Check for the presence of arguments that have been removed and exit if any are used. """ for arg in args: if arg == "-m": sys.exit( "ERROR: The -m option has been removed in favor of using the " "'-prepare' option and then directly editing the msj before " "running multisim.")
[docs]def check_jobname(jobname: str) -> Optional[str]: """ Check whether the given job name contains problematic characters. It cannot start with a "-" or contain a "=". """ if jobname.startswith("-"): return f'ERROR: The job name {jobname} cannot start with "-"' if "=" in jobname: return f'ERROR: The job name {jobname} cannot contain "="' return None
[docs]class BaseArgs: """ Base class for arguments. A subclass of this class contains all the command-line arguments. The jobcontrol-related arguments, which are consumed by the toplevel script, are recovered. """
[docs] def __init__(self, opt: argparse.Namespace): """ :param opt: Command line options with corresponding values. """ self.copy_parser_attributes(opt) self.msj = None self.forcefield = opt.forcefield self.seed = opt.seed # FIXME: Remove `self.ppr`? # Was processors per replica, deprecated option self.ppr = 1 self.ppj_set_by_user = bool(opt.ppj) self.ppj = opt.ppj or 4 self.mps_factor = 0 self.set_mps_factor(opt.mps) self.checkpoint = opt.checkpoint self.prepare = opt.prepare self.skip_traj = opt.skip_traj self.buffer = opt.buffer self.maxjob = opt.maxjob self.lambda_windows = opt.lambda_windows self.concat = not opt.no_concat self.max_walltime = opt.max_walltime self.ffbuilder = opt.ffbuilder self.ff_host = opt.ff_host self.inp_file = opt.inp_file # Jobcontrol options self.JOBNAME = opt.JOBNAME self.restart = opt.restart self.RETRIES = "1" if opt.retries is None else str(opt.retries) # NOTE: JOBHOST and SCHRODINGER_NODELIST may be "" # in which case localhost should be used. self.HOST = os.getenv("JOBHOST") or "localhost" self.SUBHOST = os.getenv("SCHRODINGER_NODELIST") or "localhost" self.WAIT = opt.wait self.LOCAL = opt.local self.DEBUG = opt.debug self.TMPDIR = os.getenv("SCHRODINGER_TMPDIR") # NOTE: -OPLSDIR option will be stripped from the command line by # toplevel script and it will set OPLS_DIR environment. self.OPLSDIR = os.getenv("OPLS_DIR") self.NICE = os.getenv("SCHRODINGER_NICE") self.SAVE = opt.save self.generate_jobname() self._normalize_mode_settings() self.validate()
def __str__(self): s = [f"{k} = {v}" for k, v in vars(self).items()] return "\n".join(s) @property def mode(self) -> UiMode: if self.checkpoint or self.restart: return UiMode.RESTART else: return UiMode.NEW
[docs] def validate(self): """ Validate the parameters. :raise SystemExit: For invalid parameters. """ try: ff_int = mm.opls_name_to_version(self.forcefield) if mm.mmffld_license_ok(ff_int) != mm.TRUE: sys.exit( f"ERROR: The {self.forcefield} forcefield requires a license.\n" f"The {mm.OPLS_NAME_F14} forcefield does not require a license.\n" f"You can specify this with \"-ff {mm.OPLS_NAME_F14}\" in the command." ) except IndexError: pass # CHARMM, AMBER, etc. don't require license check if self.ffbuilder and not self.ff_host: sys.exit( "ERROR: The -ffbuilder option requires the -ff-host HOSTNAME:MAX_FFBUILDER_SUBJOBS argument." ) self._validate_files() if err := check_jobname(self.JOBNAME): sys.exit(err)
def _normalize_mode_settings(self): """ Ensure that the settings are consistent with the chosen mode. """ if self.mode != UiMode.NEW: if not self.checkpoint: # Will ERROR if can't find checkpoint in cwd self.checkpoint = launch_utils.find_checkpoint_file() if self.inp_file: print("WARNING: An input file is not required and will be " "ignored.") self.inp_file = None def _validate_files(self): """ Ensure that the requisite files are present for the chosen mode. """ if self.mode == UiMode.NEW: # Start from scratch if not self.inp_file: sys.exit(ERROR_MISSING_INPUT) util.ensure_file_exists(self.inp_file) self._validate_input_file() else: cpt_fname, _ = launch_utils.get_checkpoint_file_and_restart_number( self.checkpoint) util.ensure_file_exists(cpt_fname) engine = launch_utils.read_checkpoint_file(cpt_fname) if self.JOBNAME != engine.jobname: sys.exit(f'ERROR: The given jobname of "{self.JOBNAME}" must ' f'be the same as the jobname used in the checkpoint ' f'file: {engine.jobname}.') def _validate_input_file(self): """ Check that the input file is valid for a new job """ if not is_maestro_file(self.inp_file): sys.exit(ERROR_WRONG_EXTENSION)
[docs] def copy_parser_attributes(self, opt: argparse.Namespace): """ Copy parser options (e.g: time, buffer, ...) from `opt` to `self`. Subclass needs to call this method in __init__ """ for attr in dir(opt): if not attr.startswith('_'): setattr(self, attr, getattr(opt, attr))
[docs] def generate_jobname(self): """ If the JOBNAME was not set and this is a new job, automatically generate a job name. """ if not self.JOBNAME: self.JOBNAME = os.environ.get("SCHRODINGER_JOBNAME") if not self.JOBNAME and self.mode == UiMode.NEW: self.JOBNAME = util.getlogin() + time.strftime("%Y%m%dT%H%M%S") print( f"Using an automatically-generated job name: {self.JOBNAME}" )
[docs] def set_mps_factor(self, val): """ Set the mps oversubcription factor. If val is `auto`, the mps factor will be determined automatically. Otherwise it is set directly, and should have an `int` value. `0` is treated equivalently to the value `1`. """ if val != "auto": if not (0 <= val < 9): raise ValueError("explicit MPS value must be an integer " "between 0 and 8") # interpret 0 as mps_factor == 1 if not val: val = 1 if val == "auto" or val > 1: warnings.warn( 'WARNING: MPS is enabled - this may cause undefined behavior ' 'if the queue being used does not support MPS') self.mps_factor = val
[docs]class FepArgs(BaseArgs): PROGRAM_NAME: str SUPPORTED_FEP_TYPES: List[str] # a list of constants.FEP_TYPES @property def mode(self) -> UiMode: if self.extend: return UiMode.EXTEND elif self.checkpoint or self.restart: return UiMode.RESTART else: return UiMode.NEW
[docs] def copy_parser_attributes(self, opt: argparse.Namespace): super().copy_parser_attributes(opt) self.extend = opt.extend self.salt_concentration = opt.salt
[docs] def validate(self): super().validate() self.check_ppj()
def _normalize_mode_settings(self): super()._normalize_mode_settings() if self.mode == UiMode.EXTEND: cpt_fname, rst_stage_idx = launch_utils.get_checkpoint_file_and_restart_number( self.checkpoint) if rst_stage_idx: print(f"WARNING: A restart stage index ({rst_stage_idx}) is " "not required for extending and will be ignored.") self.checkpoint = cpt_fname
[docs] def check_ppj(self): """ Raise a warning if restarting and trying to set ppj. :raise UserWarning: If ppj set for a restarted job. """ if self.ppj_set_by_user and self.mode != UiMode.NEW: warnings.warn( 'WARNING: When restarting, the "-ppj" option is not supported' ' and will be ignored.')
def _validate_files(self): super()._validate_files() if self.mode == UiMode.EXTEND: util.ensure_file_exists(self.extend) util.ensure_file_exists(f"{self.JOBNAME}_out.fmp") def _validate_input_file(self): """ Support fmp and maestro files """ from schrodinger.application.scisol.packages.fep import graph if self.inp_file.endswith("fmp"): g = graph.Graph.deserialize(self.inp_file) if g.fep_type not in self.SUPPORTED_FEP_TYPES: sys.exit(f"ERROR: {self.inp_file} uses the {g.fep_type} FEP " f"type but needs to be generated for " f"{self.PROGRAM_NAME}.") elif not is_maestro_file(self.inp_file): sys.exit(ERROR_WRONG_EXTENSION) def _validate_sim_times(self, times): for time in times: if not (self.DEBUG or self.extend) and time < MIN_LEG_TIME: sys.exit(ERROR_TOO_SHORT)
[docs] def check_duplicate_titles(self, sts: List[Structure]): duplicate_titles = struc.find_duplicate_titles(sts) if duplicate_titles: for idx, title in duplicate_titles: print(f'Duplicate title "{title}" found at index {idx}.') sys.exit(ERROR_DUPLICATE_TITLE)
[docs] def get_time_for_leg(self, leg_type: str) -> Optional[float]: """ Get simulation time for FEP leg. Implemented in subclasses of FepArgs. """ raise NotImplementedError( "Not implemented for FepArgs. Subclass should override.")