import base64
import os
import shutil
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import DefaultDict
from typing import List
from typing import Optional
from typing import TYPE_CHECKING
from typing import Tuple
from typing import Union
from schrodinger import structure
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import fepana
from schrodinger.application.desmond import license as desmond_license
from schrodinger.application.desmond import struc
from schrodinger.application.desmond import util
from schrodinger.application.desmond.constants import ABSOLUTE_BINDING_LEGS
from schrodinger.application.desmond.constants import FEP_DDG_PREFIX
from schrodinger.application.desmond.constants import FEP_RESTRAINT_CORRECTION
from schrodinger.application.desmond.constants import GCMC_LIGAND
from schrodinger.application.desmond.measurement import Measurement
from schrodinger.application.desmond.stage import launcher
from schrodinger.application.desmond.stage.prepare import forcefield
from schrodinger.infra import mm
from schrodinger.structure import Structure
from schrodinger.structutils import analyze
from schrodinger.utils import license
from schrodinger.utils import sea
from schrodinger.utils import subprocess
from . import restraint as ligand_restraint
from . import struc as absolute_struc
if TYPE_CHECKING:
from schrodinger.application.desmond.packages import msys
from schrodinger.application.desmond.packages import traj
from schrodinger.application.scisol.packages.fep import graph
__all__ = [
'FepAbsoluteBindingStructurePrimer',
'FepAbsoluteBindingFepPrimer',
'FepAbsoluteBindingMdLauncher',
'FepAbsoluteBindingFepLauncher',
'FepAbsoluteBindingAnalysis',
# Deprecated
'FepAbsoluteBindingMdPrimer',
'FepAbsoluteBindingPrimer',
'FepAbsoluteBindingLauncher',
]
def _get_ligand_st(cms_model: cms.Cms) -> Optional[structure.Structure]:
"""
Find the ligand structure from the given `cms_model`.
Return None if the structure could not be found
"""
# Find the ligand structure
for ct in cms_model.comp_ct:
if ct.property.get(
constants.FEP_STRUC_TAG) == constants.FEP_STRUC_TAG.VAL.LIGAND:
return ct
# ############################## SUBJOB STAGES #################################
[docs]class FepAbsoluteBindingStructurePrimer(cmj.StructureStageBase):
"""
For the ligand with the matching ligand_hash_id,
prepare the Absolute Binding structures for MD
if the `leg` is not specified, otherwise prepare the structures
for FEP. All other ligands will be ignored.
"""
NAME = "fep_absolute_binding_structure_primer"
PARAM = cmj._create_param_when_needed([
"""
DATA = {
ligand_hash_id = ""
leg = ""
}
VALIDATE = {
ligand_hash_id = {type = str1}
leg = {type = str}
}
"""
])
[docs] def run(self, jobname: str, mae_fname: str) -> Optional[str]:
cts = list(structure.StructureReader(mae_fname))
if self.param.leg.val and self.param.ligand_hash_id.val:
self._print("quiet",
"ERROR: Specify leg or ligand_hash_id not both.")
return None
if self.param.leg.val:
# Prepare for FEP
processed_cts = absolute_struc.filter_fep_structures(
cts, self.param.leg.val)
elif self.param.ligand_hash_id.val:
# Prepare for MD
# DESMOND-10214: Temporary patch for DESMOND-10213 and DESMOND-10159
# FIXME: Remove when the underlying problems have been fixed
ligand_hash_id = base64.b64decode(
self.param.ligand_hash_id.val).decode()
processed_cts = absolute_struc.prepare_md_structures(
cts, ligand_hash_id)
else:
# Do nothing
processed_cts = cts
if not processed_cts:
self._print("quiet",
"ERROR: No structures found, stopping workflow.")
return None
out_path = Path(f"{jobname}-out.mae")
struc.write_structures(processed_cts, out_path)
return str(out_path)
[docs]class FepAbsoluteBindingFepPrimer(cmj.StructureStageBase):
NAME = "fep_absolute_binding_fep_primer"
PARAM = cmj._create_param_when_needed([
"""
DATA = {
ligand_asl = "%s"
ligand_restraint = {
enable = false
force_constants = [40.0 0.0]
sigmas = [40.0 40.0]
lambda_schedule_name = "pose_dihedral_restraint"
}
adaptive_ligand_restraint = {
enable = false
force_constants = [40.0 0.0]
lambda_schedule_name = "pose_dihedral_restraint"
}
cross_link_restraint = {
force_constants = [1.0 40.0 40.0]
sigmas = [0.0 0.0 0.0]
lambda_schedule_name = "alchemical_boresch"
lambda_schedule_dihed_name = "alchemical_boresch_dihed"
receptor_asl = "protein and (not atom.att 1) and backbone"
temperature_for_free_energy_correction = 300.0
min_angle=60
r_clone=5
use_bonded_atoms=true
use_pl_interactions=false
use_centroid=false
}
use_representative_structure=false
}
VALIDATE = {
ligand_asl = {type = str1}
ligand_restraint = {
enable = {type = bool}
force_constants = {type = list size = 2 elem = {type = float+}}
sigmas = {type = list size = 2 elem = {type = float+}}
lambda_schedule_name = {type = str1}
}
adaptive_ligand_restraint = {
enable = {type = bool}
force_constants = {type = list size = 2 elem = {type = float+}}
lambda_schedule_name = {type = str1}
}
cross_link_restraint = {
force_constants = {type = list size = 3 elem = {type = float+}}
sigmas = {type = list size = 3 elem = {type = float+}}
lambda_schedule_name = {type = str1}
receptor_asl = {type = str1}
temperature_for_free_energy_correction = {type = float+}
min_angle = {type = float range = [0 90]}
r_clone = {type = float+}
use_bonded_atoms = {type = bool}
use_pl_interactions = {type = bool}
use_centroid = {type = bool}
}
use_representative_structure = {type = bool}
}
""" % (f'a.{constants.FEP_ABSOLUTE_BINDING_LIGAND} 1',)
])
[docs] def run(self, jobname: str, cms_fname: str) -> Optional[str]:
from schrodinger.application.desmond.packages import restraint
from schrodinger.application.desmond.packages import topo
from schrodinger.application.desmond.packages import traj_util
# read input files
msys_model = None
cms_model = cms.Cms(cms_fname)
if cms_model.need_msys:
msys_model, cms_model = topo.read_cms(cms_fname)
trj = traj_util.read_traj(cms_model, cms_fname)
# Update the cms_model with the coordinates from
# the representative frame
if self.param.use_representative_structure.val:
ligand_restraint.use_representative_frame(msys_model, cms_model,
trj, self._ligand_asl)
# Add restraint marker
ligand_st = _get_ligand_st(cms_model)
forcefield.clear_restraints(ligand_st)
forcefield.add_restraint_atom_marker(ligand_st, self._ligand_asl)
# Get adaptive ligand restraint if enabled
adaptive_ligand_restraint_str = None
if self.param.adaptive_ligand_restraint.enable.val:
adaptive_ligand_restraint_str = ligand_restraint.prepare_adaptive_ligand_restraints(
msys_model, cms_model, trj, self._ligand_asl,
self.param.adaptive_ligand_restraint.force_constants.val,
self.param.adaptive_ligand_restraint.lambda_schedule_name.val)
try:
# Create the input structures
sts = self._create_complex_fep_strucs(msys_model, cms_model, trj, adaptive_ligand_restraint_str) + \
self._create_solvent_fep_strucs(cms_model, adaptive_ligand_restraint_str)
out_fname = f'{jobname}-out.mae'
struc.write_structures(sts, out_fname)
return out_fname
except restraint.CrossLinkGenerationError as ex:
# When no crosslink restraints can be found
self._print("quiet", str(ex))
return None
def _create_complex_fep_strucs(
self,
msys_model: "msys.System", # noqa: F821
cms_model: cms.Cms,
trj: List["traj.TrajFrame"],
additional_restraints: Optional[str] = None
) -> List[structure.Structure]:
"""
Create the FEP structures for the complex leg. This will set up the cross
link restraints between the ligand and receptor, set up the simulation
as a relative calculation to a dummy ligand and optionally add
additional restraints on the ligand.
:param msys_model: Msys model read in with
`traj_util.read_cms_and_traj`.
:param cms_model: Cms model used to generate the complex FEP
structures. Read in with `traj_util.read_cms_and_traj`.
:param trj: Trajectory read in with `traj_util.read_cms_and_traj`. Used
to create the cross link restraints.
:param additional_restraints: Optional, additional restraints to set on
the ligand.
:return: The list of complex structures.
"""
# get the complex structures and set fep properties
complex_with_dummy_st, complex_st = self._prepare_complex_structures(
cms_model)
# Set up cross link restraints
str_crosslink_restr, dg_correction = self._set_up_cross_link_restraints(
msys_model,
cms_model,
trj,
aid_offset=complex_with_dummy_st.atom_total)
complex_st.property[FEP_RESTRAINT_CORRECTION] = dg_correction
self._extend_restraints(complex_st, f'[{str_crosslink_restr}]')
# Set up the ligand restraints if enabled
if self.param.ligand_restraint.enable.val:
lr_param = self.param.ligand_restraint
restraints_str = ligand_restraint.prepare_ligand_restraints(
complex_st, self._ligand_asl, lr_param.force_constants.val,
lr_param.sigmas.val, lr_param.lambda_schedule_name.val)
self._extend_restraints(complex_st, restraints_str)
# Set up the additional restraints
self._extend_restraints(complex_st, additional_restraints)
component_struc = struc.component_structures(cms_model)
sts = list(
struc.struc_iter([
complex_with_dummy_st, complex_st, component_struc.solvent,
component_struc.membrane
]))
for st in sts:
st.property[constants.FEP_HASH_ID] = complex_st.property[
constants.FEP_HASH_ID]
st.property[
constants.
ABSOLUTE_BINDING_LEGS] = constants.ABSOLUTE_BINDING_LEGS.VAL.COMPLEX
# build_geometry stage will read box from the solvent
if st is not component_struc.solvent:
struc.delete_structure_properties(st, constants.SIM_BOX)
return sts
def _create_solvent_fep_strucs(self,
cms_model: cms.Cms,
additional_restraints: Optional[str] = None
) -> List[structure.Structure]:
"""
Create the FEP structures for the solvent leg.
This will set up the simulation as a relative calculation
to a dummy stage and optionally add additional
restraints on the ligand.
:param cms_model: Cms model used to generate the solvent FEP structures.
Read in with `traj_util.read_cms_and_traj`.
:param additional_restraints: Optional, additional restraints to set on the ligand.
:return: The list of solvent structures.
"""
lig_st = _get_ligand_st(cms_model)
# Deletes unwanted atom- and/or CT-level properties.
# - Original coordinates atom property: DESMOND-9562
struc.delete_atom_properties(lig_st,
constants.REFERENCE_COORD_PROPERTIES)
lig_aids = analyze.evaluate_asl(lig_st, self._ligand_asl)
dummy_st = lig_st.copy()
mm.mmffld_removeCustomChargesFromCT(dummy_st)
dummy_st.deleteAtoms(lig_aids)
absolute_struc.make_dummy(dummy_st)
dummy_st.property[constants.FEP_FRAGNAME] = 'none'
lig_st.property[constants.FEP_FRAGNAME] = 'mutant'
lig_st.property[
ABSOLUTE_BINDING_LEGS] = ABSOLUTE_BINDING_LEGS.VAL.SOLVENT
# Set atom mapping for mutant ligand
has_restraint = self.param.ligand_restraint.enable.val or self.param.adaptive_ligand_restraint.enable.val
for i in lig_aids:
atm = lig_st.atom[i]
atm.property[GCMC_LIGAND] = 1
atm.property[constants.FEP_MAPPING] = 0
atm.property[constants.REST_HOTREGION] = 0 if has_restraint else 1
# Set up the ligand restraints if enabled
if self.param.ligand_restraint.enable.val:
lr_param = self.param.ligand_restraint
restraints_str = ligand_restraint.prepare_ligand_restraints(
lig_st, self._ligand_asl, lr_param.force_constants.val,
lr_param.sigmas.val, lr_param.lambda_schedule_name.val)
self._extend_restraints(lig_st, restraints_str)
# Set up the additional restraints
self._extend_restraints(lig_st, additional_restraints)
sts = [dummy_st, lig_st]
for st in sts:
st.property[constants.FEP_HASH_ID] = lig_st.property[
constants.FEP_HASH_ID]
st.property[
constants.
ABSOLUTE_BINDING_LEGS] = constants.ABSOLUTE_BINDING_LEGS.VAL.SOLVENT
return sts
def _prepare_complex_structures(
self, cms_model: cms.Cms
) -> Tuple[structure.Structure, structure.Structure]:
"""
Given a cms model, prepare the input structures for the
complex fep leg.
"""
complex_st = struc.struc_merge(
struc.component_structures(cms_model).solute)
# complex_st is build from an empty structure with
# all existing properties deleted. Need to add properties back.
if cms_model.use_titration:
complex_st.property[constants.USE_TITRATION] = True
# Find the ligand structure and preserve the original title/hash id.
for ct in cms_model.comp_ct:
if ct.property.get(constants.FEP_STRUC_TAG
) == constants.FEP_STRUC_TAG.VAL.LIGAND:
complex_st.property[
constants.FEP_ORIGINAL_TITLE] = ct.property.get(
constants.FEP_ORIGINAL_TITLE)
complex_st.property[constants.FEP_HASH_ID] = ct.property.get(
constants.FEP_HASH_ID)
break
# Deletes unwanted atom- and/or CT-level properties.
# - Original coordinates atom property: DESMOND-9562
struc.delete_atom_properties(complex_st,
constants.REFERENCE_COORD_PROPERTIES)
complex_with_dummy_st = complex_st.copy()
complex_with_dummy_st.property[constants.FEP_FRAGNAME] = 'none'
complex_st.property[constants.FEP_FRAGNAME] = 'mutant'
complex_st.property[
ABSOLUTE_BINDING_LEGS] = ABSOLUTE_BINDING_LEGS.VAL.COMPLEX
# delete the ligand in the reference and replace with
# a dummy atom
mm.mmffld_removeCustomChargesFromCT(complex_with_dummy_st)
lig0_ids = analyze.evaluate_asl(complex_with_dummy_st, self._ligand_asl)
complex_with_dummy_st.deleteAtoms(lig0_ids)
absolute_struc.make_dummy(complex_with_dummy_st)
# Set atom mapping for non-ligand
not_ligand_asl = f'(all) AND NOT (\"{self._ligand_asl}\")'
prot0_ids = analyze.evaluate_asl(complex_with_dummy_st, not_ligand_asl)
prot1_ids = analyze.evaluate_asl(complex_st, not_ligand_asl)
for i, j in zip(prot0_ids, prot1_ids):
complex_with_dummy_st.atom[i].property[constants.FEP_MAPPING] = j
complex_st.atom[j].property[constants.FEP_MAPPING] = i
complex_with_dummy_st.atom[i].property[GCMC_LIGAND] = 0
complex_st.atom[j].property[GCMC_LIGAND] = 0
complex_with_dummy_st.atom[i].property[constants.REST_HOTREGION] = 0
complex_st.atom[j].property[constants.REST_HOTREGION] = 0
# Set atom mapping for mutant ligand
lig1_ids = analyze.evaluate_asl(complex_st, self._ligand_asl)
for i in lig1_ids:
atm = complex_st.atom[i]
atm.property[GCMC_LIGAND] = 1
atm.property[constants.FEP_MAPPING] = 0
atm.property[constants.REST_HOTREGION] = 0
return complex_with_dummy_st, complex_st
def _set_up_cross_link_restraints(
self,
msys_model: "msys.System", # noqa: F821
cms_model,
trj: List["traj.Frame"],
aid_offset: int) -> Tuple[str, float]:
"""
Given a cms model and corresponding trajectory, set up
the cross link restraints on the next simulate stage.
If the use_pl_interactions parameter is true, this will
first attempt to generate restraints using the
Hydrogen bond/Salt Bridge interactions. If that fails
or is not enabled, but use_centroid is enabled, this will
attempt to generate restraints using the centroid method.
If both are not enabled, this will fall back to using the
variance of the terms for all atoms in a certain distance
to the ligand.
:param msys_model: Msys model read in with `traj_util.read_cms_and_traj`.
:param cms_model: Cms model used to set up the cross link restraints.
This must be read in by `read_cms_and_traj` and not `cms.Cms`.
:param trj_fname: Path to the trajectory to analyze.
:param aid_offset: Offset to add to the cross link restraints.
:return:
Tuple of the restraints encoded as a string and
the dg correction term from the restraints.
"""
from schrodinger.application.desmond.packages import restraint
heavy_ligand_asl = f'({self._ligand_asl}) and (not a.e H)'
use_centroid = self.param.cross_link_restraint.use_centroid.val
if self.param.cross_link_restraint.use_pl_interactions.val:
try:
crosslink_restr = restraint.gen_cross_link_restraint_interaction(
msys_model,
cms_model,
trj,
ligand_asl=self.
_ligand_asl, # Not heavy ligand asl, need hydrogens to find interactions
receptor_asl=self._receptor_asl,
min_angle=self.param.cross_link_restraint.min_angle.val)
# Don't use centroid if restraint found
use_centroid = False
except restraint.CrossLinkGenerationError as err:
# Fall back to centroid if set, otherwise propagate the error.
if not use_centroid:
raise err
else:
print(
'Could not find cross link restraints using ligand interactions.\n'
'Falling back to using the centroid method.')
if use_centroid:
crosslink_restr = restraint.gen_cross_link_restraint_centroid(
cms_model,
trj,
ligand_asl=self._ligand_asl,
receptor_asl=self._receptor_asl,
min_angle=self.param.cross_link_restraint.min_angle.val)
# Fall back to original method
if not self.param.cross_link_restraint.use_centroid.val and \
not self.param.cross_link_restraint.use_pl_interactions.val:
crosslink_restr = restraint.gen_cross_link_restraint(
cms_model,
trj,
ligand_asl=heavy_ligand_asl,
receptor_asl=self._receptor_asl,
r_clone=self.param.cross_link_restraint.r_clone.val,
min_angle=self.param.cross_link_restraint.min_angle.val,
use_bonded_atoms=self.param.cross_link_restraint.
use_bonded_atoms.val)
crosslink_restr.A += aid_offset
crosslink_restr.B += aid_offset
crosslink_restr.C += aid_offset
crosslink_restr.a += aid_offset
crosslink_restr.b += aid_offset
crosslink_restr.c += aid_offset
# Print cross-link restraints
self._print("quiet",
'Cross-link restraints: \n %s' % str(crosslink_restr))
# Calculate free energy correction
dg_correction = fepana.calc_free_energy_correction_due_to_restraint(
crosslink_restr,
self.param.cross_link_restraint.force_constants.val, self.param.
cross_link_restraint.temperature_for_free_energy_correction.val)
self._print("quiet", f'Free energy correction: {dg_correction}')
# Update next simulation stage with cross-link restraints
str_crosslink_restr = crosslink_restr.asMsjSetting(
self.param.cross_link_restraint.force_constants.val,
self.param.cross_link_restraint.sigmas.val,
self.param.cross_link_restraint.lambda_schedule_name.val,
1,
schedule_dihed_name=self.param.cross_link_restraint.
lambda_schedule_dihed_name.val)
return str_crosslink_restr, dg_correction
@property
def _ligand_asl(self) -> str:
ligand_asl = self.param.ligand_asl.val
if ligand_asl.startswith('asl:'):
ligand_asl = ligand_asl[4:]
return ligand_asl
@property
def _receptor_asl(self) -> str:
receptor_asl = self.param.cross_link_restraint.receptor_asl.val
if receptor_asl.startswith('asl:'):
receptor_asl = receptor_asl[4:]
return receptor_asl
def _extend_restraints(self, ct: structure.Structure, restraints_str: str):
"""
Extend the restraints as encoded on the given structure.
:param ct: Store encoded restraints onto this structure.
:param restraint_str: Restraints encoded as an msj setting.
"""
if not restraints_str:
return
msj_setting = f'restraint = {restraints_str}'
self._print("quiet", f'Extending restraints: {msj_setting}')
new_restraints = sea.Map(msj_setting).restraint
existing_restraints = forcefield.decode_restraints(ct)
if existing_restraints is not None:
new_restraints.extend(existing_restraints)
self._print("quiet", f'Updated restraints: {new_restraints}')
forcefield.encode_restraints(ct, new_restraints)
# ############################# MAIN JOB STAGES ##############################
[docs]class FepAbsoluteBindingLauncherBase(launcher.FepLauncher):
NAME = "fep_absolute_binding_launcher_base"
PARAM = cmj._create_param_when_needed("""
DATA = {
compress = ""
input_graph_file = ""
}
VALIDATE = {
compress = {type = str}
input_graph_file = {type = str1}
}
""")
def _get_fmp_fname(self, _) -> str:
return os.path.join(cmj.ENGINE.base_dir,
self.param.input_graph_file.val)
def _get_edge(self, pj: cmj.Job, g: 'graph.Graph') -> 'graph.Edge':
fname_in = pj.output.struct_file()
return self.get_edge_from_struct_file(fname_in, g)
[docs] def get_edge_from_struct_file(self, struct_file: Union[str, Path],
g: 'graph.Graph'):
from schrodinger.application.scisol.packages.fep import utils
cts = list(structure.StructureReader(struct_file))
# Only the ligand has the FEP_HASH_ID, not the receptor
short_id = next(
filter(lambda x: x,
[ct.property.get(constants.FEP_HASH_ID) for ct in cts]))
for edge in g.edges_iter():
ligand_id = utils.get_ligand_node(edge).id
if ligand_id.startswith(short_id):
return edge
else:
raise ValueError(
f"Could not find edge corresponding to given job: {short_id}")
[docs]class FepAbsoluteBindingMdLauncher(FepAbsoluteBindingLauncherBase):
"""
Launches the absolute binding FEP workflow.
"""
NAME = "fep_absolute_binding_md_launcher"
[docs] def check_param(self):
ev = super().check_param()
if "md_job" in self.param:
ev.record_error(
err="MSJ contains stages generated with an "
"outdated release. Please regenerate the files using "
"21-1 or later")
return ev
def _crunch_prejobs(self, pre_jobs):
"""
Set up the simulation by extracting the structures for each job.
"""
self._print("debug", "FepAbsoluteBindingMdLauncher::crunch")
from schrodinger.application.scisol.packages.fep import utils
from schrodinger.application.scisol.packages.fep import graph
if self._fail_if_license_not_available(license.FEP_GPGPU):
return
# One pre_job per input structure.
new_pre_job = []
for pj in pre_jobs:
_, jobdir = self._get_jobname_and_dir(pj)
if not os.path.isdir(jobdir):
os.makedirs(jobdir)
g = graph.Graph.deserialize(self.param.input_graph_file.val)
ligand_sts = [
utils.get_ligand_node(e).struc for e in g.edges_iter()
]
env_sts, ligand_sts = absolute_struc.prepare_input_structures(
g.environment_struc + ligand_sts)
# Create one job for each ligand
for i, st in enumerate(ligand_sts):
ligand_hash_id = st.property[constants.FEP_HASH_ID]
subjobname = f"$MAINJOBNAME_{ligand_hash_id}"
# Write input structure
mae_fname = str(Path(jobdir, sea.Atom(f"{subjobname}.mae").val))
struc.write_structures(env_sts + [st], mae_fname)
# Create new job
new_pj = cmj.Job(subjobname, pj, self, None, jobdir)
new_pj.output.add(str(Path(mae_fname).absolute()))
# Set the tag which translates to $JOBTAG in the command
new_pj.tag = ligand_hash_id
new_pre_job.append(new_pj)
return super()._crunch_prejobs(new_pre_job)
def _should_skip_leg_for_edge(self, edge: 'graph.Edge', leg: str) -> bool:
"""
Return True if the leg should be skipped.
MD should only run if no fep legs have run
for expanding a map that already has dg values.
"""
if super()._should_skip_leg_for_edge(edge, leg):
return True
if leg == 'md' and (getattr(edge, 'complex_dg') or
getattr(edge, 'solvent_dg')):
self._print("quiet",
f"INFO: Skip completed {leg} leg for {edge} edge.")
return True
return False
[docs]class FepAbsoluteBindingFepLauncher(FepAbsoluteBindingLauncherBase):
"""
Launches the absolute binding FEP workflow.
"""
NAME = "fep_absolute_binding_fep_launcher"
def _crunch_prejobs(self, pre_jobs):
"""
Set up the simulation by extracting the structures for each job.
"""
if self._fail_if_license_not_available(license.FEP_GPGPU):
return
# One pre_job per input structure.
new_pre_job = []
for pj in pre_jobs:
# Set the tag which translates to $JOBTAG in the command
ct = structure.StructureReader.read(pj.output.struct_file())
hash_id = ct.property[constants.FEP_HASH_ID]
pj.tag = hash_id
new_pre_job.append(pj)
return super()._crunch_prejobs(new_pre_job)
[docs]@dataclass
class AbsoluteBindingResult:
ligand_st: structure.Structure = None
complex_st: structure.Structure = None
complex_dg: Measurement = None
complex_dg_correction: float = None
solvent_dg: Measurement = None
@property
def is_complete(self) -> bool:
return bool(self.solvent_dg is not None and
self.complex_dg is not None and
self.complex_dg_correction is not None and
self.complex_st is not None)
@property
def final_dg(self) -> Optional[Measurement]:
if self.is_complete:
return self.solvent_dg - self.complex_dg - self.complex_dg_correction
else:
return None
def __getitem__(self, item):
"""
Allow accessing values through indexing, to support backwards
compatibility with GraphDB
"""
return getattr(self, item)
[docs]def get_results_by_ligand(
sts_list: List[List[Structure]]
) -> DefaultDict[str, AbsoluteBindingResult]:
""" Given a list of list of structures (each list corresponds to output structure
from each FEP leg), return the result for each ligand.
:return: dict with keys of
["complex_st", "complex_dg", "solvent_dg", "complex_dg_correction", "final_dg"].
final_dg is None if any values are missing.
"""
def _st_is_ligand(st):
# must check that atom_total is not 1 to avoid getting dummy ligand
leg_is_solvent = (st.property.get(constants.ABSOLUTE_BINDING_LEGS) ==
constants.ABSOLUTE_BINDING_LEGS.VAL.SOLVENT)
st_is_solute = (st.property.get(
constants.CT_TYPE) == constants.CT_TYPE.VAL.SOLUTE)
return st.atom_total != 1 and leg_is_solvent and st_is_solute
ABSOLUTE_BINDING_LEGS = constants.ABSOLUTE_BINDING_LEGS
results_by_ligand = defaultdict(AbsoluteBindingResult)
for sts in sts_list:
ligand_title = sts[0].property[constants.FEP_ORIGINAL_TITLE]
result = results_by_ligand[ligand_title]
leg = None
dg = None
dg_correction = None
for st in sts:
leg = leg or st.property.get(constants.ABSOLUTE_BINDING_LEGS)
dg = dg or st.property.get(constants.FEP_DG_PREFIX)
dg_correction = dg_correction or st.property.get(
constants.FEP_RESTRAINT_CORRECTION)
if _st_is_ligand(st):
result.ligand_st = st
if leg == ABSOLUTE_BINDING_LEGS.VAL.COMPLEX:
result.complex_st = sts[
0] # sts[0] is the fsys_ct from the complex leg
result.complex_dg = Measurement.from_string(dg) if dg else None
result.complex_dg_correction = float(
dg_correction) if dg_correction else None
elif leg == ABSOLUTE_BINDING_LEGS.VAL.SOLVENT:
result.solvent_dg = Measurement.from_string(dg) if dg else None
for ligand_title, result in results_by_ligand.items():
if result.is_complete:
complex_st = result.complex_st
complex_st.property[FEP_DDG_PREFIX] = str(result.final_dg)
complex_st.title = complex_st.property[constants.FEP_ORIGINAL_TITLE]
return results_by_ligand
[docs]def get_csv(results_by_ligand: dict) -> str:
"""Return the csv content report energies for each ligand.
:param: results_by_ligand is got from `get_results_by_ligand`
"""
report_csv = get_csv_header()
for ligand_title, result in results_by_ligand.items():
if result.is_complete:
complex_st = result.complex_st
ligand_st = result.ligand_st
smiles = _get_ligand_smiles(ligand_st) if ligand_st else ""
report_csv += get_csv_row(complex_st.title, result.complex_dg,
result.solvent_dg,
result.complex_dg_correction,
result.final_dg, smiles)
return report_csv
def _get_ligand_smiles(ligand_st: structure.Structure) -> str:
"""
Get the smiles for the ligand, first removing any atoms which were not
part of the original ligand such as counterions
"""
aids_to_delete = [
a.index
for a in ligand_st.atom
if not a.property.get(constants.FEP_ABSOLUTE_BINDING_LIGAND)
]
ligand_st = ligand_st.copy()
ligand_st.deleteAtoms(aids_to_delete)
return analyze.generate_smiles(ligand_st)
[docs]def get_csv_row(ligand: str, complex_dg: Measurement, solvent_dg: Measurement,
complex_dg_correction: float, final_dg: Measurement,
smiles: str) -> str:
"""
Given the results for a ligand, return a formatted row.
"""
return ','.join((
ligand,
f'{complex_dg.val:g}',
f'{complex_dg.unc:g}',
f'{solvent_dg.val:g}',
f'{solvent_dg.unc:g}',
f'{complex_dg_correction:g}',
f'{final_dg.val:g}',
f'{final_dg.unc:g}',
smiles,
)) + '\n'
[docs]class FepAbsoluteBindingAnalysis(cmj.StageBase):
NAME = "fep_absolute_binding_analysis"
TAG = NAME.upper()
previous_compound_ids = set(), cmj.Picklable
PARAM = cmj._create_param_when_needed("""
DATA = {
report = "$MAINJOBNAME_report.csv"
output_structure = "$MAINJOBNAME-out.mae"
input_graph_file = ""
}
VALIDATE = {
report = {type = str}
output_structure = {type = str1}
input_graph_file = {type = str1}
}
""")
[docs] def check_param(self):
ev = super().check_param()
if "graph_file" in self.param:
ev.record_error(
err="MSJ contains stages generated with an "
"outdated release. Please regenerate the files using "
"20-4 or later")
return ev
[docs] def crunch(self):
self._print("debug", "In FepAbsoluteBindingAnalysis.crunch")
sts_list = [
struc.read_all_structures(pj.output.struct_file())
for pj in self.get_prejobs()
]
results_by_ligand = get_results_by_ligand(sts_list)
if len(results_by_ligand) == 0:
self._print(
"quiet", "Missing jobs for analysis. "
"Please check for failed jobs and try restarting the workflow.")
failed_job = cmj.Job('job', None, self, None, '.')
failed_job.status.set(cmj.JobStatus.BACKEND_ERROR)
self.add_job(failed_job)
return
analysis_failed = False
jobname = cmj.ENGINE.jobname
checkpoint = f'{jobname}-multisim_checkpoint'
cmd = [
'run', '-FROM', 'scisol', "fep_mapper_cleanup.py", checkpoint,
'-force', '-fmp', self.param.input_graph_file.val, '-out-fmp',
f"{jobname}_out.fmp"
]
self._print("quiet", " ".join(cmd))
exit_status = subprocess.call(cmd)
if exit_status != 0:
analysis_failed = True
self._print("quiet",
"ERROR: Unable to complete fep_mapper_cleanup.py")
else:
self._files4copy.append(f"{jobname}_out.fmp")
self._files4copy.append(f"{jobname}_out.fmpdb")
pj0 = self.get_prejobs()[0]
jobname, jobdir = self._get_jobname_and_dir(pj0)
if not os.path.isdir(jobdir):
os.makedirs(jobdir)
util.chdir(jobdir)
new_job = cmj.Job(pj0.jobname, pj0, self, None, jobdir)
complex_sts = []
hash_ids = set()
for ligand_title, result in results_by_ligand.items():
final_dg = result.final_dg
if final_dg is not None:
complex_st = result.complex_st
hash_ids.add(complex_st.property[constants.FEP_HASH_ID])
complex_sts.append(complex_st)
self._print("quiet", f"Results for {complex_st.title}:")
self._print("quiet", f"Complex dG: {result.complex_dg}")
self._print(
"quiet",
f"Complex correction dG: {result.complex_dg_correction}")
self._print("quiet", f"Solvent dG: {result.solvent_dg}")
self._print("quiet", f"Final dG: {final_dg}")
else:
self._print("quiet", "Missing values for dg calculation:")
self._print("quiet", f"Result: {result}")
analysis_failed = True
if complex_sts:
out_fname = f'{jobname}-out.mae'
struc.write_structures(complex_sts, out_fname)
new_job.output.set_struct_file(out_fname)
# Copy the output back to the root directory
out_fname_copy = Path(cmj.ENGINE.base_dir,
self.param.output_structure.val)
shutil.copy(out_fname, out_fname_copy)
self._files4copy.append(out_fname_copy)
# Write out report
report_csv = get_csv(results_by_ligand)
if report_csv:
out_fname = f'{jobname}-out.csv'
Path(out_fname).write_text(report_csv)
new_job.output.add(out_fname, tag="CSV")
report_copy = Path(cmj.ENGINE.base_dir, self.param.report.val)
shutil.copy(out_fname, report_copy)
self._files4copy.append(report_copy)
# Check out the new compounds
num_new_compounds = len(
hash_ids - FepAbsoluteBindingAnalysis.previous_compound_ids)
FepAbsoluteBindingAnalysis.previous_compound_ids.update(hash_ids)
self._print(
"quiet",
f'Number of new compounds completed in the current job: {num_new_compounds}'
)
if num_new_compounds:
desmond_license.checkout_model2_compounds(
num_new_compounds, product=constants.PRODUCT.FEP)
if analysis_failed:
new_job.status.set(cmj.JobStatus.BACKEND_ERROR)
else:
new_job.status.set(cmj.JobStatus.SUCCESS)
self.add_job(new_job)
self._print("debug", "Done FepAbsoluteBindingAnalysis.crunch")
# these are deprecated classes for pickled files
[docs]class FepAbsoluteBindingMdPrimer:
pass
[docs]class FepAbsoluteBindingPrimer:
pass
[docs]class FepAbsoluteBindingLauncher:
pass