Source code for schrodinger.application.desmond.stage.prepare.structure

import glob
import os
import shutil
from pathlib import Path
from typing import List
from typing import Optional
from typing import Tuple

import numpy as np
from rdkit import Chem
from rdkit.Chem.rdFMCS import FindMCS

from schrodinger import structure
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import envir
from schrodinger.application.desmond import move_alchem_ions
from schrodinger.application.desmond import solubility_utils
from schrodinger.application.desmond import struc
from schrodinger.application.desmond import system_builder_inp as sbi
from schrodinger.application.desmond import util
from schrodinger.application.desmond.stage.jobs import DesmondJob
from schrodinger.infra import mm
from schrodinger.structutils import ringspear
from schrodinger.structutils import transform
from schrodinger.thirdparty import rdkit_adapter
from schrodinger.utils import fileutils
from schrodinger.utils import sea
from schrodinger.utils import subprocess

__all__ = [
    'BuildGeometry', 'ExtractStructures', 'ExtractSoluteStructure',
    'HashStructureTitle', 'DisorderedSystemBuilder', 'ProteinMutationGenerator',
    'ReplicateStructure', 'GroupWaters'
]


[docs]class BuildGeometry(cmj.StageBase): """ """ NAME = "build_geometry" BG_CMD = os.path.join(envir.CONST.SCHRODINGER_UTILITIES, "system_builder") SOLVENT = { "SPC": "spc.box.mae", "SPCFW": "spcfw.box.mae", "TIP4P": "tip4p.box.mae", "TIP4PEW": "tip4pew.box.mae", "TIP3P": "tip3p.box.mae", "WATER": "spc.box.mae", "OCTANOL": "octanol.box.mae", "METHANOL": "methanol.box.mae", "DMSO": "dmso.box.mae", "TIP5P": "tip5p.box.mae", "TIP4PD": "tip4pd.box.mae", "SPCE": "spce.box.mae", "TIP4P2005": "tip4p2005.box.mae" } DEFAULT_ALCHEMICAL_EXCLUDE_DIST = 5 PARAM = cmj._create_param_when_needed(""" DATA = { csb_file = "" solvent = water solvent_file = ? distil_solute = true solvate_system = true neutralize_system = true add_alchemical_ions = false buffer_width = 10.0 ion_awayfrom = [] ion_awaydistance = 10.0 rezero_system = true minimize_volume = false scale_solvent_vdw = 1.0 # or 0.75 for GCMC/Solvate jobs box_shape = cubic preserve_ffio = true preserve_box = false only_merge_ct = [] rebuild_cms = off remove_overlapped_crystal_water = false remove_overlapped_solvent = true override_forcefield = OPLS_2005 box = { shape = "orthorhombic" # cubic | triclinic | orthorhombic size = [10.0 10.0 10.0] # more or less numbers depending on the shape size_type = "buffer" # "absolute" | "buffer" } membrane_box = { lipid = POPC size = [10.0 10.0 10.0] } salt = { positive_ion = Na negative_ion = Cl concentration = 0.15 } add_counterion = { ion = Na number = 0 where = [] } } VALIDATE = { csb_file = {type = str _check = multisim_file_exists} solvent = [ {type = enum range = [WATER SPC TIP4P TIP4PEW TIP5P TIP3P TIP4PD SPCE TIP4P2005 OCTANOL METHANOL DMSO FILE]} ] solvent_file = [ {type = str _check = multisim_file_exists} {type = none} ] distil_solute = [ {type = bool} {type = str } ] solvate_system = {type = bool} preserve_ffio = {type = bool} preserve_box = {type = bool} neutralize_system = {type = bool} add_alchemical_ions = [ {type = bool} # FIXME: The alchemical water approach has been evolving. Once the basic # method is mature, we should clean up the parameters. # defaults are defined in move_alchem_ions.py, these only need to be # set to change the defaults {map_to_neutral_atoms = {type = bool} use_alchemical_water = {type = bool} } ] buffer_width = [ {type = float range = [0.0 inf]} {type = list size = 3 elem = {type = float range = [0.0 inf]}} ] ion_awayfrom = {type = list size = 0 elem = {type = int1}} ion_awaydistance = {type = float range = [0.0 inf]} rezero_system = {type = bool} minimize_volume = {type = bool} box_shape = {type = enum range = [cubic orthorhombic]} scale_solvent_vdw = {type = float+} only_merge_ct = {type = list size = 0 elem = {type = int1}} rebuild_cms = [ {type = bool} {membrane = {type = str} solvent = {type = str} } {membrane = {type = str} } {solvent = {type = str} } ] remove_overlapped_crystal_water = {type = bool} remove_overlapped_solvent = {type = bool} override_forcefield = {type = str _check = check_forcefield} box = [ {type = enum range = [read]} { shape = {type = enum range = [orthorhombic cubic triclinic dodecahedron_hexagon dodecahedron_square truncated_octahedron]} size = [{type = list size = 3 elem = {type = float+}} {type = float+} {type = list size = 6 elem = {type = float+}}] size_type = {type = enum range = [buffer absolute]} _mapcheck = check_box_size } ] membrane_box = { lipid = {type = enum range = [DPPC POPC DMPC POPE]} size = {type = list size = 3 elem = {type = float+}} } salt = { positive_ion = {type = enum range = [Ca2 Fe2 Fe3 K Li Mg2 Na Zn2]} negative_ion = {type = enum range = [Br Cl F I]} concentration = {type = float+} } add_counterion = { ion = {type = enum range = [Ca2 Fe2 Fe3 K Li Mg2 Na Zn2 Br Cl F I]} number = [{type = int0} {type = enum range = ["neutralize_system"]}] where = {type = list size = 0 elem = {type = list size = 0 elem = {type = int1}}} } } """)
[docs] def __init__(self, *arg, **kwarg): """ """ cmj.StageBase.__init__(self, *arg, **kwarg) # self._csb = sbi.SystemBuilderInput()
# __init__ @staticmethod def _write_ffio_structures(cts: List[cms.ffiostructure.FFIOStructure], fname: str): """ Write FFIOStructures to the file """ cts[0].write(fname, format="CMS") for ct in cts[1:]: ct.append(fname, format="CMS") def _set_csb(self, pj: cmj.Job): """ """ csb = self._csb def _csb_file(param): fname = param.val if (fname != ""): try: fh = open(fname, "r") s = fh.read() fh.close() except IOError: raise IOError("Reading the .csb file failed: %s" % fname) csb.read_text(s) def _buffer_width(param): bw = param if (isinstance(bw, sea.List)): csb.a, csb.b, csb.c = bw[0].val, bw[1].val, bw[2].val else: csb.a, csb.b, csb.c = bw.val, bw.val, bw.val def _box(param): if param.val == 'read': output = pj.output.struct_file() box = struc.find_box(structure.StructureReader(output)) if box is None: raise ValueError("Could not read box info from input " "structures") box = np.reshape(box, (3, 3)) a = transform.get_vector_magnitude(box[0]) b = transform.get_vector_magnitude(box[1]) c = transform.get_vector_magnitude(box[2]) alpha = np.degrees( transform.get_angle_between_vectors(box[1], box[2])) beta = np.degrees( transform.get_angle_between_vectors(box[0], box[2])) gamma = np.degrees( transform.get_angle_between_vectors(box[0], box[1])) size = [a, b, c, alpha, beta, gamma] # All boxes can be represented by a triclinic box # but the config code will treat orthorhombic and # triclinic boxes differently, so check for it here. if np.allclose([alpha, beta, gamma], [90, 90, 90]): bc = "orthorhombic" else: bc = 'triclinic' else: bc = param.shape.val size = param.size.val if (bc == "cubic"): csb.setBoundaryCondition(boundary_condition="cubic", a=size) elif (bc == "orthorhombic"): csb.setBoundaryCondition(boundary_condition="orthorhombic", a=size[0], b=size[1], c=size[2]) elif (bc == "triclinic"): csb.setBoundaryCondition(boundary_condition="triclinic", a=size[0], b=size[1], c=size[2], alpha=size[3], beta=size[4], gamma=size[5]) elif (bc == "dodecahedron_square"): csb.setBoundaryCondition( boundary_condition="dodecahedron_square", a=size) elif (bc == "dodecahedron_hexagon"): csb.setBoundaryCondition( boundary_condition="dodecahedron_hexagon", a=size) elif (bc == "truncated_octahedron"): csb.setBoundaryCondition( boundary_condition="truncated_octahedron", a=size) if param.val == 'read' or param.size_type.val == "absolute": csb.setBoundaryCondition(use_buffer=0) else: csb.setBoundaryCondition(use_buffer=1) def _membrane_box(param): size = param.size.val csb.setMembranize() csb.setMembrane(param.lipid.val + ".mae.gz") csb.setBoundaryCondition(boundary_condition="orthorhombic", a=0.0, b=0.0, c=size[2]) csb.setMembranePatch(size[0], size[1]) csb.setBoundaryCondition(use_buffer=1) def _salt(param): csb.setSaltPositiveIon(param.positive_ion.val + ".mae") csb.setSaltNegativeIon(param.negative_ion.val + ".mae") csb.addSalt(param.concentration.val) def _add_counterion(param): ion = param.ion.val num = param.number.val where = param.where.val if (ion in [ "Ca2", "Fe2", "Fe3", "K", "Li", "Mg2", "Na", "Zn2", ]): csb.setPositiveIon(ion + ".mae") if (isinstance(num, str) and num == "neutralize_system"): csb.setNeutralize(True) else: csb.addIons("positive", num) elif (ion in [ "Br", "Cl", "F", "I", ]): csb.setNegativeIon(ion + ".mae") if (isinstance(num, str) and num == "neutralize_system"): csb.setNeutralize(True) else: csb.addIons("negative", num) if (where): csb.setIonLocation(where) self._set("csb_file", _csb_file) self._set("buffer_width", _buffer_width) # if solvent is in the SOLVENT dictionary then use it's value, # else (it must be FILE) use param.solvent_file.val solvent_file_abs_path = os.path.abspath( self.param.solvent_file.val ) if self.param.solvent_file.val else None self._set( "solvent", lambda param: csb.setSolvent( BuildGeometry.SOLVENT.get(param.val, solvent_file_abs_path))) self._set("solvate_system", lambda param: csb.setSolvate(param.val)) self._set("neutralize_system", lambda param: csb.setNeutralize(param.val)) self._set( "box_shape", lambda param: csb.setBoundaryCondition(boundary_condition=param.val, use_buffer=1)) # this is done here because we need to know if this is an fep task if pj.task == 'fep' and self.param.add_alchemical_ions.val: self._csb.setAddAlchemicalIons(1) _, alchem_water = self._alchemical_ion_params() self._csb.setUseAlchemicalWater(alchem_water) # if not set explicitly, override if not self.param.ion_awayfrom.has_tag('setbyuser'): self.param.ion_awaydistance.val = \ self.DEFAULT_ALCHEMICAL_EXCLUDE_DIST # -1 interpreted as all solute self.param.ion_awayfrom.val = [-1] self.param.ion_awayfrom.add_tag('setbyuser') self._set( "ion_awayfrom", lambda param: csb.setExcludeLocation( param.val, self.param.ion_awaydistance.val)) self._set("box", _box) self._set("scale_solvent_vdw", lambda param: csb.setVdwScalingFactor(param.val)) self._set("membrane_box", _membrane_box) self._set("salt", _salt) self._set("add_counterion", _add_counterion) self._set("override_forcefield", lambda param: csb.setOplsaaVersion(param.val))
[docs] def crunch(self): """ """ self._print("debug", "In BuildGeometry.crunch") solvent_name = self.param.solvent.val.upper() if self.param.override_forcefield.val.upper() == "OPLS_2005" and \ solvent_name in constants.S_OPLS_WATER_MODELS: raise ValueError(f"OPLS_2005 cannot be run with {solvent_name}") for pj in self.get_prejobs(): self._set_csb(pj) jobname, dir = self._get_jobname_and_dir(pj) fname_in = jobname + "-in.mae" fname_in2 = jobname + "-in.csb" fname_out = jobname + "-out.mae" fname_log = jobname + "-in.log" if not os.path.isdir(dir): os.makedirs(dir) util.chdir(dir) prev_output = pj.output.struct_file() if self.param.only_merge_ct.val: ct_index = self.param.only_merge_ct.val cts = list(cms.ffiostructure.CMSReader(prev_output)) merged_ct = cts[ct_index[0] - 1] for i in ct_index[1:]: merged_ct = cms.ffiostructure.merge_ct( merged_ct, cts[i - 1]) new_cts = [] for i, e in enumerate(cts, start=1): if i not in ct_index: new_cts.append(e) elif i == ct_index[0]: new_cts.append(merged_ct) self._write_ffio_structures(new_cts, fname_out) new_job = DesmondJob(pj.task, pj.gid, pj.permanent_restrain, pj.permanent_group, jobname, pj, self, None, dir) new_job.need_host = False new_job.status.set(cmj.JobStatus.SUCCESS) new_job.output.add(os.path.abspath(fname_out)) self.add_job(new_job) continue if self.param.rebuild_cms.val: import schrodinger.application.desmond.repair_cms as repair_cms membrane_asl = "" solvent_asl = "" if isinstance(self.param.rebuild_cms.val, sea.Map): membrane = self.param.rebuild_cms.membrane.val if membrane: if membrane[:4].lower() == "asl:": membrane_asl = membrane[4:] else: raise ValueError( "ASL expression should start with 'asl:'") solvent = self.param.rebuild_cms.solvent.val if solvent: if solvent[:4].lower() == "asl:": solvent_asl = solvent[4:] else: raise ValueError( "ASL expression should start with 'asl:'") repair_cms.create_cms_from_mae(prev_output, fname_out, membrane_asl, solvent_asl) new_job = DesmondJob(pj.task, pj.gid, pj.permanent_restrain, pj.permanent_group, jobname, pj, self, None, dir) new_job.need_host = False new_job.status.set(cmj.JobStatus.SUCCESS) new_job.output.add(os.path.abspath(fname_out)) self.add_job(new_job) continue cts = [] if self.param.preserve_ffio.val: # Marks CTs that have ffio blocks. cts = list(cms.ffiostructure.CMSReader(prev_output)) # with prserve_ffio set as True and given cms input, we should # ignore full_system CT as it duplicates component CTs. if len(cts) > 1 and cts[0].property.get( constants.CT_TYPE) == constants.CT_TYPE.VAL.FULL_SYSTEM: try: cts = cms.Cms(prev_output).comp_ct self._print( "quiet", "WARNING: Input was a valid " "CMS, removing full_sys CT from inputs.") except ValueError: # ValueError is thrown if the input for Cms is invalid pass for i, e in enumerate(cts): if e.hasFfio(): e.property["i_systembuilder_ffio_ct"] = i elif "i_systembuilder_ffio_ct" in e.property: del e.property["i_systembuilder_ffio_ct"] if self.param.preserve_box.val: # Marks CTs that have ffio blocks. cts = cts or list(cms.ffiostructure.CMSReader(prev_output)) for i, e in enumerate(cts): for f in constants.SIM_BOX: if f not in e.property: break else: for f in constants.SIM_BOX: e.property[f + "_preserved"] = e.property[f] if cts: cts = struc.fixup_duplicate_properties(cts) self._write_ffio_structures(cts, fname_in) else: cts = struc.fixup_duplicate_properties( struc.read_all_structures(prev_output)) struc.write_structures(cts, fname_in) self._csb.setSolute(fname_in) self._csb.setWriteMaeFile(fname_out) # Separates solvent from the solute CT. distil_solute = self.param.distil_solute.val if distil_solute: self._csb.fixSolute() if isinstance(distil_solute, int): self._csb.treatSolventFromSolute() else: if distil_solute[:4].lower() == "asl:": self._csb.treatSolventFromSolute(asl=distil_solute[4:]) else: raise ValueError( "ASL expression should start with 'asl:'") self._csb.setRemoveOverlappedCrystalWater( int(self.param.remove_overlapped_crystal_water.val)) self._csb.setRemoveOverlappedSolvent( int(self.param.remove_overlapped_solvent.val)) self._csb.write(fname_in2) # Constructs the command line for job-launching. jlaunch_cmd = [ BuildGeometry.BG_CMD, "-HOST", "localhost", fname_in2, ] if self.param.minimize_volume.val: jlaunch_cmd.append("-minimize_volume") if self.param.rezero_system.val: jlaunch_cmd.append("-rezero") self._print("debug", " %s" % subprocess.list2cmdline(jlaunch_cmd)) new_job = DesmondJob(pj.task, pj.gid, pj.permanent_restrain, pj.permanent_group, jobname, pj, self, jlaunch_cmd, dir) new_job.need_host = False new_job.output.add(os.path.abspath(fname_out)) new_job.output.add(os.path.abspath(fname_log)) new_job.input.add(os.path.abspath(fname_in)) self.add_job(new_job) self._print("debug", "Out BuildGeometry.crunch")
def _alchemical_ion_params(self): map_to_neutral = move_alchem_ions.MAP_TO_NEUTRAL_DEFAULT alchem_water = move_alchem_ions.ALCHEMICAL_WATER_DEFAULT if isinstance(self.param.add_alchemical_ions, sea.Map): if 'map_to_neutral_atoms' in self.param.add_alchemical_ions: map_to_neutral = \ self.param.add_alchemical_ions.map_to_neutral_atoms.val if 'use_alchemical_water' in self.param.add_alchemical_ions: alchem_water = \ self.param.add_alchemical_ions.use_alchemical_water.val return map_to_neutral, alchem_water
[docs] def hook_captured_successful_job(self, job): """ """ fname_out = job.output.struct_file() if job.task == 'fep' and self.param.add_alchemical_ions.val: shutil.copyfile(fname_out, fname_out + '.pre-alchemion') self._print("quiet", "") # add empty line for prettier log def print_fcn(msg): self._print("quiet", msg) map_to_neutral, use_alchemical_water = self._alchemical_ion_params() move_alchem_ions.move_ions_in_file(fname_out, fname_out, map_to_neutral, use_alchemical_water, print_fcn) if (not (self.param.preserve_ffio.val or self.param.preserve_box.val) or self.param.only_merge_ct.val or self.param.rebuild_cms.val): return # Restores the stripped ffio blocks. input_ct = list(cms.ffiostructure.CMSReader(job.input.struct_file())) output_ct = list(cms.ffiostructure.CMSReader(fname_out)) is_stripped = False for e in output_ct: if (constants.CT_TYPE in e.property and constants.CT_TYPE.VAL.FULL_SYSTEM == e.property[constants.CT_TYPE]): if ("i_systembuilder_ffio_ct" in e.property): del e.property["i_systembuilder_ffio_ct"] continue if ("i_systembuilder_ffio_ct" in e.property): i = e.property["i_systembuilder_ffio_ct"] e.ffio = input_ct[i].ffio e.ffhandle = input_ct[i].ffhandle del e.property["i_systembuilder_ffio_ct"] input_ct[i].ffio = None input_ct[i].ffhandle = None is_stripped = True # Restores the original box. orig_box = {} property_box = constants.SIM_BOX for e in output_ct: for f in property_box: if (f + "_preserved" in e.property): orig_box[f] = e.property[f + "_preserved"] else: break if orig_box: for e in output_ct: for k, v in orig_box.items(): e.property[k] = v try: del e.property[k + "_preserved"] except KeyError: pass is_mutated = self._remove_ringspears(output_ct) if is_stripped or orig_box or is_mutated: shutil.copyfile(fname_out, fname_out + ".stripped") self._write_ffio_structures(output_ct, fname_out)
@staticmethod def _remove_ringspears(cts: List[structure.Structure]): """ Check whether any atoms from the (optionally present) membrane CT spear any rings from solute CTs by calling `ringspear.check_for_spears`. If any spearing atoms found they are deleted from the membrane and fullsystem CTs, and True is returned to represent that the system was mutated. Otherwise False is returned. :param cts: a list of structures, possibly including a membrane CT and various solute CTs. :return: whether the system was mutated as a result of spearing atoms that were found. """ type_vals = constants.CT_TYPE.VAL fsys_ct = cts[0] assert fsys_ct.property[constants.CT_TYPE] == type_vals.FULL_SYSTEM types_to_check = [ type_vals.SOLUTE, type_vals.LIGAND, type_vals.RECEPTOR ] cts_to_check = [ e for e in cts if e.property[constants.CT_TYPE] in types_to_check ] num_previous = 0 spear_ct = None for e in cts[1:]: if e.property[constants.CT_TYPE] == type_vals.MEMBRANE: if spear_ct is None: spear_ct = e else: raise ValueError("Output structure should only have one " "membrane ct") elif spear_ct is None: num_previous += e.atom_total is_mutated = False if spear_ct: for e in cts_to_check: spears = ringspear.check_for_spears(e, spear_ct) molecules_to_remove = set() for spear in spears: is_mutated = True for atom in spear.spear_indexes: mol = spear_ct.atom[atom].getMolecule() molecules_to_remove.add(mol) atoms_to_remove = [] for mol in molecules_to_remove: atoms_to_remove.extend(mol.getAtomIndices()) spear_ct.deleteAtoms(atoms_to_remove) fsys_ct.deleteAtoms([a + num_previous for a in atoms_to_remove]) return is_mutated
[docs]class ExtractStructures(cmj.StructureStageBase): """ This extracts one or more structures from an input mae file and passes it to the next stage. """ NAME = "extract_structures" TAG = "EXTRACT_STRUCTURES" PARAM = cmj._create_param_when_needed(""" DATA = { start_index = ? end_index = ? keep = ? } VALIDATE = { start_index = [ {type = int} {type = none} ] end_index = [ {type = int} {type = none} ] keep = [ {type = none} {type = list elem = {type = enum range = [%s]} size = -1 } ] } """ % (" ".join(constants.FEP_STRUC_TAG.VAL),))
[docs] def run(self, jobname: str, input_fname: str) -> Optional[str]: out_fname = f"{jobname}-out.mae" strucs = list(structure.StructureReader(input_fname)) if self.param.keep.val is not None: strucs = struc.get_fep_structures(strucs, struc_tags=self.param.keep.val) # Need to have at least one resulting structure. if not strucs: self._print( "quiet", f"Invalid parameters: keep = {self.param.keep.val}.") return None else: strucs = strucs[self.param.start_index.val:self.param.end_index.val] # Need to have at least one resulting structure. if not strucs: self._print( "quiet", f"Invalid index: start_index = {self.param.start_index.val}, end_index = {self.param.end_index.val}." ) return None strucs = struc.fixup_duplicate_properties(strucs) struc.write_structures(strucs, out_fname) return out_fname
[docs]class ExtractSoluteStructure(cmj.StructureStageBase): """ This extracts a solute structure from the output cms of an MD run, and clears the simulation box if present. """ NAME = "extract_solute_structure"
[docs] def run(self, jobname: str, cms_fname: str) -> Optional[str]: out_fname = f"{jobname}-out.mae" cms_model = cms.Cms(cms_fname) solute_ct = solubility_utils.extract_solute_ct(cms_model) # If there is no solvent, the whole system is the solute. solute_ct = solute_ct or cms_model.fsys_ct # Clear the box properties for prop in constants.SIM_BOX: if prop in solute_ct.property: del solute_ct.property[prop] solute_ct.write(out_fname) return out_fname
[docs]class HashStructureTitle(cmj.StructureStageBase): """ This stores the structure title and corresponding hash id for one or more structures from an input mae file and passes it to the next stage. """ NAME = "hash_structure_title"
[docs] def run(self, jobname: str, mae_fname: str) -> Optional[str]: out_fname = f"{jobname}-out.mae" cts = list(structure.StructureReader(mae_fname)) self._print("quiet", "Hashing structure title:") for ct in cts: # Clear FEP properties fep_keys = filter(lambda x: '_fep_' in x, ct.property) struc.delete_structure_properties(ct, fep_keys) # Add title and hash hash_id = util.str2hexid(ct.title) ct.property[constants.FEP_ORIGINAL_TITLE] = ct.title ct.property[constants.FEP_HASH_ID] = hash_id self._print("quiet", f"{hash_id} # {ct.title}") struc.write_structures(cts, out_fname) return out_fname
[docs]class DisorderedSystemBuilder(cmj.StructureStageBase): """ This sets up and runs the disordered system builder, which takes in a ct and creates a disordered solid system for it. """ NAME = "disordered_system_builder" N_MOLECULES = 64 PARAM = cmj._create_param_when_needed(""" DATA = { molecules = %d density = 0.25 scale_vdw = 0.80 seed = 645 forcefield = OPLS_2005 fep_preparation = true solvent_template_file = ? composition = [] build_tangled_chain = false build_cell_constant = 'vdw' } VALIDATE = { molecules = {type = int1} density = {type = "float+"} scale_vdw = {type = "float+"} seed = {type = int} forcefield = {type = str _check = check_opls_forcefield} fep_preparation = {type = bool} solvent_template_file = [{type = str} {type = none}] composition = {type = list size = 0 elem = {type = int1}} build_tangled_chain = {type = bool} build_cell_constant = {type = enum range = [vdw density]} } """ % (N_MOLECULES))
[docs] def run(self, jobname: str, mae_fname: str) -> Optional[str]: # Load the structure and save the original coordinates solute_ct = structure.Structure.read(mae_fname) struc.set_ct_reference_coordinates(solute_ct) if self.param.solvent_template_file.val: # solvent template file is in the main job dir solvent_mae_fname = Path( cmj.ENGINE.base_dir, self.param.solvent_template_file.val).absolute() solvent_cts = list(structure.StructureReader(solvent_mae_fname)) cts = solvent_cts + [solute_ct] else: cts = [solute_ct] out_fname = f"{jobname}-out.cms" # Run job in a tmpdir to make it easier to cleanup tmpdir = 'tmpdir' if not os.path.isdir(tmpdir): os.makedirs(tmpdir) if self.param.composition.val: composition = ':'.join([str(v) for v in self.param.composition.val]) self.param.molecules = sea.Atom(sum(self.param.composition.val)) else: composition = str(self.param.molecules.val) with fileutils.chdir(tmpdir): # Write the input tmp_in_fname = f"{jobname}-in.mae" struc.write_structures(cts, tmp_in_fname) # Regularize force field name ff_name = mm.opls_version_to_name( mm.opls_name_to_version(self.param.forcefield.val)) cmd = [ os.path.join(envir.CONST.SCHRODINGER, "run"), os.path.join("disordered_system_builder_gui_dir", "disordered_system_builder_driver.py"), "-molecules", str(self.param.molecules.val), "-forcefield", ff_name, "-scale_vdw", str(self.param.scale_vdw.val), "-composition", composition, "-seed", str(self.param.seed.val), "-density", str(self.param.density.val), "-obey", self.param.build_cell_constant.val, tmp_in_fname, jobname ] if self.param.fep_preparation.val: cmd.append('-fep_preparation') if self.param.build_tangled_chain.val: cmd.append('-grow') proc_ret = subprocess.call(cmd) # Check the results tmp_out_fname = os.path.join(tmpdir, f"{jobname}_system-out.cms") if proc_ret or not os.path.exists(tmp_out_fname): # Job Failed self._print("quiet", "ERROR: disordered_system_builder_driver failed.") # Print the log if the job fails. for log_fname in glob.glob(os.path.join(tmpdir, "*.log")): if os.path.exists(log_fname): with open(log_fname, 'r') as f: for line in f.readlines(): self._print("quiet", line) return None # Job Succeeded # Add output to list model = cms.Cms(file=tmp_out_fname) # Restore ct properties for k, v in solute_ct.property.items(): model.fsys_ct.property[k] = v model.sanitize_for_viparr() if self.param.solvent_template_file.val: _write_updated_model(out_fname, model, solute_ct) else: model.write(out_fname) # Clean up temporary files if not self.param.solvent_template_file.val: fileutils.force_rmtree(tmpdir, ignore_errors=True) return out_fname
def _write_updated_model(out_fname: str, model: cms.Cms, input_solute_ct: structure.Structure): """ Write the full system, solvent and solute cts for the given model and input solute structure. This will preserve custom charges in the `input_solute_ct` if any. """ mol_total = len(model.fsys_ct.molecule) solvent_ct = model.fsys_ct.extract( model.select_atom(f"m.n 1-{mol_total-1}"), copy_props=True) solute_ct = model.fsys_ct.extract(model.select_atom(f"m.n {mol_total}"), copy_props=True) # In order to preserve custom charges, need to use the original ct # and just update the coords input_solute_ct.setXYZ(solute_ct.getXYZ()) # Copy the box to the original ct for k, v in model.fsys_ct.property.items(): if k in constants.SIM_BOX: input_solute_ct.property[k] = v # Need to set the ct_type so assign forcefield can recognize the input input_solute_ct.property[constants.CT_TYPE] = constants.CT_TYPE.VAL.SOLUTE solvent_ct.property[constants.CT_TYPE] = constants.CT_TYPE.VAL.SOLVENT with structure.MaestroWriter(out_fname) as w: for ct in [model.fsys_ct, solvent_ct, input_solute_ct]: w.append(ct)
[docs]class ProteinMutationGenerator(cmj.StageBase): """ """ NAME = "protein_mutation_generator" PARAM = cmj._create_param_when_needed([ """ DATA = { mutation_file = "" cpu = 1 mutations = [] residue_file = "" graph_file = "" structure_file = "" } VALIDATE = { mutation_file = {type = str} cpu = {type = int1} mutations = {type = list size = 0 elem = {type = str range = [0 10000000000]}} residue_file = {type = str} graph_file = {type = str} structure_file = {type = str} } """, ])
[docs] def crunch(self): """ We will either read the mutations from a file or directly from the msj where mutations are defined as a list of space seperated strings mutations = ["C:23->ALA" "C:23->SER"] """ import schrodinger.application.scisol.packages.fep.protein_mutation_generator as pmg from schrodinger.application.scisol.packages.fep.graph import Graph if os.path.isfile(self.param.mutation_file.val): mutation_filename = os.path.join(cmj.ENGINE.base_dir, self.param.mutation_file.val) else: mutation_filename = os.path.join(cmj.ENGINE.base_dir, "mutations.txt") self.create_mutation_file(mutation_filename) residue_file = '' if os.path.isfile(self.param.residue_file.val): residue_file = os.path.join(cmj.ENGINE.base_dir, self.param.residue_file.val) graph_file = '' if os.path.isfile(self.param.graph_file.val): graph_file = os.path.join(cmj.ENGINE.base_dir, self.param.graph_file.val) self._print("debug", "In ProteinMutationGenerator.crunch") for pj in self.get_prejobs(): structure_file = pj.output.struct_file() if os.path.isfile(self.param.structure_file.val): structure_file = os.path.join(cmj.ENGINE.base_dir, self.param.structure_file.val) jobname, dir = self._get_jobname_and_dir(pj) if (not os.path.isdir(dir)): os.makedirs(dir) util.chdir(dir) existing_files = os.listdir(dir) # Expand mutations, multiple mutations may be present with open(mutation_filename, 'r') as f: mutations = f.readlines() expanded_mutations = pmg.expand_mutations(mutations) # Write the expanded mutations expanded_mutation_filename = "expanded_mutations.txt" with open(expanded_mutation_filename, 'w') as f: f.write('\n'.join(expanded_mutations)) # Generate the mutants num_procs = int(self.param.cpu.val) # Expanding map: get list of new mutations. updated_mutation_filename = expanded_mutation_filename if graph_file: # Read in mutations file (which may include original mutations) mutations = set() with open(updated_mutation_filename, 'r') as f: for line in f.readlines(): line = line.strip() if line: # Sort multiple mutations to canonicalize them mutations.add(' '.join(sorted(line.split(' ')))) # Read in mutations from the graph graph_mutations = set() g = Graph.deserialize(graph_file) for n in g.nodes_iter(): node_mutations = n.struc.property.get( constants.BIOLUMINATE_MUTATION, 'NONE') # WT nodes have mutation of 'NONE' if node_mutations == 'NONE': continue node_mutation_names = [] # Handle multiple mutations for node_mutation in node_mutations.split(','): res_site, _, mutant = pmg.parse_mutation(node_mutation) node_mutation_names.append(f'{res_site}->{mutant}') graph_mutations.add(' '.join(sorted(node_mutation_names))) # Get new mutations. If there are none, skip stage without error. new_mutations = mutations - graph_mutations if not new_mutations: new_job = cmj.Job(None, pj, self, None, dir) new_job.status.set(cmj.JobStatus.SUCCESS) self.add_job(new_job) continue # Only need to generate structures for the new mutations. new_mutation_filename = 'new_mutations.txt' with open(new_mutation_filename, 'w') as f: for new_mutation in new_mutations: f.write(f'{new_mutation}\n') updated_mutation_filename = new_mutation_filename try: prime_mutants_fname = pmg.generate_protein_mutation( jobname, structure_file, updated_mutation_filename, num_procs, residue_file) except EnvironmentError as e: new_job = cmj.Job(None, pj, self, None, dir) for file in os.listdir(dir): if file not in existing_files: new_job.output.add( file, type="file" if os.path.isfile(file) else "dir") new_job.status.set(cmj.JobStatus.BACKEND_ERROR) self.add_job(new_job) self._log("ERROR: %s" % e) else: # Prepare new job new_job = cmj.Job(None, pj, self, None, dir) new_job.output.add(os.path.join(os.getcwd(), prime_mutants_fname), tag="PRIME") new_job.status.set(cmj.JobStatus.SUCCESS) self.add_job(new_job) self._print("debug", "Out ProteinMutationGenerator.crunch")
[docs] def create_mutation_file(self, mutation_file): """ """ with open(mutation_file, 'w') as fh: for e in self.param.mutations.val: fh.write(e) fh.write("\n")
[docs]class ReplicateStructure(cmj.StructureStageBase): """ This takes the output structure from the previous stage and replicates it, so it has the same number of molecules as the input mae_file. Then it copies the coordinates and structure properties from mae_file. """ PARAM = cmj._create_param_when_needed(""" DATA = { mae_file = "" } VALIDATE = { mae_file = {type = str1} } """) NAME = "replicate_structure"
[docs] def run(self, jobname: str, input_fname: str) -> str: # Relative to main job dir mae_path = Path('..', self.param.mae_file.val).absolute() mol_st = structure.Structure.read(input_fname) index = mol_st.property.get(constants.CT_INDEX, 1) repeated_st = structure.Structure.read(mae_path, index=index) # Tile mol_st `n_molecules` times n_molecules = repeated_st.atom_total // mol_st.atom_total new_st = struc.struc_merge([mol_st] * n_molecules) # Match the repeated_ct molecules with the new_st molecules # and update the coordinates repeated_coords = repeated_st.getXYZ() new_coords = new_st.getXYZ(copy=False) for repeated_mol, new_mol in zip(repeated_st.molecule, new_st.molecule): for i, j in self._get_atom_pairs(repeated_mol, new_mol): new_coords[j - 1, :] = repeated_coords[i - 1, :] # Update the properties new_st.property.update(mol_st.property) out_fname = f"{jobname}-out.mae" new_st.write(out_fname) return out_fname
def _get_atom_pairs(self, mol1: structure._Molecule, mol2: structure._Molecule) -> List[Tuple[int, int]]: """ Return a list of atom idxs for mol1 and the corresponding atom index for mol2. """ mol1_atom_idxs = sorted(mol1.getAtomIndices()) mol2_atom_idxs = sorted(mol2.getAtomIndices()) r_mol1 = rdkit_adapter.to_rdkit(mol1.extractStructure()) r_mol2 = rdkit_adapter.to_rdkit(mol2.extractStructure()) mcs_smarts = FindMCS([r_mol1, r_mol2]).smartsString r_mol1_results = r_mol1.GetSubstructMatches( Chem.MolFromSmarts(mcs_smarts)) r_mol2_results = r_mol2.GetSubstructMatches( Chem.MolFromSmarts(mcs_smarts)) atom_mapping = [[[i + 1 for i in j] for j in r_mol1_results], [[i + 1 for i in j] for j in r_mol2_results]] # Shift atom idx based on the corresponding molecule atom idx atom_mapping_shifted = [] for i, j in zip(atom_mapping[0][0], atom_mapping[1][0]): atom_mapping_shifted.append( (mol1_atom_idxs[i - 1], mol2_atom_idxs[j - 1])) return atom_mapping_shifted
[docs]class GroupWaters(cmj.StructureStageBase): """ Set the `FEP_ABSOLUTE_ENERGY` property on waters, optionally selecting only non-crystal waters """ NAME = "group_waters" PARAM = cmj._create_param_when_needed(""" DATA = { skip_crystal_waters = false } VALIDATE = { skip_crystal_waters = {type = bool} } """)
[docs] def run(self, jobname, input_fname): st_list = list(structure.StructureReader(input_fname)) component_types = [st.property.get(constants.CT_TYPE) for st in st_list] output_fname = jobname + '-in.cms' solv_ct = st_list[component_types.index(constants.CT_TYPE.VAL.SOLVENT)] for a in solv_ct.atom: if not (self.param.skip_crystal_waters and a.property.get(constants.CRYSTAL_WATER_PROP)): a.property[constants.FEP_ABSOLUTE_ENERGY] = 1 struc.write_structures(st_list, output_fname) return output_fname