import glob
import os
import shutil
from pathlib import Path
from typing import List
from typing import Optional
from typing import Tuple
import numpy as np
from rdkit import Chem
from rdkit.Chem.rdFMCS import FindMCS
from schrodinger import structure
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import envir
from schrodinger.application.desmond import move_alchem_ions
from schrodinger.application.desmond import solubility_utils
from schrodinger.application.desmond import struc
from schrodinger.application.desmond import system_builder_inp as sbi
from schrodinger.application.desmond import util
from schrodinger.application.desmond.stage.jobs import DesmondJob
from schrodinger.infra import mm
from schrodinger.structutils import ringspear
from schrodinger.structutils import transform
from schrodinger.thirdparty import rdkit_adapter
from schrodinger.utils import fileutils
from schrodinger.utils import sea
from schrodinger.utils import subprocess
__all__ = [
'BuildGeometry', 'ExtractStructures', 'ExtractSoluteStructure',
'HashStructureTitle', 'DisorderedSystemBuilder', 'ProteinMutationGenerator',
'ReplicateStructure', 'GroupWaters'
]
[docs]class BuildGeometry(cmj.StageBase):
"""
"""
NAME = "build_geometry"
BG_CMD = os.path.join(envir.CONST.SCHRODINGER_UTILITIES, "system_builder")
SOLVENT = {
"SPC": "spc.box.mae",
"SPCFW": "spcfw.box.mae",
"TIP4P": "tip4p.box.mae",
"TIP4PEW": "tip4pew.box.mae",
"TIP3P": "tip3p.box.mae",
"WATER": "spc.box.mae",
"OCTANOL": "octanol.box.mae",
"METHANOL": "methanol.box.mae",
"DMSO": "dmso.box.mae",
"TIP5P": "tip5p.box.mae",
"TIP4PD": "tip4pd.box.mae",
"SPCE": "spce.box.mae",
"TIP4P2005": "tip4p2005.box.mae"
}
DEFAULT_ALCHEMICAL_EXCLUDE_DIST = 5
PARAM = cmj._create_param_when_needed("""
DATA = {
csb_file = ""
solvent = water
solvent_file = ?
distil_solute = true
solvate_system = true
neutralize_system = true
add_alchemical_ions = false
buffer_width = 10.0
ion_awayfrom = []
ion_awaydistance = 10.0
rezero_system = true
minimize_volume = false
scale_solvent_vdw = 1.0 # or 0.75 for GCMC/Solvate jobs
box_shape = cubic
preserve_ffio = true
preserve_box = false
only_merge_ct = []
rebuild_cms = off
remove_overlapped_crystal_water = false
remove_overlapped_solvent = true
override_forcefield = OPLS_2005
box = {
shape = "orthorhombic" # cubic | triclinic | orthorhombic
size = [10.0 10.0 10.0] # more or less numbers depending on the shape
size_type = "buffer" # "absolute" | "buffer"
}
membrane_box = {
lipid = POPC
size = [10.0 10.0 10.0]
}
salt = {
positive_ion = Na
negative_ion = Cl
concentration = 0.15
}
add_counterion = {
ion = Na
number = 0
where = []
}
}
VALIDATE = {
csb_file = {type = str _check = multisim_file_exists}
solvent = [
{type = enum range = [WATER SPC TIP4P TIP4PEW TIP5P TIP3P TIP4PD SPCE TIP4P2005 OCTANOL METHANOL DMSO FILE]}
]
solvent_file = [
{type = str _check = multisim_file_exists}
{type = none}
]
distil_solute = [
{type = bool}
{type = str }
]
solvate_system = {type = bool}
preserve_ffio = {type = bool}
preserve_box = {type = bool}
neutralize_system = {type = bool}
add_alchemical_ions = [
{type = bool}
# FIXME: The alchemical water approach has been evolving. Once the basic
# method is mature, we should clean up the parameters.
# defaults are defined in move_alchem_ions.py, these only need to be
# set to change the defaults
{map_to_neutral_atoms = {type = bool}
use_alchemical_water = {type = bool}
}
]
buffer_width = [
{type = float range = [0.0 inf]}
{type = list size = 3
elem = {type = float range = [0.0 inf]}}
]
ion_awayfrom = {type = list size = 0 elem = {type = int1}}
ion_awaydistance = {type = float range = [0.0 inf]}
rezero_system = {type = bool}
minimize_volume = {type = bool}
box_shape = {type = enum range = [cubic orthorhombic]}
scale_solvent_vdw = {type = float+}
only_merge_ct = {type = list size = 0 elem = {type = int1}}
rebuild_cms = [
{type = bool}
{membrane = {type = str}
solvent = {type = str}
}
{membrane = {type = str}
}
{solvent = {type = str}
}
]
remove_overlapped_crystal_water = {type = bool}
remove_overlapped_solvent = {type = bool}
override_forcefield = {type = str _check = check_forcefield}
box = [
{type = enum range = [read]}
{
shape = {type = enum range = [orthorhombic cubic triclinic dodecahedron_hexagon dodecahedron_square
truncated_octahedron]}
size = [{type = list size = 3 elem = {type = float+}}
{type = float+}
{type = list size = 6 elem = {type = float+}}]
size_type = {type = enum range = [buffer absolute]}
_mapcheck = check_box_size
}
]
membrane_box = {
lipid = {type = enum range = [DPPC POPC DMPC POPE]}
size = {type = list size = 3 elem = {type = float+}}
}
salt = {
positive_ion = {type = enum range = [Ca2 Fe2 Fe3 K Li Mg2 Na Zn2]}
negative_ion = {type = enum range = [Br Cl F I]}
concentration = {type = float+}
}
add_counterion = {
ion = {type = enum range = [Ca2 Fe2 Fe3 K Li Mg2 Na Zn2 Br Cl F I]}
number = [{type = int0}
{type = enum range = ["neutralize_system"]}]
where = {type = list size = 0 elem = {type = list size = 0 elem = {type = int1}}}
}
}
""")
[docs] def __init__(self, *arg, **kwarg):
"""
"""
cmj.StageBase.__init__(self, *arg, **kwarg)
#
self._csb = sbi.SystemBuilderInput()
# __init__
@staticmethod
def _write_ffio_structures(cts: List[cms.ffiostructure.FFIOStructure],
fname: str):
"""
Write FFIOStructures to the file
"""
cts[0].write(fname, format="CMS")
for ct in cts[1:]:
ct.append(fname, format="CMS")
def _set_csb(self, pj: cmj.Job):
"""
"""
csb = self._csb
def _csb_file(param):
fname = param.val
if (fname != ""):
try:
fh = open(fname, "r")
s = fh.read()
fh.close()
except IOError:
raise IOError("Reading the .csb file failed: %s" % fname)
csb.read_text(s)
def _buffer_width(param):
bw = param
if (isinstance(bw, sea.List)):
csb.a, csb.b, csb.c = bw[0].val, bw[1].val, bw[2].val
else:
csb.a, csb.b, csb.c = bw.val, bw.val, bw.val
def _box(param):
if param.val == 'read':
output = pj.output.struct_file()
box = struc.find_box(structure.StructureReader(output))
if box is None:
raise ValueError("Could not read box info from input "
"structures")
box = np.reshape(box, (3, 3))
a = transform.get_vector_magnitude(box[0])
b = transform.get_vector_magnitude(box[1])
c = transform.get_vector_magnitude(box[2])
alpha = np.degrees(
transform.get_angle_between_vectors(box[1], box[2]))
beta = np.degrees(
transform.get_angle_between_vectors(box[0], box[2]))
gamma = np.degrees(
transform.get_angle_between_vectors(box[0], box[1]))
size = [a, b, c, alpha, beta, gamma]
# All boxes can be represented by a triclinic box
# but the config code will treat orthorhombic and
# triclinic boxes differently, so check for it here.
if np.allclose([alpha, beta, gamma], [90, 90, 90]):
bc = "orthorhombic"
else:
bc = 'triclinic'
else:
bc = param.shape.val
size = param.size.val
if (bc == "cubic"):
csb.setBoundaryCondition(boundary_condition="cubic", a=size)
elif (bc == "orthorhombic"):
csb.setBoundaryCondition(boundary_condition="orthorhombic",
a=size[0],
b=size[1],
c=size[2])
elif (bc == "triclinic"):
csb.setBoundaryCondition(boundary_condition="triclinic",
a=size[0],
b=size[1],
c=size[2],
alpha=size[3],
beta=size[4],
gamma=size[5])
elif (bc == "dodecahedron_square"):
csb.setBoundaryCondition(
boundary_condition="dodecahedron_square", a=size)
elif (bc == "dodecahedron_hexagon"):
csb.setBoundaryCondition(
boundary_condition="dodecahedron_hexagon", a=size)
elif (bc == "truncated_octahedron"):
csb.setBoundaryCondition(
boundary_condition="truncated_octahedron", a=size)
if param.val == 'read' or param.size_type.val == "absolute":
csb.setBoundaryCondition(use_buffer=0)
else:
csb.setBoundaryCondition(use_buffer=1)
def _membrane_box(param):
size = param.size.val
csb.setMembranize()
csb.setMembrane(param.lipid.val + ".mae.gz")
csb.setBoundaryCondition(boundary_condition="orthorhombic",
a=0.0,
b=0.0,
c=size[2])
csb.setMembranePatch(size[0], size[1])
csb.setBoundaryCondition(use_buffer=1)
def _salt(param):
csb.setSaltPositiveIon(param.positive_ion.val + ".mae")
csb.setSaltNegativeIon(param.negative_ion.val + ".mae")
csb.addSalt(param.concentration.val)
def _add_counterion(param):
ion = param.ion.val
num = param.number.val
where = param.where.val
if (ion in [
"Ca2",
"Fe2",
"Fe3",
"K",
"Li",
"Mg2",
"Na",
"Zn2",
]):
csb.setPositiveIon(ion + ".mae")
if (isinstance(num, str) and num == "neutralize_system"):
csb.setNeutralize(True)
else:
csb.addIons("positive", num)
elif (ion in [
"Br",
"Cl",
"F",
"I",
]):
csb.setNegativeIon(ion + ".mae")
if (isinstance(num, str) and num == "neutralize_system"):
csb.setNeutralize(True)
else:
csb.addIons("negative", num)
if (where):
csb.setIonLocation(where)
self._set("csb_file", _csb_file)
self._set("buffer_width", _buffer_width)
# if solvent is in the SOLVENT dictionary then use it's value,
# else (it must be FILE) use param.solvent_file.val
solvent_file_abs_path = os.path.abspath(
self.param.solvent_file.val
) if self.param.solvent_file.val else None
self._set(
"solvent", lambda param: csb.setSolvent(
BuildGeometry.SOLVENT.get(param.val, solvent_file_abs_path)))
self._set("solvate_system", lambda param: csb.setSolvate(param.val))
self._set("neutralize_system",
lambda param: csb.setNeutralize(param.val))
self._set(
"box_shape",
lambda param: csb.setBoundaryCondition(boundary_condition=param.val,
use_buffer=1))
# this is done here because we need to know if this is an fep task
if pj.task == 'fep' and self.param.add_alchemical_ions.val:
self._csb.setAddAlchemicalIons(1)
_, alchem_water = self._alchemical_ion_params()
self._csb.setUseAlchemicalWater(alchem_water)
# if not set explicitly, override
if not self.param.ion_awayfrom.has_tag('setbyuser'):
self.param.ion_awaydistance.val = \
self.DEFAULT_ALCHEMICAL_EXCLUDE_DIST
# -1 interpreted as all solute
self.param.ion_awayfrom.val = [-1]
self.param.ion_awayfrom.add_tag('setbyuser')
self._set(
"ion_awayfrom", lambda param: csb.setExcludeLocation(
param.val, self.param.ion_awaydistance.val))
self._set("box", _box)
self._set("scale_solvent_vdw",
lambda param: csb.setVdwScalingFactor(param.val))
self._set("membrane_box", _membrane_box)
self._set("salt", _salt)
self._set("add_counterion", _add_counterion)
self._set("override_forcefield",
lambda param: csb.setOplsaaVersion(param.val))
[docs] def crunch(self):
"""
"""
self._print("debug", "In BuildGeometry.crunch")
solvent_name = self.param.solvent.val.upper()
if self.param.override_forcefield.val.upper() == "OPLS_2005" and \
solvent_name in constants.S_OPLS_WATER_MODELS:
raise ValueError(f"OPLS_2005 cannot be run with {solvent_name}")
for pj in self.get_prejobs():
self._set_csb(pj)
jobname, dir = self._get_jobname_and_dir(pj)
fname_in = jobname + "-in.mae"
fname_in2 = jobname + "-in.csb"
fname_out = jobname + "-out.mae"
fname_log = jobname + "-in.log"
if not os.path.isdir(dir):
os.makedirs(dir)
util.chdir(dir)
prev_output = pj.output.struct_file()
if self.param.only_merge_ct.val:
ct_index = self.param.only_merge_ct.val
cts = list(cms.ffiostructure.CMSReader(prev_output))
merged_ct = cts[ct_index[0] - 1]
for i in ct_index[1:]:
merged_ct = cms.ffiostructure.merge_ct(
merged_ct, cts[i - 1])
new_cts = []
for i, e in enumerate(cts, start=1):
if i not in ct_index:
new_cts.append(e)
elif i == ct_index[0]:
new_cts.append(merged_ct)
self._write_ffio_structures(new_cts, fname_out)
new_job = DesmondJob(pj.task, pj.gid, pj.permanent_restrain,
pj.permanent_group, jobname, pj, self,
None, dir)
new_job.need_host = False
new_job.status.set(cmj.JobStatus.SUCCESS)
new_job.output.add(os.path.abspath(fname_out))
self.add_job(new_job)
continue
if self.param.rebuild_cms.val:
import schrodinger.application.desmond.repair_cms as repair_cms
membrane_asl = ""
solvent_asl = ""
if isinstance(self.param.rebuild_cms.val, sea.Map):
membrane = self.param.rebuild_cms.membrane.val
if membrane:
if membrane[:4].lower() == "asl:":
membrane_asl = membrane[4:]
else:
raise ValueError(
"ASL expression should start with 'asl:'")
solvent = self.param.rebuild_cms.solvent.val
if solvent:
if solvent[:4].lower() == "asl:":
solvent_asl = solvent[4:]
else:
raise ValueError(
"ASL expression should start with 'asl:'")
repair_cms.create_cms_from_mae(prev_output, fname_out,
membrane_asl, solvent_asl)
new_job = DesmondJob(pj.task, pj.gid, pj.permanent_restrain,
pj.permanent_group, jobname, pj, self,
None, dir)
new_job.need_host = False
new_job.status.set(cmj.JobStatus.SUCCESS)
new_job.output.add(os.path.abspath(fname_out))
self.add_job(new_job)
continue
cts = []
if self.param.preserve_ffio.val:
# Marks CTs that have ffio blocks.
cts = list(cms.ffiostructure.CMSReader(prev_output))
# with prserve_ffio set as True and given cms input, we should
# ignore full_system CT as it duplicates component CTs.
if len(cts) > 1 and cts[0].property.get(
constants.CT_TYPE) == constants.CT_TYPE.VAL.FULL_SYSTEM:
try:
cts = cms.Cms(prev_output).comp_ct
self._print(
"quiet", "WARNING: Input was a valid "
"CMS, removing full_sys CT from inputs.")
except ValueError:
# ValueError is thrown if the input for Cms is invalid
pass
for i, e in enumerate(cts):
if e.hasFfio():
e.property["i_systembuilder_ffio_ct"] = i
elif "i_systembuilder_ffio_ct" in e.property:
del e.property["i_systembuilder_ffio_ct"]
if self.param.preserve_box.val:
# Marks CTs that have ffio blocks.
cts = cts or list(cms.ffiostructure.CMSReader(prev_output))
for i, e in enumerate(cts):
for f in constants.SIM_BOX:
if f not in e.property:
break
else:
for f in constants.SIM_BOX:
e.property[f + "_preserved"] = e.property[f]
if cts:
cts = struc.fixup_duplicate_properties(cts)
self._write_ffio_structures(cts, fname_in)
else:
cts = struc.fixup_duplicate_properties(
struc.read_all_structures(prev_output))
struc.write_structures(cts, fname_in)
self._csb.setSolute(fname_in)
self._csb.setWriteMaeFile(fname_out)
# Separates solvent from the solute CT.
distil_solute = self.param.distil_solute.val
if distil_solute:
self._csb.fixSolute()
if isinstance(distil_solute, int):
self._csb.treatSolventFromSolute()
else:
if distil_solute[:4].lower() == "asl:":
self._csb.treatSolventFromSolute(asl=distil_solute[4:])
else:
raise ValueError(
"ASL expression should start with 'asl:'")
self._csb.setRemoveOverlappedCrystalWater(
int(self.param.remove_overlapped_crystal_water.val))
self._csb.setRemoveOverlappedSolvent(
int(self.param.remove_overlapped_solvent.val))
self._csb.write(fname_in2)
# Constructs the command line for job-launching.
jlaunch_cmd = [
BuildGeometry.BG_CMD,
"-HOST",
"localhost",
fname_in2,
]
if self.param.minimize_volume.val:
jlaunch_cmd.append("-minimize_volume")
if self.param.rezero_system.val:
jlaunch_cmd.append("-rezero")
self._print("debug", " %s" % subprocess.list2cmdline(jlaunch_cmd))
new_job = DesmondJob(pj.task, pj.gid, pj.permanent_restrain,
pj.permanent_group, jobname, pj, self,
jlaunch_cmd, dir)
new_job.need_host = False
new_job.output.add(os.path.abspath(fname_out))
new_job.output.add(os.path.abspath(fname_log))
new_job.input.add(os.path.abspath(fname_in))
self.add_job(new_job)
self._print("debug", "Out BuildGeometry.crunch")
def _alchemical_ion_params(self):
map_to_neutral = move_alchem_ions.MAP_TO_NEUTRAL_DEFAULT
alchem_water = move_alchem_ions.ALCHEMICAL_WATER_DEFAULT
if isinstance(self.param.add_alchemical_ions, sea.Map):
if 'map_to_neutral_atoms' in self.param.add_alchemical_ions:
map_to_neutral = \
self.param.add_alchemical_ions.map_to_neutral_atoms.val
if 'use_alchemical_water' in self.param.add_alchemical_ions:
alchem_water = \
self.param.add_alchemical_ions.use_alchemical_water.val
return map_to_neutral, alchem_water
[docs] def hook_captured_successful_job(self, job):
"""
"""
fname_out = job.output.struct_file()
if job.task == 'fep' and self.param.add_alchemical_ions.val:
shutil.copyfile(fname_out, fname_out + '.pre-alchemion')
self._print("quiet", "") # add empty line for prettier log
def print_fcn(msg):
self._print("quiet", msg)
map_to_neutral, use_alchemical_water = self._alchemical_ion_params()
move_alchem_ions.move_ions_in_file(fname_out, fname_out,
map_to_neutral,
use_alchemical_water, print_fcn)
if (not (self.param.preserve_ffio.val or self.param.preserve_box.val) or
self.param.only_merge_ct.val or self.param.rebuild_cms.val):
return
# Restores the stripped ffio blocks.
input_ct = list(cms.ffiostructure.CMSReader(job.input.struct_file()))
output_ct = list(cms.ffiostructure.CMSReader(fname_out))
is_stripped = False
for e in output_ct:
if (constants.CT_TYPE in e.property and
constants.CT_TYPE.VAL.FULL_SYSTEM
== e.property[constants.CT_TYPE]):
if ("i_systembuilder_ffio_ct" in e.property):
del e.property["i_systembuilder_ffio_ct"]
continue
if ("i_systembuilder_ffio_ct" in e.property):
i = e.property["i_systembuilder_ffio_ct"]
e.ffio = input_ct[i].ffio
e.ffhandle = input_ct[i].ffhandle
del e.property["i_systembuilder_ffio_ct"]
input_ct[i].ffio = None
input_ct[i].ffhandle = None
is_stripped = True
# Restores the original box.
orig_box = {}
property_box = constants.SIM_BOX
for e in output_ct:
for f in property_box:
if (f + "_preserved" in e.property):
orig_box[f] = e.property[f + "_preserved"]
else:
break
if orig_box:
for e in output_ct:
for k, v in orig_box.items():
e.property[k] = v
try:
del e.property[k + "_preserved"]
except KeyError:
pass
is_mutated = self._remove_ringspears(output_ct)
if is_stripped or orig_box or is_mutated:
shutil.copyfile(fname_out, fname_out + ".stripped")
self._write_ffio_structures(output_ct, fname_out)
@staticmethod
def _remove_ringspears(cts: List[structure.Structure]):
"""
Check whether any atoms from the (optionally present) membrane CT spear
any rings from solute CTs by calling `ringspear.check_for_spears`.
If any spearing atoms found they are deleted from the membrane and
fullsystem CTs, and True is returned to represent that the
system was mutated. Otherwise False is returned.
:param cts: a list of structures, possibly including a membrane CT and
various solute CTs.
:return: whether the system was mutated as a result of spearing atoms
that were found.
"""
type_vals = constants.CT_TYPE.VAL
fsys_ct = cts[0]
assert fsys_ct.property[constants.CT_TYPE] == type_vals.FULL_SYSTEM
types_to_check = [
type_vals.SOLUTE, type_vals.LIGAND, type_vals.RECEPTOR
]
cts_to_check = [
e for e in cts if e.property[constants.CT_TYPE] in types_to_check
]
num_previous = 0
spear_ct = None
for e in cts[1:]:
if e.property[constants.CT_TYPE] == type_vals.MEMBRANE:
if spear_ct is None:
spear_ct = e
else:
raise ValueError("Output structure should only have one "
"membrane ct")
elif spear_ct is None:
num_previous += e.atom_total
is_mutated = False
if spear_ct:
for e in cts_to_check:
spears = ringspear.check_for_spears(e, spear_ct)
molecules_to_remove = set()
for spear in spears:
is_mutated = True
for atom in spear.spear_indexes:
mol = spear_ct.atom[atom].getMolecule()
molecules_to_remove.add(mol)
atoms_to_remove = []
for mol in molecules_to_remove:
atoms_to_remove.extend(mol.getAtomIndices())
spear_ct.deleteAtoms(atoms_to_remove)
fsys_ct.deleteAtoms([a + num_previous for a in atoms_to_remove])
return is_mutated
[docs]class HashStructureTitle(cmj.StructureStageBase):
"""
This stores the structure title and corresponding hash id for one or more
structures from an input mae file and passes it to the next stage.
"""
NAME = "hash_structure_title"
[docs] def run(self, jobname: str, mae_fname: str) -> Optional[str]:
out_fname = f"{jobname}-out.mae"
cts = list(structure.StructureReader(mae_fname))
self._print("quiet", "Hashing structure title:")
for ct in cts:
# Clear FEP properties
fep_keys = filter(lambda x: '_fep_' in x, ct.property)
struc.delete_structure_properties(ct, fep_keys)
# Add title and hash
hash_id = util.str2hexid(ct.title)
ct.property[constants.FEP_ORIGINAL_TITLE] = ct.title
ct.property[constants.FEP_HASH_ID] = hash_id
self._print("quiet", f"{hash_id} # {ct.title}")
struc.write_structures(cts, out_fname)
return out_fname
[docs]class DisorderedSystemBuilder(cmj.StructureStageBase):
"""
This sets up and runs the disordered system
builder, which takes in a ct and creates a
disordered solid system for it.
"""
NAME = "disordered_system_builder"
N_MOLECULES = 64
PARAM = cmj._create_param_when_needed("""
DATA = {
molecules = %d
density = 0.25
scale_vdw = 0.80
seed = 645
forcefield = OPLS_2005
fep_preparation = true
solvent_template_file = ?
composition = []
build_tangled_chain = false
build_cell_constant = 'vdw'
}
VALIDATE = {
molecules = {type = int1}
density = {type = "float+"}
scale_vdw = {type = "float+"}
seed = {type = int}
forcefield = {type = str _check = check_opls_forcefield}
fep_preparation = {type = bool}
solvent_template_file = [{type = str} {type = none}]
composition = {type = list size = 0 elem = {type = int1}}
build_tangled_chain = {type = bool}
build_cell_constant = {type = enum range = [vdw density]}
}
""" % (N_MOLECULES))
[docs] def run(self, jobname: str, mae_fname: str) -> Optional[str]:
# Load the structure and save the original coordinates
solute_ct = structure.Structure.read(mae_fname)
struc.set_ct_reference_coordinates(solute_ct)
if self.param.solvent_template_file.val:
# solvent template file is in the main job dir
solvent_mae_fname = Path(
cmj.ENGINE.base_dir,
self.param.solvent_template_file.val).absolute()
solvent_cts = list(structure.StructureReader(solvent_mae_fname))
cts = solvent_cts + [solute_ct]
else:
cts = [solute_ct]
out_fname = f"{jobname}-out.cms"
# Run job in a tmpdir to make it easier to cleanup
tmpdir = 'tmpdir'
if not os.path.isdir(tmpdir):
os.makedirs(tmpdir)
if self.param.composition.val:
composition = ':'.join([str(v) for v in self.param.composition.val])
self.param.molecules = sea.Atom(sum(self.param.composition.val))
else:
composition = str(self.param.molecules.val)
with fileutils.chdir(tmpdir):
# Write the input
tmp_in_fname = f"{jobname}-in.mae"
struc.write_structures(cts, tmp_in_fname)
# Regularize force field name
ff_name = mm.opls_version_to_name(
mm.opls_name_to_version(self.param.forcefield.val))
cmd = [
os.path.join(envir.CONST.SCHRODINGER, "run"),
os.path.join("disordered_system_builder_gui_dir",
"disordered_system_builder_driver.py"),
"-molecules",
str(self.param.molecules.val), "-forcefield", ff_name,
"-scale_vdw",
str(self.param.scale_vdw.val), "-composition", composition,
"-seed",
str(self.param.seed.val), "-density",
str(self.param.density.val), "-obey",
self.param.build_cell_constant.val, tmp_in_fname, jobname
]
if self.param.fep_preparation.val:
cmd.append('-fep_preparation')
if self.param.build_tangled_chain.val:
cmd.append('-grow')
proc_ret = subprocess.call(cmd)
# Check the results
tmp_out_fname = os.path.join(tmpdir, f"{jobname}_system-out.cms")
if proc_ret or not os.path.exists(tmp_out_fname):
# Job Failed
self._print("quiet",
"ERROR: disordered_system_builder_driver failed.")
# Print the log if the job fails.
for log_fname in glob.glob(os.path.join(tmpdir, "*.log")):
if os.path.exists(log_fname):
with open(log_fname, 'r') as f:
for line in f.readlines():
self._print("quiet", line)
return None
# Job Succeeded
# Add output to list
model = cms.Cms(file=tmp_out_fname)
# Restore ct properties
for k, v in solute_ct.property.items():
model.fsys_ct.property[k] = v
model.sanitize_for_viparr()
if self.param.solvent_template_file.val:
_write_updated_model(out_fname, model, solute_ct)
else:
model.write(out_fname)
# Clean up temporary files
if not self.param.solvent_template_file.val:
fileutils.force_rmtree(tmpdir, ignore_errors=True)
return out_fname
def _write_updated_model(out_fname: str, model: cms.Cms,
input_solute_ct: structure.Structure):
"""
Write the full system, solvent and solute cts for the given model and input solute structure.
This will preserve custom charges in the `input_solute_ct` if any.
"""
mol_total = len(model.fsys_ct.molecule)
solvent_ct = model.fsys_ct.extract(
model.select_atom(f"m.n 1-{mol_total-1}"), copy_props=True)
solute_ct = model.fsys_ct.extract(model.select_atom(f"m.n {mol_total}"),
copy_props=True)
# In order to preserve custom charges, need to use the original ct
# and just update the coords
input_solute_ct.setXYZ(solute_ct.getXYZ())
# Copy the box to the original ct
for k, v in model.fsys_ct.property.items():
if k in constants.SIM_BOX:
input_solute_ct.property[k] = v
# Need to set the ct_type so assign forcefield can recognize the input
input_solute_ct.property[constants.CT_TYPE] = constants.CT_TYPE.VAL.SOLUTE
solvent_ct.property[constants.CT_TYPE] = constants.CT_TYPE.VAL.SOLVENT
with structure.MaestroWriter(out_fname) as w:
for ct in [model.fsys_ct, solvent_ct, input_solute_ct]:
w.append(ct)
[docs]class ProteinMutationGenerator(cmj.StageBase):
"""
"""
NAME = "protein_mutation_generator"
PARAM = cmj._create_param_when_needed([
"""
DATA = {
mutation_file = ""
cpu = 1
mutations = []
residue_file = ""
graph_file = ""
structure_file = ""
}
VALIDATE = {
mutation_file = {type = str}
cpu = {type = int1}
mutations = {type = list size = 0 elem = {type = str range = [0 10000000000]}}
residue_file = {type = str}
graph_file = {type = str}
structure_file = {type = str}
}
""",
])
[docs] def crunch(self):
"""
We will either read the mutations from a file or directly from the msj
where mutations are defined as a list of space seperated strings
mutations = ["C:23->ALA" "C:23->SER"]
"""
import schrodinger.application.scisol.packages.fep.protein_mutation_generator as pmg
from schrodinger.application.scisol.packages.fep.graph import Graph
if os.path.isfile(self.param.mutation_file.val):
mutation_filename = os.path.join(cmj.ENGINE.base_dir,
self.param.mutation_file.val)
else:
mutation_filename = os.path.join(cmj.ENGINE.base_dir,
"mutations.txt")
self.create_mutation_file(mutation_filename)
residue_file = ''
if os.path.isfile(self.param.residue_file.val):
residue_file = os.path.join(cmj.ENGINE.base_dir,
self.param.residue_file.val)
graph_file = ''
if os.path.isfile(self.param.graph_file.val):
graph_file = os.path.join(cmj.ENGINE.base_dir,
self.param.graph_file.val)
self._print("debug", "In ProteinMutationGenerator.crunch")
for pj in self.get_prejobs():
structure_file = pj.output.struct_file()
if os.path.isfile(self.param.structure_file.val):
structure_file = os.path.join(cmj.ENGINE.base_dir,
self.param.structure_file.val)
jobname, dir = self._get_jobname_and_dir(pj)
if (not os.path.isdir(dir)):
os.makedirs(dir)
util.chdir(dir)
existing_files = os.listdir(dir)
# Expand mutations, multiple mutations may be present
with open(mutation_filename, 'r') as f:
mutations = f.readlines()
expanded_mutations = pmg.expand_mutations(mutations)
# Write the expanded mutations
expanded_mutation_filename = "expanded_mutations.txt"
with open(expanded_mutation_filename, 'w') as f:
f.write('\n'.join(expanded_mutations))
# Generate the mutants
num_procs = int(self.param.cpu.val)
# Expanding map: get list of new mutations.
updated_mutation_filename = expanded_mutation_filename
if graph_file:
# Read in mutations file (which may include original mutations)
mutations = set()
with open(updated_mutation_filename, 'r') as f:
for line in f.readlines():
line = line.strip()
if line:
# Sort multiple mutations to canonicalize them
mutations.add(' '.join(sorted(line.split(' '))))
# Read in mutations from the graph
graph_mutations = set()
g = Graph.deserialize(graph_file)
for n in g.nodes_iter():
node_mutations = n.struc.property.get(
constants.BIOLUMINATE_MUTATION, 'NONE')
# WT nodes have mutation of 'NONE'
if node_mutations == 'NONE':
continue
node_mutation_names = []
# Handle multiple mutations
for node_mutation in node_mutations.split(','):
res_site, _, mutant = pmg.parse_mutation(node_mutation)
node_mutation_names.append(f'{res_site}->{mutant}')
graph_mutations.add(' '.join(sorted(node_mutation_names)))
# Get new mutations. If there are none, skip stage without error.
new_mutations = mutations - graph_mutations
if not new_mutations:
new_job = cmj.Job(None, pj, self, None, dir)
new_job.status.set(cmj.JobStatus.SUCCESS)
self.add_job(new_job)
continue
# Only need to generate structures for the new mutations.
new_mutation_filename = 'new_mutations.txt'
with open(new_mutation_filename, 'w') as f:
for new_mutation in new_mutations:
f.write(f'{new_mutation}\n')
updated_mutation_filename = new_mutation_filename
try:
prime_mutants_fname = pmg.generate_protein_mutation(
jobname, structure_file, updated_mutation_filename,
num_procs, residue_file)
except EnvironmentError as e:
new_job = cmj.Job(None, pj, self, None, dir)
for file in os.listdir(dir):
if file not in existing_files:
new_job.output.add(
file,
type="file" if os.path.isfile(file) else "dir")
new_job.status.set(cmj.JobStatus.BACKEND_ERROR)
self.add_job(new_job)
self._log("ERROR: %s" % e)
else:
# Prepare new job
new_job = cmj.Job(None, pj, self, None, dir)
new_job.output.add(os.path.join(os.getcwd(),
prime_mutants_fname),
tag="PRIME")
new_job.status.set(cmj.JobStatus.SUCCESS)
self.add_job(new_job)
self._print("debug", "Out ProteinMutationGenerator.crunch")
[docs] def create_mutation_file(self, mutation_file):
"""
"""
with open(mutation_file, 'w') as fh:
for e in self.param.mutations.val:
fh.write(e)
fh.write("\n")
[docs]class ReplicateStructure(cmj.StructureStageBase):
"""
This takes the output structure from the previous stage
and replicates it, so it has the same number of molecules
as the input mae_file. Then it copies the coordinates
and structure properties from mae_file.
"""
PARAM = cmj._create_param_when_needed("""
DATA = {
mae_file = ""
}
VALIDATE = {
mae_file = {type = str1}
}
""")
NAME = "replicate_structure"
[docs] def run(self, jobname: str, input_fname: str) -> str:
# Relative to main job dir
mae_path = Path('..', self.param.mae_file.val).absolute()
mol_st = structure.Structure.read(input_fname)
index = mol_st.property.get(constants.CT_INDEX, 1)
repeated_st = structure.Structure.read(mae_path, index=index)
# Tile mol_st `n_molecules` times
n_molecules = repeated_st.atom_total // mol_st.atom_total
new_st = struc.struc_merge([mol_st] * n_molecules)
# Match the repeated_ct molecules with the new_st molecules
# and update the coordinates
repeated_coords = repeated_st.getXYZ()
new_coords = new_st.getXYZ(copy=False)
for repeated_mol, new_mol in zip(repeated_st.molecule, new_st.molecule):
for i, j in self._get_atom_pairs(repeated_mol, new_mol):
new_coords[j - 1, :] = repeated_coords[i - 1, :]
# Update the properties
new_st.property.update(mol_st.property)
out_fname = f"{jobname}-out.mae"
new_st.write(out_fname)
return out_fname
def _get_atom_pairs(self, mol1: structure._Molecule,
mol2: structure._Molecule) -> List[Tuple[int, int]]:
"""
Return a list of atom idxs for mol1 and the
corresponding atom index for mol2.
"""
mol1_atom_idxs = sorted(mol1.getAtomIndices())
mol2_atom_idxs = sorted(mol2.getAtomIndices())
r_mol1 = rdkit_adapter.to_rdkit(mol1.extractStructure())
r_mol2 = rdkit_adapter.to_rdkit(mol2.extractStructure())
mcs_smarts = FindMCS([r_mol1, r_mol2]).smartsString
r_mol1_results = r_mol1.GetSubstructMatches(
Chem.MolFromSmarts(mcs_smarts))
r_mol2_results = r_mol2.GetSubstructMatches(
Chem.MolFromSmarts(mcs_smarts))
atom_mapping = [[[i + 1 for i in j] for j in r_mol1_results],
[[i + 1 for i in j] for j in r_mol2_results]]
# Shift atom idx based on the corresponding molecule atom idx
atom_mapping_shifted = []
for i, j in zip(atom_mapping[0][0], atom_mapping[1][0]):
atom_mapping_shifted.append(
(mol1_atom_idxs[i - 1], mol2_atom_idxs[j - 1]))
return atom_mapping_shifted
[docs]class GroupWaters(cmj.StructureStageBase):
"""
Set the `FEP_ABSOLUTE_ENERGY` property on waters, optionally selecting
only non-crystal waters
"""
NAME = "group_waters"
PARAM = cmj._create_param_when_needed("""
DATA = {
skip_crystal_waters = false
}
VALIDATE = {
skip_crystal_waters = {type = bool}
}
""")
[docs] def run(self, jobname, input_fname):
st_list = list(structure.StructureReader(input_fname))
component_types = [st.property.get(constants.CT_TYPE) for st in st_list]
output_fname = jobname + '-in.cms'
solv_ct = st_list[component_types.index(constants.CT_TYPE.VAL.SOLVENT)]
for a in solv_ct.atom:
if not (self.param.skip_crystal_waters and
a.property.get(constants.CRYSTAL_WATER_PROP)):
a.property[constants.FEP_ABSOLUTE_ENERGY] = 1
struc.write_structures(st_list, output_fname)
return output_fname