import copy
import glob
import os
import shutil
import string
import warnings
import weakref
from collections import defaultdict
from pathlib import Path
from typing import List
from typing import Optional
from typing import Tuple
import numpy as np
from schrodinger import structure
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import config
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import deltaE
from schrodinger.application.desmond import envir
from schrodinger.application.desmond import fepana
from schrodinger.application.desmond import gcmc_utils
from schrodinger.application.desmond import util
from schrodinger.utils import sea
from .jobs import DesmondBackendJob
from .jobs import DesmondJob
from .jobs import FepJob
from .jobs import SnapcoreCoordinates
from .utils import SystemBuilder
from .utils import check_cms_file
from .utils import run_stage_method_in_parallel
__all__ = [
'SimulateBase', 'RemovesInitialGCMCSolvent', 'GCMCCapable', 'Simulate',
'ReplicaExchange', 'LambdaHopping', 'DesmondExtend', 'Vrun', 'FepVrun',
'Concatenate', 'ReInit'
]
[docs]def concatenate_and_backup(text_fname):
text_fname_previous = text_fname + ".previous"
if os.path.exists(text_fname_previous):
if os.path.exists(text_fname):
with open(text_fname_previous, "a") as fh_a:
with open(text_fname, "r") as fh_r:
for line in fh_r:
if (line and '\n' == line[-1]):
line = line[:-1]
print(line, file=fh_a)
os.remove(text_fname)
elif os.path.exists(text_fname):
os.rename(text_fname, text_fname + ".previous")
[docs]def backup_trajectories(fname):
fname_previous = fname + ".previous"
if os.path.exists(fname_previous):
raise IOError(fname_previous + " already exists...")
elif os.path.exists(fname):
os.rename(fname, fname_previous)
[docs]def concatenate_trajectories(trajectories):
"""
Concatenate a list of previously backed-up trajectory files (or dirs) with
the respective current ones.
:type trajectories: list[str]
:param trajectories: A list of file names. Each name should have the suffix
of either "_trj.previous" or ".xtc.previous". Elements without such a
suffix will be ignored without error. These are the names of the
backed-up trajectories. The current trajectories should have the same
basenames, but not required to be of the same format as the previous
ones'. This means we can concatenate a DTR trajectory with a XTC
trajectory. The final (complete) trajectories will be in the same
formats as the current ones'.
"""
from schrodinger.application.desmond.packages import traj
part1_fnames = []
part1_formats = []
part2_fnames = []
for fname in trajectories:
for suffix, fmt in [('_trj.previous', traj.Fmt.DTR),
('.xtc.previous', traj.Fmt.XTC)]:
if fname.endswith(suffix):
if os.path.exists(fname):
if os.path.getsize(fname):
traj_name0 = fname.replace(suffix, '_trj')
traj_name1 = fname.replace(suffix, '.xtc')
if os.path.exists(traj_name0):
part1_fnames.append(fname)
part1_formats.append(fmt)
part2_fnames.append(traj_name0)
elif os.path.exists(traj_name1) and os.path.getsize(
traj_name1):
part1_fnames.append(fname)
part1_formats.append(fmt)
part2_fnames.append(traj_name1)
else:
# `fname[:9]` drops the ".previous" suffix.
os.rename(fname, fname[:-9])
else:
# Remove empty file
os.unlink(fname)
break
for part1_fname, part1_format, part2_fname in \
zip(part1_fnames, part1_formats, part2_fnames):
part1_traj = traj.read_traj(part1_fname, part1_format)
part2_traj = traj.read_traj(part2_fname)
complete_traj = traj.merge(part1_traj, part2_traj)
if complete_traj:
# part2_fname may be absolute path, so only append ".tmp"
fmt = traj.get_fmt(part2_fname)
tmp_fname = part2_fname + ".tmp"
traj.write_traj(complete_traj, tmp_fname, format=fmt)
# Removes the current trajectory
try:
os.remove(part2_fname)
except OSError:
# Close the trajectory, otherwise the folder may
# not be able to be deleted
if len(part2_traj) > 0:
part2_traj[0].source().close()
shutil.rmtree(part2_fname)
# Renames the tmp file to the current trajectory file/dir's name.
os.rename(tmp_fname, part2_fname)
# Removes the previous trajectory.
try:
os.remove(part1_fname)
except OSError:
# Close the trajectory, otherwise the folder may
# not be able to be deleted
if len(part1_traj) > 0:
part1_traj[0].source().close()
shutil.rmtree(part1_fname)
[docs]class SimulateBase(cmj.StageBase):
"""
A stage that can launch desmond simulations.
"""
RESTARTABLE = True
DESMOND_CMD = os.path.join(envir.CONST.SCHRODINGER, "desmond")
CFG_SIM_NAME = "mdsim"
parameter_string = string.Template("""
DATA = {
jobname = "$$MAINJOBNAME_$$STAGENO$$[_lambda$$LAMBDA$$]"
dir = "$$[$$JOBPREFIX/$$]$$[$$PREFIX/$$]$$MAINJOBNAME_$$STAGENO$$[_lambda$$LAMBDA$$]"
host = "$$SUBHOST"
fep.type = $small_molecule
jin_file = []
jin_must_transfer_file= []
jout = ""
cfg_file = ""
window = ?
atom_group = none
restraints.new = []
restraints.existing = $restraint_ignore
print_restraint = false
print_expected_memory = false
jlaunch_opt= [""]
}
VALIDATE = {
host = {type = str range = [1 10000000000]}
fep.type = {type = enum range = [$all_fep_types]}
cfg_file = {type = str range = [0 10000000000]}
jin_file = [
{type = str range = [0 10000000000]}
{type = list size = 0
elem = {type = str range = [0 10000000000]}}
]
jin_must_transfer_file = {type = list size = 0 elem = {type = str range = [0 10000000000]}}
jout = [
{type = str range = [0 10000000000]}
{type = list size = 0
elem = {type = str range = [0 10000000000]}}
]
window = [
{type = none}
{type = int0 _check = check_iwindow}
{type = list size = 0 elem = {type = int0 _check = check_iwindow}}
]
atom_group = [
{type = enum range = [retain none]}
{atom = {type = str range = [0 10000000000]}
name = {type = str range = [0 10000000000]}
index = {type = int range = [0 255]}
}
{type = list size = 0
elem = {atom = {type = str range = [0 10000000000]}
name = {type = str range = [0 10000000000]}
index = {type = int range = [0 7]}
}
}
]
restraints.existing = {type = enum range = [$existing_restraint]}
restraints.new = [ {type = list}
{_skip = all}
]
print_restraint = {type = bool}
print_expected_memory = {type = bool}
jlaunch_opt.black_list = ["!append!" "-exec" "-PROCS" "-in" "-c" "-overwrite"]
}
""").substitute(restraint_ignore=constants.EXISTING_RESTRAINT.IGNORE,
small_molecule=constants.FEP_TYPES.SMALL_MOLECULE,
all_fep_types=" ".join(constants.FEP_TYPES),
existing_restraint=" ".join(constants.EXISTING_RESTRAINT))
PARAM = cmj._create_param_when_needed([
config.MD,
parameter_string,
])
[docs] def __init__(self, *arg, **kwarg):
"""
"""
cmj.StageBase.__init__(self, *arg, **kwarg)
self._cfg = None
[docs] def migrate_param(self, param: sea.Map):
"""
The Berendsen method is no longer supported (it was CPU only). If
we encounter it we automatically change it to Langevin, which was the
behavior before it was removed via the gpu effect_if param.
:param param: a param corresponding to this stage class to migrate
"""
try:
if param.ensemble.method.val == 'Berendsen':
warnings.warn(
DeprecationWarning(
"Berendsen thermostat is not supported in GPU Desmond, "
"Langevin will be used instead. Using 'Berendsen' will "
"result in an error in a future release."))
param.ensemble.method.val = 'Langevin'
except AttributeError:
# possible the ensemble attribute does not exist or is not a map
# with method attribute, ie with Concatenate stage
pass
[docs] def prestage(self):
# Updates the parameters set in the config file.
# Needs to run before crunch().
self._update_cfg()
def _update_model_snapcore(self, model: cms.Cms, job: DesmondJob):
"""
If snapcore is not None, update the model to match the snapcore coordinates.
"""
is_alchemical = cms.get_model_system_type(
model.comp_ct) == constants.SystemType.ALCHEMICAL
snapcore = getattr(job, 'snapcore', None)
if is_alchemical and snapcore is not None:
(ref_ct, mut_ct) = model.get_fep_cts()
if job.i_win < job.n_win // 2:
ref_ct.setXYZ(snapcore.fh_ref_coord)
mut_ct.setXYZ(snapcore.fh_mut_coord)
else:
ref_ct.setXYZ(snapcore.sh_ref_coord)
mut_ct.setXYZ(snapcore.sh_mut_coord)
# we don't want to use any existing restrains or their reference
# coordinates since we've changed the positions
model.clear_restrain()
model.synchronize_fsys_ct()
def _print_expected_memory(self, model: cms.Cms, job: DesmondJob):
# Print the predicted memory utlization for this job
if self.param.print_expected_memory.val and job.i_win == 0:
cpu_mem, gpu_mem = util.predict_memory_utilization(
self.param.fep.type.val, model.fsys_ct.atom_total, job.n_win)
self._print(
"quiet",
f"Expected maximum CPU Memory (GB): {cpu_mem/1024:.2f}")
self._print(
"quiet",
f"Expected maximum GPU Memory (GB): {gpu_mem/1024:.2f}")
def _get_model(self, job: DesmondJob, fname: os.PathLike, param: sea.Map):
from schrodinger.application.desmond.packages.restraint import \
RestraintBuilder
fname = Path(fname)
if not fname.is_absolute() and not fname.is_file():
# If we do not find the file in the current directory, we try the base directory.
fname = Path(cmj.ENGINE.base_dir) / fname.name
model = cms.Cms(file=fname)
model = SystemBuilder.set_atom_group(model, param.atom_group,
job.permanent_group)
self._update_model_snapcore(model, job)
self._print_expected_memory(model, job)
model = SystemBuilder.set_restrain(model, param.restrain,
job.permanent_restrain, job)
restraint_builder = RestraintBuilder(param.restraints.new,
param.restraints.existing.val,
model)
restraint_builder.addRestraints()
if param.print_restraint.val:
# Skip printing posre_harm, since it can be very large
self._print(
"quiet",
restraint_builder.getString(skip_tables={'posre_harm'},
compact=True))
return model
def _write_cms(self, jobname, model):
"""
"""
cms_fname = jobname + "-in.cms"
# Use compressed input if output cms is compressed
if isinstance(self.param.maeff_output, sea.Map) and \
self.param.maeff_output.val.get('name') and \
self.param.maeff_output.name.val.endswith('gz'):
cms_fname += '.gz'
model.write(cms_fname)
return cms_fname
@staticmethod
def _incfg_name(jobname):
return jobname + "-in.cfg"
@staticmethod
def _outcfg_name(jobname):
return jobname + "-out.cfg"
def _write_cfg(self,
jobname,
task,
add_tag=config.TAG_SPECS.md,
tag="MD",
param=None):
"""
"""
if (param is None):
param = copy.deepcopy(config.get_default_setting(
self.param)).update(self.param)
else:
param = copy.deepcopy(param)
if (task == "regular"):
if ("fep" in param):
del param["fep"]
if (task == "afep"):
if (not param.energy_group.has_tag("setbyuser")):
param["energy_group"] = True
config.tag_sim_map(param, add_tag)
cfg_fname = self._incfg_name(jobname)
fh = open(cfg_fname, "w")
print(param.__str__(tag=tag), file=fh)
fh.close()
return cfg_fname
def _update_cfg(self):
if self.param.cfg_file.val:
base_dir = cmj.ENGINE.base_dir
# Saves explicit settings in `self.param'.
explicit_setting = sea.sea_filter(self.param, "setbyuser")
# Updates `self.param' from the file
self.param.update(
file=os.path.join(base_dir, self.param.cfg_file.val))
# Applies the explicit ones back.
self.param.update(explicit_setting, tag="setbyuser")
def _get_output_names(self,
param: sea.Map) -> (Optional[str], Optional[str]):
"""
Parse param to get names of output files.
:return: the cms and trajectory output filenames
"""
fname_out = (None if isinstance(param.maeff_output, sea.Atom) else
param.maeff_output.name.val)
traj_name = (None if (config.is_minimize(param) or isinstance(
param.trajectory, sea.Atom)) else param.trajectory.name.val)
return fname_out, traj_name
def _get_input(self, default_model: cms.Cms, pj: DesmondJob, param: sea.Map,
jobname: str) -> ([str], [str], [cms.Box]):
"""
Create input files as needed and return a list of the filenames created
This function can modify `param`, so should be executed before anything
that depends on `param`.
:param default_model: The base model from previous job, can be
superseded in subclasses
:param pj: The previous job
:param param: Settings for this job. Can be updated
:param jobname: The jobname of this job
"""
fname_in = self._write_cms(jobname, default_model)
return [fname_in], [], [default_model.box]
def _get_device_args(self, pj, num_allotted_devices):
if pj.task in ("fep", "afep") and num_allotted_devices > 1:
# explicitly set the device so we can handle nodes with
# Default Compute Mode and nodes with CUDA MPS (cases where
# CUDA allows >1 process to acquire the same GPU)
# GPUs will be assigned in round-robin fashion.
device = pj.i_win % num_allotted_devices
return ['-device', str(device)]
return []
def _add_eneseq(self, new_job: DesmondJob, param: sea.Map):
if (isinstance(param.eneseq, sea.Map) and
param.time.val > param.eneseq.first.val):
new_job.output.add(os.path.abspath(param.eneseq.name.val),
tag="ENESEQ")
def _get_working_param(self):
"""
Extracted for easier mocking
:return: a copy of self.param with the default values filled in
"""
return copy.deepcopy(config.get_default_setting(self.param)).update(
self.param)
def _set_box(self, param: sea.Map, boxes: List[cms.Box]):
"""
Update the param to set the box information.
"""
for box in boxes:
param.box = sea.List(box)
param.box.add_tag('setbyuser')
@run_stage_method_in_parallel
def _create_job(self, pre_job=[], job_param=[]): # noqa: M511
"""
We're going to loop through pre_job/job_param and do a few things here
For each pre job/job_param
- Create a job directory and create/copy the necessary inputs to that
directory
- Create a Desmond job launching command, with the inputs specified by
-in/-jin keywords, and the outputs that Desmond does not automatically
determine by -jout keywords, as well as additional keywords.
- Use that command to create a new DesmondJob object
- Update the inputs and outputs on the DesmondJob object - these should
correspond to at least a subset, if not all, of the inputs and outputs
that the backend will be made aware of via -in/-jin/-jout keywords.
These inputs and outputs will be used to check the success of a job,
archive directories and copy files back to the parent launch directory,
and expose inputs/outputs to other stage code, but are not involved in
jobcontrol registration for the jobs submitted (ie the jobcontrol
registration only goes upwards in job launch hierarchy, not downwards).
# TODO Integrate cmj.Job inputs/outputs and jlaunch command
-in/-jin/-jout args into a class that maintains coherence between the
two (this will require importing desmond driver code that determines
plugin output)
"""
base_dir = Path(cmj.ENGINE.base_dir)
num_allotted_devices = self._get_num_allotted_devices()
new_jobs = []
for iii, pj in enumerate(pre_job):
self._print("debug", f"Job task: {pj.task}")
# get and modify param
param = self._get_working_param()
if job_param:
param.update(job_param[iii])
if config.is_meta(param) and param.meta_file.val:
param.meta_file.val = os.path.join(base_dir,
param.meta_file.val)
param.meta_file.add_tag("setbyuser")
# Sets i_window.
if pj.task in ["fep", "afep"]:
param.fep.i_window.val = pj.i_win
# modify macro dict for this job and get directory which depends
# on macro values
macro_dict = {
"$LAMBDA": str(pj.i_win),
} if (pj.task in [
"fep",
"afep",
]) else {}
jobname, jobdir = self._get_jobname_and_dir(pj, macro_dict)
# TODO why do we do this?? some job types don't even use default_model.
# we should let _get_input decide if default_model is necessary, and raise
# an error if it cannot be loaded.
try:
default_model = self._get_model(pj, pj.output.struct_file(),
param)
except RuntimeError as e:
self._print("quiet", e)
continue
new_job = self._create_single_job_common(pj, default_model, jobname,
jobdir, param,
num_allotted_devices)
new_job.other_parent = pj.other_parent
new_jobs.append(new_job)
return new_jobs
def _create_single_job_common(self, pj: DesmondJob, default_model: cms.Cms,
jobname: str, jobdir: str, param: sea.Map,
num_allotted_devices: int) -> DesmondJob:
"""
Given a previous job, model, and job params, create a job.
:param pj: The previous input job
:param default_model: The cms model to use as input. Can be overridden
by subclass implementation of `_get_input`
:param jobname: The name for the job
:param jobdir: The directory for the job
:param param: The working param corresponding to this job, with any
per-job customizations applied.
:param num_allotted_devices: The number of devices allotted to this job
:return: The created job
"""
# Makes the directory.
if not os.path.isdir(jobdir):
os.makedirs(jobdir)
util.chdir(jobdir)
# creates and writes input files and returns filenames
# can modify param so must be done before write_cfg
# TODO can we make it so get_input does not modify param?
# only happens in GCMC to the buffer width
input_cms_fnames, additional_inputs, boxes = self._get_input(
default_model, pj, param, jobname)
self._set_box(param, boxes)
# modifications to param complete, now we write config
fname_cfg = self._write_cfg(jobname, pj.task, param=param)
# Constructs the job-launching command.
jlaunch_cmd = [
self.DESMOND_CMD, "-JOBNAME", jobname, "-overwrite", "-c", fname_cfg
]
# get inputs and add them to jlaunch command
for fname in input_cms_fnames:
jlaunch_cmd += ["-in", fname]
jlaunch_cmd += self._get_additional_in_out_args(param,
additional_inputs)
# Gets number of replicas to set the '-P' option.
n_replica = config.num_replica(param, default_model, self._log)
n_cpu = param.cpu.val
try:
total_proc = param.total_proc.val
except AttributeError:
total_proc = n_cpu * n_replica
if total_proc is None:
total_proc = n_cpu * n_replica
jlaunch_cmd += ["-P", str(total_proc)]
# TODO move this to lambda hopping stage
if pj.task == "lambda_hopping":
jlaunch_cmd += [
"-fep_convergence",
str(self.param.fep_convergence.val)
]
jlaunch_cmd += self._get_device_args(pj, num_allotted_devices)
if cmj.GENERAL_LOGLEVEL == "debug":
jlaunch_cmd.append("-DEBUG")
desmond_kwargs = dict(jobname=jobname,
parent=pj,
stage=self,
dir=jobdir,
jlaunch_cmd=jlaunch_cmd,
task=pj.task,
permanent_restrain=pj.permanent_restrain,
permanent_group=pj.permanent_group)
if pj.task == "fep" or pj.task == "afep":
fepout = config.get_fep_output_filename(param)
fepout = os.path.abspath(fepout) if fepout else None
egout = config.get_energy_group_output_filename(param)
egout = os.path.abspath(egout) if egout else None
# TODO weird that we use default_model explicitly when we might
# not be using it as input
new_job = FepJob(
fepout=fepout, # dE disabled
egout=egout,
fragname=default_model.get_fragname(),
**desmond_kwargs)
else:
new_job = DesmondBackendJob(**desmond_kwargs)
self._register_job_inputs_outputs(new_job, param, input_cms_fnames)
return new_job
def _get_additional_in_out_args(self, param: sea.Map,
additional_inputs: List[str]):
"""
Get -jin and -jout command line args by parsing -jin_file,
-jin_must_transfer_file and -jout fields of `param`, and any other
explicit inputs via `additional_inputs`. These instruct the desmond
backend to register these inputs and outputs via jobcontrol.
These params should represent all of the inputs and outputs outside of
config files, input structures, and plugin outputs, which are all
handled separately.
:param param: The working param for a given job
:param additional_inputs: Other inputs which should be registered via
-jin in launch command
"""
jlaunch_args = []
jout = []
if isinstance(param.jout, sea.Atom) and "" != param.jout.val:
jout = [param.jout]
elif isinstance(param.jout, sea.List):
jout = param.jout
for e in (se.val for se in jout if se.val):
jlaunch_args.extend(["-jout", e])
jin = []
if (isinstance(param.jin_file, sea.Atom) and "" != param.jin_file.val):
jin = [param.jin_file]
elif isinstance(param.jin_file, sea.List):
jin = param.jin_file
for e in (se.val for se in jin if se.val):
if os.path.isabs(e) or os.path.exists(e):
jin_fname = e
else:
fullpath = os.path.join(cmj.ENGINE.base_dir,
os.path.basename(e))
jin_fname = util.relpath(fullpath)
jlaunch_args.extend(["-jin", jin_fname])
# TODO not sure why we need relative path for absolute paths here
# but not above?
# DESMOND-10355 filed to remove this parameter
for e in param.jin_must_transfer_file.val:
if e:
jin_fname = util.relpath(e) if os.path.isabs(e) else e
jlaunch_args.extend(["-jin", jin_fname])
for additional_input in additional_inputs:
jlaunch_args.extend(["-jin", additional_input])
return jlaunch_args
def _register_job_inputs_outputs(self, job: DesmondJob, param: sea.Map,
input_cms_fnames: List[str]):
"""
Register the output files for the job. These include the standard
cms output as well as the output of backend plugins such as trajectory
and eneseq. These outputs are not necessarily all of the outputs that
are created by the backend, but are the outputs that must be produced
for multisim to consider the job a success.
:param job: The job to add inputs and outputs to
:param param: The current working param corresponding to the given job
"""
jobname = job.jobname
job.input.add(os.path.abspath(self._incfg_name(jobname)))
# TODO why is outcfg passed as input? It is created by the job
job.input.add(os.path.abspath(self._outcfg_name(jobname)))
job.input.cms = []
for e in input_cms_fnames:
job.input.cms.append(os.path.abspath(e))
fname_out, traj_name = self._get_output_names(param)
if fname_out:
job.output.add(os.path.abspath(fname_out), check_cms_file)
if traj_name and param.time.val > param.trajectory.first.val:
traj_type = 'dir' if param.trajectory.format.val == 'dtr' else 'file'
job.output.add(os.path.abspath(traj_name),
tag="TRJ",
type=traj_type)
if config.is_meta(param):
if isinstance(param.meta, sea.Map):
job.output.add(os.path.abspath(param.meta.name.val),
tag="KERSEQ")
job.output.add(os.path.abspath(param.meta.cv_name.val),
tag="CVSEQ")
elif (isinstance(param.meta, sea.Atom) and param.meta == 'FILE' and
param.meta_file):
job.output.add(os.path.abspath(param.meta_file.val))
# currently this is a method so remd can no-op
self._add_eneseq(job, param)
fname_log = job.jobname + ".log"
job.output.add(os.path.abspath(fname_log))
def _get_snapcore_coordinates(
self, cms_fname: str) -> Optional[SnapcoreCoordinates]:
"""
Return the snapcore coordinates, that is the phase-aligned structures,
for both the reference and mutant molecules. If the fep type does not use
snapcore or this is an academic build, then return None.
:param cms_fname: Input cms filename.
"""
from schrodinger.application.desmond.frag_snap import \
get_aligned_coordinates_from_fragments
from schrodinger.application.scisol.packages.core_hopping.int_fepio import \
get_core_atoms_from_mapping
from schrodinger.application.scisol.packages.core_hopping.int_fepio import \
get_reference_coordinates_for_two_molecules
from schrodinger.application.scisol.packages.core_hopping.int_match import \
get_bonds_deleted
snapcore = self.param.fep.type.val == constants.FEP_TYPES.SMALL_MOLECULE
if not snapcore:
return None
cms_model = cms.Cms(file=cms_fname)
(ct1, ct2) = cms_model.get_fep_cts()
if ct1 is None or ct2 is None:
print('WARNING: cannot find two fep cts')
return None
(matched_atom1, matched_atom2) = get_core_atoms_from_mapping(ct2)
# Recovers the original coordinates.
(ct1_ref_coord, ct2_ref_coord) = \
get_reference_coordinates_for_two_molecules(ct1, ct2)
ct1.setXYZ(ct1_ref_coord)
ct2.setXYZ(ct2_ref_coord)
(broken_bond_ct1, broken_bond_ct2) = \
get_bonds_deleted(ct1, matched_atom1, ct2, matched_atom2)
(aligned_ct1_coord, aligned_ct2_coord) = \
get_aligned_coordinates_from_fragments(ct1, ct2,
matched_atom1, matched_atom2,
broken_bond_ct1, broken_bond_ct2)
if aligned_ct2_coord is not None and aligned_ct1_coord is not None:
return SnapcoreCoordinates(fh_ref_coord=ct1_ref_coord,
fh_mut_coord=aligned_ct2_coord,
sh_ref_coord=aligned_ct1_coord,
sh_mut_coord=ct2_ref_coord)
elif aligned_ct2_coord is not None:
return SnapcoreCoordinates(fh_ref_coord=ct1_ref_coord,
fh_mut_coord=aligned_ct2_coord,
sh_ref_coord=ct1_ref_coord,
sh_mut_coord=aligned_ct2_coord)
elif aligned_ct1_coord is not None:
return SnapcoreCoordinates(fh_ref_coord=aligned_ct1_coord,
fh_mut_coord=ct2_ref_coord,
sh_ref_coord=aligned_ct1_coord,
sh_mut_coord=ct2_ref_coord)
raise RuntimeError(
"PhaseAlign fails with both end as template molecules")
def _get_num_allotted_devices(self):
"""
Return the number of mps pipe directories, if set, otherwise the number of visible devices, if set, otherwise 0
"""
visible_devices = self._get_visible_devices()
mps_pipes = self._get_mps_pipes()
if not mps_pipes:
return len(visible_devices)
# sanity check
if not visible_devices:
raise ValueError("$SCHRODINGER_CUDA_MPS_PIPE_DIRECTORY set but "
"$(SCHRODINGER_)CUDA_VISIBLE_DEVICES unset, this "
"can result in undefined behavior")
if visible_devices != ['0']:
raise ValueError("$(SCHRODINGER_)CUDA_VISIBLE_DEVICES must have the"
" value '0' when "
"$SCHRODINGER_CUDA_MPS_PIPE_DIRECTORY is set")
return len(mps_pipes)
def _get_mps_pipes(self):
mps_pipe_str = os.environ.get('SCHRODINGER_CUDA_MPS_PIPE_DIRECTORY', '')
if mps_pipe_str:
return mps_pipe_str.split(',')
return []
def _get_visible_devices(self):
visible_devices_str = os.environ.get(
'SCHRODINGER_CUDA_VISIBLE_DEVICES') or os.environ.get(
'CUDA_VISIBLE_DEVICES')
if visible_devices_str:
return visible_devices_str.split(',')
return []
[docs] def crunch(self):
"""
"""
self._print("debug", "In Simulate.crunch")
pre_job = []
n_win = config.num_lambda_window(self.param.fep.lambda_)
for pj in self.get_prejobs():
# TODO: DESMOND-10413: This shouldn't be needed but too
# often the job is copied give it a stale state.
if hasattr(pj, 'snapcore'):
pj.snapcore = None
# In the first simulate stage the prejob is not FepJob.
# In the following stages the prejob is an FepJob,
# so not isinstance(pj, FepJob) is checking that this
# is the first simulate stage for fep.
# TODO: Can we handle this in a better way?
if ((pj.task == "fep" or pj.task == "afep") and
not isinstance(pj, FepJob)):
snapcore = None
if pj.task == "fep":
try:
snapcore = self._get_snapcore_coordinates(
pj.output.struct_file())
except ImportError:
self._print(
"quiet",
"Ligand FEP: Using academic FEP atom-matching code")
job_template = FepJob(n_win=n_win,
i_win=0,
jobname=pj.jobname,
parent=pj,
stage=pj.stage,
dir=pj.dir,
jlaunch_cmd=pj.jlaunch_cmd,
task=pj.task,
permanent_restrain=pj.permanent_restrain,
permanent_group=pj.permanent_group,
snapcore=snapcore)
if (self.param.window.val is not None):
if (isinstance(self.param.window, sea.Atom)):
job = job_template
job.i_win = self.param.window.val
job.output = pj.output
pre_job.append(job)
elif (isinstance(self.param.window, sea.List)):
for i_win in self.param.window:
job = copy.deepcopy(job_template)
job.i_win = i_win.val
job.output = pj.output
pre_job.append(job)
else:
raise ValueError(
"The value of the window parameter should be either a sea.List or sea.Atom object,"
" but we got %s" % type(self.param.window.val))
else:
for i in range(n_win):
job = copy.deepcopy(job_template)
job.i_win = i
job.output = pj.output
pre_job.append(job)
else:
if (self.param.window.val is not None):
if (isinstance(self.param.window, sea.Atom)):
if (pj.i_win == self.param.window.val):
pre_job.append(pj)
elif (isinstance(self.param.window, sea.List)):
if (pj.i_win in self.param.window.val):
pre_job.append(pj)
else:
raise ValueError(
"The value of the window parameter should be either a sea.List or sea.Atom object,"
" but we got %s" % type(self.param.window.val))
else:
pre_job.append(pj)
self.add_jobs(self._create_job(pre_job))
self._print("debug", "Out Simulate.crunch")
def _checkpoint_path(self, outcfg: sea.Map):
return outcfg.mdsim.checkpt.name.val
def _base_restore_args(self, outcfg: sea.Map,
added_time: float) -> List[str]:
sim_name = self.CFG_SIM_NAME
sim_cfg = getattr(outcfg, sim_name)
last_time = self.param.time.val + added_time
args = [
"-restore", sim_cfg.checkpt.name.val, "-cfg",
f"{sim_name}.last_time={last_time:f}", "-cfg",
f"{sim_name}.plugin.trajectory.mode=append", "-cfg",
f"{sim_name}.checkpt.interval={self.param.checkpt.interval.val:f}"
]
# Include cfg extracted from cpt if it exists
cpt_cfg_path = Path(sim_cfg.checkpt.name.val + '.cfg')
if cpt_cfg_path.exists():
args += ['-jin', str(cpt_cfg_path)]
return args
def _get_restore_args(self, outcfg: sea.Map,
added_time: float) -> ([str], [str]):
"""
Get the basic restore arguments needed for the jlaunch command,
including the checkpoint args, extra -cfg params, and any -jin and -in
args, and a list of any input files registered with -in args.
"""
sim_cfg = getattr(outcfg, self.CFG_SIM_NAME)
trj_opts = ["-jin", sim_cfg.plugin.trajectory.name.val]
return self._base_restore_args(outcfg, added_time) + trj_opts, []
def _get_outcfg(self, job: DesmondJob) -> Optional[sea.Map]:
outcfg_fname = job.input.outcfg_file()
if outcfg_fname is None:
outcfg_fname = job.input.incfg_file().replace("-in.cfg", "-out.cfg")
try:
with open(outcfg_fname, "r") as f:
return sea.Map(f.read())
except IOError:
return None
[docs] def restart_subjobs(self, jobs):
for job in jobs:
util.chdir(job.dir)
outcfg = self._get_outcfg(job)
run_from_scratch = outcfg is None or not os.path.exists(
self._checkpoint_path(outcfg))
if not getattr(job, "jlaunch_cmd_scratch", None):
job.jlaunch_cmd_scratch = copy.copy(job.jlaunch_cmd)
job.status.set(cmj.JobStatus.WAITING)
if run_from_scratch:
self._log(
"No data of this subjob were saved from the previous run.")
self._log("Will restart it from scratch.")
# Runs this subjob from scratch.
job.status.set(cmj.JobStatus.WAITING)
arg_to_drop = []
for i, e in enumerate(job.jlaunch_cmd):
if (e == "-HOST"):
arg_to_drop.append(i)
arg_to_drop.append(i + 1)
arg_to_drop.reverse()
for i in arg_to_drop:
del job.jlaunch_cmd[i]
continue
arg_to_drop = []
for i, e in enumerate(job.jlaunch_cmd):
if e in ("-in", "-c", "-HOST"):
arg_to_drop.append(i)
arg_to_drop.append(i + 1)
arg_to_drop.reverse()
for i in arg_to_drop:
del job.jlaunch_cmd[i]
added_time = getattr(job, "added_time", 0.)
restore_args, cms_fnames = self._get_restore_args(
outcfg, added_time)
job.jlaunch_cmd += restore_args
job.input.cms = cms_fnames
self._backup_data_files()
self.add_jobs(jobs)
[docs] def hook_captured_successful_job(self, job):
"""
"""
cmj.StageBase.hook_captured_successful_job(self, job)
self._concatenate_data_files(job)
@staticmethod
def _concatenate_data_files(job):
# Concatenate all .ene files.
util.chdir(job.dir)
for fname in (glob.glob("*.ene") + glob.glob("*.dat") +
glob.glob("*.txt") + glob.glob("*.dE") +
glob.glob("*.cvseq") + glob.glob("*.kerseq")):
concatenate_and_backup(fname)
# Concatenate trajectories
concatenate_trajectories(
glob.glob("*_trj.previous") + glob.glob("*.xtc.previous"))
for fname_previous in glob.glob("*.previous"):
os.rename(fname_previous, fname_previous[:-9])
def _backup_data_files(self):
for fname in (glob.glob("*.ene") + glob.glob("*.dat") +
glob.glob("*.txt") + glob.glob("*.dE") +
glob.glob("*.cvseq") + glob.glob("*.kerseq")):
concatenate_and_backup(fname)
for trjname in glob.glob('*_trj') + glob.glob('*.xtc'):
backup_trajectories(trjname)
for previous in glob.glob("*.previous"):
self._files4copy.append(os.path.abspath(previous))
[docs] def capture(self, job):
cmj.StageBase.capture(self, job)
if job.status == cmj.JobStatus.NON_RETRIABLE_FAILURE:
# Being nontriable means to be revived from the job's .cpt file.
# Revives this job, and sends it back into `QUEUE`.
self._concatenate_data_files(job)
self.restart_subjobs([
job,
])
jlaunch_opt = list(map(str, self.param.jlaunch_opt.val))
if jlaunch_opt and jlaunch_opt != [""]:
job.jlaunch_cmd += jlaunch_opt
self._job_manager.submit_jobs(cmj.QUEUE)
[docs] def pack_stage(self, force=False):
# When a job is stopped for checkpointing,
# we want to make sure the trajectories are concatenated.
# hook_captured_successful_job is not called on checkpoint.
trajectories = [
x for x in self._files4copy
if (x.endswith('_trj.previous') or x.endswith('.xtc.previous'))
]
concatenate_trajectories(trajectories)
# remove previous trajectories and non-existent files from previous runs
self._files4copy = [
x for x in self._files4copy
if (not x.endswith(('_trj.previous',
'.xtc.previous')) and Path(x).exists())
]
super(SimulateBase, self).pack_stage(force=force)
[docs]class RemovesInitialGCMCSolvent(SimulateBase):
"""
Any stage that actually does simulation should inherit from this class.
Only vrun-type stages, which rely on the consistency of the previous stage's
trajectory and the cms model for the current stage, should derive from
SimulateBase instead.
"""
def _get_model(self, job: DesmondJob, fname: os.PathLike, param: sea.Map):
model = super()._get_model(job, fname, param)
gcmc_utils.remove_inactive_solvent(model)
return model
[docs]class GCMCCapable(RemovesInitialGCMCSolvent):
"""
An extension to SimulateBase that allows adding a GCMC plugin to the
desmond simulation. Subclasses of this class become 'GCMC-capable'. If
no 'gcmc' block is passed to the class's config block, this class behaves
exactly as SimulateBase.
"""
PARAM = cmj._create_param_when_needed([
"""
# TODO remove this parameter and update checkpoint files in bpfep test
VALIDATE = {
fep.equal_water_number_for_all_windows = { type = bool }
}
""", config.GCMC
])
[docs] def __init__(self, *args, **kwargs):
super(GCMCCapable, self).__init__(*args, **kwargs)
self.is_gcmc = False
def __getstate__(self, state=None):
# Need to store is_gcmc for the checkpoint
state = super().__getstate__(state)
# Check for compatibility with old checkpoints
if hasattr(self, 'is_gcmc'):
state.is_gcmc = self.is_gcmc
return state
[docs] def check_param(self):
"""
See super for details. This subclass implementation checks if the gcmc
block was passed in by the user to determine if this stage needs to
do gcmc-functionality.
"""
ev = super(GCMCCapable, self).check_param()
# see if gcmc block explicitly set by user (ie passed in, not default)
self.is_gcmc = "gcmc" in self.param.keys(tag="setbyuser")
# if not, remove the default gcmc block
if not self.is_gcmc and "gcmc" in self.param.keys():
del self.param.gcmc
return ev
def _get_model(self, job: DesmondJob, fname: os.PathLike, param: sea.Map):
"""
See super for details. This subclass implementation adds inactive solvent
to the model if needed.
"""
model = super(GCMCCapable, self)._get_model(job, fname, param)
if not self.is_gcmc:
return model
self._print("verbose", "Getting GCMC model for fname {}".format(fname))
if not self._has_inactive_solvent(model):
self._print("verbose",
"Need to add inactive atoms to fname {}".format(fname))
solvent_param = param.gcmc.solvent
if not self._has_solvent_ct(model):
if solvent_param.s_file.val:
s_file = os.path.join(cmj.ENGINE.base_dir,
solvent_param.s_file.val)
solvent_model = cms.Cms(s_file)
else:
raise ValueError('Provided cms file has no solvent '
'candidates, you must provide a solvent '
'file.')
else:
solvent_model = model
num_copies = gcmc_utils.NUM_COPIES_DEFAULT
nc_key = 'num_copies'
if nc_key in solvent_param:
num_copies = solvent_param[nc_key].val
num_copies = self._adjust_num_copies(model, num_copies, param)
model = gcmc_utils.add_inactive_solvent(model,
solvent_model,
num_copies=num_copies)
self._print("verbose", "Added inactive solvent atoms")
# this will add gcmc ligand tags if model is FEP
tag_only_mutated = param.fep.type.val in [
constants.FEP_TYPES.LIGAND_SELECTIVITY,
constants.FEP_TYPES.PROTEIN_STABILITY,
constants.FEP_TYPES.PROTEIN_SELECTIVITY
]
model = gcmc_utils.tag_gcmc_ligand(model, tag_only_mutated)
# if there's still nothing gcmc would interpret as being a ligand
# (ie tagged atoms or a ct with `constants.MOLTYPE` == `constants.MOLTYPE.VAL.LIGAND`),
# then check if there is a supplied ligand file; if so tag nearby atoms
if not self._has_gcmc_ligand(model):
try:
if param.gcmc.ligand_file.val:
ligand_param = param.gcmc.ligand_file
else:
# this param is set by the set_family mechanism, which is in
# turn set during systype.WatermapTyper.prepare
ligand_param = param.backend.mdsim.plugin. \
SpatialActiveSite.ligand_file
except AttributeError:
pass
else:
self._tag_atoms_from_ligand_model(model, ligand_param, param)
return model
def _adjust_num_copies(self, model, num_copies, param):
"""
Perform any adjustments needed to the `num_copies` parameter that will
be passed to `gcmc_utils.add_inactive_solvent`. Subclasses can override
this method as needed but should always call the super implementation.
The base class implementation checks if the batch size backend
parameter is set and rescales as needed.
This method is only called for gcmc simulations, ie `self.is_gcmc` is
True.
:param model: The cms model that inactive solvent will be added to.
:type model: `cms.Cms`
:param num_copies: The unadjusted value of the `num_copies` parameter
:type num_copies: int
:return: The adjusted value of the `num_copies` parameter
:rtype: int
"""
# if batch size explicitly set, we raise num_copies as needed
try:
batch_size = param.backend.mdsim.plugin.gcmc.batch_size.val
except AttributeError:
pass
else:
# we need approx batch_size/2, but up to batch_size copies to
# perform insertions, plus whatever amount will be accepted.
min_num_copies = int(1.5 * batch_size)
num_copies = max(num_copies, min_num_copies)
return num_copies
def _tag_atoms_from_ligand_model(self, model: cms.Cms,
ligand_param: sea.Map, param: sea.Map):
"""
Tag the solute atoms within a cutoff (specified in
`param.gcmc.gcmc_region.region_buffer`) of the ligand_model defined
in ligand_param. Also resets the region buffer value to zero, as the
tagged atoms take the place of the buffer.
:param model: the cms model
:param ligand_param: the param specifying the ligand file
:param param: the working param corrsponding to the model
:return: the model with atoms near to the atoms in the ligand file
tagged with the 'i_gcmc_ligand' property.
"""
lig_file = os.path.join(cmj.ENGINE.base_dir, ligand_param.val)
if not os.path.isfile(lig_file):
raise IOError("ERROR: Ligand file not found: %s" % lig_file)
self._print('quiet',
'Tagging GCMC atoms using ligand file %s' % lig_file)
lig_model = None
for st in structure.StructureReader(lig_file):
if lig_model is None:
lig_model = st
else:
self._print(
'quiet',
'WARNING: More than one structure in ligand file. '
'Using the first one...')
break
if lig_model is None:
raise ValueError("No structures found in ligand file.")
lig_distance = param.gcmc.gcmc_region.region_buffer.val
model = gcmc_utils.tag_gcmc_near_ligand(model, lig_model, lig_distance)
# set the buffer to zero since we have already extended our region
# outside of the ligand
param.gcmc.gcmc_region.region_buffer.val = 0
self._print(
"quiet", "Added GCMC ligand tag to atoms near supplied "
"ligand file")
return model
def _has_inactive_solvent(self, model):
return self._has_solvent_ct(model) and \
model.active_total != model.comp_atom_total
def _has_solvent_ct(self, model):
return self._has_ct(model, constants.CT_TYPE.VAL.SOLVENT)
@staticmethod
def _has_gcmc_ligand(model):
for ct in model.comp_ct + [model]:
if ct.property.get(constants.MOLTYPE,
'') == constants.MOLTYPE.VAL.LIGAND:
return True
for a in ct.atom:
if a.property.get(constants.GCMC_LIGAND, 0) != 0:
return True
return False
@staticmethod
def _has_ct(model, ct_type):
for st in model.comp_ct:
if st.property.get(constants.CT_TYPE, '') == ct_type:
return True
return False
def _add_eneseq(self, new_job: DesmondJob, param: sea.Map):
super()._add_eneseq(new_job, param)
if self.is_gcmc:
new_job.output.add(os.path.abspath(param.gcmc.ene_name.val),
tag="GCMC_ENESEQ")
def _write_cfg(self,
jobname,
task,
add_tag=config.TAG_SPECS.md,
tag="MD",
param=None):
"""
See super for details. This subclass implementation adds tags to
gcmc-specific keys to make sure they are also written out if the stage
has a gcmc block.
"""
if self.is_gcmc:
add_tag = copy.deepcopy(add_tag)
add_tag.keys.extend(config.TAG_SPECS.gcmc.keys)
return super(GCMCCapable, self)._write_cfg(jobname, task, add_tag, tag,
param)
[docs]class Simulate(GCMCCapable):
"""
This class defines the GCMC-capable simulate stage, which can be thought
of as the basic simulate stage, and is what the user will get when they
use the name `simulate`. This is just a thin wrapper to associate
the name 'simulate' with a corresponding class, even though it is
otherwise identical to GCMCCapable.
Edits to basic simulate functionality should be made to SimulateBase,
not here. This class should be subclassed if GCMC support is needed,
otherwise subclass SimulateBase
"""
NAME = "simulate"
[docs]class ReplicaExchange(Simulate):
"""
"""
NAME = "replica_exchange"
CFG_SIM_NAME = "remd"
PARAM = cmj._create_param_when_needed([
config.REMD,
"""
DATA = {
total_proc = ?
}
VALIDATE = {
jlaunch_opt.black_list = ["!append!" "-t"]
total_proc = [
{type = none}
{type = int1}
]
}
""",
])
[docs] def __init__(self, *arg, **kwarg):
"""
"""
super(ReplicaExchange, self).__init__(*arg, **kwarg)
self.natoms_max = None # used only when gcmc is enabled
def __getstate__(self, state=None):
# Need to store natoms_max for the checkpoint
state = super().__getstate__(state)
# Check for compatibility with old checkpoints
if hasattr(self, 'natoms_max'):
state.natoms_max = self.natoms_max
return state
def _write_cfg(self,
jobname,
task,
add_tag=config.TAG_SPECS.remd,
tag="REMD",
param=None):
"""
"""
return super(ReplicaExchange, self)._write_cfg(jobname, task, add_tag,
tag, param)
def _set_box(self, param: sea.Map, boxes: List[cms.Box]):
"""
Update the param to set the box information.
"""
for replica, box in zip(param.replica, boxes):
replica.box = sea.List(box)
replica.box.add_tag('setbyuser')
def _get_output_names(self, param: sea.Map):
"""
We return None here because we will instead register outputs onto the
job parents during hook_captured_successful_job. (I am not sure of the
specific reason for this -Ellery)
"""
# FIXME: It will be nice if we can set `fname_out' and `traj_name' to
# be lists of file (or dir) names.
return None, None
def _get_device_args(self, pj, num_allotted_devices):
# Device is not needed, handled internally based on MPI rank for
# replica exchange jobs
return []
def _get_input(self, default_model: cms.Cms, pj: DesmondJob, param: sea.Map,
jobname: str) -> ([str], [str], [cms.Box]):
"""
See super method for documentation
"""
if isinstance(param.replica, sea.Map):
return [self._write_cms(jobname,
default_model)], [], [default_model.box]
else:
assert isinstance(param.replica, sea.List)
replicas_with_idx = list(enumerate(param.replica))
cms_fnames_and_boxes = self._get_replica_input(
replicas_with_idx, default_model, pj, param, jobname)
cms_fnames = [result[0] for result in cms_fnames_and_boxes]
boxes = [result[1] for result in cms_fnames_and_boxes]
return cms_fnames, [], boxes
@run_stage_method_in_parallel
def _get_replica_input(self, replicas_with_idx: List[Tuple[int, sea.Map]],
default_model: cms.Cms, pj: DesmondJob,
param: sea.Map,
jobname: str) -> List[Tuple[str, cms.Box]]:
cms_fnames_and_boxes = []
for (i, e) in replicas_with_idx:
if "model_file" in e and e.model_file.val:
model = self._get_model(
pj,
Path(cmj.ENGINE.base_dir) / e.model_file.val, param)
else:
model = default_model
fname = self._write_cms(f"{jobname}_replica_{i}", model)
cms_fnames_and_boxes.append((fname, model.box))
return cms_fnames_and_boxes
def _add_eneseq(self, new_job: DesmondJob, param: sea.Map):
# Don't register eneseq files for replica exchange jobs
return
def _adjust_num_copies(self, model, num_copies, param):
"""
Perform any adjustments needed to the `num_copies` parameter that will
be passed to `gcmc_utils.add_inactive_solvent`. Subclasses can override
this method as needed but should always call the super implementation.
The Replica Exchange subclass implementation makes sure that all
replicas will have the same total number of particles by adding
different numbers of inactive solvent to each replica.
This method is only called for gcmc simulations, ie `self.is_gcmc` is
True.
:param model: The cms model that inactive solvent will be added to.
:type model: `cms.Cms`
:param num_copies: The unadjusted value of the `num_copies` parameter
:type num_copies: int
:return: The adjusted value of the `num_copies` paramter
:rtype: int
"""
num_copies = super()._adjust_num_copies(model, num_copies, param)
if self.natoms_max is None:
if isinstance(param.replica, sea.List):
base_dir = cmj.ENGINE.base_dir
fnames = []
for i, e in enumerate(param.replica):
if "model_file" in e and e.model_file.val:
fnames.append(os.path.join(base_dir, e.model_file.val))
self.natoms_max = 0
for other_fname in fnames:
with structure.StructureReader(other_fname) as reader:
fsys_ct = next(reader)
self.natoms_max = max(fsys_ct.atom_total, self.natoms_max)
if self.natoms_max:
solvent_cts = model.get_solvent_cts()
# only one gcmc solvent is currently supported
if len(solvent_cts) != 1:
raise ValueError("GCMC systems can only have one solvent ct")
solvent_ct = solvent_cts[0]
if not len(solvent_ct.molecule):
raise ValueError("Solvent CT in GCMC system used for replica "
"exchange should always have at least one "
"molecule!")
molecule_size = len(solvent_ct.molecule[1])
# there should be no inactive atoms here, so we can use atom_total
num_copies += (self.natoms_max - model.atom_total) // molecule_size
return num_copies
def _checkpoint_path(self, outcfg: sea.Map):
return outcfg.remd.checkpt.name.val
def _get_restore_args(self, outcfg: sea.Map,
added_time: float) -> ([str], [str]):
trj_opt = []
cms_fnames = []
# TODO first version with remd.cfg is from CPU config file. Remove this
# if block and use only else block when CPU Desmond is fully removed.
if 'cfg' in outcfg.remd:
for replica in outcfg.remd.cfg:
trj_opt += ["-jin", replica.plugin.trajectory.name.val]
for replica in outcfg.remd.cfg:
bootfile = replica.boot.file.val
trj_opt += ["-in", bootfile]
cms_fnames.append(bootfile)
else:
graph = outcfg.remd.graph
nodes = graph.edges.nodes
for o in nodes:
trj_opt += [
"-jin",
graph[o.val].remd.plugin.trajectory.name.val,
]
for o in nodes:
bootfile = graph[o.val].boot.file.val
trj_opt += ["-in", bootfile]
cms_fnames.append(bootfile)
return self._base_restore_args(outcfg, added_time) + trj_opt, cms_fnames
[docs] def hook_captured_successful_job(self, job):
"""
"""
super(ReplicaExchange, self).hook_captured_successful_job(job)
job.is_output = False
# We need to figure out the number of replicas.
# User may specify a replica generation than a list of replicas. In the former case, we need to read the *-out.cfg.
incfg_fname = job.input.incfg_file()
with open(incfg_fname, "r") as fh:
incfg = sea.Map(fh.read())
replica = []
if (isinstance(incfg.replica, sea.Map)):
outcfg_fname = job.input.outcfg_file()
outcfg = sea.Map(open(outcfg_fname, "r").read())
try:
n_replica = len(outcfg.remd.cfg)
except:
n_replica = len(outcfg.remd.graph.edges.nodes)
replica = [incfg] * n_replica
else:
for e in incfg.replica:
new_cfg = copy.deepcopy(incfg)
new_cfg.update(e)
replica.append(new_cfg)
parent = job
sea.update_macro_dict({"$JOBNAME": job.jobname})
for i, e in enumerate(replica):
sea.update_macro_dict({"$REPLICA": str(i)})
newjob = copy.deepcopy(parent)
newjob.is_output = True
newjob.output = cmj.JobOutput()
newjob.input.cms = [
parent.input.cms[(i if (i < len(parent.input.cms)) else 0)]
]
fname_out = None if (isinstance(
e.maeff_output, sea.Atom)) else e.maeff_output.name.val
traj_name = e.trajectory.name.val
fname_dE = config.get_fep_output_filename(e)
fname_eg = config.get_energy_group_output_filename(e)
fname_dE = os.path.join(newjob.dir,
fname_dE) if (fname_dE) else None
fname_eg = os.path.join(newjob.dir,
fname_eg) if (fname_eg) else None
if (fname_out):
newjob.output.add(os.path.join(newjob.dir, fname_out),
check_cms_file)
if (traj_name and (e.time.val > e.trajectory.first.val)):
newjob.output.add(os.path.join(newjob.dir, traj_name),
tag="TRJ",
type="dir")
if (not isinstance(e.eneseq, sea.Atom) and
e.time.val > e.eneseq.first.val):
newjob.output.add(os.path.join(newjob.dir, e.eneseq.name.val),
tag="ENESEQ")
newjob.output.add(parent.output.log_file())
newjob.output.add(fname_dE, tag="dE")
newjob.output.add(fname_eg, tag="eg")
self.add_job(newjob)
return "dissolved"
def _get_restart_job(self,
jobs: List[DesmondJob],
task='regular') -> DesmondJob:
"""
If the previous job has finished, return a copy of it with the new
job(s) as its parents, otherwise return the new job (assumed to be
singular). Update the added_time and is_extend attributes on the
returned job.
:param task: task type for the extend job
"""
if self.filter_jobs(is_output=[False], status=[cmj.JobStatus.SUCCESS]):
# The previous replica exchange job has finished.
prev_job = self.filter_jobs(is_output=[False])[0]
job = copy.deepcopy(prev_job)
job.task = task
job.stage = self
job.parent = jobs[0]
# DESMOND-10153: Fix desmond_extend if added_time > 0
# and the previous lambda stage just finished.
# hook_captured_successful_job uses parent.parent and
# other_parent, so both must be updated here.
job.parent.parent = jobs[0]
job.other_parent = jobs[1:]
if getattr(job.parent, "is_extend", False):
# For extension jobs, clear the captured job
# so checkpoint/restart will continue from the LambdaHopping stage
self._job_manager.clear()
else:
# The previous replica exchange job has NOT finished.
job = jobs[0]
# Used for extending replica exchange jobs
job.added_time = getattr(jobs[0], "added_time", 0.0)
job.is_extend = getattr(jobs[0], "is_extend", False)
return job
[docs] def restart_subjobs(self, jobs):
if not jobs:
return
jobs = [self._get_restart_job(jobs)]
super().restart_subjobs(jobs)
self.add_job(*jobs)
[docs]class LambdaHopping(ReplicaExchange):
"""
"""
NAME = "lambda_hopping"
PARAM = cmj._create_param_when_needed([
"""
DATA = {
jobname = "$MAINJOBNAME_$STAGENO"
dir = "$[$JOBPREFIX/$]$[$PREFIX/$]$MAINJOBNAME_$STAGENO"
fep.output.name = "$JOBNAME$[_replica$REPLICA$].dE"
should_sync = yes
solute_tempering = off
restraints.existing = %s
restraints.new = []
total_proc = ?
fep_convergence = 0.0
}
VALIDATE = {
solute_tempering = [
{type = bool0}
{atom = {type = str1}
temperature = [
{generator = {type = enum range = [PvdS]}
highest_temperature = {type = float+ }
exchange_probability = {type = float0_1}
start_win = {type = int0}
end_win = {type = int}
_mapcheck = check_remd_temp_generator2
}
{type = list size = -1 elem = {type = float+}}
]
}
]
total_proc = [
{type = none}
{type = int1}
]
fep_convergence = {type = float+}
}
restraints.existing = {type = enum range = [%s]}
restraints.new = [ {type = list}
{_skip = all}
]
print_restraint = {type = bool}
""" % (constants.EXISTING_RESTRAINT.IGNORE, " ".join(
constants.EXISTING_RESTRAINT)),
])
[docs] def __init__(self, *arg, **kwarg):
"""
"""
super(LambdaHopping, self).__init__(*arg, **kwarg)
# FIXME: The 'parent' and 'other_parent' business might make the whole thing complicated than necessary. We should enable
# the fep_analysis stage to directly handle a lambda_hopping subjob, instead of faking a lambda_hopping subjob
# as a series of normal MD jobs.
[docs] def crunch(self):
"""
"""
self._print("debug", "In LambdaHopping.crunch")
from .. import remd
has_lhjob = False
fep_job = {}
for pj in self.get_prejobs():
if isinstance(pj, FepJob):
fep_job.setdefault(pj.fepid, []).append(pj)
elif isinstance(pj, DesmondJob) and pj.task == "lambda_hopping":
pre_fepjob = [pj.parent] + pj.other_parent
pre_fepjob.sort(key=lambda x: x.i_win)
self.param["replica"] = sea.List()
replica = sea.Map("model_file = '' fep.i_window = 0")
for e in pre_fepjob:
replica.model_file.val = e.output.struct_file()
replica.fep.i_window.val = e.i_win
self.param.replica.append(replica)
has_lhjob = True
else:
self._log(
"WARNING: Simulation performed by %s seems not a FEP simulation."
% str(pj))
self._log(
"WARNING: Ignore this job for lambda hopping simulation.")
if has_lhjob:
self.add_jobs(self._create_job(self.get_prejobs()))
if not fep_job:
if not has_lhjob:
self._log("No FEP job was performed.")
else:
new_pjs = []
job_params = []
for fepid in fep_job:
pre_fepjob = fep_job[fepid]
if (len(pre_fepjob) < 2):
self._log(
"WARNING: Job %s: No peer jobs for the other windows." %
str(pre_fepjob[0]))
self._log(
"WARNING: Ignore this job for lambda hopping simulation."
)
continue
pre_fepjob.sort(key=lambda x: x.i_win)
n_win = pre_fepjob[0].n_win
fragname = pre_fepjob[0].fragname
fragname = "absolute" if (not fragname) else fragname
ref_temp = self.param.temperature.val
solute_t = []
hotatoms = []
if (isinstance(self.param.solute_tempering, sea.Map)):
asl = self.param.solute_tempering.atom.val
model = cms.Cms(file=pre_fepjob[0].output.struct_file())
hotatoms = model.select_atom(asl)
if (len(hotatoms) == 0):
self._log(
"WARNING: No atoms selected by ASL expression: %s" %
asl)
else:
self._print(
'quiet',
"Hot atoms selected by ASL expression: %s" % asl)
self._print('quiet',
"Number of hot atoms: %s" % len(hotatoms))
self._print('quiet', "Hot atoms: %s" % hotatoms)
temp = self.param.solute_tempering.temperature
if (isinstance(temp, sea.List)):
if (len(temp) != n_win):
self._log(
"ERROR: Number of temperatures for solute tempering does not match number of replicas."
)
self._log(
"ERROR: Failed to turn on solute tempering on FEP [%s]"
% fragname)
continue
else:
solute_t = temp.val
else:
start_win = temp.start_win.val if 'start_win' in temp else 0
end_win = temp.end_win.val if 'end_win' in temp else -1
if end_win <= 0:
end_win = n_win
n_tot = n_win
n_win = end_win - start_win or n_tot
n_replica = (n_win + 1) // 2
if (len(hotatoms) > 0):
# TODO: Move this into `remd` module.
if ("exchange_probability" in temp):
exch = temp.exchange_probability.val
temp_ladder, prob = remd.predict_with_nreplica_and_exch(
n_replica, exch, ref_temp, model, asl)
else:
temp_range = (
ref_temp,
temp.highest_temperature.val,
)
temp_ladder, prob = remd.predict_with_temp_and_nreplica(
temp_range, n_replica, model, asl)
n = n_win // 2
temp_ladder0 = copy.copy(temp_ladder[:n])
temp_ladder.reverse()
temp_ladder = temp_ladder0 + temp_ladder
temp_ladder = [ref_temp] * (
start_win) + temp_ladder + [ref_temp
] * (n_tot - end_win)
prob = remd.get_prob_from_temp_ladder(
temp_ladder, model, asl)
self._log(
"Temperature ladder (exchange probability) for solute tempering:"
)
for e, p in zip(temp_ladder, prob):
self._log(" %.3g (%.3g)" % (
e,
p,
))
self._log(" %.3g (n/a)" % temp_ladder[-1])
self._log(
"Estimation of exchange probabilities did not take into account lambda schedule."
)
self._log(
"Actual exchange probabilities might be lower.")
solute_t = temp_ladder
self.param["replica"] = sea.List()
replica = sea.Map("model_file = '' fep.i_window = 0")
parent = []
for i, pj in enumerate(pre_fepjob):
model_file = pj.output.struct_file()
macro_dict = {
"$LAMBDA": str(pj.i_win),
}
jobname, dir = self._get_jobname_and_dir(pj, macro_dict)
if solute_t:
model = cms.Cms(file=model_file)
model_file = sea.Atom(
"$JOBNAME_st_lambda$LAMBDA-in.cms").val
remd.write_ff_solute_tempering(
model, model_file,
"asl:atom.num " + str(hotatoms)[1:-1],
ref_temp / solute_t[i])
replica.model_file.val = model_file
replica.fep.i_window.val = pj.i_win
self.param.replica.append(replica)
parent.append(pj)
pj = pre_fepjob[0]
new_pj = DesmondBackendJob(
jobname=pj.jobname,
parent=parent[0],
stage=pj.stage,
dir=pj.dir,
jlaunch_cmd=None,
task="lambda_hopping",
permanent_restrain=pj.permanent_restrain,
permanent_group=pj.permanent_group)
new_pj.other_parent = parent[1:]
new_pj.output = pj.output
# customize fep.output.first and fep.output.interval
# to disable gibbs plugin output (DESMOND-10303)
# using `job_param` argument of `self._create_job`
# (keep `self.param` unchanged to be able to
# recover "fep.output.name" in
# `self.hook_capture_successful_job`; see
# `config.get_fep_output_filename` for details)
new_job_param = sea.Map(
"fep { output { first = inf interval = inf } }")
new_job_param.add_tag("REMD", propagate=True)
new_pjs.append(new_pj)
job_params.append(new_job_param)
new_desmond_jobs = self._create_job(new_pjs, job_param=job_params)
for job in new_desmond_jobs:
# the DesmondBackendJobs created here are not the stage's
# output. The output jobs are created in
# hook_captured_successful_job
job.is_output = False
self.add_jobs(self._create_job(new_pjs, job_param=job_params))
self._print("debug", "Out LambdaHopping.crunch")
def _get_restart_job(self, jobs: List[DesmondJob]) -> DesmondJob:
"""
If the previous job has finished, return a copy of it with the new
job(s) as its parents, otherwise return the new job (assumed to be
singular). Update the added_time and is_extend attributes on the
returned job.
"""
return super()._get_restart_job(jobs, task='lambda_hopping')
[docs] def restart_subjobs(self, jobs):
if not jobs:
return
job = self._get_restart_job(jobs)
outcfg = self._get_outcfg(job)
if outcfg is None or not Path(job.dir,
self._checkpoint_path(outcfg)).exists():
# The previous lambda-hopping job has NOT finished, and no data of
# the job were saved.
# We have to rerun the LH job from scratch.
self._log(
"No data of this subjob were saved from the previous run.")
self._log("Will restart it from scratch.")
self.crunch()
return
if not getattr(job, "jlaunch_cmd_scratch", None):
job.jlaunch_cmd_scratch = copy.copy(job.jlaunch_cmd)
job.status.set(cmj.JobStatus.WAITING)
util.chdir(job.dir)
job.jlaunch_cmd = [
self.DESMOND_CMD, "-JOBNAME", job.jobname, "-overwrite",
"-fep_convergence",
str(self.param.fep_convergence.val)
]
try:
n_replica = len(outcfg.remd.cfg)
except AttributeError:
n_replica = len(outcfg.remd.graph.edges.nodes)
total_proc = self.param.total_proc.val
if total_proc is None:
param = self._get_working_param()
total_proc = param.cpu.val * n_replica
job.jlaunch_cmd += ["-P", str(total_proc)]
arg_to_drop = []
for i, e in enumerate(job.jlaunch_cmd_scratch):
if e == "-P":
arg_to_drop.append(i)
arg_to_drop.append(i + 1)
arg_to_drop.reverse()
for i in arg_to_drop:
del job.jlaunch_cmd_scratch[i]
job.jlaunch_cmd_scratch += ["-P", str(total_proc)]
restore_args, cms_fnames = self._get_restore_args(
outcfg, job.added_time)
job.jlaunch_cmd += restore_args
job.input.cms = cms_fnames
self.param["replica"] = sea.List()
for i in range(n_replica):
replica = sea.Map("model_file = '' fep.i_window = 0")
replica.fep.i_window.val = i
self.param.replica.append(replica)
if cmj.GENERAL_LOGLEVEL == "debug":
job.jlaunch_cmd.append("-DEBUG")
job.jlaunch_cmd_scratch.append("-DEBUG")
self._backup_data_files()
fname_log = job.jobname + ".log"
job.output = cmj.JobOutput()
job.output.add(os.path.abspath(fname_log))
job._jctrl_hist = []
job.stage = weakref.proxy(self)
self.add_job(job)
[docs] def hook_captured_successful_job(self, job):
"""
"""
Simulate.hook_captured_successful_job(self, job)
job.is_output = False
outcfg_fname = job.input.outcfg_file() or job.input.incfg_file(
).replace("-in.cfg", "-out.cfg")
outcfg = sea.Map(open(outcfg_fname, "r").read())
deltaE_fname = outcfg.remd.deltaE.name.val
exchange_dat_fname = outcfg.remd.exchange_dat.name.val
dE = {}
parent = [job.parent.parent] + job.other_parent
replica = config.get_replica_setting(self.param)
sea.update_macro_dict({"$JOBNAME": job.jobname})
for e in parent:
i = e.i_win
sea.update_macro_dict({"$REPLICA": str(i)})
e = copy.deepcopy(e)
e.is_output = True
# Clear this out!
e.input.__dict__['_file'] = {}
cfg = replica[i]
e.dir = job.dir
e.jobname = job.jobname
e.output = cmj.JobOutput()
e.stage = weakref.proxy(self)
# Used for extending lambda hopping jobs
e.added_time = getattr(job, "added_time", 0.0)
e.is_extend = getattr(job, "is_extend", False)
fname_out = cfg.maeff_output.name.val
traj_name = cfg.trajectory.name.val
fname_dE = config.get_fep_output_filename(cfg)
fname_eg = config.get_energy_group_output_filename(cfg)
fname_dE = os.path.join(e.dir, fname_dE) if (fname_dE) else None
fname_eg = os.path.join(e.dir, fname_eg) if (fname_eg) else None
e.input.cms = [job.input.cms[i]]
if fname_out:
e.output.add(os.path.join(e.dir, fname_out), check_cms_file)
if traj_name and (cfg.time.val > cfg.trajectory.first.val):
e.output.add(os.path.join(e.dir, traj_name),
tag="TRJ",
type="dir")
if (not isinstance(cfg.eneseq, sea.Atom) and
cfg.time.val > cfg.eneseq.first.val):
e.output.add(os.path.join(e.dir, cfg.eneseq.name.val),
tag="ENESEQ")
e.output.add(job.output.log_file())
if exchange_dat_fname:
e.output.add(exchange_dat_fname)
e.output.add(fname_dE, tag="dE")
e.output.add(fname_eg, tag="eg")
e.input.add(os.path.abspath(outcfg_fname))
e.status.set(cmj.JobStatus.SUCCESS)
self.add_job(e)
dE[i] = fname_dE
# Because the gibbs plugin does not handle REST scaling, here we are
# overwriting the .dE plugin output with deltaE file differences
deltaE.gen_dE_files(deltaE_fname, dE)
[docs]class DesmondExtend(cmj.StageBase):
"""
"""
NAME = "desmond_extend"
PARAM = cmj._create_param_when_needed([
"""
DATA = {
added_time = 0
}
VALIDATE = {
added_time = {type = float+}
}
""",
])
[docs] def crunch(self):
self._print("debug", "In DesmondExtend.crunch")
# If restarting a partially complete extend,
# set up the stage without changing the job state.
if getattr(self, 'should_restart_extend', False):
self._print("debug", "In DesmondExtend.crunch restart")
self.should_restart_extend = False
self._is_shown = False
self._PREV_STAGE._is_packed = False
# Need to call prestage to set up the stage
self._PREV_STAGE.prestage()
self._PREV_STAGE.release(True)
self._print("debug", "Out DesmondExtend.crunch restart")
return
if (0 == self.param.added_time.val):
self.add_jobs(self.get_prejobs())
else:
should_extend = False
for pj in self.get_prejobs():
if getattr(pj, "is_extend", False):
self.add_job(pj)
else:
if getattr(pj, "added_time", 0):
pj.added_time += self.param.added_time.val
else:
pj.added_time = self.param.added_time.val
# Mark this as an extension job so this stage
# will only run once
pj.is_extend = True
pj.status.set(cmj.JobStatus.WAITING)
pj.old = False
should_extend = True
if should_extend:
prev_stage = self._PREV_STAGE
self._is_shown = False
prev_stage._is_packed = False
# Need to call prestage to set up the stage
prev_stage.prestage()
prev_stage.release(True)
self._print("debug", "Out DesmondExtend.crunch")
[docs] def hook_captured_successful_job(self, job):
"""
Run after successfully completing this stage.
:param job: Job object representing the captured job
:type job: `FepJob`
"""
super(DesmondExtend, self).hook_captured_successful_job(job)
# Once the lambda hopping job is complete, reset the extend marker
# so the job can be extended again by calling the DesmondExtend stage
self.should_restart_extend = False
for prejob in self.get_prejobs():
prejob.is_extend = False
[docs] def restart_subjobs(self, jobs):
self._print("debug", "In DesmondExtend.restart_subjobs")
# If no jobs were passed to this stage but the previous stage has
# jobs released, it means the previous stage failed or was stopped.
# In this case, restart the previous stage without changing the job stage.
self.should_restart_extend = len(self.get_prejobs()) == 0 and len(
self._PREV_STAGE.filter_jobs(is_incomplete=[True]))
self._print("debug", "Out DesmondExtend.restart_subjobs")
[docs]class VrunBase(SimulateBase):
PARAM = cmj._create_param_when_needed([
config.VRUN,
"""
DATA = {
trajectory = off
checkpt = off
maeff_output = off
eneseq.interval = 0.0
}
VALIDATE = {
}
""",
])
def _write_cfg(self,
jobname,
task,
add_tag=config.TAG_SPECS.vrun,
tag="VRUN",
param=None):
"""
"""
return super()._write_cfg(jobname, task, add_tag, tag, param)
def _get_additional_in_out_args(self, param: sea.Map,
additional_inputs: List[str]):
additional_inputs.append(param.vrun_frameset.val)
return super()._get_additional_in_out_args(param, additional_inputs)
def _get_input(self, default_model: cms.Cms, pj: DesmondJob, param: sea.Map,
jobname: str) -> ([str], [str], [cms.Box]):
cms_fnames, additional_inputs, boxes = super()._get_input(
default_model, pj, param, jobname)
traj_name = self._setup_vrun_trajectory(pj, param)
return cms_fnames, additional_inputs + [traj_name], boxes
def _setup_vrun_trajectory(self, pj: DesmondJob, param: sea.Map):
# TODO this is a bit strange - we always symlink the previous
# trajectory, but we don't necessarily use it - if param.vrun_frameset
# has been set explicitly to another path, we would use that instead.
orig_traj = pj.output.get("TRJ")
traj_name = os.path.basename(orig_traj)
sea.update_macro_dict({"$FRAMESET": traj_name})
# updates param.vrun_frameset to set the traj_name
# This will $FRAMESET and set it to traj_name
param.vrun_frameset.val = param.vrun_frameset.val
# this needs to take place in the directory setup by the super call.
util.symlink(orig_traj, traj_name)
return traj_name
[docs]class Vrun(VrunBase):
"""
"""
NAME = "vrun"
PARAM = cmj._create_param_when_needed([
"""
DATA = {
skip_intermediates = False
}
VALIDATE = {
skip_intermediates = {type = bool}
}
""",
])
[docs] def __init__(self, *arg, **kwarg):
"""
"""
super(Vrun, self).__init__(*arg, **kwarg)
self._pack = "\\tar cfz"
[docs] def crunch(self):
"""
"""
self._print("debug", "In Vrun.crunch")
pre_job = []
jobs_to_vrun = []
if self.param.skip_intermediates.val:
for pj_ in self.get_prejobs():
if not isinstance(pj_, FepJob):
jobs_to_vrun.append(pj_)
self._print(
"quiet", "Warning: skip_intermediates option "
"has no effect when running Vrun "
"with non FepJobs")
continue
# skip_intermediates means we want only want to keep the jobs
# for the first and last lambda windows
if pj_.i_win == 0 or pj_.i_win == pj_.n_win - 1:
jobs_to_vrun.append(pj_)
else:
jobs_to_vrun = self.get_prejobs()
for pj_ in jobs_to_vrun:
pj = copy.deepcopy(pj_)
pj.input.cms = copy.copy(pj_.input.cms)
cms_fname = pj.output.struct_file()
pj.output.remove(cms_fname)
pj.output.add(pj.input.cms[0])
pre_job.append(pj)
self.add_jobs(self._create_job(pre_job))
self._print("debug", "Out Vrun.crunch")
if self.param.skip_intermediates.val:
# For skip intermediates workflow, the FepJobs getting passed to
# vrun will not generate dE files
for job in self.jobs:
dE_fname = job.output.get("dE")
if dE_fname:
job.output.remove(dE_fname)
[docs]class FepVrun(VrunBase):
"""
"""
NAME = "fep_vrun"
PARAM = cmj._create_param_when_needed([
"""
DATA = {
simbox = off
fep.output.interval = 0.0
time = inf
}
VALIDATE = {
}
""",
])
[docs] def __init__(self, *arg, **kwarg):
"""
"""
super(FepVrun, self).__init__(*arg, **kwarg)
# key is (fepid, i_win), value is [forward job, reversed job, current job,]
self._fepvrun_job = defaultdict(list)
self._parent = {}
self._pack = "\\tar cfz"
def _get_parent(self, parent):
"""
"""
if (parent not in self._parent):
self._parent[parent] = copy.deepcopy(parent)
return self._parent[parent]
def _create_job_vrun(self, job_request):
"""
"""
model_file, i_win, direction, parent = job_request
pj = self._get_parent(parent)
base_dir = cmj.ENGINE.base_dir
num_allotted_devices = self._get_num_allotted_devices()
# Gets the working param.
param = self._get_working_param()
# Sets i_window.
param.fep.i_window.val = i_win + (1 if (direction == "f") else
(-1 if (direction == "r") else 0))
self._print("debug", "Job task: %s" % pj.task)
macro_dict = {
"$LAMBDA": i_win,
}
jobname, jobdir = self._get_jobname_and_dir(pj, macro_dict)
jobname += "_" + direction
jobdir += "_" + direction
sea.update_macro_dict({"$JOBNAME": jobname})
model = self._get_model(pj, model_file, param)
new_job = self._create_single_job_common(pj, model, jobname, jobdir,
param, num_allotted_devices)
# TODO create FepVrunJob subclass of FepJob and give it this direction
# attribute instead of just adding it here
new_job.direction = direction
self.add_jobs([new_job, pj])
[docs] def crunch(self):
"""
"""
self._print("debug", "In FepVrun.crunch")
fep_job = defaultdict(list)
for pj in self.get_prejobs():
# TODO add cms to JobOutput/Input and to deepcopy? It's already
# referenced within the class
cms = pj.input.cms
pj = copy.deepcopy(pj)
pj.input.cms = cms
if (isinstance(pj, FepJob)):
fep_job[pj.fepid].append(pj)
else:
self._log(
"WARNING: Simulation performed by %s seems not a FEP simulation."
% str(pj))
self._log("WARNING: Ignore this job for FEP vrun.")
if not fep_job:
self._log("No FEP job was performed.")
else:
for fepid in fep_job:
pre_fepjob = fep_job[fepid]
if (len(pre_fepjob) < 2):
self._log(
"WARNING: Job %s: No peer jobs for the other windows." %
str(pre_fepjob[0]))
self._log("WARNING: Ignore this job for FEP vrun.")
continue
n_win = pre_fepjob[0].n_win
index_map = defaultdict(list)
to_be_del = []
pre_fepjob.sort(key=lambda x: x.i_win)
for i, e in enumerate(pre_fepjob):
index_map[e.i_win].append(i)
for i_win in index_map:
print(i_win, len(index_map[i_win]))
new_job = [
] # A list of (model_file, i_win, "f", parent) tuples
for i, pj in enumerate(pre_fepjob):
if i < n_win - 1:
model_file = pre_fepjob[i + 1].input.cms[0]
new_job.append((model_file, i, "f", pj))
if i > 0:
model_file = pre_fepjob[i - 1].input.cms[0]
new_job.append((model_file, i, "r", pj))
model_file = pre_fepjob[i].input.cms[0]
new_job.append((model_file, i, "o", pj))
for e in new_job:
self._create_job_vrun(e)
self._print("debug", "Out FepVrun.crunch")
[docs] def hook_captured_successful_job(self, job):
"""
"""
if (job.stage._ID != self._ID):
return
if not hasattr(job, 'direction'):
return
key = (
job.fepid,
job.i_win,
)
self._fepvrun_job[key].append(job)
alljob = []
if (job.i_win == 0 or job.i_win == job.n_win - 1):
if (2 == len(self._fepvrun_job[key])):
alljob = self._fepvrun_job[key]
else:
if (3 == len(self._fepvrun_job[key])):
alljob = self._fepvrun_job[key]
if (alljob == []):
return "dissolved"
job_f = None
job_r = None
job_o = None
for e in alljob:
if (e.direction == "f"):
job_f = e
elif (e.direction == "o"):
job_o = e
elif (e.direction == "r"):
job_r = e
util.chdir(job_o.dir)
pj = job_o.parent
enes = []
time_name, pot_e_name = eneseq_col_names = ['time', 'E_p']
for job_d in (job_r, job_o, job_f):
if not job_d:
enes.append(None)
else:
fname_eneseq = job_d.output.get("ENESEQ")
enes.append(fepana.parse_eneseq(fname_eneseq)[eneseq_col_names])
ene_r, ene_o, ene_f = enes
assert ene_o is not None, "Original job must exist for all windows"
ene_diffs = [ene_o[time_name]]
for ene in (ene_r, ene_f):
if ene is not None:
assert ene.dtype == ene_o.dtype, \
"ene files have different datatypes, likely resulting " \
"from different columns"
assert (ene[time_name] == ene_o[time_name]).all(), \
"ene timestamps differ"
ene_diffs.append(ene[pot_e_name] - ene_o[pot_e_name])
else:
ene_diffs.append(np.zeros_like(ene_o[pot_e_name]))
diff_arr = np.stack(ene_diffs, axis=-1)
# Because the gibbs plugin does not handle REST scaling, here we are
# overwriting the .dE plugin output and replacing it with vrun energy
# differences
dE_fname = os.path.basename(pj.output.get("dE"))
additional_header = "Output recreated from vrun .ene files"
deltaE.write_dE(dE_fname, diff_arr, additional_header=additional_header)
pj.output.add(os.path.abspath(dE_fname), tag="dE")
return "dissolved"
[docs]class ConcatParam(sea.Map):
[docs] def update(self, ark=None, file=None, tag=set()): # noqa: M511
# because simulate is a list, it gets overwritten during update.
# thus, we need to update each list parameter individually
if isinstance(ark, sea.Map):
if 'simulate' in ark and len(ark.simulate) == len(self.simulate):
for ark_param, sim_param in zip(ark.simulate, self.simulate):
sim_param.update(ark_param, tag=tag)
self.simulate.add_tag(tag, propagate=False)
ark = copy.deepcopy(ark)
del ark.simulate
super(ConcatParam, self).update(ark, file, tag)
[docs]class Concatenate(Simulate):
# TODO do we want to allow gcmc in concatenated (relaxation) stages?
NAME = 'concatenate'
PARAM = cmj._create_param_when_needed([
config.CONCAT,
])
[docs] def __init__(self, *args, **kwargs):
self._param = None
super(Concatenate, self).__init__(*args, **kwargs)
@property
def param(self):
return self._param
@param.setter
def param(self, value):
self._param = value
if value is None:
return
self._param = ConcatParam()
self._param.simulate = sea.List()
# create a new list of blank simulate stages with default params
# and initialize self._param.simulate
for i in range(len(value.simulate)):
self._param.simulate.append(copy.deepcopy(Simulate.PARAM.DATA))
self._param.update(value)
for tag in value.tag():
self._param.update(sea.sea_filter(value, tag), tag=tag)
last_time = 0.0
for sim_param in self._param.simulate:
last_time += sim_param.time.val
# handle params that need consistency between top-level and simulate
# blocks
self._handle_restraints()
self._handle_gcmc()
self._param.time.val = last_time
self._param.time.add_tag('setbyuser')
def _get_model(self, job: DesmondJob, fname: os.PathLike, param: sea.Map):
# now verify with permanent restraints
has_permanent_restrain = bool(job.permanent_restrain)
for sim_block in self.param.simulate[1:]:
msg = cmj.restraints_incompatible(sim_block, self.param,
has_permanent_restrain)
if msg:
raise ValueError(msg)
return super()._get_model(job, fname, param)
def _handle_restraints(self):
"""
Enforce consistency between first simulate block's and top level block's
restrain(ts)?$ parameters, and check that the subsequent simulate
blocks' restrain(ts)?$ parameters are valid.
"""
# if first simulate block's restrain(ts)?$ is not 'none', and top level
# restrain(ts)?$ is not set, we overwrite top level. This will allow
# restraints to be added to the model properly.
for name, not_none in (('restrain', lambda r: r.val != 'none'),
('restraints',
lambda r: r.existing.val != 'ignore' or r.new)):
first = getattr(self._param.simulate[0], name)
if not_none(first):
stage_level = getattr(self._param, name)
if not_none(stage_level) and stage_level != first:
raise ValueError(
f"Different `{name}` cannot be set on both "
"the concatenate stage and its first simulate "
"subblock")
setattr(self._param, name, first)
# the frontend config will use the first restrain(ts)?$ only via
# `self._param.restrain(ts)?$`, the backend config will use only
# the simulate blocks restrain via
# `self._param.simulate[i].restrain(ts)?$. We ensure here
# that the first restrain is consistent between the two.
setattr(self._param.simulate[0], name,
copy.deepcopy(getattr(self._param, name)))
def check_restraints(param):
# verify for now without knowing if permanent restraints exist, we
# will verify with permanent restraints again in `_get_model`
msg = cmj.restraints_incompatible(param, self._param, None)
if msg:
raise ValueError(msg)
# check top level param (for example to ensure no explicit restraints)
check_restraints(self._param)
# ensure that the subsequent restrain blocks are valid, ie they only
# differ in force constant from the first block.
for sim_block in self._param.simulate:
check_restraints(sim_block)
def _handle_gcmc(self):
"""
Require equivalence between the GCMC parameter of the top level block
and the simulate blocks. Either the top level block or all simulate
blocks can have GCMC parameter unset, they will be updated to match the
other, but both cannot define GCMC parameter unless they are exactly
equivalent.
"""
# all simulate blocks must be consistent
get_gcmc_block = lambda param: (param.gcmc if "gcmc" in param.keys(
tag="setbyuser") else None)
sub_gcmc_blocks = [
get_gcmc_block(sim_param) for sim_param in self.param.simulate
]
all_equal = all(x == sub_gcmc_blocks[0] for x in sub_gcmc_blocks)
if not all_equal:
raise ValueError("GCMC blocks in Concatenate substages must"
" be identical")
# if top block and subblocks define GCMC, enforce consistency
# if top block or subblocks exclusively define GCMC, copy from defined
# to undefined block
top_gcmc_block = get_gcmc_block(self.param)
if top_gcmc_block:
for sim_param, sub_gcmc_block in zip(self.param.simulate,
sub_gcmc_blocks):
if not sub_gcmc_block:
sim_param.gcmc = copy.deepcopy(top_gcmc_block)
elif sub_gcmc_block != top_gcmc_block:
raise ValueError("Top level GCMC block in Concatenate "
"stage conflicts with substage GCMC block")
elif sub_gcmc_blocks and sub_gcmc_blocks[0]:
self.param.gcmc = copy.deepcopy(sub_gcmc_blocks[0])
[docs] def check_param(self):
ev = super(Concatenate, self).check_param()
if not ev.is_ok:
return ev
for sim_param in self.param.simulate:
sim_stage = Simulate()
sim_stage.param = sim_param
ev = sim_stage.check_param()
if not ev.is_ok():
return ev
return ev
def _write_cfg(self,
jobname,
task,
add_tag=config.TAG_SPECS.concat,
tag="CONCAT",
param=None):
if param is None:
# removes all tags
param = copy.deepcopy(config.get_default_setting(
self.param)).update(self.param)
i_win = param.fep.i_window.val
# necessary for getting proper tags on simulate params
# use self.param to set param on substages, then they will create their
# own param inside _write_cfg
for i, sim_param in enumerate(self.param.simulate):
sim_stage = Simulate()
sim_stage.param = sim_param
sim_stage.param.fep.i_window.val = i_win
# necessary for GCMC substages
sim_stage.check_param()
sim_cfg = sim_stage._write_cfg(jobname, task)
with open(sim_cfg) as f:
sim_param = sea.Map(f.read())
param.simulate[i] = sim_param
return super(Concatenate, self)._write_cfg(jobname, task, add_tag, tag,
param)
def _effect(self, param):
if 'simulate' in param:
for i, sim_param in enumerate(param.simulate):
if isinstance(sim_param.effect_if, sea.List):
param.simulate[i] = super(Concatenate,
self)._effect(sim_param)
return super(Concatenate, self)._effect(param)
[docs] def migrate_param(self, param: sea.Map):
if 'simulate' in param:
for sim_param in param.simulate:
super().migrate_param(sim_param)
return super().migrate_param(param)
[docs]class ReInit(SimulateBase):
NAME = 'reinit'
PARAM = cmj._create_param_when_needed([config.REINIT])
def _write_cfg(self,
jobname,
task,
add_tag=config.TAG_SPECS.reinit,
tag="REINIT",
param=None):
return super()._write_cfg(jobname, task, add_tag, tag, param)
def _get_input(self, default_model: cms.Cms, pj: DesmondJob, param: sea.Map,
jobname: str) -> ([str], [str], [cms.Box]):
"""
Create input files as needed and return a list of the filenames created
This function can modify `param`, so should be executed before anything
that depends on `param`.
:param default_model: The base model from previous job, can be
superseded in subclasses
:param pj: The previous job
:param param: Settings for this job. Can be updated
:param jobname: The jobname of this job
"""
cms_fnames, additional_inputs, boxes = super()._get_input(
default_model, pj, param, jobname)
DOLLAR_RCF = "$REINIT_CONFIGURATIONS_FILE"
if param.reinit_frameset.configurations_file.val == DOLLAR_RCF:
# use parent job trajectory
prev_traj = pj.output.get("TRJ")
traj_name = os.path.basename(prev_traj)
sea.update_macro_dict({DOLLAR_RCF: traj_name})
# this replaces $REINIT_CONFIGURATIONS_FILE with `traj_name`
param.reinit_frameset.configurations_file.val = \
param.reinit_frameset.configurations_file.val
util.symlink(prev_traj, traj_name)
additional_inputs.append(traj_name)
return (cms_fnames, additional_inputs, boxes)
[docs] def crunch(self):
self._print("debug", "in reinit.crunch")
pre_jobs = []
for prejob in self.get_prejobs():
new_prejob = DesmondBackendJob(
task=prejob.task,
gid=prejob.gid,
permanent_restrain=prejob.permanent_restrain,
permanent_group=prejob.permanent_group,
jobname=prejob.jobname,
parent=prejob.parent,
stage=prejob.stage,
dir=prejob.dir,
jlaunch_cmd=None)
new_prejob.output.set_struct_file(prejob.output.struct_file())
new_prejob.output.add(prejob.output.get('TRJ'), tag='TRJ')
pre_jobs.append(new_prejob)
self.add_jobs(self._create_job(pre_jobs))
self._print("debug", "out reinit.crunch")