Source code for schrodinger.application.desmond.starter.generator.constant_ph

"""
Copyright Schrodinger, LLC. All rights reserved.
"""
import os
import subprocess
from typing import List
from pathlib import Path

from schrodinger import structure
from schrodinger.application.desmond import cmdline
from schrodinger.application.desmond.constants import UiMode
from schrodinger.application.desmond import launch_utils
from schrodinger.application.desmond import mapper_msj_generator
from schrodinger.application.desmond import multisim
from schrodinger.application.desmond import stage
from schrodinger.application.desmond.starter import ui
from schrodinger.application.desmond import license


def _cmd_for_new_job(args: ui.constant_ph.Args) -> List[str]:
    from schrodinger.application.scisol.packages.lambda_dynamics import constant_ph
    stage_data_fnames = []  # dummy
    cd_params = {
        "processors_per_replica": 1,
        "cpus": args.ppj,
    }

    st = next(structure.StructureReader(args.inp_file))
    marked_mae_fname = f'{args.JOBNAME}-in.mae'
    constant_ph.mark_titration_sites(st, args.titratable_sites,
                                     args.skip_sidechain_mapping)
    st.write(marked_mae_fname)

    generator_args = [args.JOBNAME, cd_params]
    generator_kwargs = dict(
        forcefield=args.forcefield,
        sim_time=args.time,
        phs=constant_ph.expand_phs(args.ph_lower_limit, args.ph_upper_limit,
                                   args.ph_interval),
        site_info=constant_ph.get_site_info(
            constant_ph.get_titratable_sites_list(st)),
        rand_seed=args.seed,
    )
    generator = mapper_msj_generator.ConstantpHMsjGenerator(
        *generator_args, **generator_kwargs)
    msj_fname = generator.write_msj()
    cfg_fname = generator.write_cfg()

    host = f'{args.HOST}:{args.ppj}' if ':' not in args.HOST else args.HOST
    cmd = launch_utils.prepare_command_for_launch(
        host,
        None,
        args.JOBNAME,
        msj_fname,
        args.ppj,
        input_fname=marked_mae_fname,
        lics=[license.epik_lic(), license.fep_lic(host)])

    cmd += launch_utils.additional_command_arguments(
        stage_data_fnames, args.RETRIES, args.WAIT, args.LOCAL, args.DEBUG,
        args.TMPDIR, args.forcefield, args.OPLSDIR, args.NICE, args.SAVE)
    return cmd


def _cmd_for_extend_restart_job(args: ui.constant_ph.Args) -> List[str]:
    """
    Return a command for launching the restart multisim job.
    Exit if the multisim stage could not be found.

    :param args: Command line arguments.
    """
    cpt_fname, rst_stage_idx = launch_utils.get_checkpoint_file_and_restart_number(
        args.checkpoint)
    rst_whole = rst_stage_idx is not None and rst_stage_idx > 0
    engine = launch_utils.read_checkpoint_file(cpt_fname)
    if not rst_stage_idx:
        rst_stage_idx = launch_utils.get_restart_stage_from_engine(engine)

    if args.extend:
        # Modifies the msj file.
        parsed_msj = multisim.parse(string=engine.msj_content)
        extend_stg = parsed_msj.find_stages(stage.DesmondExtend.NAME)[0]
        extend_stg['added_time'] = args.time
        args.msj = f'{args.JOBNAME}.extend.msj'
        parsed_msj.write(args.msj)
        rst_stage_idx = extend_stg.STAGE_INDEX
        # always a full restart for extend
        rst_whole = True

    launch_utils.validate_restart_stage(engine, rst_stage_idx)

    stage_data_fnames = []
    # If restarting a partially complete stage, include this
    # out.tgz in the input list.
    if not rst_whole:
        rst_stage_idx += 1
    for stage_idx in range(1, rst_stage_idx):
        p = Path(f"{engine.jobname}_{stage_idx}-out.tgz")
        if p.exists():
            stage_data_fnames.append(p.name)

    host = f'{args.HOST}:{args.ppj}' if ':' not in args.HOST else args.HOST
    cmd = launch_utils.prepare_command_for_restart(
        engine,
        stage_data_fnames,
        args.HOST,
        args.SUBHOST,
        cpt_fname,
        maxjob=args.ppj,
        jobname=args.JOBNAME,
        msj=args.msj,
        rst_stage_idx=rst_stage_idx,
        rst_whole=rst_whole,
        lics=[license.epik_lic(), license.fep_lic(host)])

    forcefield = None
    cmd += launch_utils.additional_command_arguments(
        stage_data_fnames, args.RETRIES, args.WAIT, args.LOCAL, args.DEBUG,
        args.TMPDIR, forcefield, args.OPLSDIR, args.NICE, args.SAVE)
    return cmd


[docs]def generate(args: ui.constant_ph.Args) -> List[str]: """ Generate the files and command line to run multisim for the constant pH workflow. :param args: Object with input arguments. :return: Command line to launch multisim """ if args.mode == UiMode.NEW: cmd = _cmd_for_new_job(args) else: cmd = _cmd_for_extend_restart_job(args) cmd += ['-set', f'stage[1].set_family.remd.total_proc={args.ppj}'] cmd += ['-mode', 'umbrella'] cmd += ['-o', f'{args.JOBNAME}-out.mae'] print( "Launch command:", subprocess.list2cmdline(cmd).replace(os.environ["SCHRODINGER"], "$SCHRODINGER")) cmd.extend([ "-encoded_description", cmdline.get_b64encoded_str(cmdline.get_job_command_in_startup()), ]) return cmd