Source code for schrodinger.application.watermap.watermap_inp

"""

This is a class that generates watermap input files and runs them through jobcontrol. This also supports all the options in GUI.

Copyright Schrodinger, LLC. All rights reserved.

"""

# Contributors: Byungchan Kim

import argparse
import os
import sys
from typing import Optional

import schrodinger.application.desmond.cmj as cmj
import schrodinger.application.desmond.envir as envir
import schrodinger.application.desmond.smarts as smarts
# stage.py needs to be imported for cmj module to know the stages
import schrodinger.application.desmond.stage as stage  # noqa: F401
import schrodinger.application.desmond.system_builder_util as system_builder_util
import schrodinger.job.jobcontrol as jobcontrol
import schrodinger.structure as structure
import schrodinger.structutils.transform as transform
import schrodinger.utils.cmdline as cmdline
from schrodinger.application.desmond import constants
from schrodinger.infra import mm

FFLD14 = mm.OPLS_NAME_F14
FFLD16 = mm.OPLS_NAME_F16
_SOLVATE_POCKET_TITLE = "GCMC solvate pocket"

# Define water SMARTS patterns
water_smarts = ["[H]O[H]"]
zob_water_smarts = ["[H]O([H])_[*]", "[H]O[H]_[*]"]





[docs]class WaterMapInput:
[docs] def __init__(self, protein_st, ligand_st, **kwargs): self._protein = protein_st self._ligand = ligand_st self._solute = None self._crystal_water = None self.retain_ligand = False self.do_not_truncate = False self.ligand_distance = 10.0 self.truncate_distance = 20.0 self.treat_solvent = 'as solvent' self.simulation_time = 2.0 self.do_not_return_trajectory = False self.solvent_buffer_distance = 5.0 self.has_membrane = False self.turn_off_solvate_pocket = False self.twobody = False self.forcefield = FFLD16 self.num_solvate_pocket = 1 self.extended_gcmc = False self.update(**kwargs)
[docs] def update(self, **kwargs): ''' updates states of options ''' if 'retain_ligand' in kwargs: self.retain_ligand = kwargs['retain_ligand'] if 'do_not_truncate' in kwargs: self.do_not_truncate = kwargs['do_not_truncate'] if 'ligand_distance' in kwargs: self.ligand_distance = kwargs['ligand_distance'] if 'truncate_distance' in kwargs: self.truncate_distance = kwargs['truncate_distance'] if 'simulation_time' in kwargs: self.simulation_time = kwargs['simulation_time'] if 'treat_solvent' in kwargs: self.treat_solvent = kwargs['treat_solvent'] if 'do_not_return_trajectory' in kwargs: self.do_not_return_trajectory = kwargs['do_not_return_trajectory'] if 'solvent_buffer_distance' in kwargs: print( "solvent_buffer_distance is ignored. 'ligand_distance' should be used to set solvent buffer distance." ) if 'has_membrane' in kwargs: self.has_membrane = kwargs['has_membrane'] if 'turn_off_solvate_pocket' in kwargs: self.turn_off_solvate_pocket = kwargs['turn_off_solvate_pocket'] if 'twobody' in kwargs: self.twobody = kwargs['twobody'] if 'forcefield' in kwargs: self.forcefield = kwargs['forcefield'] if 'num_solvate_pocket' in kwargs: self.num_solvate_pocket = kwargs['num_solvate_pocket'] if 'extended_gcmc' in kwargs: self.extended_gcmc = kwargs['extended_gcmc'] if self.do_not_truncate: self.truncate_distance = 0.0
[docs] def reorder_solute(self, st): ''' reorder atoms so that the very first atom is located close to origin to prevent solute to be shifted to other images. ''' new_st = st.copy() reorder_array = [0] for i in range(self._solute.atom_total): reorder_array.append(i + 1) min_dist = 1.0E10 min_index = 0 for a in self._solute.atom: if "b_wmap_ligand" in a.property and a.property["b_wmap_ligand"]: continue dist = a.x * a.x + a.y * a.y + a.z * a.z if dist < min_dist: min_dist = dist min_index = a.index reorder_array[1] = min_index reorder_array[min_index] = 1 mm.mmct_ct_reorder(new_st, st, reorder_array) return new_st
[docs] def createTip4pFfio(self, ct): mm.mmffio_initialize(mm.MMERR_DEFAULT_HANDLER) mmshare_exec = os.environ.get('MMSHARE_EXEC') # FIXME this will not work on Windows: mmshare_data = mmshare_exec + '/../../data/system_builder' tip4p_fname = os.path.join(mmshare_data, 'tip4p.box.mae') st_reader = structure.StructureReader(tip4p_fname) st = next(st_reader) ff = mm.mmffio_ff_mmct_get(st.handle) mm.mmffio_ff_mmct_put(ff, ct) mm.mmffio_terminate()
[docs] def make_canonical_solvent(self, st): ''' Fix atomic order to O H H ''' new_st = st.copy() reorder = [0] for m in st.molecule: for i in m.getAtomIndices(): reorder.append(i) mm.mmct_ct_reorder(new_st, st, reorder) return new_st
[docs] def prepareStructures(self): ''' Prepare -solute, -protein, -ligand.mae files. ''' # tag ligand atoms when it is retained if self.retain_ligand: for a in self._ligand.atom: a.property["b_wmap_ligand"] = True st = system_builder_util.truncateProtein(self._protein, self._ligand, self.retain_ligand, self.truncate_distance) if self.truncate_distance > 0.0: for a in st.atom: if "b_wmap_ligand" in a.property and a.property["b_wmap_ligand"]: continue else: a.property[constants.FF_USE_EXISTING_CHARGE] = 1 if self.retain_ligand: st.property["b_wmap_retain_ligand"] = True # Allow some structural waters to be kept when running WaterMap [Ev:72892] st.property[constants.CT_TYPE] = constants.CT_TYPE.VAL.SOLUTE self._ligand.property[constants.CT_TYPE] = constants.CT_TYPE.VAL.LIGAND self._protein.property[ constants.CT_TYPE] = constants.CT_TYPE.VAL.RECEPTOR water_atoms = smarts.evaluate_net_smarts(st, water_smarts, zob_water_smarts) if len(water_atoms): # Separate water to crystal water CT tmp_water_st = st.extract(water_atoms) self._crystal_water = self.make_canonical_solvent(tmp_water_st) self._crystal_water.property['i_ffio_num_component'] = 1 if self.treat_solvent == 'as solvent': self._crystal_water.property[ constants.CT_TYPE] = constants.CT_TYPE.VAL.CRYSTAL_WATER self.createTip4pFfio(self._crystal_water.handle) for a in self._crystal_water.atom: a.pdbres = 'T4P ' # remove water from protein del_atoms = smarts.evaluate_net_smarts(self._protein, water_smarts, zob_water_smarts) self._protein.deleteAtoms(del_atoms) # remove water from solute st.deleteAtoms(water_atoms) # Restrain solute water elif self.treat_solvent == 'as solute': self._crystal_water = None # Remove water from solute and protein elif self.treat_solvent == 'delete': self._crystal_water = None # remove water from protein del_atoms = smarts.evaluate_net_smarts(self._protein, water_smarts, zob_water_smarts) self._protein.deleteAtoms(del_atoms) # remove water from solute st.deleteAtoms(water_atoms) # set all solute waters including zero-order bonded waters # with T3P resname and restraint water_atoms = smarts.evaluate_net_smarts(st, water_smarts, []) for i in water_atoms: a = st.atom[i] a.pdbres = 'T3P ' if a.atomic_number > 1: a.property['i_wmap_restrain'] = 1 self._solute = st
def _writeStructures(self, jobname): ''' write structures as mae files ''' if not self._solute: try: self.prepareStructures() except Exception: raise if (not self._protein) or (self._protein.atom_total == 0): raise RuntimeError('Protein atoms do not exist.') if (not self._ligand) or (self._ligand.atom_total == 0): raise RuntimeError('Ligand atoms do not exist.') centroid = [0.0, 0.0, 0.0] if not self.has_membrane: centroid = transform.get_centroid(self._ligand) x = -centroid[0] y = -centroid[1] z = -centroid[2] transform.translate_structure(self._ligand, x, y, z) self._ligand.title = "sampling_coordinates" self._ligand.property['r_watermap_trans1'] = -x self._ligand.property['r_watermap_trans2'] = -y self._ligand.property['r_watermap_trans3'] = -z transform.translate_structure(self._solute, x, y, z) self._solute.property['r_watermap_trans1'] = -x self._solute.property['r_watermap_trans2'] = -y self._solute.property['r_watermap_trans3'] = -z if self._crystal_water: transform.translate_structure(self._crystal_water, x, y, z) self._crystal_water.property['r_watermap_trans1'] = -x self._crystal_water.property['r_watermap_trans2'] = -y self._crystal_water.property['r_watermap_trans3'] = -z if (not self._solute) or (self._solute.atom_total == 0): raise RuntimeError( 'Truncation distance is too short to include any protein atoms. Please increase the truncation distance.' ) self._solute = self.reorder_solute(self._solute) self._solute.write(self.in_fname) self._ligand.append(self.in_fname) self._protein.append(self.in_fname) if self._crystal_water: self._crystal_water.append(self.in_fname)
[docs] def writeMSJ(self) -> Optional[str]: """ Write multisim file to the returned str. If there was a problem generating the msj, return None. """ # TODO should we support extended gcmc in the future or remove this # entirely? assert not self.extended_gcmc, "Extended GCMC is currently not " \ "supported" if self.extended_gcmc: template_fname = os.path.join(envir.CONST.MMSHARE_DATA_DESMOND_DIR, "watermap_extended_template.msj") else: template_fname = os.path.join(envir.CONST.MMSHARE_DATA_DESMOND_DIR, "watermap_gpu_template.msj") with open(template_fname) as f: s = f.read() s = s.replace('$SIMULATION_TIME', str(self.simulation_time * 1000.0)) s = s.replace('$LIGAND_DISTANCE', str(self.ligand_distance)) all_stages = cmj.parse_msj(None, msj_content=s) water_model = None for stg in all_stages: if stg.NAME == "system_builder" or stg.NAME == "build_geometry": if self.has_membrane: stg.param.should_skip.add_tag("setbyuser") stg.param.should_skip.val = True if self.treat_solvent == "as solute": stg.param.distil_solute.add_tag("setbyuser") stg.param.distil_solute.val = False if stg.NAME == "build_geometry" and self.forcefield != FFLD14: stg.param.override_forcefield.add_tag("setbyuser") stg.param.override_forcefield.val = self.forcefield if stg.NAME == "assign_forcefield": if self.forcefield != FFLD14: stg.param.forcefield.add_tag("setbyuser") stg.param.forcefield.val = self.forcefield water_model = stg.param.water.val # the GCMC solvate_pocket replacement cannot be identified by stg.NAME # which is simply "simulate" if stg.param.title.val == _SOLVATE_POCKET_TITLE: if self.turn_off_solvate_pocket: stg.param.should_skip.add_tag("setbyuser") stg.param.should_skip.val = True else: water_model = water_model or 'SPC' timestep = int(stg.param.timestep.val[0] * 1000) key = (water_model, timestep) if key not in constants.GCMC_CHEMICAL_POTENTIALS: print( f"ERROR: Water model {water_model} not calibrated for GCMC at a {timestep} fs timestep. " "Turn off solvate pocket to run with this model.") return None mu_excess, solvent_density = constants.GCMC_CHEMICAL_POTENTIALS[ key] stg.param.gcmc.mu_excess.val = mu_excess stg.param.gcmc.mu_excess.add_tag("setbyuser") stg.param.gcmc.solvent_density.val = solvent_density stg.param.gcmc.solvent_density.add_tag("setbyuser") if stg.NAME == "watermap_post_analysis" and self.do_not_return_trajectory: stg.param.do_not_return_trajectory.add_tag("setbyuser") stg.param.do_not_return_trajectory.val = True if stg.NAME == "watermap_cluster": if self.twobody: stg.param.twobody.add_tag("setbyuser") stg.param.twobody.val = 1 if stg.NAME == "simulate" and self.extended_gcmc: try: stg.param.backend.mdsim.plugin.SpatialActiveSite # production stage stg.param.time.val = 200.0 except AttributeError: pass s = cmj.write_msj(all_stages, to_str=True) return s
[docs] def write(self, jobname, suffix="maegz", hostname=None, cpus=None): """ Call all write functions :param jobname: The name of the job in the command example :type jobname: str :param suffix: The file extension for the input file :type suffix: str :param hostname: The name of the host in the command example :type hostname: str :param cpus: The number of cpus in the command example :type cpus: int """ self.in_fname = jobname + "-in." + suffix self.msj_fname = jobname + '.msj' self._writeStructures(jobname) host_and_cpus_args = "" if hostname and cpus: host_and_cpus_args = " -HOST %s -cpu %s" % (hostname, str(cpus)) s = self.writeMSJ() if not s: raise RuntimeError("Could not generate msj for job submission.") s += '# command example:\n' s += '# $SCHRODINGER/watermap -JOBNAME %s%s -m %s %s\n\n' % ( jobname, host_and_cpus_args, self.msj_fname, self.in_fname) fh = open(self.msj_fname, "w") print(s, file=fh) fh.close()
[docs] def run(self, jobname, host, cpu): ''' Run WaterMap Job ''' watermap_cmd = os.path.join(os.environ["SCHRODINGER"], "watermap") cmd_line = [ watermap_cmd, "-JOBNAME", jobname, "-HOST", host, "-cpu", str(cpu), "-m", self.msj_fname, self.in_fname, ] job = jobcontrol.launch_job(cmd_line) if not job: raise RuntimeError( "Desmond submission failed. Please check the accessibility of the cluster machine and of Desmond engine over there." ) return job
[docs]def main(opt): if not os.path.exists(opt.protein): print('%s does not exists.' % opt.protein) print('Use -protein to specify a protein file name.') sys.exit(1) if not os.path.exists(opt.ligand): print('Ligand, %s, does not exists.' % opt.ligand) print('Use -ligand to specify a ligand file name.') sys.exit(1) if opt.extended_gcmc: print('WaterMap does not currently support the extended_gcmc option.') sys.exit(1) protein_st = None for st in structure.StructureReader(opt.protein): if protein_st: protein_st.extend(st) else: protein_st = st ligand_st = None for st in structure.StructureReader(opt.ligand): if ligand_st: ligand_st.extend(st) else: ligand_st = st wm_inp = WaterMapInput( protein_st, ligand_st, retain_ligand=opt.retain_ligand, do_not_truncate=opt.do_not_truncate, ligand_distance=opt.ligand_distance, truncate_distance=opt.truncate_distance, treat_solvent=opt.treat_solvent, simulation_time=opt.simulation_time, do_not_return_trajectory=opt.do_not_return_trajectory, turn_off_solvate_pocket=opt.turn_off_solvate_pocket, twobody=opt.twobody, forcefield=opt.forcefield, num_solvate_pocket=opt.num_solvate_pocket, extended_gcmc=opt.extended_gcmc) jobname = opt.jobname if not jobname: jobname = 'watermap' host_list = jobcontrol.get_command_line_host_list() host = host_list[0][0] if (not host): host = 'localhost' wm_inp.write(jobname) if not opt.do_not_run: job = wm_inp.run(jobname, opt.host, opt.cpu) print(" JobId : %s" % job.job_id)
if (__name__ == "__main__"): usage = ''' $SCHRODINGER/run -FROM watermap %prog -protein protein.mae -ligand ligand.mae [options] Description: %prog prepares and submits WaterMap Jobs Example to retain ligand: %prog -JOBNAME watermap_test -protein protein.mae -ligand ligand.mae -retain_ligand ''' parser = cmdline.SingleDashOptionParser(usage) jc_options = [ cmdline.HOST, ] parser.add_option('-JOBNAME', dest='jobname', default='watermap', help='JOBNAME') parser.add_option('-HOST', dest='host', default='localhost', help='host name') parser.add_option('-cpu', dest='cpu', default=1, help='# of cpus') parser.add_option('-protein', dest='protein', default='', help='protein file name') parser.add_option('-ligand', dest='ligand', default='', help='ligand file name') parser.add_option('-retain_ligand', action='store_true', dest='retain_ligand', default=False, help='to retain ligand') parser.add_option('-do_not_truncate', action='store_true', dest='do_not_truncate', default=False, help='do no truncate protein') parser.add_option('-ligand_distance', dest='ligand_distance', default=10.0, help='binding site definition') parser.add_option('-truncate_distance', dest='truncate_distance', default=20.0, help='protein truncation distance') parser.add_option('-treat_solvent', dest='treat_solvent', default='as solvent', help='"as solvent", "as solute", or "delete"') parser.add_option('-simulation_time', dest='simulation_time', default=2.0, help='simulation time in ns') parser.add_option('-do_not_return_trajectory', action='store_true', dest='do_not_return_trajectory', default=False, help='do not return trajectory') parser.add_option('-solvent_buffer_distance', dest='solvent_buffer_distance', default=10.0, type='float', action='callback', callback=print_deprecate, help="""solvent buffer distance to solvate protein\n (This option has been DEPRECATED, please use 'ligand_distance' option)""" ) parser.add_option( '-do_not_run', action='store_true', dest='do_not_run', default=False, help='do not run a job and stop after writing input files') parser.add_option( '-continuous', action='callback', dest='continuous', default=False, callback=print_deprecate, help= "This option is not needed anymore. Continuous WM runs automatically.") parser.add_option('-twobody', action='store_true', dest='twobody', default=False, help='calculate twobody term') parser.add_option('-forcefield', dest='forcefield', default=FFLD14, help=(FFLD14 + " or " + FFLD16)) parser.add_option('-num_solvate_pocket', dest='num_solvate_pocket', default=1, help='number of solvate_pocket outputs') parser.add_option( '-extended_gcmc', action='store_true', dest='extended_gcmc', default=False, help= 'Turn on extended gcmc protocol. The protocol is set to run more thorough GCMC sampling and generate 20 representative structures. Relaxation and production simulations continue on each output structure to obtain total of 4 ns trajectory and analysis is performed on the trajectory.' ) # This value is ignored, use_gpu is assumed by the code parser.add_option('-use_gpu', action='store_true', help=argparse.SUPPRESS) options, args = parser.parse_args() main(options)