Source code for schrodinger.application.desmond.fep_scholar_msj_generator

import os
import re
import shutil

import schrodinger.application.desmond.cmj as cmj
import schrodinger.utils.sea as sea
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import stage

# Here to make sure import is not removed
_ = stage.Task

MMSHARE_DIR = os.path.dirname(os.path.dirname(os.environ["MMSHARE_EXEC"]))
DESMOND_DATA_DIR = os.path.join(MMSHARE_DIR, "data", "desmond")

ENSEMBLE = {'NVT': "desmond_nvt_fep.msj", 'NPT': "desmond_npt_fep.msj"}


[docs]class FEPScholarMsjGenerator(object): """ This class generates main msj file that manages both solvent and complex legs of the FEP simulation. """
[docs] def __init__(self, jobname, cd_params, structure_fname, force_field="OPLS_2005", ensemble="NPT", sim_time="5000", rand_seed="2015", complex_box_buffer_width=5.0, trj_interval=24.0): """ Valid FF : OPLS_2005 CHARMM AMBER """ self.jobname = jobname self.cd_params = cd_params self.structure_fname = structure_fname self.force_field = force_field self.ensemble = ENSEMBLE[ensemble] self.sim_time = sim_time self.rand_seed = rand_seed self.trj_interval = trj_interval self.from_main_msj = os.path.join(DESMOND_DATA_DIR, "fep_scholar_launcher.msj") # By default we should be running npt self.from_subjob_msj = os.path.join(DESMOND_DATA_DIR, self.ensemble) self.main_msj_fname = self.jobname + ".msj" self.complexmsj_fname = self.jobname + "_complex.msj" self.solventmsj_fname = self.jobname + "_solvent.msj" self.pv_fname = self.jobname + '_pv.maegz' # always use predefined REST selection region self.asl = f"asl: atom.{constants.REST_HOTREGION} 1" self.solvent_box_buffer_width = 10.0 self.complex_box_buffer_width = complex_box_buffer_width
[docs] def write_complex_msj(self): """ """ self.generate_subjob_msj(self.complexmsj_fname, self.complex_box_buffer_width) return self.complexmsj_fname
[docs] def write_solvent_msj(self): """ """ # Note we are using solvent box of 5 angstroms too since it will be a # large protein fragment self.generate_subjob_msj(self.solventmsj_fname, self.solvent_box_buffer_width, solvent_leg=True) return self.solventmsj_fname
[docs] def write_main_msj(self): """ """ shutil.copyfile(self.from_main_msj, self.main_msj_fname) self.generate_main_msj() return self.main_msj_fname
[docs] def generate_subjob_msj(self, fname, buffer_width, solvent_leg=False): with open(self.from_subjob_msj) as fh: subjobmsj_contents = fh.read() if solvent_leg: subjobmsj_contents = re.sub( "buffer_width *= *5.0", " buffer_width = {bf}".format(bf=buffer_width), subjobmsj_contents, count=1) subjobmsj_contents = re.sub( "extract_structures *{", "extract_structures {\n keep = [ligand]", subjobmsj_contents, count=1) with open(fname, 'w') as fh: fh.write(subjobmsj_contents) raw = cmj.msj2sea_full(fname) # Default is 5.0 if it is larger then we set it if float(self.complex_box_buffer_width) > 5.0: self.set_box_buffer_width(raw, self.complex_box_buffer_width) self.set_lambda_hopping(raw) self.set_ff(raw) cmj.write_sea2msj(raw.stage, fname)
[docs] def generate_main_msj(self): """ """ # 'raw' is an object of sea.Sea (or its subclass). raw = cmj.msj2sea_full(self.main_msj_fname) self.modify_multisim_stage(raw) cmj.write_sea2msj(raw.stage, self.main_msj_fname)
[docs] def modify_multisim_stage(self, raw): """ Method modifies 'multisim' stage in the main msj file """ for stage in raw.stage: if stage.__NAME__ == "multisim": multisim_stage = stage break else: raise Exception("no multisim stage in the msj file") multisim_stage.job[0][1].val = self.complexmsj_fname multisim_stage.job[1][1].val = self.solventmsj_fname multisim_stage.job[0][3].val = self.cd_params['processors_per_replica'] multisim_stage.job[1][3].val = self.cd_params['processors_per_replica'] subhost_ncpu = "$SUBHOST:%d" % int(self.cd_params['cpus']) umbrella_options = \ [sea.Atom(x) for x in ["-HOST", subhost_ncpu, "-mode", "umbrella"]] multisim_stage.job[0].extend(umbrella_options) multisim_stage.job[1].extend(umbrella_options)
[docs] def get_total_proc(self): return self.cd_params["cpus"]
[docs] @staticmethod def set_box_buffer_width(raw, width): """ """ for stage in raw.stage: if stage.__NAME__ == "build_geometry": stage["buffer_width"] = width stage.buffer_width.add_tag("setbyuser") break
[docs] def set_ff(self, raw): for stage in raw.stage: if stage.__NAME__ == "assign_forcefield": assign_forcefield_stage = stage break else: raise Exception("No assign_forcefield stage in the msj file") assign_forcefield_stage["forcefield"] = self.force_field assign_forcefield_stage.forcefield.add_tag("setbyuser") assign_forcefield_stage["core_hopping_fepio"] = "off" assign_forcefield_stage.core_hopping_fepio.add_tag("setbyuser")
[docs] def set_lambda_hopping(self, raw): # Finds the last ``simulate'' or ``lambda_hopping'' stage. last_simulate = None for stage in raw.stage: if stage.__NAME__ in ["simulate", "lambda_hopping"]: last_simulate = stage # Renames the last simulate stage to lambda_hopping. If it is # already lambda_hopping, we just assign the same name back to # itself. It doesn't matter. last_simulate.__NAME__ = "lambda_hopping" last_simulate["solute_tempering"] = sea.Map(""" atom = "%s" temperature = { generator = PvdS exchange_probability = 0.3 } """ % self.asl) last_simulate["time"] = self.sim_time last_simulate["trajectory"]["interval"] = self.trj_interval last_simulate["trajectory"].add_tag("setbyuser") last_simulate["randomize_velocity"]["seed"] = self.rand_seed last_simulate["randomize_velocity"].add_tag("setbyuser") last_simulate.solute_tempering.add_tag("setbyuser") last_simulate["time"].add_tag("setbyuser")