Source code for schrodinger.application.desmond.stage.simulate

import copy
import glob
import os
import shutil
import string
import warnings
import weakref
from collections import defaultdict
from pathlib import Path
from typing import List
from typing import Optional
from typing import Tuple

import numpy as np

from schrodinger import structure
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import config
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import deltaE
from schrodinger.application.desmond import envir
from schrodinger.application.desmond import fepana
from schrodinger.application.desmond import gcmc_utils
from schrodinger.application.desmond import util
from schrodinger.utils import sea

from .jobs import DesmondBackendJob
from .jobs import DesmondJob
from .jobs import FepJob
from .jobs import SnapcoreCoordinates
from .utils import SystemBuilder
from .utils import check_cms_file
from .utils import run_stage_method_in_parallel

__all__ = [
    'SimulateBase', 'RemovesInitialGCMCSolvent', 'GCMCCapable', 'Simulate',
    'ReplicaExchange', 'LambdaHopping', 'DesmondExtend', 'Vrun', 'FepVrun',
    'Concatenate', 'ReInit'
]


[docs]def concatenate_and_backup(text_fname): text_fname_previous = text_fname + ".previous" if os.path.exists(text_fname_previous): if os.path.exists(text_fname): with open(text_fname_previous, "a") as fh_a: with open(text_fname, "r") as fh_r: for line in fh_r: if (line and '\n' == line[-1]): line = line[:-1] print(line, file=fh_a) os.remove(text_fname) elif os.path.exists(text_fname): os.rename(text_fname, text_fname + ".previous")
[docs]def backup_trajectories(fname): fname_previous = fname + ".previous" if os.path.exists(fname_previous): raise IOError(fname_previous + " already exists...") elif os.path.exists(fname): os.rename(fname, fname_previous)
[docs]def concatenate_trajectories(trajectories): """ Concatenate a list of previously backed-up trajectory files (or dirs) with the respective current ones. :type trajectories: list[str] :param trajectories: A list of file names. Each name should have the suffix of either "_trj.previous" or ".xtc.previous". Elements without such a suffix will be ignored without error. These are the names of the backed-up trajectories. The current trajectories should have the same basenames, but not required to be of the same format as the previous ones'. This means we can concatenate a DTR trajectory with a XTC trajectory. The final (complete) trajectories will be in the same formats as the current ones'. """ from schrodinger.application.desmond.packages import traj part1_fnames = [] part1_formats = [] part2_fnames = [] for fname in trajectories: for suffix, fmt in [('_trj.previous', traj.Fmt.DTR), ('.xtc.previous', traj.Fmt.XTC)]: if fname.endswith(suffix): if os.path.exists(fname): if os.path.getsize(fname): traj_name0 = fname.replace(suffix, '_trj') traj_name1 = fname.replace(suffix, '.xtc') if os.path.exists(traj_name0): part1_fnames.append(fname) part1_formats.append(fmt) part2_fnames.append(traj_name0) elif os.path.exists(traj_name1) and os.path.getsize( traj_name1): part1_fnames.append(fname) part1_formats.append(fmt) part2_fnames.append(traj_name1) else: # `fname[:9]` drops the ".previous" suffix. os.rename(fname, fname[:-9]) else: # Remove empty file os.unlink(fname) break for part1_fname, part1_format, part2_fname in \ zip(part1_fnames, part1_formats, part2_fnames): part1_traj = traj.read_traj(part1_fname, part1_format) part2_traj = traj.read_traj(part2_fname) complete_traj = traj.merge(part1_traj, part2_traj) if complete_traj: # part2_fname may be absolute path, so only append ".tmp" fmt = traj.get_fmt(part2_fname) tmp_fname = part2_fname + ".tmp" traj.write_traj(complete_traj, tmp_fname, format=fmt) # Removes the current trajectory try: os.remove(part2_fname) except OSError: # Close the trajectory, otherwise the folder may # not be able to be deleted if len(part2_traj) > 0: part2_traj[0].source().close() shutil.rmtree(part2_fname) # Renames the tmp file to the current trajectory file/dir's name. os.rename(tmp_fname, part2_fname) # Removes the previous trajectory. try: os.remove(part1_fname) except OSError: # Close the trajectory, otherwise the folder may # not be able to be deleted if len(part1_traj) > 0: part1_traj[0].source().close() shutil.rmtree(part1_fname)
[docs]class SimulateBase(cmj.StageBase): """ A stage that can launch desmond simulations. """ RESTARTABLE = True DESMOND_CMD = os.path.join(envir.CONST.SCHRODINGER, "desmond") CFG_SIM_NAME = "mdsim" parameter_string = string.Template(""" DATA = { jobname = "$$MAINJOBNAME_$$STAGENO$$[_lambda$$LAMBDA$$]" dir = "$$[$$JOBPREFIX/$$]$$[$$PREFIX/$$]$$MAINJOBNAME_$$STAGENO$$[_lambda$$LAMBDA$$]" host = "$$SUBHOST" fep.type = $small_molecule jin_file = [] jin_must_transfer_file= [] jout = "" cfg_file = "" window = ? atom_group = none restraints.new = [] restraints.existing = $restraint_ignore print_restraint = false print_expected_memory = false jlaunch_opt= [""] } VALIDATE = { host = {type = str range = [1 10000000000]} fep.type = {type = enum range = [$all_fep_types]} cfg_file = {type = str range = [0 10000000000]} jin_file = [ {type = str range = [0 10000000000]} {type = list size = 0 elem = {type = str range = [0 10000000000]}} ] jin_must_transfer_file = {type = list size = 0 elem = {type = str range = [0 10000000000]}} jout = [ {type = str range = [0 10000000000]} {type = list size = 0 elem = {type = str range = [0 10000000000]}} ] window = [ {type = none} {type = int0 _check = check_iwindow} {type = list size = 0 elem = {type = int0 _check = check_iwindow}} ] atom_group = [ {type = enum range = [retain none]} {atom = {type = str range = [0 10000000000]} name = {type = str range = [0 10000000000]} index = {type = int range = [0 255]} } {type = list size = 0 elem = {atom = {type = str range = [0 10000000000]} name = {type = str range = [0 10000000000]} index = {type = int range = [0 7]} } } ] restraints.existing = {type = enum range = [$existing_restraint]} restraints.new = [ {type = list} {_skip = all} ] print_restraint = {type = bool} print_expected_memory = {type = bool} jlaunch_opt.black_list = ["!append!" "-exec" "-PROCS" "-in" "-c" "-overwrite"] } """).substitute(restraint_ignore=constants.EXISTING_RESTRAINT.IGNORE, small_molecule=constants.FEP_TYPES.SMALL_MOLECULE, all_fep_types=" ".join(constants.FEP_TYPES), existing_restraint=" ".join(constants.EXISTING_RESTRAINT)) PARAM = cmj._create_param_when_needed([ config.MD, parameter_string, ])
[docs] def __init__(self, *arg, **kwarg): """ """ cmj.StageBase.__init__(self, *arg, **kwarg) self._cfg = None
[docs] def migrate_param(self, param: sea.Map): """ The Berendsen method is no longer supported (it was CPU only). If we encounter it we automatically change it to Langevin, which was the behavior before it was removed via the gpu effect_if param. :param param: a param corresponding to this stage class to migrate """ try: if param.ensemble.method.val == 'Berendsen': warnings.warn( DeprecationWarning( "Berendsen thermostat is not supported in GPU Desmond, " "Langevin will be used instead. Using 'Berendsen' will " "result in an error in a future release.")) param.ensemble.method.val = 'Langevin' except AttributeError: # possible the ensemble attribute does not exist or is not a map # with method attribute, ie with Concatenate stage pass
[docs] def prestage(self): # Updates the parameters set in the config file. # Needs to run before crunch(). self._update_cfg()
def _update_model_snapcore(self, model: cms.Cms, job: DesmondJob): """ If snapcore is not None, update the model to match the snapcore coordinates. """ is_alchemical = cms.get_model_system_type( model.comp_ct) == constants.SystemType.ALCHEMICAL snapcore = getattr(job, 'snapcore', None) if is_alchemical and snapcore is not None: (ref_ct, mut_ct) = model.get_fep_cts() if job.i_win < job.n_win // 2: ref_ct.setXYZ(snapcore.fh_ref_coord) mut_ct.setXYZ(snapcore.fh_mut_coord) else: ref_ct.setXYZ(snapcore.sh_ref_coord) mut_ct.setXYZ(snapcore.sh_mut_coord) # we don't want to use any existing restrains or their reference # coordinates since we've changed the positions model.clear_restrain() model.synchronize_fsys_ct() def _print_expected_memory(self, model: cms.Cms, job: DesmondJob): # Print the predicted memory utlization for this job if self.param.print_expected_memory.val and job.i_win == 0: cpu_mem, gpu_mem = util.predict_memory_utilization( self.param.fep.type.val, model.fsys_ct.atom_total, job.n_win) self._print( "quiet", f"Expected maximum CPU Memory (GB): {cpu_mem/1024:.2f}") self._print( "quiet", f"Expected maximum GPU Memory (GB): {gpu_mem/1024:.2f}") def _get_model(self, job: DesmondJob, fname: os.PathLike, param: sea.Map): from schrodinger.application.desmond.packages.restraint import \ RestraintBuilder fname = Path(fname) if not fname.is_absolute() and not fname.is_file(): # If we do not find the file in the current directory, we try the base directory. fname = Path(cmj.ENGINE.base_dir) / fname.name model = cms.Cms(file=fname) model = SystemBuilder.set_atom_group(model, param.atom_group, job.permanent_group) self._update_model_snapcore(model, job) self._print_expected_memory(model, job) model = SystemBuilder.set_restrain(model, param.restrain, job.permanent_restrain, job) restraint_builder = RestraintBuilder(param.restraints.new, param.restraints.existing.val, model) restraint_builder.addRestraints() if param.print_restraint.val: # Skip printing posre_harm, since it can be very large self._print( "quiet", restraint_builder.getString(skip_tables={'posre_harm'}, compact=True)) return model def _write_cms(self, jobname, model): """ """ cms_fname = jobname + "-in.cms" # Use compressed input if output cms is compressed if isinstance(self.param.maeff_output, sea.Map) and \ self.param.maeff_output.val.get('name') and \ self.param.maeff_output.name.val.endswith('gz'): cms_fname += '.gz' model.write(cms_fname) return cms_fname @staticmethod def _incfg_name(jobname): return jobname + "-in.cfg" @staticmethod def _outcfg_name(jobname): return jobname + "-out.cfg" def _write_cfg(self, jobname, task, add_tag=config.TAG_SPECS.md, tag="MD", param=None): """ """ if (param is None): param = copy.deepcopy(config.get_default_setting( self.param)).update(self.param) else: param = copy.deepcopy(param) if (task == "regular"): if ("fep" in param): del param["fep"] if (task == "afep"): if (not param.energy_group.has_tag("setbyuser")): param["energy_group"] = True config.tag_sim_map(param, add_tag) cfg_fname = self._incfg_name(jobname) fh = open(cfg_fname, "w") print(param.__str__(tag=tag), file=fh) fh.close() return cfg_fname def _update_cfg(self): if self.param.cfg_file.val: base_dir = cmj.ENGINE.base_dir # Saves explicit settings in `self.param'. explicit_setting = sea.sea_filter(self.param, "setbyuser") # Updates `self.param' from the file self.param.update( file=os.path.join(base_dir, self.param.cfg_file.val)) # Applies the explicit ones back. self.param.update(explicit_setting, tag="setbyuser") def _get_output_names(self, param: sea.Map) -> (Optional[str], Optional[str]): """ Parse param to get names of output files. :return: the cms and trajectory output filenames """ fname_out = (None if isinstance(param.maeff_output, sea.Atom) else param.maeff_output.name.val) traj_name = (None if (config.is_minimize(param) or isinstance( param.trajectory, sea.Atom)) else param.trajectory.name.val) return fname_out, traj_name def _get_input(self, default_model: cms.Cms, pj: DesmondJob, param: sea.Map, jobname: str) -> ([str], [str], [cms.Box]): """ Create input files as needed and return a list of the filenames created This function can modify `param`, so should be executed before anything that depends on `param`. :param default_model: The base model from previous job, can be superseded in subclasses :param pj: The previous job :param param: Settings for this job. Can be updated :param jobname: The jobname of this job """ fname_in = self._write_cms(jobname, default_model) return [fname_in], [], [default_model.box] def _get_device_args(self, pj, num_allotted_devices): if pj.task in ("fep", "afep") and num_allotted_devices > 1: # explicitly set the device so we can handle nodes with # Default Compute Mode and nodes with CUDA MPS (cases where # CUDA allows >1 process to acquire the same GPU) # GPUs will be assigned in round-robin fashion. device = pj.i_win % num_allotted_devices return ['-device', str(device)] return [] def _add_eneseq(self, new_job: DesmondJob, param: sea.Map): if (isinstance(param.eneseq, sea.Map) and param.time.val > param.eneseq.first.val): new_job.output.add(os.path.abspath(param.eneseq.name.val), tag="ENESEQ") def _get_working_param(self): """ Extracted for easier mocking :return: a copy of self.param with the default values filled in """ return copy.deepcopy(config.get_default_setting(self.param)).update( self.param) def _set_box(self, param: sea.Map, boxes: List[cms.Box]): """ Update the param to set the box information. """ for box in boxes: param.box = sea.List(box) param.box.add_tag('setbyuser') @run_stage_method_in_parallel def _create_job(self, pre_job=[], job_param=[]): # noqa: M511 """ We're going to loop through pre_job/job_param and do a few things here For each pre job/job_param - Create a job directory and create/copy the necessary inputs to that directory - Create a Desmond job launching command, with the inputs specified by -in/-jin keywords, and the outputs that Desmond does not automatically determine by -jout keywords, as well as additional keywords. - Use that command to create a new DesmondJob object - Update the inputs and outputs on the DesmondJob object - these should correspond to at least a subset, if not all, of the inputs and outputs that the backend will be made aware of via -in/-jin/-jout keywords. These inputs and outputs will be used to check the success of a job, archive directories and copy files back to the parent launch directory, and expose inputs/outputs to other stage code, but are not involved in jobcontrol registration for the jobs submitted (ie the jobcontrol registration only goes upwards in job launch hierarchy, not downwards). # TODO Integrate cmj.Job inputs/outputs and jlaunch command -in/-jin/-jout args into a class that maintains coherence between the two (this will require importing desmond driver code that determines plugin output) """ base_dir = Path(cmj.ENGINE.base_dir) num_allotted_devices = self._get_num_allotted_devices() new_jobs = [] for iii, pj in enumerate(pre_job): self._print("debug", f"Job task: {pj.task}") # get and modify param param = self._get_working_param() if job_param: param.update(job_param[iii]) if config.is_meta(param) and param.meta_file.val: param.meta_file.val = os.path.join(base_dir, param.meta_file.val) param.meta_file.add_tag("setbyuser") # Sets i_window. if pj.task in ["fep", "afep"]: param.fep.i_window.val = pj.i_win # modify macro dict for this job and get directory which depends # on macro values macro_dict = { "$LAMBDA": str(pj.i_win), } if (pj.task in [ "fep", "afep", ]) else {} jobname, jobdir = self._get_jobname_and_dir(pj, macro_dict) # TODO why do we do this?? some job types don't even use default_model. # we should let _get_input decide if default_model is necessary, and raise # an error if it cannot be loaded. try: default_model = self._get_model(pj, pj.output.struct_file(), param) except RuntimeError as e: self._print("quiet", e) continue new_job = self._create_single_job_common(pj, default_model, jobname, jobdir, param, num_allotted_devices) new_job.other_parent = pj.other_parent new_jobs.append(new_job) return new_jobs def _create_single_job_common(self, pj: DesmondJob, default_model: cms.Cms, jobname: str, jobdir: str, param: sea.Map, num_allotted_devices: int) -> DesmondJob: """ Given a previous job, model, and job params, create a job. :param pj: The previous input job :param default_model: The cms model to use as input. Can be overridden by subclass implementation of `_get_input` :param jobname: The name for the job :param jobdir: The directory for the job :param param: The working param corresponding to this job, with any per-job customizations applied. :param num_allotted_devices: The number of devices allotted to this job :return: The created job """ # Makes the directory. if not os.path.isdir(jobdir): os.makedirs(jobdir) util.chdir(jobdir) # creates and writes input files and returns filenames # can modify param so must be done before write_cfg # TODO can we make it so get_input does not modify param? # only happens in GCMC to the buffer width input_cms_fnames, additional_inputs, boxes = self._get_input( default_model, pj, param, jobname) self._set_box(param, boxes) # modifications to param complete, now we write config fname_cfg = self._write_cfg(jobname, pj.task, param=param) # Constructs the job-launching command. jlaunch_cmd = [ self.DESMOND_CMD, "-JOBNAME", jobname, "-overwrite", "-c", fname_cfg ] # get inputs and add them to jlaunch command for fname in input_cms_fnames: jlaunch_cmd += ["-in", fname] jlaunch_cmd += self._get_additional_in_out_args(param, additional_inputs) # Gets number of replicas to set the '-P' option. n_replica = config.num_replica(param, default_model, self._log) n_cpu = param.cpu.val try: total_proc = param.total_proc.val except AttributeError: total_proc = n_cpu * n_replica if total_proc is None: total_proc = n_cpu * n_replica jlaunch_cmd += ["-P", str(total_proc)] # TODO move this to lambda hopping stage if pj.task == "lambda_hopping": jlaunch_cmd += [ "-fep_convergence", str(self.param.fep_convergence.val) ] jlaunch_cmd += self._get_device_args(pj, num_allotted_devices) if cmj.GENERAL_LOGLEVEL == "debug": jlaunch_cmd.append("-DEBUG") desmond_kwargs = dict(jobname=jobname, parent=pj, stage=self, dir=jobdir, jlaunch_cmd=jlaunch_cmd, task=pj.task, permanent_restrain=pj.permanent_restrain, permanent_group=pj.permanent_group) if pj.task == "fep" or pj.task == "afep": fepout = config.get_fep_output_filename(param) fepout = os.path.abspath(fepout) if fepout else None egout = config.get_energy_group_output_filename(param) egout = os.path.abspath(egout) if egout else None # TODO weird that we use default_model explicitly when we might # not be using it as input new_job = FepJob( fepout=fepout, # dE disabled egout=egout, fragname=default_model.get_fragname(), **desmond_kwargs) else: new_job = DesmondBackendJob(**desmond_kwargs) self._register_job_inputs_outputs(new_job, param, input_cms_fnames) return new_job def _get_additional_in_out_args(self, param: sea.Map, additional_inputs: List[str]): """ Get -jin and -jout command line args by parsing -jin_file, -jin_must_transfer_file and -jout fields of `param`, and any other explicit inputs via `additional_inputs`. These instruct the desmond backend to register these inputs and outputs via jobcontrol. These params should represent all of the inputs and outputs outside of config files, input structures, and plugin outputs, which are all handled separately. :param param: The working param for a given job :param additional_inputs: Other inputs which should be registered via -jin in launch command """ jlaunch_args = [] jout = [] if isinstance(param.jout, sea.Atom) and "" != param.jout.val: jout = [param.jout] elif isinstance(param.jout, sea.List): jout = param.jout for e in (se.val for se in jout if se.val): jlaunch_args.extend(["-jout", e]) jin = [] if (isinstance(param.jin_file, sea.Atom) and "" != param.jin_file.val): jin = [param.jin_file] elif isinstance(param.jin_file, sea.List): jin = param.jin_file for e in (se.val for se in jin if se.val): if os.path.isabs(e) or os.path.exists(e): jin_fname = e else: fullpath = os.path.join(cmj.ENGINE.base_dir, os.path.basename(e)) jin_fname = util.relpath(fullpath) jlaunch_args.extend(["-jin", jin_fname]) # TODO not sure why we need relative path for absolute paths here # but not above? # DESMOND-10355 filed to remove this parameter for e in param.jin_must_transfer_file.val: if e: jin_fname = util.relpath(e) if os.path.isabs(e) else e jlaunch_args.extend(["-jin", jin_fname]) for additional_input in additional_inputs: jlaunch_args.extend(["-jin", additional_input]) return jlaunch_args def _register_job_inputs_outputs(self, job: DesmondJob, param: sea.Map, input_cms_fnames: List[str]): """ Register the output files for the job. These include the standard cms output as well as the output of backend plugins such as trajectory and eneseq. These outputs are not necessarily all of the outputs that are created by the backend, but are the outputs that must be produced for multisim to consider the job a success. :param job: The job to add inputs and outputs to :param param: The current working param corresponding to the given job """ jobname = job.jobname job.input.add(os.path.abspath(self._incfg_name(jobname))) # TODO why is outcfg passed as input? It is created by the job job.input.add(os.path.abspath(self._outcfg_name(jobname))) job.input.cms = [] for e in input_cms_fnames: job.input.cms.append(os.path.abspath(e)) fname_out, traj_name = self._get_output_names(param) if fname_out: job.output.add(os.path.abspath(fname_out), check_cms_file) if traj_name and param.time.val > param.trajectory.first.val: traj_type = 'dir' if param.trajectory.format.val == 'dtr' else 'file' job.output.add(os.path.abspath(traj_name), tag="TRJ", type=traj_type) if config.is_meta(param): if isinstance(param.meta, sea.Map): job.output.add(os.path.abspath(param.meta.name.val), tag="KERSEQ") job.output.add(os.path.abspath(param.meta.cv_name.val), tag="CVSEQ") elif (isinstance(param.meta, sea.Atom) and param.meta == 'FILE' and param.meta_file): job.output.add(os.path.abspath(param.meta_file.val)) # currently this is a method so remd can no-op self._add_eneseq(job, param) fname_log = job.jobname + ".log" job.output.add(os.path.abspath(fname_log)) def _get_snapcore_coordinates( self, cms_fname: str) -> Optional[SnapcoreCoordinates]: """ Return the snapcore coordinates, that is the phase-aligned structures, for both the reference and mutant molecules. If the fep type does not use snapcore or this is an academic build, then return None. :param cms_fname: Input cms filename. """ from schrodinger.application.desmond.frag_snap import \ get_aligned_coordinates_from_fragments from schrodinger.application.scisol.packages.core_hopping.int_fepio import \ get_core_atoms_from_mapping from schrodinger.application.scisol.packages.core_hopping.int_fepio import \ get_reference_coordinates_for_two_molecules from schrodinger.application.scisol.packages.core_hopping.int_match import \ get_bonds_deleted snapcore = self.param.fep.type.val == constants.FEP_TYPES.SMALL_MOLECULE if not snapcore: return None cms_model = cms.Cms(file=cms_fname) (ct1, ct2) = cms_model.get_fep_cts() if ct1 is None or ct2 is None: print('WARNING: cannot find two fep cts') return None (matched_atom1, matched_atom2) = get_core_atoms_from_mapping(ct2) # Recovers the original coordinates. (ct1_ref_coord, ct2_ref_coord) = \ get_reference_coordinates_for_two_molecules(ct1, ct2) ct1.setXYZ(ct1_ref_coord) ct2.setXYZ(ct2_ref_coord) (broken_bond_ct1, broken_bond_ct2) = \ get_bonds_deleted(ct1, matched_atom1, ct2, matched_atom2) (aligned_ct1_coord, aligned_ct2_coord) = \ get_aligned_coordinates_from_fragments(ct1, ct2, matched_atom1, matched_atom2, broken_bond_ct1, broken_bond_ct2) if aligned_ct2_coord is not None and aligned_ct1_coord is not None: return SnapcoreCoordinates(fh_ref_coord=ct1_ref_coord, fh_mut_coord=aligned_ct2_coord, sh_ref_coord=aligned_ct1_coord, sh_mut_coord=ct2_ref_coord) elif aligned_ct2_coord is not None: return SnapcoreCoordinates(fh_ref_coord=ct1_ref_coord, fh_mut_coord=aligned_ct2_coord, sh_ref_coord=ct1_ref_coord, sh_mut_coord=aligned_ct2_coord) elif aligned_ct1_coord is not None: return SnapcoreCoordinates(fh_ref_coord=aligned_ct1_coord, fh_mut_coord=ct2_ref_coord, sh_ref_coord=aligned_ct1_coord, sh_mut_coord=ct2_ref_coord) raise RuntimeError( "PhaseAlign fails with both end as template molecules") def _get_num_allotted_devices(self): """ Return the number of mps pipe directories, if set, otherwise the number of visible devices, if set, otherwise 0 """ visible_devices = self._get_visible_devices() mps_pipes = self._get_mps_pipes() if not mps_pipes: return len(visible_devices) # sanity check if not visible_devices: raise ValueError("$SCHRODINGER_CUDA_MPS_PIPE_DIRECTORY set but " "$(SCHRODINGER_)CUDA_VISIBLE_DEVICES unset, this " "can result in undefined behavior") if visible_devices != ['0']: raise ValueError("$(SCHRODINGER_)CUDA_VISIBLE_DEVICES must have the" " value '0' when " "$SCHRODINGER_CUDA_MPS_PIPE_DIRECTORY is set") return len(mps_pipes) def _get_mps_pipes(self): mps_pipe_str = os.environ.get('SCHRODINGER_CUDA_MPS_PIPE_DIRECTORY', '') if mps_pipe_str: return mps_pipe_str.split(',') return [] def _get_visible_devices(self): visible_devices_str = os.environ.get( 'SCHRODINGER_CUDA_VISIBLE_DEVICES') or os.environ.get( 'CUDA_VISIBLE_DEVICES') if visible_devices_str: return visible_devices_str.split(',') return []
[docs] def crunch(self): """ """ self._print("debug", "In Simulate.crunch") pre_job = [] n_win = config.num_lambda_window(self.param.fep.lambda_) for pj in self.get_prejobs(): # TODO: DESMOND-10413: This shouldn't be needed but too # often the job is copied give it a stale state. if hasattr(pj, 'snapcore'): pj.snapcore = None # In the first simulate stage the prejob is not FepJob. # In the following stages the prejob is an FepJob, # so not isinstance(pj, FepJob) is checking that this # is the first simulate stage for fep. # TODO: Can we handle this in a better way? if ((pj.task == "fep" or pj.task == "afep") and not isinstance(pj, FepJob)): snapcore = None if pj.task == "fep": try: snapcore = self._get_snapcore_coordinates( pj.output.struct_file()) except ImportError: self._print( "quiet", "Ligand FEP: Using academic FEP atom-matching code") job_template = FepJob(n_win=n_win, i_win=0, jobname=pj.jobname, parent=pj, stage=pj.stage, dir=pj.dir, jlaunch_cmd=pj.jlaunch_cmd, task=pj.task, permanent_restrain=pj.permanent_restrain, permanent_group=pj.permanent_group, snapcore=snapcore) if (self.param.window.val is not None): if (isinstance(self.param.window, sea.Atom)): job = job_template job.i_win = self.param.window.val job.output = pj.output pre_job.append(job) elif (isinstance(self.param.window, sea.List)): for i_win in self.param.window: job = copy.deepcopy(job_template) job.i_win = i_win.val job.output = pj.output pre_job.append(job) else: raise ValueError( "The value of the window parameter should be either a sea.List or sea.Atom object," " but we got %s" % type(self.param.window.val)) else: for i in range(n_win): job = copy.deepcopy(job_template) job.i_win = i job.output = pj.output pre_job.append(job) else: if (self.param.window.val is not None): if (isinstance(self.param.window, sea.Atom)): if (pj.i_win == self.param.window.val): pre_job.append(pj) elif (isinstance(self.param.window, sea.List)): if (pj.i_win in self.param.window.val): pre_job.append(pj) else: raise ValueError( "The value of the window parameter should be either a sea.List or sea.Atom object," " but we got %s" % type(self.param.window.val)) else: pre_job.append(pj) self.add_jobs(self._create_job(pre_job)) self._print("debug", "Out Simulate.crunch")
def _checkpoint_path(self, outcfg: sea.Map): return outcfg.mdsim.checkpt.name.val def _base_restore_args(self, outcfg: sea.Map, added_time: float) -> List[str]: sim_name = self.CFG_SIM_NAME sim_cfg = getattr(outcfg, sim_name) last_time = self.param.time.val + added_time args = [ "-restore", sim_cfg.checkpt.name.val, "-cfg", f"{sim_name}.last_time={last_time:f}", "-cfg", f"{sim_name}.plugin.trajectory.mode=append", "-cfg", f"{sim_name}.checkpt.interval={self.param.checkpt.interval.val:f}" ] # Include cfg extracted from cpt if it exists cpt_cfg_path = Path(sim_cfg.checkpt.name.val + '.cfg') if cpt_cfg_path.exists(): args += ['-jin', str(cpt_cfg_path)] return args def _get_restore_args(self, outcfg: sea.Map, added_time: float) -> ([str], [str]): """ Get the basic restore arguments needed for the jlaunch command, including the checkpoint args, extra -cfg params, and any -jin and -in args, and a list of any input files registered with -in args. """ sim_cfg = getattr(outcfg, self.CFG_SIM_NAME) trj_opts = ["-jin", sim_cfg.plugin.trajectory.name.val] return self._base_restore_args(outcfg, added_time) + trj_opts, [] def _get_outcfg(self, job: DesmondJob) -> Optional[sea.Map]: outcfg_fname = job.input.outcfg_file() if outcfg_fname is None: outcfg_fname = job.input.incfg_file().replace("-in.cfg", "-out.cfg") try: with open(outcfg_fname, "r") as f: return sea.Map(f.read()) except IOError: return None
[docs] def restart_subjobs(self, jobs): for job in jobs: util.chdir(job.dir) outcfg = self._get_outcfg(job) run_from_scratch = outcfg is None or not os.path.exists( self._checkpoint_path(outcfg)) if not getattr(job, "jlaunch_cmd_scratch", None): job.jlaunch_cmd_scratch = copy.copy(job.jlaunch_cmd) job.status.set(cmj.JobStatus.WAITING) if run_from_scratch: self._log( "No data of this subjob were saved from the previous run.") self._log("Will restart it from scratch.") # Runs this subjob from scratch. job.status.set(cmj.JobStatus.WAITING) arg_to_drop = [] for i, e in enumerate(job.jlaunch_cmd): if (e == "-HOST"): arg_to_drop.append(i) arg_to_drop.append(i + 1) arg_to_drop.reverse() for i in arg_to_drop: del job.jlaunch_cmd[i] continue arg_to_drop = [] for i, e in enumerate(job.jlaunch_cmd): if e in ("-in", "-c", "-HOST"): arg_to_drop.append(i) arg_to_drop.append(i + 1) arg_to_drop.reverse() for i in arg_to_drop: del job.jlaunch_cmd[i] added_time = getattr(job, "added_time", 0.) restore_args, cms_fnames = self._get_restore_args( outcfg, added_time) job.jlaunch_cmd += restore_args job.input.cms = cms_fnames self._backup_data_files() self.add_jobs(jobs)
[docs] def hook_captured_successful_job(self, job): """ """ cmj.StageBase.hook_captured_successful_job(self, job) self._concatenate_data_files(job)
@staticmethod def _concatenate_data_files(job): # Concatenate all .ene files. util.chdir(job.dir) for fname in (glob.glob("*.ene") + glob.glob("*.dat") + glob.glob("*.txt") + glob.glob("*.dE") + glob.glob("*.cvseq") + glob.glob("*.kerseq")): concatenate_and_backup(fname) # Concatenate trajectories concatenate_trajectories( glob.glob("*_trj.previous") + glob.glob("*.xtc.previous")) for fname_previous in glob.glob("*.previous"): os.rename(fname_previous, fname_previous[:-9]) def _backup_data_files(self): for fname in (glob.glob("*.ene") + glob.glob("*.dat") + glob.glob("*.txt") + glob.glob("*.dE") + glob.glob("*.cvseq") + glob.glob("*.kerseq")): concatenate_and_backup(fname) for trjname in glob.glob('*_trj') + glob.glob('*.xtc'): backup_trajectories(trjname) for previous in glob.glob("*.previous"): self._files4copy.append(os.path.abspath(previous))
[docs] def capture(self, job): cmj.StageBase.capture(self, job) if job.status == cmj.JobStatus.NON_RETRIABLE_FAILURE: # Being nontriable means to be revived from the job's .cpt file. # Revives this job, and sends it back into `QUEUE`. self._concatenate_data_files(job) self.restart_subjobs([ job, ]) jlaunch_opt = list(map(str, self.param.jlaunch_opt.val)) if jlaunch_opt and jlaunch_opt != [""]: job.jlaunch_cmd += jlaunch_opt self._job_manager.submit_jobs(cmj.QUEUE)
[docs] def pack_stage(self, force=False): # When a job is stopped for checkpointing, # we want to make sure the trajectories are concatenated. # hook_captured_successful_job is not called on checkpoint. trajectories = [ x for x in self._files4copy if (x.endswith('_trj.previous') or x.endswith('.xtc.previous')) ] concatenate_trajectories(trajectories) # remove previous trajectories and non-existent files from previous runs self._files4copy = [ x for x in self._files4copy if (not x.endswith(('_trj.previous', '.xtc.previous')) and Path(x).exists()) ] super(SimulateBase, self).pack_stage(force=force)
[docs]class RemovesInitialGCMCSolvent(SimulateBase): """ Any stage that actually does simulation should inherit from this class. Only vrun-type stages, which rely on the consistency of the previous stage's trajectory and the cms model for the current stage, should derive from SimulateBase instead. """ def _get_model(self, job: DesmondJob, fname: os.PathLike, param: sea.Map): model = super()._get_model(job, fname, param) gcmc_utils.remove_inactive_solvent(model) return model
[docs]class GCMCCapable(RemovesInitialGCMCSolvent): """ An extension to SimulateBase that allows adding a GCMC plugin to the desmond simulation. Subclasses of this class become 'GCMC-capable'. If no 'gcmc' block is passed to the class's config block, this class behaves exactly as SimulateBase. """ PARAM = cmj._create_param_when_needed([ """ # TODO remove this parameter and update checkpoint files in bpfep test VALIDATE = { fep.equal_water_number_for_all_windows = { type = bool } } """, config.GCMC ])
[docs] def __init__(self, *args, **kwargs): super(GCMCCapable, self).__init__(*args, **kwargs) self.is_gcmc = False
def __getstate__(self, state=None): # Need to store is_gcmc for the checkpoint state = super().__getstate__(state) # Check for compatibility with old checkpoints if hasattr(self, 'is_gcmc'): state.is_gcmc = self.is_gcmc return state
[docs] def check_param(self): """ See super for details. This subclass implementation checks if the gcmc block was passed in by the user to determine if this stage needs to do gcmc-functionality. """ ev = super(GCMCCapable, self).check_param() # see if gcmc block explicitly set by user (ie passed in, not default) self.is_gcmc = "gcmc" in self.param.keys(tag="setbyuser") # if not, remove the default gcmc block if not self.is_gcmc and "gcmc" in self.param.keys(): del self.param.gcmc return ev
def _get_model(self, job: DesmondJob, fname: os.PathLike, param: sea.Map): """ See super for details. This subclass implementation adds inactive solvent to the model if needed. """ model = super(GCMCCapable, self)._get_model(job, fname, param) if not self.is_gcmc: return model self._print("verbose", "Getting GCMC model for fname {}".format(fname)) if not self._has_inactive_solvent(model): self._print("verbose", "Need to add inactive atoms to fname {}".format(fname)) solvent_param = param.gcmc.solvent if not self._has_solvent_ct(model): if solvent_param.s_file.val: s_file = os.path.join(cmj.ENGINE.base_dir, solvent_param.s_file.val) solvent_model = cms.Cms(s_file) else: raise ValueError('Provided cms file has no solvent ' 'candidates, you must provide a solvent ' 'file.') else: solvent_model = model num_copies = gcmc_utils.NUM_COPIES_DEFAULT nc_key = 'num_copies' if nc_key in solvent_param: num_copies = solvent_param[nc_key].val num_copies = self._adjust_num_copies(model, num_copies, param) model = gcmc_utils.add_inactive_solvent(model, solvent_model, num_copies=num_copies) self._print("verbose", "Added inactive solvent atoms") # this will add gcmc ligand tags if model is FEP tag_only_mutated = param.fep.type.val in [ constants.FEP_TYPES.LIGAND_SELECTIVITY, constants.FEP_TYPES.PROTEIN_STABILITY, constants.FEP_TYPES.PROTEIN_SELECTIVITY ] model = gcmc_utils.tag_gcmc_ligand(model, tag_only_mutated) # if there's still nothing gcmc would interpret as being a ligand # (ie tagged atoms or a ct with `constants.MOLTYPE` == `constants.MOLTYPE.VAL.LIGAND`), # then check if there is a supplied ligand file; if so tag nearby atoms if not self._has_gcmc_ligand(model): try: if param.gcmc.ligand_file.val: ligand_param = param.gcmc.ligand_file else: # this param is set by the set_family mechanism, which is in # turn set during systype.WatermapTyper.prepare ligand_param = param.backend.mdsim.plugin. \ SpatialActiveSite.ligand_file except AttributeError: pass else: self._tag_atoms_from_ligand_model(model, ligand_param, param) return model def _adjust_num_copies(self, model, num_copies, param): """ Perform any adjustments needed to the `num_copies` parameter that will be passed to `gcmc_utils.add_inactive_solvent`. Subclasses can override this method as needed but should always call the super implementation. The base class implementation checks if the batch size backend parameter is set and rescales as needed. This method is only called for gcmc simulations, ie `self.is_gcmc` is True. :param model: The cms model that inactive solvent will be added to. :type model: `cms.Cms` :param num_copies: The unadjusted value of the `num_copies` parameter :type num_copies: int :return: The adjusted value of the `num_copies` parameter :rtype: int """ # if batch size explicitly set, we raise num_copies as needed try: batch_size = param.backend.mdsim.plugin.gcmc.batch_size.val except AttributeError: pass else: # we need approx batch_size/2, but up to batch_size copies to # perform insertions, plus whatever amount will be accepted. min_num_copies = int(1.5 * batch_size) num_copies = max(num_copies, min_num_copies) return num_copies def _tag_atoms_from_ligand_model(self, model: cms.Cms, ligand_param: sea.Map, param: sea.Map): """ Tag the solute atoms within a cutoff (specified in `param.gcmc.gcmc_region.region_buffer`) of the ligand_model defined in ligand_param. Also resets the region buffer value to zero, as the tagged atoms take the place of the buffer. :param model: the cms model :param ligand_param: the param specifying the ligand file :param param: the working param corrsponding to the model :return: the model with atoms near to the atoms in the ligand file tagged with the 'i_gcmc_ligand' property. """ lig_file = os.path.join(cmj.ENGINE.base_dir, ligand_param.val) if not os.path.isfile(lig_file): raise IOError("ERROR: Ligand file not found: %s" % lig_file) self._print('quiet', 'Tagging GCMC atoms using ligand file %s' % lig_file) lig_model = None for st in structure.StructureReader(lig_file): if lig_model is None: lig_model = st else: self._print( 'quiet', 'WARNING: More than one structure in ligand file. ' 'Using the first one...') break if lig_model is None: raise ValueError("No structures found in ligand file.") lig_distance = param.gcmc.gcmc_region.region_buffer.val model = gcmc_utils.tag_gcmc_near_ligand(model, lig_model, lig_distance) # set the buffer to zero since we have already extended our region # outside of the ligand param.gcmc.gcmc_region.region_buffer.val = 0 self._print( "quiet", "Added GCMC ligand tag to atoms near supplied " "ligand file") return model def _has_inactive_solvent(self, model): return self._has_solvent_ct(model) and \ model.active_total != model.comp_atom_total def _has_solvent_ct(self, model): return self._has_ct(model, constants.CT_TYPE.VAL.SOLVENT) @staticmethod def _has_gcmc_ligand(model): for ct in model.comp_ct + [model]: if ct.property.get(constants.MOLTYPE, '') == constants.MOLTYPE.VAL.LIGAND: return True for a in ct.atom: if a.property.get(constants.GCMC_LIGAND, 0) != 0: return True return False @staticmethod def _has_ct(model, ct_type): for st in model.comp_ct: if st.property.get(constants.CT_TYPE, '') == ct_type: return True return False def _add_eneseq(self, new_job: DesmondJob, param: sea.Map): super()._add_eneseq(new_job, param) if self.is_gcmc: new_job.output.add(os.path.abspath(param.gcmc.ene_name.val), tag="GCMC_ENESEQ") def _write_cfg(self, jobname, task, add_tag=config.TAG_SPECS.md, tag="MD", param=None): """ See super for details. This subclass implementation adds tags to gcmc-specific keys to make sure they are also written out if the stage has a gcmc block. """ if self.is_gcmc: add_tag = copy.deepcopy(add_tag) add_tag.keys.extend(config.TAG_SPECS.gcmc.keys) return super(GCMCCapable, self)._write_cfg(jobname, task, add_tag, tag, param)
[docs]class Simulate(GCMCCapable): """ This class defines the GCMC-capable simulate stage, which can be thought of as the basic simulate stage, and is what the user will get when they use the name `simulate`. This is just a thin wrapper to associate the name 'simulate' with a corresponding class, even though it is otherwise identical to GCMCCapable. Edits to basic simulate functionality should be made to SimulateBase, not here. This class should be subclassed if GCMC support is needed, otherwise subclass SimulateBase """ NAME = "simulate"
[docs]class ReplicaExchange(Simulate): """ """ NAME = "replica_exchange" CFG_SIM_NAME = "remd" PARAM = cmj._create_param_when_needed([ config.REMD, """ DATA = { total_proc = ? } VALIDATE = { jlaunch_opt.black_list = ["!append!" "-t"] total_proc = [ {type = none} {type = int1} ] } """, ])
[docs] def __init__(self, *arg, **kwarg): """ """ super(ReplicaExchange, self).__init__(*arg, **kwarg) self.natoms_max = None # used only when gcmc is enabled
def __getstate__(self, state=None): # Need to store natoms_max for the checkpoint state = super().__getstate__(state) # Check for compatibility with old checkpoints if hasattr(self, 'natoms_max'): state.natoms_max = self.natoms_max return state def _write_cfg(self, jobname, task, add_tag=config.TAG_SPECS.remd, tag="REMD", param=None): """ """ return super(ReplicaExchange, self)._write_cfg(jobname, task, add_tag, tag, param) def _set_box(self, param: sea.Map, boxes: List[cms.Box]): """ Update the param to set the box information. """ for replica, box in zip(param.replica, boxes): replica.box = sea.List(box) replica.box.add_tag('setbyuser') def _get_output_names(self, param: sea.Map): """ We return None here because we will instead register outputs onto the job parents during hook_captured_successful_job. (I am not sure of the specific reason for this -Ellery) """ # FIXME: It will be nice if we can set `fname_out' and `traj_name' to # be lists of file (or dir) names. return None, None def _get_device_args(self, pj, num_allotted_devices): # Device is not needed, handled internally based on MPI rank for # replica exchange jobs return [] def _get_input(self, default_model: cms.Cms, pj: DesmondJob, param: sea.Map, jobname: str) -> ([str], [str], [cms.Box]): """ See super method for documentation """ if isinstance(param.replica, sea.Map): return [self._write_cms(jobname, default_model)], [], [default_model.box] else: assert isinstance(param.replica, sea.List) replicas_with_idx = list(enumerate(param.replica)) cms_fnames_and_boxes = self._get_replica_input( replicas_with_idx, default_model, pj, param, jobname) cms_fnames = [result[0] for result in cms_fnames_and_boxes] boxes = [result[1] for result in cms_fnames_and_boxes] return cms_fnames, [], boxes @run_stage_method_in_parallel def _get_replica_input(self, replicas_with_idx: List[Tuple[int, sea.Map]], default_model: cms.Cms, pj: DesmondJob, param: sea.Map, jobname: str) -> List[Tuple[str, cms.Box]]: cms_fnames_and_boxes = [] for (i, e) in replicas_with_idx: if "model_file" in e and e.model_file.val: model = self._get_model( pj, Path(cmj.ENGINE.base_dir) / e.model_file.val, param) else: model = default_model fname = self._write_cms(f"{jobname}_replica_{i}", model) cms_fnames_and_boxes.append((fname, model.box)) return cms_fnames_and_boxes def _add_eneseq(self, new_job: DesmondJob, param: sea.Map): # Don't register eneseq files for replica exchange jobs return def _adjust_num_copies(self, model, num_copies, param): """ Perform any adjustments needed to the `num_copies` parameter that will be passed to `gcmc_utils.add_inactive_solvent`. Subclasses can override this method as needed but should always call the super implementation. The Replica Exchange subclass implementation makes sure that all replicas will have the same total number of particles by adding different numbers of inactive solvent to each replica. This method is only called for gcmc simulations, ie `self.is_gcmc` is True. :param model: The cms model that inactive solvent will be added to. :type model: `cms.Cms` :param num_copies: The unadjusted value of the `num_copies` parameter :type num_copies: int :return: The adjusted value of the `num_copies` paramter :rtype: int """ num_copies = super()._adjust_num_copies(model, num_copies, param) if self.natoms_max is None: if isinstance(param.replica, sea.List): base_dir = cmj.ENGINE.base_dir fnames = [] for i, e in enumerate(param.replica): if "model_file" in e and e.model_file.val: fnames.append(os.path.join(base_dir, e.model_file.val)) self.natoms_max = 0 for other_fname in fnames: with structure.StructureReader(other_fname) as reader: fsys_ct = next(reader) self.natoms_max = max(fsys_ct.atom_total, self.natoms_max) if self.natoms_max: solvent_cts = model.get_solvent_cts() # only one gcmc solvent is currently supported if len(solvent_cts) != 1: raise ValueError("GCMC systems can only have one solvent ct") solvent_ct = solvent_cts[0] if not len(solvent_ct.molecule): raise ValueError("Solvent CT in GCMC system used for replica " "exchange should always have at least one " "molecule!") molecule_size = len(solvent_ct.molecule[1]) # there should be no inactive atoms here, so we can use atom_total num_copies += (self.natoms_max - model.atom_total) // molecule_size return num_copies def _checkpoint_path(self, outcfg: sea.Map): return outcfg.remd.checkpt.name.val def _get_restore_args(self, outcfg: sea.Map, added_time: float) -> ([str], [str]): trj_opt = [] cms_fnames = [] # TODO first version with remd.cfg is from CPU config file. Remove this # if block and use only else block when CPU Desmond is fully removed. if 'cfg' in outcfg.remd: for replica in outcfg.remd.cfg: trj_opt += ["-jin", replica.plugin.trajectory.name.val] for replica in outcfg.remd.cfg: bootfile = replica.boot.file.val trj_opt += ["-in", bootfile] cms_fnames.append(bootfile) else: graph = outcfg.remd.graph nodes = graph.edges.nodes for o in nodes: trj_opt += [ "-jin", graph[o.val].remd.plugin.trajectory.name.val, ] for o in nodes: bootfile = graph[o.val].boot.file.val trj_opt += ["-in", bootfile] cms_fnames.append(bootfile) return self._base_restore_args(outcfg, added_time) + trj_opt, cms_fnames
[docs] def hook_captured_successful_job(self, job): """ """ super(ReplicaExchange, self).hook_captured_successful_job(job) job.is_output = False # We need to figure out the number of replicas. # User may specify a replica generation than a list of replicas. In the former case, we need to read the *-out.cfg. incfg_fname = job.input.incfg_file() with open(incfg_fname, "r") as fh: incfg = sea.Map(fh.read()) replica = [] if (isinstance(incfg.replica, sea.Map)): outcfg_fname = job.input.outcfg_file() outcfg = sea.Map(open(outcfg_fname, "r").read()) try: n_replica = len(outcfg.remd.cfg) except: n_replica = len(outcfg.remd.graph.edges.nodes) replica = [incfg] * n_replica else: for e in incfg.replica: new_cfg = copy.deepcopy(incfg) new_cfg.update(e) replica.append(new_cfg) parent = job sea.update_macro_dict({"$JOBNAME": job.jobname}) for i, e in enumerate(replica): sea.update_macro_dict({"$REPLICA": str(i)}) newjob = copy.deepcopy(parent) newjob.is_output = True newjob.output = cmj.JobOutput() newjob.input.cms = [ parent.input.cms[(i if (i < len(parent.input.cms)) else 0)] ] fname_out = None if (isinstance( e.maeff_output, sea.Atom)) else e.maeff_output.name.val traj_name = e.trajectory.name.val fname_dE = config.get_fep_output_filename(e) fname_eg = config.get_energy_group_output_filename(e) fname_dE = os.path.join(newjob.dir, fname_dE) if (fname_dE) else None fname_eg = os.path.join(newjob.dir, fname_eg) if (fname_eg) else None if (fname_out): newjob.output.add(os.path.join(newjob.dir, fname_out), check_cms_file) if (traj_name and (e.time.val > e.trajectory.first.val)): newjob.output.add(os.path.join(newjob.dir, traj_name), tag="TRJ", type="dir") if (not isinstance(e.eneseq, sea.Atom) and e.time.val > e.eneseq.first.val): newjob.output.add(os.path.join(newjob.dir, e.eneseq.name.val), tag="ENESEQ") newjob.output.add(parent.output.log_file()) newjob.output.add(fname_dE, tag="dE") newjob.output.add(fname_eg, tag="eg") self.add_job(newjob) return "dissolved"
def _get_restart_job(self, jobs: List[DesmondJob], task='regular') -> DesmondJob: """ If the previous job has finished, return a copy of it with the new job(s) as its parents, otherwise return the new job (assumed to be singular). Update the added_time and is_extend attributes on the returned job. :param task: task type for the extend job """ if self.filter_jobs(is_output=[False], status=[cmj.JobStatus.SUCCESS]): # The previous replica exchange job has finished. prev_job = self.filter_jobs(is_output=[False])[0] job = copy.deepcopy(prev_job) job.task = task job.stage = self job.parent = jobs[0] # DESMOND-10153: Fix desmond_extend if added_time > 0 # and the previous lambda stage just finished. # hook_captured_successful_job uses parent.parent and # other_parent, so both must be updated here. job.parent.parent = jobs[0] job.other_parent = jobs[1:] if getattr(job.parent, "is_extend", False): # For extension jobs, clear the captured job # so checkpoint/restart will continue from the LambdaHopping stage self._job_manager.clear() else: # The previous replica exchange job has NOT finished. job = jobs[0] # Used for extending replica exchange jobs job.added_time = getattr(jobs[0], "added_time", 0.0) job.is_extend = getattr(jobs[0], "is_extend", False) return job
[docs] def restart_subjobs(self, jobs): if not jobs: return jobs = [self._get_restart_job(jobs)] super().restart_subjobs(jobs) self.add_job(*jobs)
[docs]class LambdaHopping(ReplicaExchange): """ """ NAME = "lambda_hopping" PARAM = cmj._create_param_when_needed([ """ DATA = { jobname = "$MAINJOBNAME_$STAGENO" dir = "$[$JOBPREFIX/$]$[$PREFIX/$]$MAINJOBNAME_$STAGENO" fep.output.name = "$JOBNAME$[_replica$REPLICA$].dE" should_sync = yes solute_tempering = off restraints.existing = %s restraints.new = [] total_proc = ? fep_convergence = 0.0 } VALIDATE = { solute_tempering = [ {type = bool0} {atom = {type = str1} temperature = [ {generator = {type = enum range = [PvdS]} highest_temperature = {type = float+ } exchange_probability = {type = float0_1} start_win = {type = int0} end_win = {type = int} _mapcheck = check_remd_temp_generator2 } {type = list size = -1 elem = {type = float+}} ] } ] total_proc = [ {type = none} {type = int1} ] fep_convergence = {type = float+} } restraints.existing = {type = enum range = [%s]} restraints.new = [ {type = list} {_skip = all} ] print_restraint = {type = bool} """ % (constants.EXISTING_RESTRAINT.IGNORE, " ".join( constants.EXISTING_RESTRAINT)), ])
[docs] def __init__(self, *arg, **kwarg): """ """ super(LambdaHopping, self).__init__(*arg, **kwarg)
# FIXME: The 'parent' and 'other_parent' business might make the whole thing complicated than necessary. We should enable # the fep_analysis stage to directly handle a lambda_hopping subjob, instead of faking a lambda_hopping subjob # as a series of normal MD jobs.
[docs] def crunch(self): """ """ self._print("debug", "In LambdaHopping.crunch") from .. import remd has_lhjob = False fep_job = {} for pj in self.get_prejobs(): if isinstance(pj, FepJob): fep_job.setdefault(pj.fepid, []).append(pj) elif isinstance(pj, DesmondJob) and pj.task == "lambda_hopping": pre_fepjob = [pj.parent] + pj.other_parent pre_fepjob.sort(key=lambda x: x.i_win) self.param["replica"] = sea.List() replica = sea.Map("model_file = '' fep.i_window = 0") for e in pre_fepjob: replica.model_file.val = e.output.struct_file() replica.fep.i_window.val = e.i_win self.param.replica.append(replica) has_lhjob = True else: self._log( "WARNING: Simulation performed by %s seems not a FEP simulation." % str(pj)) self._log( "WARNING: Ignore this job for lambda hopping simulation.") if has_lhjob: self.add_jobs(self._create_job(self.get_prejobs())) if not fep_job: if not has_lhjob: self._log("No FEP job was performed.") else: new_pjs = [] job_params = [] for fepid in fep_job: pre_fepjob = fep_job[fepid] if (len(pre_fepjob) < 2): self._log( "WARNING: Job %s: No peer jobs for the other windows." % str(pre_fepjob[0])) self._log( "WARNING: Ignore this job for lambda hopping simulation." ) continue pre_fepjob.sort(key=lambda x: x.i_win) n_win = pre_fepjob[0].n_win fragname = pre_fepjob[0].fragname fragname = "absolute" if (not fragname) else fragname ref_temp = self.param.temperature.val solute_t = [] hotatoms = [] if (isinstance(self.param.solute_tempering, sea.Map)): asl = self.param.solute_tempering.atom.val model = cms.Cms(file=pre_fepjob[0].output.struct_file()) hotatoms = model.select_atom(asl) if (len(hotatoms) == 0): self._log( "WARNING: No atoms selected by ASL expression: %s" % asl) else: self._print( 'quiet', "Hot atoms selected by ASL expression: %s" % asl) self._print('quiet', "Number of hot atoms: %s" % len(hotatoms)) self._print('quiet', "Hot atoms: %s" % hotatoms) temp = self.param.solute_tempering.temperature if (isinstance(temp, sea.List)): if (len(temp) != n_win): self._log( "ERROR: Number of temperatures for solute tempering does not match number of replicas." ) self._log( "ERROR: Failed to turn on solute tempering on FEP [%s]" % fragname) continue else: solute_t = temp.val else: start_win = temp.start_win.val if 'start_win' in temp else 0 end_win = temp.end_win.val if 'end_win' in temp else -1 if end_win <= 0: end_win = n_win n_tot = n_win n_win = end_win - start_win or n_tot n_replica = (n_win + 1) // 2 if (len(hotatoms) > 0): # TODO: Move this into `remd` module. if ("exchange_probability" in temp): exch = temp.exchange_probability.val temp_ladder, prob = remd.predict_with_nreplica_and_exch( n_replica, exch, ref_temp, model, asl) else: temp_range = ( ref_temp, temp.highest_temperature.val, ) temp_ladder, prob = remd.predict_with_temp_and_nreplica( temp_range, n_replica, model, asl) n = n_win // 2 temp_ladder0 = copy.copy(temp_ladder[:n]) temp_ladder.reverse() temp_ladder = temp_ladder0 + temp_ladder temp_ladder = [ref_temp] * ( start_win) + temp_ladder + [ref_temp ] * (n_tot - end_win) prob = remd.get_prob_from_temp_ladder( temp_ladder, model, asl) self._log( "Temperature ladder (exchange probability) for solute tempering:" ) for e, p in zip(temp_ladder, prob): self._log(" %.3g (%.3g)" % ( e, p, )) self._log(" %.3g (n/a)" % temp_ladder[-1]) self._log( "Estimation of exchange probabilities did not take into account lambda schedule." ) self._log( "Actual exchange probabilities might be lower.") solute_t = temp_ladder self.param["replica"] = sea.List() replica = sea.Map("model_file = '' fep.i_window = 0") parent = [] for i, pj in enumerate(pre_fepjob): model_file = pj.output.struct_file() macro_dict = { "$LAMBDA": str(pj.i_win), } jobname, dir = self._get_jobname_and_dir(pj, macro_dict) if solute_t: model = cms.Cms(file=model_file) model_file = sea.Atom( "$JOBNAME_st_lambda$LAMBDA-in.cms").val remd.write_ff_solute_tempering( model, model_file, "asl:atom.num " + str(hotatoms)[1:-1], ref_temp / solute_t[i]) replica.model_file.val = model_file replica.fep.i_window.val = pj.i_win self.param.replica.append(replica) parent.append(pj) pj = pre_fepjob[0] new_pj = DesmondBackendJob( jobname=pj.jobname, parent=parent[0], stage=pj.stage, dir=pj.dir, jlaunch_cmd=None, task="lambda_hopping", permanent_restrain=pj.permanent_restrain, permanent_group=pj.permanent_group) new_pj.other_parent = parent[1:] new_pj.output = pj.output # customize fep.output.first and fep.output.interval # to disable gibbs plugin output (DESMOND-10303) # using `job_param` argument of `self._create_job` # (keep `self.param` unchanged to be able to # recover "fep.output.name" in # `self.hook_capture_successful_job`; see # `config.get_fep_output_filename` for details) new_job_param = sea.Map( "fep { output { first = inf interval = inf } }") new_job_param.add_tag("REMD", propagate=True) new_pjs.append(new_pj) job_params.append(new_job_param) new_desmond_jobs = self._create_job(new_pjs, job_param=job_params) for job in new_desmond_jobs: # the DesmondBackendJobs created here are not the stage's # output. The output jobs are created in # hook_captured_successful_job job.is_output = False self.add_jobs(self._create_job(new_pjs, job_param=job_params)) self._print("debug", "Out LambdaHopping.crunch")
def _get_restart_job(self, jobs: List[DesmondJob]) -> DesmondJob: """ If the previous job has finished, return a copy of it with the new job(s) as its parents, otherwise return the new job (assumed to be singular). Update the added_time and is_extend attributes on the returned job. """ return super()._get_restart_job(jobs, task='lambda_hopping')
[docs] def restart_subjobs(self, jobs): if not jobs: return job = self._get_restart_job(jobs) outcfg = self._get_outcfg(job) if outcfg is None or not Path(job.dir, self._checkpoint_path(outcfg)).exists(): # The previous lambda-hopping job has NOT finished, and no data of # the job were saved. # We have to rerun the LH job from scratch. self._log( "No data of this subjob were saved from the previous run.") self._log("Will restart it from scratch.") self.crunch() return if not getattr(job, "jlaunch_cmd_scratch", None): job.jlaunch_cmd_scratch = copy.copy(job.jlaunch_cmd) job.status.set(cmj.JobStatus.WAITING) util.chdir(job.dir) job.jlaunch_cmd = [ self.DESMOND_CMD, "-JOBNAME", job.jobname, "-overwrite", "-fep_convergence", str(self.param.fep_convergence.val) ] try: n_replica = len(outcfg.remd.cfg) except AttributeError: n_replica = len(outcfg.remd.graph.edges.nodes) total_proc = self.param.total_proc.val if total_proc is None: param = self._get_working_param() total_proc = param.cpu.val * n_replica job.jlaunch_cmd += ["-P", str(total_proc)] arg_to_drop = [] for i, e in enumerate(job.jlaunch_cmd_scratch): if e == "-P": arg_to_drop.append(i) arg_to_drop.append(i + 1) arg_to_drop.reverse() for i in arg_to_drop: del job.jlaunch_cmd_scratch[i] job.jlaunch_cmd_scratch += ["-P", str(total_proc)] restore_args, cms_fnames = self._get_restore_args( outcfg, job.added_time) job.jlaunch_cmd += restore_args job.input.cms = cms_fnames self.param["replica"] = sea.List() for i in range(n_replica): replica = sea.Map("model_file = '' fep.i_window = 0") replica.fep.i_window.val = i self.param.replica.append(replica) if cmj.GENERAL_LOGLEVEL == "debug": job.jlaunch_cmd.append("-DEBUG") job.jlaunch_cmd_scratch.append("-DEBUG") self._backup_data_files() fname_log = job.jobname + ".log" job.output = cmj.JobOutput() job.output.add(os.path.abspath(fname_log)) job._jctrl_hist = [] job.stage = weakref.proxy(self) self.add_job(job)
[docs] def hook_captured_successful_job(self, job): """ """ Simulate.hook_captured_successful_job(self, job) job.is_output = False outcfg_fname = job.input.outcfg_file() or job.input.incfg_file( ).replace("-in.cfg", "-out.cfg") outcfg = sea.Map(open(outcfg_fname, "r").read()) deltaE_fname = outcfg.remd.deltaE.name.val exchange_dat_fname = outcfg.remd.exchange_dat.name.val dE = {} parent = [job.parent.parent] + job.other_parent replica = config.get_replica_setting(self.param) sea.update_macro_dict({"$JOBNAME": job.jobname}) for e in parent: i = e.i_win sea.update_macro_dict({"$REPLICA": str(i)}) e = copy.deepcopy(e) e.is_output = True # Clear this out! e.input.__dict__['_file'] = {} cfg = replica[i] e.dir = job.dir e.jobname = job.jobname e.output = cmj.JobOutput() e.stage = weakref.proxy(self) # Used for extending lambda hopping jobs e.added_time = getattr(job, "added_time", 0.0) e.is_extend = getattr(job, "is_extend", False) fname_out = cfg.maeff_output.name.val traj_name = cfg.trajectory.name.val fname_dE = config.get_fep_output_filename(cfg) fname_eg = config.get_energy_group_output_filename(cfg) fname_dE = os.path.join(e.dir, fname_dE) if (fname_dE) else None fname_eg = os.path.join(e.dir, fname_eg) if (fname_eg) else None e.input.cms = [job.input.cms[i]] if fname_out: e.output.add(os.path.join(e.dir, fname_out), check_cms_file) if traj_name and (cfg.time.val > cfg.trajectory.first.val): e.output.add(os.path.join(e.dir, traj_name), tag="TRJ", type="dir") if (not isinstance(cfg.eneseq, sea.Atom) and cfg.time.val > cfg.eneseq.first.val): e.output.add(os.path.join(e.dir, cfg.eneseq.name.val), tag="ENESEQ") e.output.add(job.output.log_file()) if exchange_dat_fname: e.output.add(exchange_dat_fname) e.output.add(fname_dE, tag="dE") e.output.add(fname_eg, tag="eg") e.input.add(os.path.abspath(outcfg_fname)) e.status.set(cmj.JobStatus.SUCCESS) self.add_job(e) dE[i] = fname_dE # Because the gibbs plugin does not handle REST scaling, here we are # overwriting the .dE plugin output with deltaE file differences deltaE.gen_dE_files(deltaE_fname, dE)
[docs]class DesmondExtend(cmj.StageBase): """ """ NAME = "desmond_extend" PARAM = cmj._create_param_when_needed([ """ DATA = { added_time = 0 } VALIDATE = { added_time = {type = float+} } """, ])
[docs] def crunch(self): self._print("debug", "In DesmondExtend.crunch") # If restarting a partially complete extend, # set up the stage without changing the job state. if getattr(self, 'should_restart_extend', False): self._print("debug", "In DesmondExtend.crunch restart") self.should_restart_extend = False self._is_shown = False self._PREV_STAGE._is_packed = False # Need to call prestage to set up the stage self._PREV_STAGE.prestage() self._PREV_STAGE.release(True) self._print("debug", "Out DesmondExtend.crunch restart") return if (0 == self.param.added_time.val): self.add_jobs(self.get_prejobs()) else: should_extend = False for pj in self.get_prejobs(): if getattr(pj, "is_extend", False): self.add_job(pj) else: if getattr(pj, "added_time", 0): pj.added_time += self.param.added_time.val else: pj.added_time = self.param.added_time.val # Mark this as an extension job so this stage # will only run once pj.is_extend = True pj.status.set(cmj.JobStatus.WAITING) pj.old = False should_extend = True if should_extend: prev_stage = self._PREV_STAGE self._is_shown = False prev_stage._is_packed = False # Need to call prestage to set up the stage prev_stage.prestage() prev_stage.release(True) self._print("debug", "Out DesmondExtend.crunch")
[docs] def hook_captured_successful_job(self, job): """ Run after successfully completing this stage. :param job: Job object representing the captured job :type job: `FepJob` """ super(DesmondExtend, self).hook_captured_successful_job(job) # Once the lambda hopping job is complete, reset the extend marker # so the job can be extended again by calling the DesmondExtend stage self.should_restart_extend = False for prejob in self.get_prejobs(): prejob.is_extend = False
[docs] def restart_subjobs(self, jobs): self._print("debug", "In DesmondExtend.restart_subjobs") # If no jobs were passed to this stage but the previous stage has # jobs released, it means the previous stage failed or was stopped. # In this case, restart the previous stage without changing the job stage. self.should_restart_extend = len(self.get_prejobs()) == 0 and len( self._PREV_STAGE.filter_jobs(is_incomplete=[True])) self._print("debug", "Out DesmondExtend.restart_subjobs")
[docs]class VrunBase(SimulateBase): PARAM = cmj._create_param_when_needed([ config.VRUN, """ DATA = { trajectory = off checkpt = off maeff_output = off eneseq.interval = 0.0 } VALIDATE = { } """, ]) def _write_cfg(self, jobname, task, add_tag=config.TAG_SPECS.vrun, tag="VRUN", param=None): """ """ return super()._write_cfg(jobname, task, add_tag, tag, param) def _get_additional_in_out_args(self, param: sea.Map, additional_inputs: List[str]): additional_inputs.append(param.vrun_frameset.val) return super()._get_additional_in_out_args(param, additional_inputs) def _get_input(self, default_model: cms.Cms, pj: DesmondJob, param: sea.Map, jobname: str) -> ([str], [str], [cms.Box]): cms_fnames, additional_inputs, boxes = super()._get_input( default_model, pj, param, jobname) traj_name = self._setup_vrun_trajectory(pj, param) return cms_fnames, additional_inputs + [traj_name], boxes def _setup_vrun_trajectory(self, pj: DesmondJob, param: sea.Map): # TODO this is a bit strange - we always symlink the previous # trajectory, but we don't necessarily use it - if param.vrun_frameset # has been set explicitly to another path, we would use that instead. orig_traj = pj.output.get("TRJ") traj_name = os.path.basename(orig_traj) sea.update_macro_dict({"$FRAMESET": traj_name}) # updates param.vrun_frameset to set the traj_name # This will $FRAMESET and set it to traj_name param.vrun_frameset.val = param.vrun_frameset.val # this needs to take place in the directory setup by the super call. util.symlink(orig_traj, traj_name) return traj_name
[docs]class Vrun(VrunBase): """ """ NAME = "vrun" PARAM = cmj._create_param_when_needed([ """ DATA = { skip_intermediates = False } VALIDATE = { skip_intermediates = {type = bool} } """, ])
[docs] def __init__(self, *arg, **kwarg): """ """ super(Vrun, self).__init__(*arg, **kwarg) self._pack = "\\tar cfz"
[docs] def crunch(self): """ """ self._print("debug", "In Vrun.crunch") pre_job = [] jobs_to_vrun = [] if self.param.skip_intermediates.val: for pj_ in self.get_prejobs(): if not isinstance(pj_, FepJob): jobs_to_vrun.append(pj_) self._print( "quiet", "Warning: skip_intermediates option " "has no effect when running Vrun " "with non FepJobs") continue # skip_intermediates means we want only want to keep the jobs # for the first and last lambda windows if pj_.i_win == 0 or pj_.i_win == pj_.n_win - 1: jobs_to_vrun.append(pj_) else: jobs_to_vrun = self.get_prejobs() for pj_ in jobs_to_vrun: pj = copy.deepcopy(pj_) pj.input.cms = copy.copy(pj_.input.cms) cms_fname = pj.output.struct_file() pj.output.remove(cms_fname) pj.output.add(pj.input.cms[0]) pre_job.append(pj) self.add_jobs(self._create_job(pre_job)) self._print("debug", "Out Vrun.crunch") if self.param.skip_intermediates.val: # For skip intermediates workflow, the FepJobs getting passed to # vrun will not generate dE files for job in self.jobs: dE_fname = job.output.get("dE") if dE_fname: job.output.remove(dE_fname)
[docs]class FepVrun(VrunBase): """ """ NAME = "fep_vrun" PARAM = cmj._create_param_when_needed([ """ DATA = { simbox = off fep.output.interval = 0.0 time = inf } VALIDATE = { } """, ])
[docs] def __init__(self, *arg, **kwarg): """ """ super(FepVrun, self).__init__(*arg, **kwarg) # key is (fepid, i_win), value is [forward job, reversed job, current job,] self._fepvrun_job = defaultdict(list) self._parent = {} self._pack = "\\tar cfz"
def _get_parent(self, parent): """ """ if (parent not in self._parent): self._parent[parent] = copy.deepcopy(parent) return self._parent[parent] def _create_job_vrun(self, job_request): """ """ model_file, i_win, direction, parent = job_request pj = self._get_parent(parent) base_dir = cmj.ENGINE.base_dir num_allotted_devices = self._get_num_allotted_devices() # Gets the working param. param = self._get_working_param() # Sets i_window. param.fep.i_window.val = i_win + (1 if (direction == "f") else (-1 if (direction == "r") else 0)) self._print("debug", "Job task: %s" % pj.task) macro_dict = { "$LAMBDA": i_win, } jobname, jobdir = self._get_jobname_and_dir(pj, macro_dict) jobname += "_" + direction jobdir += "_" + direction sea.update_macro_dict({"$JOBNAME": jobname}) model = self._get_model(pj, model_file, param) new_job = self._create_single_job_common(pj, model, jobname, jobdir, param, num_allotted_devices) # TODO create FepVrunJob subclass of FepJob and give it this direction # attribute instead of just adding it here new_job.direction = direction self.add_jobs([new_job, pj])
[docs] def crunch(self): """ """ self._print("debug", "In FepVrun.crunch") fep_job = defaultdict(list) for pj in self.get_prejobs(): # TODO add cms to JobOutput/Input and to deepcopy? It's already # referenced within the class cms = pj.input.cms pj = copy.deepcopy(pj) pj.input.cms = cms if (isinstance(pj, FepJob)): fep_job[pj.fepid].append(pj) else: self._log( "WARNING: Simulation performed by %s seems not a FEP simulation." % str(pj)) self._log("WARNING: Ignore this job for FEP vrun.") if not fep_job: self._log("No FEP job was performed.") else: for fepid in fep_job: pre_fepjob = fep_job[fepid] if (len(pre_fepjob) < 2): self._log( "WARNING: Job %s: No peer jobs for the other windows." % str(pre_fepjob[0])) self._log("WARNING: Ignore this job for FEP vrun.") continue n_win = pre_fepjob[0].n_win index_map = defaultdict(list) to_be_del = [] pre_fepjob.sort(key=lambda x: x.i_win) for i, e in enumerate(pre_fepjob): index_map[e.i_win].append(i) for i_win in index_map: print(i_win, len(index_map[i_win])) new_job = [ ] # A list of (model_file, i_win, "f", parent) tuples for i, pj in enumerate(pre_fepjob): if i < n_win - 1: model_file = pre_fepjob[i + 1].input.cms[0] new_job.append((model_file, i, "f", pj)) if i > 0: model_file = pre_fepjob[i - 1].input.cms[0] new_job.append((model_file, i, "r", pj)) model_file = pre_fepjob[i].input.cms[0] new_job.append((model_file, i, "o", pj)) for e in new_job: self._create_job_vrun(e) self._print("debug", "Out FepVrun.crunch")
[docs] def hook_captured_successful_job(self, job): """ """ if (job.stage._ID != self._ID): return if not hasattr(job, 'direction'): return key = ( job.fepid, job.i_win, ) self._fepvrun_job[key].append(job) alljob = [] if (job.i_win == 0 or job.i_win == job.n_win - 1): if (2 == len(self._fepvrun_job[key])): alljob = self._fepvrun_job[key] else: if (3 == len(self._fepvrun_job[key])): alljob = self._fepvrun_job[key] if (alljob == []): return "dissolved" job_f = None job_r = None job_o = None for e in alljob: if (e.direction == "f"): job_f = e elif (e.direction == "o"): job_o = e elif (e.direction == "r"): job_r = e util.chdir(job_o.dir) pj = job_o.parent enes = [] time_name, pot_e_name = eneseq_col_names = ['time', 'E_p'] for job_d in (job_r, job_o, job_f): if not job_d: enes.append(None) else: fname_eneseq = job_d.output.get("ENESEQ") enes.append(fepana.parse_eneseq(fname_eneseq)[eneseq_col_names]) ene_r, ene_o, ene_f = enes assert ene_o is not None, "Original job must exist for all windows" ene_diffs = [ene_o[time_name]] for ene in (ene_r, ene_f): if ene is not None: assert ene.dtype == ene_o.dtype, \ "ene files have different datatypes, likely resulting " \ "from different columns" assert (ene[time_name] == ene_o[time_name]).all(), \ "ene timestamps differ" ene_diffs.append(ene[pot_e_name] - ene_o[pot_e_name]) else: ene_diffs.append(np.zeros_like(ene_o[pot_e_name])) diff_arr = np.stack(ene_diffs, axis=-1) # Because the gibbs plugin does not handle REST scaling, here we are # overwriting the .dE plugin output and replacing it with vrun energy # differences dE_fname = os.path.basename(pj.output.get("dE")) additional_header = "Output recreated from vrun .ene files" deltaE.write_dE(dE_fname, diff_arr, additional_header=additional_header) pj.output.add(os.path.abspath(dE_fname), tag="dE") return "dissolved"
[docs]class ConcatParam(sea.Map):
[docs] def update(self, ark=None, file=None, tag=set()): # noqa: M511 # because simulate is a list, it gets overwritten during update. # thus, we need to update each list parameter individually if isinstance(ark, sea.Map): if 'simulate' in ark and len(ark.simulate) == len(self.simulate): for ark_param, sim_param in zip(ark.simulate, self.simulate): sim_param.update(ark_param, tag=tag) self.simulate.add_tag(tag, propagate=False) ark = copy.deepcopy(ark) del ark.simulate super(ConcatParam, self).update(ark, file, tag)
[docs]class Concatenate(Simulate): # TODO do we want to allow gcmc in concatenated (relaxation) stages? NAME = 'concatenate' PARAM = cmj._create_param_when_needed([ config.CONCAT, ])
[docs] def __init__(self, *args, **kwargs): self._param = None super(Concatenate, self).__init__(*args, **kwargs)
@property def param(self): return self._param @param.setter def param(self, value): self._param = value if value is None: return self._param = ConcatParam() self._param.simulate = sea.List() # create a new list of blank simulate stages with default params # and initialize self._param.simulate for i in range(len(value.simulate)): self._param.simulate.append(copy.deepcopy(Simulate.PARAM.DATA)) self._param.update(value) for tag in value.tag(): self._param.update(sea.sea_filter(value, tag), tag=tag) last_time = 0.0 for sim_param in self._param.simulate: last_time += sim_param.time.val # handle params that need consistency between top-level and simulate # blocks self._handle_restraints() self._handle_gcmc() self._param.time.val = last_time self._param.time.add_tag('setbyuser') def _get_model(self, job: DesmondJob, fname: os.PathLike, param: sea.Map): # now verify with permanent restraints has_permanent_restrain = bool(job.permanent_restrain) for sim_block in self.param.simulate[1:]: msg = cmj.restraints_incompatible(sim_block, self.param, has_permanent_restrain) if msg: raise ValueError(msg) return super()._get_model(job, fname, param) def _handle_restraints(self): """ Enforce consistency between first simulate block's and top level block's restrain(ts)?$ parameters, and check that the subsequent simulate blocks' restrain(ts)?$ parameters are valid. """ # if first simulate block's restrain(ts)?$ is not 'none', and top level # restrain(ts)?$ is not set, we overwrite top level. This will allow # restraints to be added to the model properly. for name, not_none in (('restrain', lambda r: r.val != 'none'), ('restraints', lambda r: r.existing.val != 'ignore' or r.new)): first = getattr(self._param.simulate[0], name) if not_none(first): stage_level = getattr(self._param, name) if not_none(stage_level) and stage_level != first: raise ValueError( f"Different `{name}` cannot be set on both " "the concatenate stage and its first simulate " "subblock") setattr(self._param, name, first) # the frontend config will use the first restrain(ts)?$ only via # `self._param.restrain(ts)?$`, the backend config will use only # the simulate blocks restrain via # `self._param.simulate[i].restrain(ts)?$. We ensure here # that the first restrain is consistent between the two. setattr(self._param.simulate[0], name, copy.deepcopy(getattr(self._param, name))) def check_restraints(param): # verify for now without knowing if permanent restraints exist, we # will verify with permanent restraints again in `_get_model` msg = cmj.restraints_incompatible(param, self._param, None) if msg: raise ValueError(msg) # check top level param (for example to ensure no explicit restraints) check_restraints(self._param) # ensure that the subsequent restrain blocks are valid, ie they only # differ in force constant from the first block. for sim_block in self._param.simulate: check_restraints(sim_block) def _handle_gcmc(self): """ Require equivalence between the GCMC parameter of the top level block and the simulate blocks. Either the top level block or all simulate blocks can have GCMC parameter unset, they will be updated to match the other, but both cannot define GCMC parameter unless they are exactly equivalent. """ # all simulate blocks must be consistent get_gcmc_block = lambda param: (param.gcmc if "gcmc" in param.keys( tag="setbyuser") else None) sub_gcmc_blocks = [ get_gcmc_block(sim_param) for sim_param in self.param.simulate ] all_equal = all(x == sub_gcmc_blocks[0] for x in sub_gcmc_blocks) if not all_equal: raise ValueError("GCMC blocks in Concatenate substages must" " be identical") # if top block and subblocks define GCMC, enforce consistency # if top block or subblocks exclusively define GCMC, copy from defined # to undefined block top_gcmc_block = get_gcmc_block(self.param) if top_gcmc_block: for sim_param, sub_gcmc_block in zip(self.param.simulate, sub_gcmc_blocks): if not sub_gcmc_block: sim_param.gcmc = copy.deepcopy(top_gcmc_block) elif sub_gcmc_block != top_gcmc_block: raise ValueError("Top level GCMC block in Concatenate " "stage conflicts with substage GCMC block") elif sub_gcmc_blocks and sub_gcmc_blocks[0]: self.param.gcmc = copy.deepcopy(sub_gcmc_blocks[0])
[docs] def check_param(self): ev = super(Concatenate, self).check_param() if not ev.is_ok: return ev for sim_param in self.param.simulate: sim_stage = Simulate() sim_stage.param = sim_param ev = sim_stage.check_param() if not ev.is_ok(): return ev return ev
def _write_cfg(self, jobname, task, add_tag=config.TAG_SPECS.concat, tag="CONCAT", param=None): if param is None: # removes all tags param = copy.deepcopy(config.get_default_setting( self.param)).update(self.param) i_win = param.fep.i_window.val # necessary for getting proper tags on simulate params # use self.param to set param on substages, then they will create their # own param inside _write_cfg for i, sim_param in enumerate(self.param.simulate): sim_stage = Simulate() sim_stage.param = sim_param sim_stage.param.fep.i_window.val = i_win # necessary for GCMC substages sim_stage.check_param() sim_cfg = sim_stage._write_cfg(jobname, task) with open(sim_cfg) as f: sim_param = sea.Map(f.read()) param.simulate[i] = sim_param return super(Concatenate, self)._write_cfg(jobname, task, add_tag, tag, param) def _effect(self, param): if 'simulate' in param: for i, sim_param in enumerate(param.simulate): if isinstance(sim_param.effect_if, sea.List): param.simulate[i] = super(Concatenate, self)._effect(sim_param) return super(Concatenate, self)._effect(param)
[docs] def migrate_param(self, param: sea.Map): if 'simulate' in param: for sim_param in param.simulate: super().migrate_param(sim_param) return super().migrate_param(param)
[docs]class ReInit(SimulateBase): NAME = 'reinit' PARAM = cmj._create_param_when_needed([config.REINIT]) def _write_cfg(self, jobname, task, add_tag=config.TAG_SPECS.reinit, tag="REINIT", param=None): return super()._write_cfg(jobname, task, add_tag, tag, param) def _get_input(self, default_model: cms.Cms, pj: DesmondJob, param: sea.Map, jobname: str) -> ([str], [str], [cms.Box]): """ Create input files as needed and return a list of the filenames created This function can modify `param`, so should be executed before anything that depends on `param`. :param default_model: The base model from previous job, can be superseded in subclasses :param pj: The previous job :param param: Settings for this job. Can be updated :param jobname: The jobname of this job """ cms_fnames, additional_inputs, boxes = super()._get_input( default_model, pj, param, jobname) DOLLAR_RCF = "$REINIT_CONFIGURATIONS_FILE" if param.reinit_frameset.configurations_file.val == DOLLAR_RCF: # use parent job trajectory prev_traj = pj.output.get("TRJ") traj_name = os.path.basename(prev_traj) sea.update_macro_dict({DOLLAR_RCF: traj_name}) # this replaces $REINIT_CONFIGURATIONS_FILE with `traj_name` param.reinit_frameset.configurations_file.val = \ param.reinit_frameset.configurations_file.val util.symlink(prev_traj, traj_name) additional_inputs.append(traj_name) return (cms_fnames, additional_inputs, boxes)
[docs] def collect_inputfile(self) -> [str]: """ Called from `cmj.collect_inputfile()` during startup. :return: List of input file names. """ return []
[docs] def crunch(self): self._print("debug", "in reinit.crunch") pre_jobs = [] for prejob in self.get_prejobs(): new_prejob = DesmondBackendJob( task=prejob.task, gid=prejob.gid, permanent_restrain=prejob.permanent_restrain, permanent_group=prejob.permanent_group, jobname=prejob.jobname, parent=prejob.parent, stage=prejob.stage, dir=prejob.dir, jlaunch_cmd=None) new_prejob.output.set_struct_file(prejob.output.struct_file()) new_prejob.output.add(prejob.output.get('TRJ'), tag='TRJ') pre_jobs.append(new_prejob) self.add_jobs(self._create_job(pre_jobs)) self._print("debug", "out reinit.crunch")