import base64
import copy
import glob
import json
import math
import os
import shutil
import string
import warnings
from collections import defaultdict
from itertools import chain
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from pathlib import Path
import numpy as np
from schrodinger import structure
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import config
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import envir
from schrodinger.application.desmond import fep_schedule
from schrodinger.application.desmond import fepana
from schrodinger.application.desmond import struc
from schrodinger.application.desmond import util
from schrodinger.application.desmond.constants import FEP_ENCODED_RESTRAINTS
from schrodinger.application.desmond.constants import FEP_RESTRAIN
from schrodinger.application.desmond.constants import FEP_RESTRAINT_CORRECTION
from schrodinger.application.desmond.stage import simulate
from schrodinger.application.desmond.stage.jobs import DesmondJob
from schrodinger.application.desmond.stage.utils import SystemBuilder
from schrodinger.application.desmond.stage.utils import check_cms_file
from schrodinger.application.desmond.stage.workflow import \
PREDEFINED_STAGE_FAMILY
from schrodinger.infra import mm
from schrodinger.job import jobcontrol
from schrodinger.structutils.analyze import evaluate_asl
from schrodinger.adapter import evaluate_smarts
from schrodinger.utils import fileutils
from schrodinger.utils import sea
from schrodinger.utils import subprocess
Restraints = List[sea.Map]
__all__ = [
'AssignCustomCharge', 'AssignForcefield', 'AssignLambdaSchedule',
'LoadRestraintsFromStructure', 'ForcefieldBuilderLauncher'
]
[docs]def is_valid_opls_name(name: str) -> bool:
try:
mm.opls_name_to_version(name)
except IndexError:
return False
return True
[docs]class AssignCustomCharge(cmj.StructureStageBase):
"""
This sets up and runs the custom charge script
for the ligand cts in the input file. Ligands
must be in separate cts, otherwise custom charges
will not be assigned. Other cts will be passed
unchanged. This must be called prior to the
AssignForcefield stage.
"""
NAME = "assign_custom_charge"
PARAM = cmj._create_param_when_needed("""
DATA = {
enabled = false
mode = %s
}
VALIDATE = {
enabled = {type = bool}
mode = {type = enum range = [%s]}
}
""" % (constants.CUSTOM_CHARGE_MODE.KEEP, " ".join(
constants.CUSTOM_CHARGE_MODE)))
[docs] def run(self, jobname: str, mae_fname: str) -> Optional[str]:
mode = self.param.mode.val
if self.param.enabled.has_tag("setbyuser"):
if self.param.enabled.val:
mode = constants.CUSTOM_CHARGE_MODE.ASSIGN
msg = f'The "enabled = true" option is deprecated. Use "mode = {mode}" instead.'
else:
mode = constants.CUSTOM_CHARGE_MODE.KEEP
msg = f'The "enabled = false" option is deprecated. Use "mode = {mode}" instead.'
warnings.warn(msg, DeprecationWarning, stacklevel=2)
out_fname = f'{jobname}-out.mae'
cts = list(structure.StructureReader(mae_fname))
ligands = []
if mode in [
constants.CUSTOM_CHARGE_MODE.CLEAR,
constants.CUSTOM_CHARGE_MODE.ASSIGN
]:
# This import is not available in fep scholar.
from schrodinger.application.scisol.packages.fep import fepmae
_, _, _, ligands = fepmae.filter_receptors_and_ligands(cts)
custom_cts = []
for i, ct in enumerate(cts):
if mode in [
constants.CUSTOM_CHARGE_MODE.CLEAR,
constants.CUSTOM_CHARGE_MODE.ASSIGN
]:
# Clear existing custom charges
mm.mmffld_removeCustomChargesFromCT(ct.handle)
if ct not in ligands:
custom_cts.append(ct)
continue
# For keep and clear, do not generate custom charges
if mode in [
constants.CUSTOM_CHARGE_MODE.KEEP,
constants.CUSTOM_CHARGE_MODE.CLEAR
]:
custom_cts.append(ct)
continue
# If there is more than one molecule present, skip custom charge assignment
if len(ct.molecule) > 1:
self._print(
"quiet",
f"WARNING: More than one molecule present in {ct.title}, "
"skipping custom charge assignment.")
custom_cts.append(ct)
continue
# Input filename must be unique per ct.
charge_in_fname = "{}_{}-in.mae".format(jobname, i)
charge_out_fname = "{}-out.mae".format(jobname)
ct.write(charge_in_fname)
cmd = [
os.path.join(envir.CONST.SCHRODINGER,
"generate_custom_charges"),
"-imae",
charge_in_fname,
"-omae",
charge_out_fname,
]
# Reference coordinates may not have been set
if constants.REFERENCE_COORD_PROPERTIES[0] in ct.atom[1].property:
cmd.extend([
'-coord_property_basename',
constants.REFERENCE_COORD_BASENAME
])
job = jobcontrol.launch_job(cmd)
job.wait()
if not job.succeeded() or not os.path.exists(charge_out_fname):
self._print("quiet", "ERROR: generate_custom_charges failed:")
for log_fname in glob.glob('*.log'):
self._print("quiet", f"Log file : {log_fname}")
with open(log_fname, 'r') as f:
self._print(
"quiet",
f"Log file content:\n>{'>'.join(f.readlines())}")
self._print("quiet", "(end of log file)\n")
return None
# Add output to list
custom_ct = structure.Structure.read(charge_out_fname)
custom_cts.append(custom_ct)
# Clean up temporary files
fileutils.force_rmtree(charge_in_fname.strip('.mae'),
ignore_errors=True)
fileutils.force_remove(charge_in_fname)
fileutils.force_remove(charge_out_fname)
# Write output cts, potentially with custom charge.
# May be missing if a generate_custom_charges job failed.
if len(custom_cts) == len(cts):
with structure.StructureWriter(out_fname) as w:
for ct in custom_cts:
w.append(ct)
return out_fname
[docs]class AssignForcefield(cmj.StageBase):
"""
"""
NAME = "assign_forcefield"
FFLD_WATER = constants.FFLD_WATER
parameter_string = string.Template("""
DATA = {
forcefield = OPLS_2005
water = SPC
humble = no
fepio_mode = 2
restrain = none
restraints = {
new = []
existing = $restraint_ignore
}
print_restraint = false
atom_group = retain
fep_retain_angle = yes
core_hopping_fepio = on
hydrogen_mass_repartition = off
make_alchemical_water = on
fep_enhance_sampling_dihedral = off
assign_is_infinite = off
fail_on_lewis_failure = on
use_zob_property = on
pose_conf_restraint = {
enable = false
name = harm
schedule = $pose_conf_schedule
fc = 50.0
sigma = 10.0
alpha = 1.0
dihedral_schedule = ""
}
macromolecule_conf_restraint = {
enable = false
dihedral_schedule = ""
$backbone = {
name = harm
schedule = ""
fc = 50.0
sigma = 10.0
alpha = 1.0
}
$sidechain = {
name = harm
schedule = ""
fc = 50.0
sigma = 10.0
alpha = 1.0
}
$calpha_rung = {
name = harm
schedule = ""
fc = 5.0
alpha = 1.0
}
}
}
VALIDATE = {
forcefield = {type = str _check = check_forcefield}
water = {type = enum range = [$ffld_water none]}
humble = {type = bool}
fepio_mode = {type = int range = [1 3]}
atom_group = [
{type = enum range = [retain none]}
{atom = {type = str}
name = {type = str}
index = {type = int range = [0 7]}
}
{type = list size = 0
elem = {atom = {type = str}
name = {type = str}
index = {type = int range = [0 7]}
}
}
]
restrain = [
{type = enum range = [retain none]}
{_mapcheck = check_restrain _skip = all}
{type = list size = -1
elem = {_mapcheck = check_restrain _skip = all}
}
]
restraints = {
existing = {type = enum range = [$existing_restraint]}
new = [{type = list} {_skip = all}]
}
print_restraint = {type = bool }
fep_retain_angle = {type = bool}
core_hopping_fepio = {type = bool}
hydrogen_mass_repartition = {type = bool}
make_alchemical_water = {type = bool}
fep_enhance_sampling_dihedral = {type = bool}
assign_is_infinite = {type = bool}
fail_on_lewis_failure = {type = bool}
use_zob_property = {type = bool}
pose_conf_restraint = {
enable = {type = bool}
name = {type = enum range = [harm fbhw soft]}
fc = {type = float+}
sigma = {type = float+}
alpha = {type = float+}
schedule = {type = str}
dihedral_schedule = {type = str}
_mapcheck = check_pose_conf_restraint
}
macromolecule_conf_restraint = {
enable = {type = bool}
dihedral_schedule = {type = str}
$backbone = {
name = {type = enum range = [harm fbhw soft]}
schedule = {type = str}
fc = {type = float+}
sigma = {type = float+}
alpha = {type = float+}
}
$sidechain = {
name = {type = enum range = [harm fbhw soft]}
schedule = {type = str}
fc = {type = float+}
sigma = {type = float+}
alpha = {type = float+}
}
$calpha_rung = {
name = {type = enum range = [harm]}
schedule = {type = str}
fc = {type = float+}
alpha = {type = float+}
}
}
}
""").substitute(ffld_water=" ".join(FFLD_WATER.keys()),
restraint_ignore=constants.EXISTING_RESTRAINT.IGNORE,
existing_restraint=" ".join(constants.EXISTING_RESTRAINT),
pose_conf_schedule=constants.POSE_DIHEDRAL_RESTRAINT,
backbone=constants.ConfRestraintType.BACKBONE,
sidechain=constants.ConfRestraintType.SIDECHAIN,
calpha_rung=constants.ConfRestraintType.CALPHA_RUNG)
PARAM = cmj._create_param_when_needed(parameter_string)
[docs] def crunch(self):
"""
"""
self._print("debug", "In AssignForcefield.crunch")
from schrodinger.application.desmond.packages import viparr
for pj in self.get_prejobs():
jobname, dir = self._get_jobname_and_dir(pj)
fname_in = jobname + "-in.mae"
fname_out = jobname + "-out.cms"
if (not os.path.isdir(dir)):
os.makedirs(dir)
util.chdir(dir)
util.symlink(pj.output.struct_file(), fname_in)
# Constructs the command line for job-launching.
if is_valid_opls_name(self.param.forcefield.val):
def run(job):
if mm.opls_name_to_version(self.param.forcefield.val) < 16:
if self.param.water.val in constants.S_OPLS_WATER_MODELS:
self._print(
"quiet",
f"ERROR: {self.param.water.val} water model uses virtual sites and is not compatible with OPLS_2005."
)
job.status.set(cmj.JobStatus.BACKEND_ERROR)
return
util.chdir(job.dir)
fname_in = job.jobname + "-in.mae"
fname_out = job.jobname + "-out.cms"
tmp_fname = f'{job.jobname}-out.mae'
all_cts = list(
structure.StructureReader(pj.output.struct_file()))
for ct in all_cts:
cms.Cms.clean_ct_properties(ct)
try:
water_ct_index = self.get_water_ct_indices(all_cts)
except ValueError:
job.status.set(cmj.JobStatus.BACKEND_ERROR)
return
if (water_ct_index != []):
water = self.param.water.val
if (water.lower() != "none"):
for idx in water_ct_index:
for a in all_cts[idx].atom:
a.pdbres = AssignForcefield.FFLD_WATER[
water.upper()]
print(f'Writing {len(all_cts)} structures to {tmp_fname}')
with open(tmp_fname, 'w') as fp:
# DESMOND-11261 -- write & flush & fsync
fp.write('{\n s_m_m2io_version\n :::\n 2.0.0\n}\n\n')
fp.write(struc.write_structures(all_cts))
fp.flush()
os.fsync(fp.fileno())
with open(tmp_fname, 'r') as fp:
print(f'wrote : {len(fp.read())} bytes')
# ZOB property is only supported for FFLD 16
use_zob_property = self.param.use_zob_property.val
if mm.opls_name_to_version(self.param.forcefield.val) < 16:
use_zob_property = False
cmd = ["run", "desmond_ff_builder",
"--opls_version", self.param.forcefield.val,
"--fep_mode", str(self.param.fepio_mode.val),
"--fep_retain_angle", str((2 if (self.param.fep_retain_angle.val) else 1)), ] \
+ (["--keep_ffio_block", ] if (self.param.humble.val) else []) \
+ (["--skip_fepio"] if (self.param.core_hopping_fepio.val) else []) \
+ (["--use_zob_prop"] if (use_zob_property) else []) \
+ (["--fail_on_lewis_failure"] if (self.param.fail_on_lewis_failure.val) else []) \
+ [tmp_fname, fname_out, ]
proc_ret = subprocess.call(cmd)
if (proc_ret or not os.path.isfile(fname_out)):
fileutils.force_remove(fname_out)
job.status.set(cmj.JobStatus.BACKEND_ERROR)
return
model = cms.Cms(file=fname_out)
model.sanitize_for_viparr()
model.write(fname_out)
job.status.set(cmj.JobStatus.SUCCESS)
jlaunch_cmd = run
else:
ffmap = {
"CHARMM": [
"-f",
"charmm36_lipids",
"-f",
"charmm36_protein",
],
"AMBER": [
"-f",
"amber03",
],
}
try:
ff = ffmap[self.param.forcefield.val]
except KeyError:
ff = [
"-f",
self.param.forcefield.val,
]
water = self.param.water.val.lower()
def run(job, humble=self.param.humble.val, ff=ff, water=water):
fname_in = job.jobname + "-in.mae"
fname_out = job.jobname + "-out.cms"
c_opt = []
if humble:
cts = [e for e in cms.ffiostructure.CMSReader(fname_in)]
for i, e in enumerate(cts, 1):
if (not e.hasFfio()):
c_opt.extend([
"-c",
str(i),
])
args = c_opt + ff
if water != "none":
args.extend(["-f", water])
args.extend([fname_in, fname_out + "_vp"])
self._print("debug", "viparr args: " + str(args))
viparr.main(args)
if not os.path.isfile(fname_out + "_vp"):
job.status.set(cmj.JobStatus.BACKEND_ERROR)
return
cmd = [
"run",
"-FROM",
"desmond",
"build_constraints.py",
fname_out + "_vp",
fname_out + "_vp_bc",
]
proc_ret = subprocess.call(cmd)
if (proc_ret or not os.path.isfile(fname_out + "_vp_bc")):
job.status.set(cmj.JobStatus.BACKEND_ERROR)
return
cmd = [
"run",
"desmond_ff_builder",
"--fep_mode",
str(self.param.fepio_mode.val),
"--fep_retain_angle",
str((2 if (self.param.fep_retain_angle.val) else 1)),
"--keep_ffio_block",
fname_out + "_vp_bc",
fname_out,
]
proc_ret = subprocess.call(cmd)
if (proc_ret or not os.path.isfile(fname_out)):
job.status.set(cmj.JobStatus.BACKEND_ERROR)
return
job.status.set(cmj.JobStatus.SUCCESS)
jlaunch_cmd = run
new_job = DesmondJob(pj.task, pj.gid, pj.permanent_restrain,
pj.permanent_group, jobname, pj, self,
jlaunch_cmd, dir)
new_job.need_host = False
new_job.output.add(os.path.abspath(fname_out),
checker=check_cms_file)
self.add_job(new_job)
self._print("debug", "Out AssignForcefield.crunch")
def _postbuild(self, job):
"""
"""
fname = job.output.struct_file()
model = cms.Cms(file=fname)
model = SystemBuilder.set_atom_group(model, self.param.atom_group)
setting = self.param.restrain
if (isinstance(setting, sea.Atom) and setting.val == "retain"):
model = SystemBuilder.set_restrain(model, sea.Atom("none"),
job.permanent_restrain, job)
else:
model = SystemBuilder.set_restrain(model, setting, None, job)
# when empty set permanent_restrain to None
job.permanent_restrain = model.get_restrain() or None
job.permanent_group = model.get_atom_group()
if (job.permanent_group == []):
job.permanent_group = None
self._update_maestro_group_and_title(model)
if self.param.assign_is_infinite.val:
from schrodinger.application.matsci import desmondutils
model.remove_cts_property(constants.IS_INFINITE)
is_infinite = desmondutils.is_infinite(model)
model.set_cts_property(constants.IS_INFINITE, is_infinite)
self._print('debug',
'Set %s to %s' % (constants.IS_INFINITE, is_infinite))
titration_ct = model.get_titration_ct()
if titration_ct is not None:
from schrodinger.application.scisol.packages.lambda_dynamics.interaction_terms import \
generate_titration_states
try:
generate_titration_states(titration_ct,
self.param.forcefield.val)
titration_ct.property[constants.DESMOND_WRITE_JSON] = True
except (ChildProcessError, FileNotFoundError) as err:
self._print('quiet', err)
job.status.set(cmj.JobStatus.BACKEND_ERROR)
return
model.write(fname)
(ref_ct, mut_ct) = model.get_fep_cts()
if self.param.core_hopping_fepio.val and mut_ct and ref_ct:
out_fname = fname + "_"
ffld_name = self.param.forcefield.val
if not is_valid_opls_name(ffld_name):
raise RuntimeError(
"ERROR: Core-hopping FEP only works with OPLS force field.")
if model.use_titration:
import schrodinger.application.scisol.packages.lambda_dynamics.interaction_terms as interaction_terms
from schrodinger.application.desmond.packages.restraint import \
b64_encode
from schrodinger.application.desmond.packages.restraint import \
get_encoded_restraints
from schrodinger.application.desmond.packages.restraint import \
set_encoded_restraints
# offset comp_ct index by 1 (starting number for ct) + 1 (first cms ct is always fullsystem)
ct_numbers = tuple(
model.comp_ct.index(ct) + 2 for ct in (ref_ct, mut_ct))
alchemical_changes = interaction_terms.combine_pairwise_int_match(
(ref_ct, mut_ct),
opls_version=mm.opls_name_to_version(ffld_name),
restraint_fc=10,
conf_restraint=self.param.macromolecule_conf_restraint.val,
dihedral_sampling=self.param.fep_enhance_sampling_dihedral.
val,
pose_restraint=self.param.pose_conf_restraint.val,
ct_numbers=ct_numbers)
pose_conf_restraints = alchemical_changes.pop(
constants.CONF_POST_RESTRAINTS, None)
ref_ct.property[constants.FEP_ALCHEMICAL_CHANGE] = b64_encode(
json.dumps(alchemical_changes))
if pose_conf_restraints is not None:
# This should be the first time the restraint property is set in model.
if get_encoded_restraints(model):
self._print(
"quiet",
"Warning: existing restraints found, these will be ignored and overwritten."
)
set_encoded_restraints(
model, b64_encode(pose_conf_restraints.toJson()))
ref_ct.property[constants.DESMOND_WRITE_JSON] = True
# FEP-stage number used in msys for alchemical changes
# Ideally they should be imported from desmond
ref_ct.property[constants.FEPIO_STAGE] = 1
mut_ct.property[constants.FEPIO_STAGE] = 2
model.write(out_fname)
shutil.move(out_fname, fname)
else:
import schrodinger.application.scisol.packages.core_hopping.int_fepio as fepio
fepio.write_fepio_from_int_map(
fname,
out_fname,
mm.opls_name_to_version(ffld_name),
conf_restraint=self.param.macromolecule_conf_restraint.val,
dihedral_sampling=self.param.fep_enhance_sampling_dihedral.
val,
pose_restraint=self.param.pose_conf_restraint.val)
shutil.move(out_fname, fname)
if cmj.has_explicit_restraints(self.param):
if job.permanent_restrain:
raise RuntimeError(
"ERROR: old-style (permanent) restrain and 'restraints' "
"cannot be used simultaneously")
if "restrain" in self.param and self.param.restrain.val != "none":
raise RuntimeError(
"ERROR: 'restrain' and 'restraints' are mutually exclusive")
from schrodinger.application.desmond.packages.restraint import \
RestraintBuilder
model = cms.Cms(file=fname)
builder = RestraintBuilder(
restraint_terms=self.param.restraints.new,
existing=self.param.restraints.existing.val,
cms_sys=model,
persistent=True)
builder.addRestraints()
out_fname = fname + "_restraints"
model.write(out_fname)
shutil.move(out_fname, fname)
if self.param.print_restraint.val:
self._print("quiet", builder.getString(compact=True))
if self.param.hydrogen_mass_repartition.val:
model = cms.Cms(file=fname)
out_cms_filename = fname + "__"
for ct in model.comp_ct:
ct.ffio.property[
'b_ffio_hmr'] = self.param.hydrogen_mass_repartition.val
model.write(out_cms_filename)
shutil.move(out_cms_filename, fname)
if self.param.make_alchemical_water.val and mut_ct and ref_ct:
fname_alchem_water = fname + "_prealchemical_water.cms"
shutil.copyfile(fname, fname_alchem_water)
self._make_alchemical_water(fname_alchem_water)
shutil.move(fname_alchem_water, fname)
[docs] def hook_captured_successful_job(self, job):
"""
"""
try:
self._postbuild(job)
except RuntimeError as e:
self._print("quiet", e)
return "dissolved"
@staticmethod
def _update_maestro_group_and_title(model: cms.Cms):
component = struc.component_structures(model)
subgroup = (component.fep_ref and
"FEP: %s" % cmj.ENGINE.jobname or
"MD: %s" % cmj.ENGINE.jobname) # yapf: disable
struc_iter = struc.struc_iter(component.solute, component.membrane,
component.cosolvent)
solvent_iter = struc.struc_iter(component.solvent)
model.fsys_ct.title = " + ".join(e.title for e in struc_iter) or \
" + ".join(e.title for e in solvent_iter) or \
'Untitled'
for ct in ([model.fsys_ct] + model.comp_ct):
ct.property[constants.M_SUBGROUP_TITLE] = ct.property[
constants.M_SUBGROUPID] = subgroup
ct.property[constants.M_SUBGROUP_COLLAPSED] = 0
model.synchronize_fsys_ct()
@staticmethod
def _make_alchemical_water(cms_fname: str):
"""
Reads a CMS file, and moves one or more water molecules out of the
"solvent" component structure(s) into a new component structure whose
CT type is `constants.CT_TYPE.VAL.ALCHEMICAL_WATER`. We call this
component structure "alchemical water".
The number of the moved water molecules is determined by the difference
in formal charge between the reference and mutant component structures
such that once the alchemical water molecules are mutated to ions the
mutant structure would have the same formal charge as the reference. If
there is no difference, then there is no need for alchemical water, and
this function will have no effects.
The moved water molecules are selected such that the oxygen atoms are
at least 10 Angstroms away from any atoms in the "solute" component
structure. If no such water molecules can be found, this function will
try to find water molecules closer to the "solute" atoms, but not closer
than 2 angstroms. If still no water molecules can be found, a
`ValueError` exception will be raised.
The new "alchemical water" component structure is put right after the
reference and the mutant component structures. A new `Cms` object will
be constructed and is written out, overwriting the original CMS file
`cms_fname`.
"""
cms_model = cms.Cms(file=cms_fname)
charge_diff = (cms_model.ref_ct.formal_charge -
cms_model.mut_ct.formal_charge)
if 0 == charge_diff:
return
num_water_wanted = abs(charge_diff)
bulk_water_oxygen_asl = f"(water and (atom.ele O)) and (beyond %d " \
f"(atom.{constants.CT_TYPE} '{constants.CT_TYPE.VAL.SOLUTE}'))"
min_distance_away = 10
water_oxygen_aids = []
while num_water_wanted > len(water_oxygen_aids):
if min_distance_away <= 1:
raise ValueError("Not enough water molecules to pick for "
"alchemical water.")
water_oxygen_aids = cms_model.select_atom(
bulk_water_oxygen_asl % min_distance_away)[:num_water_wanted]
min_distance_away -= 1
# Converts `water_oxygen_aids` to atom indices in the component
# structure.
atom_index_fix = {}
index_delta = 0
for i, comp_ct in enumerate(cms_model.comp_ct):
atom_index_fix[i] = index_delta
index_delta += comp_ct.atom_total
water_oxygen_indices = defaultdict(list)
# Key = comp-ct-index, value = atom-index-in-comp-ct
for aid in water_oxygen_aids:
atom = cms_model.fsys_ct.atom[aid]
ct_index = atom.property[
constants.CT_INDEX] - 1 # `CT_INDEX` property is 1-based.
water_oxygen_indices[ct_index].append(aid -
atom_index_fix[ct_index])
# We will extract the selected water molecules out of their respective
# component structures and put them into a new component water
# structure.
# First, we prepare an empty new structure with the proper ffio block
# for water molecules. We achieve that by copying an existing water
# structure and deleting all atoms in it.
alchem_water_structure = cms_model.comp_ct[ct_index].copy()
alchem_water_structure.title = "Alchemical Water"
alchem_water_structure.deleteAtoms(
range(1, alchem_water_structure.atom_total + 1))
# Then we move the selected water molecules into the new component
# structure.
alchem_water_structure.ffio.deletePseudos(
list(range(1,
len(alchem_water_structure.ffio.pseudo) + 1)))
alc_pseudo_idx = 1
for ct_index, oxygen_indices in water_oxygen_indices.items():
comp_ct = cms_model.comp_ct[ct_index]
atom_indices = evaluate_asl(
comp_ct,
"fillmol (a.n %s)" % " ".join(map(str, oxygen_indices)))
alchem_water_structure.extend(
comp_ct.extract(atom_indices, copy_props=True))
comp_ct.deleteAtoms(atom_indices)
n_virtuals = len(comp_ct.ffio.virtual)
if n_virtuals:
del_pseudo = []
for oxygen_idx in oxygen_indices:
water_number = (oxygen_idx - 1) // 3
del_pseudo.extend([
water_number * n_virtuals + vir_idx
for vir_idx in range(1, n_virtuals + 1)
])
alchem_water_structure.ffio.addPseudos(len(del_pseudo))
for pseudo_idx in del_pseudo:
src_pseudo = comp_ct.ffio.pseudo[pseudo_idx]
alc_pseudo = alchem_water_structure.ffio.pseudo[
alc_pseudo_idx]
(alc_pseudo.x_coord, alc_pseudo.y_coord,
alc_pseudo.z_coord) = (src_pseudo.x_coord,
src_pseudo.y_coord,
src_pseudo.z_coord)
alc_pseudo_idx += 1
comp_ct.ffio.deletePseudos(del_pseudo)
# Sets a structure-level property `constants.ALCHEMICAL_WATER_CHARGE` to
# the desired value (+1 or -1). Eventually in the backend, this value
# will become the `chargeB` of the alchemical water's oxygen atoms.
alchem_charge = math.copysign(1, charge_diff)
alchem_water_structure.property[constants.ALCHEMICAL_WATER_CHARGE] = \
alchem_charge
# Puts the new structure right after the FEP structures.
alchem_water_structure.property[constants.CT_TYPE] = \
constants.CT_TYPE.VAL.ALCHEMICAL_WATER
ct_index = max(cms_model.comp_ct.index(cms_model.ref_ct),
cms_model.comp_ct.index(cms_model.mut_ct)) + 1
cms_model.comp_ct.insert(ct_index, alchem_water_structure)
cms_model.synchronize_fsys_ct()
cms_model.write(cms_fname)
[docs] @staticmethod
def get_water_ct_indices(ct_list):
water_ct_index = []
for i, ct in enumerate(ct_list):
if ct.property[constants.CT_TYPE] == constants.CT_TYPE.VAL.SOLVENT:
# If num component is 1, speed up evaluate_smarts by just
# checking first molecule (MATSCI-12463)
if ct.property.get(constants.NUM_COMPONENT) == 1:
ct = ct.molecule[1].extractStructure()
list_of_list = evaluate_smarts(ct, "[H]O[H]", True, 1000000000)
atom = list(chain(*list_of_list))
if atom != []:
if len(atom) != ct.atom_total:
msg = ("ERROR: Failed to set force field parameters "
"for solvent CT (%i), which contains mixture of "
"water and nonwater molecules. %i does not match"
" asl atom count using (SMARTS. \"[H]O[H]\")" %
(ct.atom_total, len(atom)))
print(msg)
raise ValueError(msg)
water_ct_index.append(i)
return water_ct_index
[docs]class AssignLambdaScheduleError(Exception):
pass
[docs]class AssignLambdaSchedule(cmj.StageBase):
"""
"""
NAME = "assign_lambda_schedule"
PARAM = cmj._create_param_when_needed([
"""
DATA = {
}
VALIDATE = {
}
""",
])
[docs] def crunch(self) -> None:
"""
"""
# Read conformation and compute electric dipole moments of the
# original and perturbed systems.
for pj in self.get_prejobs():
jobname, dir = self._get_jobname_and_dir(pj)
if not os.path.isdir(dir):
os.makedirs(dir)
util.chdir(dir)
input_cms = pj.output.struct_file()
try:
norm_dipole_per_atom = self._compute_norm_dipole_per_atom(
input_cms)
except AssignLambdaScheduleError:
self._print(
'quiet', 'ERROR: The structure was not set up '
'properly for alchemical FEP simulations')
status = cmj.JobStatus.BACKEND_ERROR
else:
status = cmj.JobStatus.SUCCESS
self._print('debug',
f'norm_dipole_per_atom: {norm_dipole_per_atom}')
new_job = copy.deepcopy(pj)
new_job.stage = cmj.weakref.proxy(self)
new_job.output = cmj.JobOutput()
new_job.need_host = False
new_job.status.set(status)
new_job.output.set_struct_file(os.path.abspath(input_cms))
self.add_job(new_job)
# Compute lambda schedule and update subsequent task stages.
if status == cmj.JobStatus.SUCCESS:
current_stage = self._NEXT_STAGE
while current_stage:
if current_stage.NAME in PREDEFINED_STAGE_FAMILY['md']:
if current_stage.NAME == 'concatenate':
for sim_param in current_stage.param.simulate:
sim_stage = simulate.Simulate()
sim_stage.param = sim_param
if sim_stage.NAME in PREDEFINED_STAGE_FAMILY[
'md']:
self._update_fep_lambda(
norm_dipole_per_atom, sim_stage)
else:
self._update_fep_lambda(norm_dipole_per_atom,
current_stage)
current_stage = current_stage._NEXT_STAGE
[docs] @staticmethod
def get_n_vdw_and_charge_div(norm_dipole_per_atom: float, n_win: int) \
-> Tuple[int, float]:
"""Compute number of van der Waals windows and fraction of charges.
Uses a "broken stick" function to determine the partition of the
lambda windows into van der Waals and electrostatic interactions.
"""
x = norm_dipole_per_atom
p1 = np.poly1d([-932.51920742, 94.84779172])
p2 = np.poly1d([-20.80999859, 69.48773835])
x0 = -(p1[0] - p2[0]) / (p1[1] - p2[1]) # Intersection point.
alpha0 = p1[0] + p1[1] * x0
alpha1 = (p1[1] + p2[1]) / 2
alpha2 = -(p1[1] - p2[1]) / 2
percent_vdw = (alpha0 + alpha1 * (x - x0) + alpha2 *
(x - x0) * np.sign(x - x0))
# The number of van der Waals windows (n_vdw) and the inverse of
# the fraction of Coulomb windows (charge_div) are set so that
# there always are at least two windows assigned to each
# interaction.
n_vdw = max(2, int(percent_vdw / 100 * n_win))
charge_div = n_vdw if n_win - n_vdw == 2 else n_win / (n_win - n_vdw)
return n_vdw, charge_div
def _update_fep_lambda(self, norm_dipole_per_atom: Tuple[Optional[float],
Optional[float]],
target_stage: cmj.StageBase) -> None:
"""Change the lambda schedule for the stage `target_stage`.
"""
param = target_stage.param
fep_lambda = param.fep.lambda_
scheme, n_win = config.parse_lambda(fep_lambda.val)
self._print('debug', f'scheme = {scheme!r}\tn_win = {n_win}')
if scheme not in {'default', 'charge'}:
warnings.warn(f'Scheme {scheme!r} is currently unsupported')
return
FepSchedule = fep_schedule.get_fep_schedule_class(scheme=scheme)
norm_dipole_per_atom_disappearing = norm_dipole_per_atom[0]
if norm_dipole_per_atom_disappearing is not None:
n_vdwA, charge_divA = self.get_n_vdw_and_charge_div(
norm_dipole_per_atom_disappearing, n_win)
self._print('debug',
f'n_vdwA = {n_vdwA}\tcharge_divA = {charge_divA}')
self._log(
f'Assigning ({n_vdwA} out of {n_win}) lambda windows '
f'to van der Waals A interactions in stage {target_stage.NAME!r}'
)
scheduleA = FepSchedule(n_win, charge_divA)
else:
scheduleA = FepSchedule(n_win)
norm_dipole_per_atom_appearing = norm_dipole_per_atom[1]
if norm_dipole_per_atom_appearing is not None:
n_vdwB, charge_divB = self.get_n_vdw_and_charge_div(
norm_dipole_per_atom_appearing, n_win)
self._print('debug',
f'n_vdwB = {n_vdwB}\tcharge_divB = {charge_divB}')
self._log(
f'Assigning ({n_vdwB} out of {n_win}) lambda windows '
f'to van der Waals B interactions in stage {target_stage.NAME!r}'
)
scheduleB = FepSchedule(n_win, charge_divB)
else:
scheduleB = FepSchedule(n_win)
param.fep.lambda_ = sea.Map()
param.fep.lambda_.vdwA = sea.List(scheduleA.vdwA)
param.fep.lambda_.vdwB = sea.List(scheduleB.vdwB)
param.fep.lambda_.chargeA = sea.List(scheduleA.chargeA)
param.fep.lambda_.chargeB = sea.List(scheduleB.chargeB)
param.fep.lambda_.bondedA = sea.List(scheduleA.bondedA)
param.fep.lambda_.bondedB = sea.List(scheduleB.bondedB)
param.add_tag('setbyuser', propagate=True)
def _compute_norm_dipole_per_atom(self, cms_fname: str) \
-> Tuple[Optional[float], Optional[float]]:
from schrodinger.application.desmond.packages import analysis
from schrodinger.application.desmond.packages import topo
class Dipole(analysis.Dipole):
"""Electric dipole moment of the selected atoms."""
def __init__(self, msys_model, cms_model, aids, charges=None):
super().__init__(msys_model, cms_model, aids)
if charges:
# msys_model.atom(gid).charge always returns zero for the
# atoms belonging only to cms_model.mut_ct, so we replace
# the zeroed charges by the appropriate values here.
self._charges[:len(charges)] = charges
def norm_dipole_per_atom(msys_model, cms_model, atoms, charges=None):
dipole_analyzer = Dipole(msys_model, cms_model, atoms, charges)
frames = [topo.DuckFrame(msys_model)]
electric_dipole = analysis.analyze(frames, dipole_analyzer)
return np.linalg.norm(electric_dipole) / len(atoms)
msys_model, cms_model = topo.read_cms(cms_fname)
fep_ct = cms_model.fep_ct
if fep_ct is None:
raise AssignLambdaScheduleError
atommap = fep_ct.fepio.atommap
atoms_disappearing = [e.ai for e in atommap if e.aj < 0]
atoms_appearing = [e.aj for e in atommap if e.ai < 0]
aids_disappearing = [
fsys_atom.index
for fsys_atom, comp_atom, comp_ct, ct_index in topo.cms_atom(
cms_model)
if comp_ct == cms_model.ref_ct and
comp_atom.index in atoms_disappearing
]
aids_appearing = [
fsys_atom.index
for fsys_atom, comp_atom, comp_ct, ct_index in topo.cms_atom(
cms_model)
if comp_ct == cms_model.mut_ct and
comp_atom.index in atoms_appearing
]
if len(aids_disappearing) > 2:
norm_dipole_per_atom_disappearing = norm_dipole_per_atom(
msys_model, cms_model, aids_disappearing)
else:
norm_dipole_per_atom_disappearing = None
if len(aids_appearing) > 2:
site = fep_ct.ffio.site
charges_appearing = [site[i].charge for i in atoms_appearing]
norm_dipole_per_atom_appearing = norm_dipole_per_atom(
msys_model, cms_model, aids_appearing, charges_appearing)
else:
norm_dipole_per_atom_appearing = None
return norm_dipole_per_atom_disappearing, norm_dipole_per_atom_appearing
[docs]def encode_restraints(ct: structure.Structure, restraints: Restraints):
"""
Encode the restraints in the `FEP_ENCODED_RESTRAINTS` ct property.
:param ct: Structure to modify in place.
:param restraints: List of restraints as sea.Map objects.
"""
encoded_restraint = base64.b64encode(str(restraints).encode())
ct.property[FEP_ENCODED_RESTRAINTS] = encoded_restraint.decode()
[docs]def decode_restraints(ct: structure.Structure) -> Restraints:
"""
Decode the restraints in the `FEP_ENCODED_RESTRAINTS` ct property.
:param ct: Structure to read the restraints from.
:return: If found, restraints that can be passed to
`schrodinger.application.desmond.packages.restraint.RestraintsBuilder`.
Otherwise, return `None`.
"""
encoded_restraints = ct.property.get(FEP_ENCODED_RESTRAINTS)
if encoded_restraints is not None:
new_restraints = base64.b64decode(encoded_restraints).decode()
return sea.Map("new = " + new_restraints).new
[docs]def clear_restraints(ct: structure.Structure):
"""
Clear the `FEP_ENCODED_RESTRAINTS` ct property and
`FEP_RESTRAIN` atom property from a given structure.
:param ct: Structure to modify in place.
"""
struc.delete_structure_properties(ct, FEP_ENCODED_RESTRAINTS)
struc.delete_atom_properties(ct, FEP_RESTRAIN)
[docs]def add_restraint_reference(st: structure.Structure, restraints: Restraints):
"""
Add the reference values for Boresch type distance/angle/dihedral restraints.
:param st: Reference structure.
:param restraints: List of restraints, updated in place.
"""
# Can be given as aid or asl, so convert to aid if needed.
convert_atom = lambda a: a if isinstance(a, int) else evaluate_asl(
st,
a.split('asl:')[-1].strip())[0]
for restraint in restraints:
atoms = restraint.atoms.val
ref_value = st.measure(*[convert_atom(a) for a in atoms])
ref_name = ''
if len(atoms) == 2:
ref_name = 'r0'
elif len(atoms) == 3:
ref_name = 'theta0'
elif len(atoms) == 4:
ref_name = 'phi0'
else:
continue
for ref_ext in 'AB':
ref_key = f'{ref_name}{ref_ext}'
if ref_key not in restraint:
restraint[ref_key] = ref_value
[docs]def calculate_restraint_correction_term(restraints: Restraints,
temperature: float) -> float:
"""
Calculate the correction to the free energy due to the restraints.
:param restraints: List of restraints.
:param temperature: The temperature for the simulation.
:return: The correction term in kcal/mol.
"""
# Convert restraint format
converted_restraints = []
for restraint in restraints:
atoms = restraint.atoms.val
k = max(restraint.force_constants.val)
ref = None
if len(atoms) == 2:
if hasattr(restraint, 'r0A'):
ref = restraint.r0A.val
elif hasattr(restraint, 'r0'):
ref = restraint.r0.val
elif len(atoms) == 3:
if hasattr(restraint, 'theta0A'):
ref = restraint.theta0A.val
elif hasattr(restraint, 'theta0'):
ref = restraint.theta0.val
elif len(atoms) == 4:
if hasattr(restraint, 'phi0A'):
ref = restraint.phi0A.val
elif hasattr(restraint, 'phi0'):
ref = restraint.phi0.val
if ref is None:
print(
f'Could not find reference value for restraint {restraint}, skipping this term for the correction.'
)
continue
converted_restraints.append(fepana.Restraint(atoms, ref, k))
return fepana.calc_free_energy_for_abfe_cross_link_xu(
converted_restraints, temperature)
[docs]def add_restraint_atom_marker(st: structure.Structure,
ligand_asl: str) -> Dict[int, int]:
"""
Update `st` to mark the ligand atoms that could be part of a restraint.
Returns a dictionary mapping the `FEP_RESTRAIN` values to
the structure atom indicies.
:param st: This will be updated in place to add atom properties to mark
the ligand atoms.
:param ligand_asl: ASL to identify the ligand.
"""
ligand_aids = sorted(evaluate_asl(st, ligand_asl))
restraint_to_atom_idx = {}
for i, aid in enumerate(ligand_aids):
# This is used to create restraints that are independent
# of the absolute atom index. Number the ligand atoms
# as if the ligand was by itself.
st.atom[aid].property[FEP_RESTRAIN] = i + 1
restraint_to_atom_idx[i + 1] = aid
return restraint_to_atom_idx
[docs]class LoadRestraintsFromStructure(cmj.StructureStageBase):
"""
Load the restraints encoded in the structure using `encode_restraints`
and store to the cms. By default this stage will append to any existing
restraints, set 'load_restraints_from_structure.existing = ignore'
to ignore existing restraints in the structure.
The restraints can be used by setting 'restrain.existing = retain' in the
subsequent simulate stage.
"""
NAME = "load_restraints_from_structure"
PARAM = cmj._create_param_when_needed([
"""
DATA = {
override = ""
temperature = 300.0
existing = %s
}
VALIDATE = {
override = {type = list size = 0}
temperature = {type = float+}
existing = {type = enum range = [%s]}
}
""" % (constants.EXISTING_RESTRAINT.RETAIN, ' '.join(
constants.EXISTING_RESTRAINT))
])
[docs] def run(self, jobname: str, cms_fname: str) -> Optional[str]:
from schrodinger.application.desmond.packages.restraint import \
RestraintBuilder
model = cms.Cms(file=cms_fname)
has_restraints = any(decode_restraints(ct) for ct in model.comp_ct)
for ct in model.comp_ct:
new_restraints = decode_restraints(ct)
if self.param.override.val:
# Need to add restraint correction to same ct that has the
# restraint.
if has_restraints and new_restraints is None:
continue
new_restraints = self.param.override
# Users can specify restraints without the reference
# values set, so determine the values here.
add_restraint_reference(model.fsys_ct, new_restraints)
dg_correction = calculate_restraint_correction_term(
new_restraints, self.param.temperature.val)
ct.property[FEP_RESTRAINT_CORRECTION] = dg_correction
self._print(
"quiet",
f"Updated restraint correction term: {dg_correction}")
if new_restraints is not None:
# This will update the model in place
restraint_builder = RestraintBuilder(new_restraints,
self.param.existing.val,
model)
restraint_builder.addRestraints()
self._print(
"quiet",
restraint_builder.getString(skip_tables={'posre_harm'},
compact=True))
break
out_fname = f"{jobname}-out.cms"
model.write(out_fname)
return out_fname
[docs]class ForcefieldBuilderLauncher(cmj.StageBase):
NAME = "forcefield_builder_launcher"
PARAM = cmj._create_param_when_needed([
"""
DATA = {
structure_file = ""
additional_args = []
host = "localhost"
output_opls_name = "$MAINJOBNAME-out.opls"
}
VALIDATE = {
additional_args = {type = list size = 0}
structure_file = {type = str1}
host = {type = str1}
output_opls_name = {type = str1}
}
"""
])
[docs] def crunch(self):
self._print("debug", "In ForcefieldBuilderLauncher.crunch")
for pj in self.get_prejobs():
jobname, jobdir = self._get_jobname_and_dir(pj)
jobpath = Path(jobdir)
new_job = cmj.Job(jobname, pj, self, None, jobdir)
self.add_job(new_job)
if not jobpath.exists():
jobpath.mkdir()
# Set up input structure
structure_path = Path(self.param.structure_file.val).absolute()
util.chdir(jobdir)
shutil.copyfile(structure_path, structure_path.name)
# Set up command
cmd = [
'ffbuilder', '-HOST', 'localhost', '-SUBHOST',
self.param.host.val, structure_path.name, '-JOBNAME',
'ffbuilder'
] + self.param.additional_args.val
# Propagate input opls
if oplsdir := os.getenv(mm.OPLS_DIR_ENV):
cmd += ['-OPLSDIR', oplsdir]
self._print("debug", f'Launching job: {cmd}')
job = jobcontrol.launch_job(cmd)
job.wait()
# Copy back output files
out_opls_path = Path('ffbuilder_oplsdir')
if not job.succeeded():
self._print("quiet", "ERROR: forcefield_builder failed:")
if not out_opls_path.exists():
self._print("quiet",
f"Could not find output path {out_opls_path}.")
if not job.succeeded():
self._print("quiet", "Job failed.")
for log_fname in glob.glob('*.log'):
self._print("quiet", f"Log file : {log_fname}")
with open(log_fname, 'r') as f:
log_contents = '>'.join(f.readlines())
self._print("quiet",
f"Log file content:\n>{log_contents}")
self._print("quiet", "(end of log file)\n")
new_job.status.set(cmj.JobStatus.BACKEND_ERROR)
return
if out_opls_path.exists():
self._print(
"quiet",
"The following OPLSDIR will be used for all fep_plus "
"calculations and copied back to the launch "
f"directory: {self.param.output_opls_name.val}")
opls_copy_path = Path(cmj.ENGINE.base_dir,
self.param.output_opls_name.val)
# Find the opls file in the output
out_fname = next(out_opls_path.glob('*.opls'))
shutil.copy(out_fname, opls_copy_path)
self._files4copy.append(opls_copy_path)
# Reset the OPLS_DIR env var
os.environ[mm.OPLS_DIR_ENV] = str(opls_copy_path.absolute())
else:
self._print(
"quiet", "No additional parameters determined by the "
"Force Field Builder workflow.")
# Pass the output since this is the input for the next stage
new_job.status.set(cmj.JobStatus.SUCCESS)
new_job.output = copy.deepcopy(pj.output)
return
self._print("debug", "out ForcefieldBuilderLauncher.crunch")