Source code for schrodinger.application.desmond.stage.app.absolute_binding.stage

import base64
import os
import shutil
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import DefaultDict
from typing import List
from typing import Optional
from typing import TYPE_CHECKING
from typing import Tuple
from typing import Union

from schrodinger import structure
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import fepana
from schrodinger.application.desmond import license as desmond_license
from schrodinger.application.desmond import struc
from schrodinger.application.desmond import util
from schrodinger.application.desmond.constants import ABSOLUTE_BINDING_LEGS
from schrodinger.application.desmond.constants import FEP_DDG_PREFIX
from schrodinger.application.desmond.constants import FEP_RESTRAINT_CORRECTION
from schrodinger.application.desmond.constants import GCMC_LIGAND
from schrodinger.application.desmond.measurement import Measurement
from schrodinger.application.desmond.stage import launcher
from schrodinger.application.desmond.stage.prepare import forcefield
from schrodinger.infra import mm
from schrodinger.structure import Structure
from schrodinger.structutils import analyze
from schrodinger.utils import license
from schrodinger.utils import sea
from schrodinger.utils import subprocess

from . import restraint as ligand_restraint
from . import struc as absolute_struc

if TYPE_CHECKING:
    from schrodinger.application.desmond.packages import msys
    from schrodinger.application.desmond.packages import traj
    from schrodinger.application.scisol.packages.fep import graph

__all__ = [
    'FepAbsoluteBindingStructurePrimer',
    'FepAbsoluteBindingFepPrimer',
    'FepAbsoluteBindingMdLauncher',
    'FepAbsoluteBindingFepLauncher',
    'FepAbsoluteBindingAnalysis',
    # Deprecated
    'FepAbsoluteBindingMdPrimer',
    'FepAbsoluteBindingPrimer',
    'FepAbsoluteBindingLauncher',
]


def _get_ligand_st(cms_model: cms.Cms) -> Optional[structure.Structure]:
    """
    Find the ligand structure from the given `cms_model`.
    Return None if the structure could not be found
    """
    # Find the ligand structure
    for ct in cms_model.comp_ct:
        if ct.property.get(
                constants.FEP_STRUC_TAG) == constants.FEP_STRUC_TAG.VAL.LIGAND:
            return ct


# ############################## SUBJOB STAGES #################################


[docs]class FepAbsoluteBindingStructurePrimer(cmj.StructureStageBase): """ For the ligand with the matching ligand_hash_id, prepare the Absolute Binding structures for MD if the `leg` is not specified, otherwise prepare the structures for FEP. All other ligands will be ignored. """ NAME = "fep_absolute_binding_structure_primer" PARAM = cmj._create_param_when_needed([ """ DATA = { ligand_hash_id = "" leg = "" } VALIDATE = { ligand_hash_id = {type = str1} leg = {type = str} } """ ])
[docs] def run(self, jobname: str, mae_fname: str) -> Optional[str]: cts = list(structure.StructureReader(mae_fname)) if self.param.leg.val and self.param.ligand_hash_id.val: self._print("quiet", "ERROR: Specify leg or ligand_hash_id not both.") return None if self.param.leg.val: # Prepare for FEP processed_cts = absolute_struc.filter_fep_structures( cts, self.param.leg.val) elif self.param.ligand_hash_id.val: # Prepare for MD # DESMOND-10214: Temporary patch for DESMOND-10213 and DESMOND-10159 # FIXME: Remove when the underlying problems have been fixed ligand_hash_id = base64.b64decode( self.param.ligand_hash_id.val).decode() processed_cts = absolute_struc.prepare_md_structures( cts, ligand_hash_id) else: # Do nothing processed_cts = cts if not processed_cts: self._print("quiet", "ERROR: No structures found, stopping workflow.") return None out_path = Path(f"{jobname}-out.mae") struc.write_structures(processed_cts, out_path) return str(out_path)
[docs]class FepAbsoluteBindingFepPrimer(cmj.StructureStageBase): NAME = "fep_absolute_binding_fep_primer" PARAM = cmj._create_param_when_needed([ """ DATA = { ligand_asl = "%s" ligand_restraint = { enable = false force_constants = [40.0 0.0] sigmas = [40.0 40.0] lambda_schedule_name = "pose_dihedral_restraint" } adaptive_ligand_restraint = { enable = false force_constants = [40.0 0.0] lambda_schedule_name = "pose_dihedral_restraint" } cross_link_restraint = { force_constants = [1.0 40.0 40.0] sigmas = [0.0 0.0 0.0] lambda_schedule_name = "alchemical_boresch" lambda_schedule_dihed_name = "alchemical_boresch_dihed" receptor_asl = "protein and (not atom.att 1) and backbone" temperature_for_free_energy_correction = 300.0 min_angle=60 r_clone=5 use_bonded_atoms=true use_pl_interactions=false use_centroid=false } use_representative_structure=false } VALIDATE = { ligand_asl = {type = str1} ligand_restraint = { enable = {type = bool} force_constants = {type = list size = 2 elem = {type = float+}} sigmas = {type = list size = 2 elem = {type = float+}} lambda_schedule_name = {type = str1} } adaptive_ligand_restraint = { enable = {type = bool} force_constants = {type = list size = 2 elem = {type = float+}} lambda_schedule_name = {type = str1} } cross_link_restraint = { force_constants = {type = list size = 3 elem = {type = float+}} sigmas = {type = list size = 3 elem = {type = float+}} lambda_schedule_name = {type = str1} receptor_asl = {type = str1} temperature_for_free_energy_correction = {type = float+} min_angle = {type = float range = [0 90]} r_clone = {type = float+} use_bonded_atoms = {type = bool} use_pl_interactions = {type = bool} use_centroid = {type = bool} } use_representative_structure = {type = bool} } """ % (f'a.{constants.FEP_ABSOLUTE_BINDING_LIGAND} 1',) ])
[docs] def run(self, jobname: str, cms_fname: str) -> Optional[str]: from schrodinger.application.desmond.packages import restraint from schrodinger.application.desmond.packages import topo from schrodinger.application.desmond.packages import traj_util # read input files msys_model = None cms_model = cms.Cms(cms_fname) if cms_model.need_msys: msys_model, cms_model = topo.read_cms(cms_fname) trj = traj_util.read_traj(cms_model, cms_fname) # Update the cms_model with the coordinates from # the representative frame if self.param.use_representative_structure.val: ligand_restraint.use_representative_frame(msys_model, cms_model, trj, self._ligand_asl) # Add restraint marker ligand_st = _get_ligand_st(cms_model) forcefield.clear_restraints(ligand_st) forcefield.add_restraint_atom_marker(ligand_st, self._ligand_asl) # Get adaptive ligand restraint if enabled adaptive_ligand_restraint_str = None if self.param.adaptive_ligand_restraint.enable.val: adaptive_ligand_restraint_str = ligand_restraint.prepare_adaptive_ligand_restraints( msys_model, cms_model, trj, self._ligand_asl, self.param.adaptive_ligand_restraint.force_constants.val, self.param.adaptive_ligand_restraint.lambda_schedule_name.val) try: # Create the input structures sts = self._create_complex_fep_strucs(msys_model, cms_model, trj, adaptive_ligand_restraint_str) + \ self._create_solvent_fep_strucs(cms_model, adaptive_ligand_restraint_str) out_fname = f'{jobname}-out.mae' struc.write_structures(sts, out_fname) return out_fname except restraint.CrossLinkGenerationError as ex: # When no crosslink restraints can be found self._print("quiet", str(ex)) return None
def _create_complex_fep_strucs( self, msys_model: "msys.System", # noqa: F821 cms_model: cms.Cms, trj: List["traj.TrajFrame"], additional_restraints: Optional[str] = None ) -> List[structure.Structure]: """ Create the FEP structures for the complex leg. This will set up the cross link restraints between the ligand and receptor, set up the simulation as a relative calculation to a dummy ligand and optionally add additional restraints on the ligand. :param msys_model: Msys model read in with `traj_util.read_cms_and_traj`. :param cms_model: Cms model used to generate the complex FEP structures. Read in with `traj_util.read_cms_and_traj`. :param trj: Trajectory read in with `traj_util.read_cms_and_traj`. Used to create the cross link restraints. :param additional_restraints: Optional, additional restraints to set on the ligand. :return: The list of complex structures. """ # get the complex structures and set fep properties complex_with_dummy_st, complex_st = self._prepare_complex_structures( cms_model) # Set up cross link restraints str_crosslink_restr, dg_correction = self._set_up_cross_link_restraints( msys_model, cms_model, trj, aid_offset=complex_with_dummy_st.atom_total) complex_st.property[FEP_RESTRAINT_CORRECTION] = dg_correction self._extend_restraints(complex_st, f'[{str_crosslink_restr}]') # Set up the ligand restraints if enabled if self.param.ligand_restraint.enable.val: lr_param = self.param.ligand_restraint restraints_str = ligand_restraint.prepare_ligand_restraints( complex_st, self._ligand_asl, lr_param.force_constants.val, lr_param.sigmas.val, lr_param.lambda_schedule_name.val) self._extend_restraints(complex_st, restraints_str) # Set up the additional restraints self._extend_restraints(complex_st, additional_restraints) component_struc = struc.component_structures(cms_model) sts = list( struc.struc_iter([ complex_with_dummy_st, complex_st, component_struc.solvent, component_struc.membrane ])) for st in sts: st.property[constants.FEP_HASH_ID] = complex_st.property[ constants.FEP_HASH_ID] st.property[ constants. ABSOLUTE_BINDING_LEGS] = constants.ABSOLUTE_BINDING_LEGS.VAL.COMPLEX # build_geometry stage will read box from the solvent if st is not component_struc.solvent: struc.delete_structure_properties(st, constants.SIM_BOX) return sts def _create_solvent_fep_strucs(self, cms_model: cms.Cms, additional_restraints: Optional[str] = None ) -> List[structure.Structure]: """ Create the FEP structures for the solvent leg. This will set up the simulation as a relative calculation to a dummy stage and optionally add additional restraints on the ligand. :param cms_model: Cms model used to generate the solvent FEP structures. Read in with `traj_util.read_cms_and_traj`. :param additional_restraints: Optional, additional restraints to set on the ligand. :return: The list of solvent structures. """ lig_st = _get_ligand_st(cms_model) # Deletes unwanted atom- and/or CT-level properties. # - Original coordinates atom property: DESMOND-9562 struc.delete_atom_properties(lig_st, constants.REFERENCE_COORD_PROPERTIES) lig_aids = analyze.evaluate_asl(lig_st, self._ligand_asl) dummy_st = lig_st.copy() mm.mmffld_removeCustomChargesFromCT(dummy_st) dummy_st.deleteAtoms(lig_aids) absolute_struc.make_dummy(dummy_st) dummy_st.property[constants.FEP_FRAGNAME] = 'none' lig_st.property[constants.FEP_FRAGNAME] = 'mutant' lig_st.property[ ABSOLUTE_BINDING_LEGS] = ABSOLUTE_BINDING_LEGS.VAL.SOLVENT # Set atom mapping for mutant ligand has_restraint = self.param.ligand_restraint.enable.val or self.param.adaptive_ligand_restraint.enable.val for i in lig_aids: atm = lig_st.atom[i] atm.property[GCMC_LIGAND] = 1 atm.property[constants.FEP_MAPPING] = 0 atm.property[constants.REST_HOTREGION] = 0 if has_restraint else 1 # Set up the ligand restraints if enabled if self.param.ligand_restraint.enable.val: lr_param = self.param.ligand_restraint restraints_str = ligand_restraint.prepare_ligand_restraints( lig_st, self._ligand_asl, lr_param.force_constants.val, lr_param.sigmas.val, lr_param.lambda_schedule_name.val) self._extend_restraints(lig_st, restraints_str) # Set up the additional restraints self._extend_restraints(lig_st, additional_restraints) sts = [dummy_st, lig_st] for st in sts: st.property[constants.FEP_HASH_ID] = lig_st.property[ constants.FEP_HASH_ID] st.property[ constants. ABSOLUTE_BINDING_LEGS] = constants.ABSOLUTE_BINDING_LEGS.VAL.SOLVENT return sts def _prepare_complex_structures( self, cms_model: cms.Cms ) -> Tuple[structure.Structure, structure.Structure]: """ Given a cms model, prepare the input structures for the complex fep leg. """ complex_st = struc.struc_merge( struc.component_structures(cms_model).solute) # complex_st is build from an empty structure with # all existing properties deleted. Need to add properties back. if cms_model.use_titration: complex_st.property[constants.USE_TITRATION] = True # Find the ligand structure and preserve the original title/hash id. for ct in cms_model.comp_ct: if ct.property.get(constants.FEP_STRUC_TAG ) == constants.FEP_STRUC_TAG.VAL.LIGAND: complex_st.property[ constants.FEP_ORIGINAL_TITLE] = ct.property.get( constants.FEP_ORIGINAL_TITLE) complex_st.property[constants.FEP_HASH_ID] = ct.property.get( constants.FEP_HASH_ID) break # Deletes unwanted atom- and/or CT-level properties. # - Original coordinates atom property: DESMOND-9562 struc.delete_atom_properties(complex_st, constants.REFERENCE_COORD_PROPERTIES) complex_with_dummy_st = complex_st.copy() complex_with_dummy_st.property[constants.FEP_FRAGNAME] = 'none' complex_st.property[constants.FEP_FRAGNAME] = 'mutant' complex_st.property[ ABSOLUTE_BINDING_LEGS] = ABSOLUTE_BINDING_LEGS.VAL.COMPLEX # delete the ligand in the reference and replace with # a dummy atom mm.mmffld_removeCustomChargesFromCT(complex_with_dummy_st) lig0_ids = analyze.evaluate_asl(complex_with_dummy_st, self._ligand_asl) complex_with_dummy_st.deleteAtoms(lig0_ids) absolute_struc.make_dummy(complex_with_dummy_st) # Set atom mapping for non-ligand not_ligand_asl = f'(all) AND NOT (\"{self._ligand_asl}\")' prot0_ids = analyze.evaluate_asl(complex_with_dummy_st, not_ligand_asl) prot1_ids = analyze.evaluate_asl(complex_st, not_ligand_asl) for i, j in zip(prot0_ids, prot1_ids): complex_with_dummy_st.atom[i].property[constants.FEP_MAPPING] = j complex_st.atom[j].property[constants.FEP_MAPPING] = i complex_with_dummy_st.atom[i].property[GCMC_LIGAND] = 0 complex_st.atom[j].property[GCMC_LIGAND] = 0 complex_with_dummy_st.atom[i].property[constants.REST_HOTREGION] = 0 complex_st.atom[j].property[constants.REST_HOTREGION] = 0 # Set atom mapping for mutant ligand lig1_ids = analyze.evaluate_asl(complex_st, self._ligand_asl) for i in lig1_ids: atm = complex_st.atom[i] atm.property[GCMC_LIGAND] = 1 atm.property[constants.FEP_MAPPING] = 0 atm.property[constants.REST_HOTREGION] = 0 return complex_with_dummy_st, complex_st def _set_up_cross_link_restraints( self, msys_model: "msys.System", # noqa: F821 cms_model, trj: List["traj.Frame"], aid_offset: int) -> Tuple[str, float]: """ Given a cms model and corresponding trajectory, set up the cross link restraints on the next simulate stage. If the use_pl_interactions parameter is true, this will first attempt to generate restraints using the Hydrogen bond/Salt Bridge interactions. If that fails or is not enabled, but use_centroid is enabled, this will attempt to generate restraints using the centroid method. If both are not enabled, this will fall back to using the variance of the terms for all atoms in a certain distance to the ligand. :param msys_model: Msys model read in with `traj_util.read_cms_and_traj`. :param cms_model: Cms model used to set up the cross link restraints. This must be read in by `read_cms_and_traj` and not `cms.Cms`. :param trj_fname: Path to the trajectory to analyze. :param aid_offset: Offset to add to the cross link restraints. :return: Tuple of the restraints encoded as a string and the dg correction term from the restraints. """ from schrodinger.application.desmond.packages import restraint heavy_ligand_asl = f'({self._ligand_asl}) and (not a.e H)' use_centroid = self.param.cross_link_restraint.use_centroid.val if self.param.cross_link_restraint.use_pl_interactions.val: try: crosslink_restr = restraint.gen_cross_link_restraint_interaction( msys_model, cms_model, trj, ligand_asl=self. _ligand_asl, # Not heavy ligand asl, need hydrogens to find interactions receptor_asl=self._receptor_asl, min_angle=self.param.cross_link_restraint.min_angle.val) # Don't use centroid if restraint found use_centroid = False except restraint.CrossLinkGenerationError as err: # Fall back to centroid if set, otherwise propagate the error. if not use_centroid: raise err else: print( 'Could not find cross link restraints using ligand interactions.\n' 'Falling back to using the centroid method.') if use_centroid: crosslink_restr = restraint.gen_cross_link_restraint_centroid( cms_model, trj, ligand_asl=self._ligand_asl, receptor_asl=self._receptor_asl, min_angle=self.param.cross_link_restraint.min_angle.val) # Fall back to original method if not self.param.cross_link_restraint.use_centroid.val and \ not self.param.cross_link_restraint.use_pl_interactions.val: crosslink_restr = restraint.gen_cross_link_restraint( cms_model, trj, ligand_asl=heavy_ligand_asl, receptor_asl=self._receptor_asl, r_clone=self.param.cross_link_restraint.r_clone.val, min_angle=self.param.cross_link_restraint.min_angle.val, use_bonded_atoms=self.param.cross_link_restraint. use_bonded_atoms.val) crosslink_restr.A += aid_offset crosslink_restr.B += aid_offset crosslink_restr.C += aid_offset crosslink_restr.a += aid_offset crosslink_restr.b += aid_offset crosslink_restr.c += aid_offset # Print cross-link restraints self._print("quiet", 'Cross-link restraints: \n %s' % str(crosslink_restr)) # Calculate free energy correction dg_correction = fepana.calc_free_energy_correction_due_to_restraint( crosslink_restr, self.param.cross_link_restraint.force_constants.val, self.param. cross_link_restraint.temperature_for_free_energy_correction.val) self._print("quiet", f'Free energy correction: {dg_correction}') # Update next simulation stage with cross-link restraints str_crosslink_restr = crosslink_restr.asMsjSetting( self.param.cross_link_restraint.force_constants.val, self.param.cross_link_restraint.sigmas.val, self.param.cross_link_restraint.lambda_schedule_name.val, 1, schedule_dihed_name=self.param.cross_link_restraint. lambda_schedule_dihed_name.val) return str_crosslink_restr, dg_correction @property def _ligand_asl(self) -> str: ligand_asl = self.param.ligand_asl.val if ligand_asl.startswith('asl:'): ligand_asl = ligand_asl[4:] return ligand_asl @property def _receptor_asl(self) -> str: receptor_asl = self.param.cross_link_restraint.receptor_asl.val if receptor_asl.startswith('asl:'): receptor_asl = receptor_asl[4:] return receptor_asl def _extend_restraints(self, ct: structure.Structure, restraints_str: str): """ Extend the restraints as encoded on the given structure. :param ct: Store encoded restraints onto this structure. :param restraint_str: Restraints encoded as an msj setting. """ if not restraints_str: return msj_setting = f'restraint = {restraints_str}' self._print("quiet", f'Extending restraints: {msj_setting}') new_restraints = sea.Map(msj_setting).restraint existing_restraints = forcefield.decode_restraints(ct) if existing_restraints is not None: new_restraints.extend(existing_restraints) self._print("quiet", f'Updated restraints: {new_restraints}') forcefield.encode_restraints(ct, new_restraints)
# ############################# MAIN JOB STAGES ##############################
[docs]class FepAbsoluteBindingLauncherBase(launcher.FepLauncher): NAME = "fep_absolute_binding_launcher_base" PARAM = cmj._create_param_when_needed(""" DATA = { compress = "" input_graph_file = "" } VALIDATE = { compress = {type = str} input_graph_file = {type = str1} } """)
[docs] def collect_inputfile(self, *args): fnames = super().collect_inputfile(*args) return fnames + [self.param.input_graph_file.val]
def _get_fmp_fname(self, _) -> str: return os.path.join(cmj.ENGINE.base_dir, self.param.input_graph_file.val) def _get_edge(self, pj: cmj.Job, g: 'graph.Graph') -> 'graph.Edge': fname_in = pj.output.struct_file() return self.get_edge_from_struct_file(fname_in, g)
[docs] def get_edge_from_struct_file(self, struct_file: Union[str, Path], g: 'graph.Graph'): from schrodinger.application.scisol.packages.fep import utils cts = list(structure.StructureReader(struct_file)) # Only the ligand has the FEP_HASH_ID, not the receptor short_id = next( filter(lambda x: x, [ct.property.get(constants.FEP_HASH_ID) for ct in cts])) for edge in g.edges_iter(): ligand_id = utils.get_ligand_node(edge).id if ligand_id.startswith(short_id): return edge else: raise ValueError( f"Could not find edge corresponding to given job: {short_id}")
[docs]class FepAbsoluteBindingMdLauncher(FepAbsoluteBindingLauncherBase): """ Launches the absolute binding FEP workflow. """ NAME = "fep_absolute_binding_md_launcher"
[docs] def check_param(self): ev = super().check_param() if "md_job" in self.param: ev.record_error( err="MSJ contains stages generated with an " "outdated release. Please regenerate the files using " "21-1 or later") return ev
def _crunch_prejobs(self, pre_jobs): """ Set up the simulation by extracting the structures for each job. """ self._print("debug", "FepAbsoluteBindingMdLauncher::crunch") from schrodinger.application.scisol.packages.fep import utils from schrodinger.application.scisol.packages.fep import graph if self._fail_if_license_not_available(license.FEP_GPGPU): return # One pre_job per input structure. new_pre_job = [] for pj in pre_jobs: _, jobdir = self._get_jobname_and_dir(pj) if not os.path.isdir(jobdir): os.makedirs(jobdir) g = graph.Graph.deserialize(self.param.input_graph_file.val) ligand_sts = [ utils.get_ligand_node(e).struc for e in g.edges_iter() ] env_sts, ligand_sts = absolute_struc.prepare_input_structures( g.environment_struc + ligand_sts) # Create one job for each ligand for i, st in enumerate(ligand_sts): ligand_hash_id = st.property[constants.FEP_HASH_ID] subjobname = f"$MAINJOBNAME_{ligand_hash_id}" # Write input structure mae_fname = str(Path(jobdir, sea.Atom(f"{subjobname}.mae").val)) struc.write_structures(env_sts + [st], mae_fname) # Create new job new_pj = cmj.Job(subjobname, pj, self, None, jobdir) new_pj.output.add(str(Path(mae_fname).absolute())) # Set the tag which translates to $JOBTAG in the command new_pj.tag = ligand_hash_id new_pre_job.append(new_pj) return super()._crunch_prejobs(new_pre_job) def _should_skip_leg_for_edge(self, edge: 'graph.Edge', leg: str) -> bool: """ Return True if the leg should be skipped. MD should only run if no fep legs have run for expanding a map that already has dg values. """ if super()._should_skip_leg_for_edge(edge, leg): return True if leg == 'md' and (getattr(edge, 'complex_dg') or getattr(edge, 'solvent_dg')): self._print("quiet", f"INFO: Skip completed {leg} leg for {edge} edge.") return True return False
[docs]class FepAbsoluteBindingFepLauncher(FepAbsoluteBindingLauncherBase): """ Launches the absolute binding FEP workflow. """ NAME = "fep_absolute_binding_fep_launcher" def _crunch_prejobs(self, pre_jobs): """ Set up the simulation by extracting the structures for each job. """ if self._fail_if_license_not_available(license.FEP_GPGPU): return # One pre_job per input structure. new_pre_job = [] for pj in pre_jobs: # Set the tag which translates to $JOBTAG in the command ct = structure.StructureReader.read(pj.output.struct_file()) hash_id = ct.property[constants.FEP_HASH_ID] pj.tag = hash_id new_pre_job.append(pj) return super()._crunch_prejobs(new_pre_job)
[docs]def get_csv_header() -> str: return ",".join(( "Ligand", "Complex free energy", "Complex free energy uncertainty", "Solvent free energy", "Solvent free energy uncertainty", "Complex free energy restraint correction", "Absolute Binding free energy", "Absolute Binding free energy uncertainty", "Canonical Smiles", )) + '\n'
[docs]@dataclass class AbsoluteBindingResult: ligand_st: structure.Structure = None complex_st: structure.Structure = None complex_dg: Measurement = None complex_dg_correction: float = None solvent_dg: Measurement = None @property def is_complete(self) -> bool: return bool(self.solvent_dg is not None and self.complex_dg is not None and self.complex_dg_correction is not None and self.complex_st is not None) @property def final_dg(self) -> Optional[Measurement]: if self.is_complete: return self.solvent_dg - self.complex_dg - self.complex_dg_correction else: return None def __getitem__(self, item): """ Allow accessing values through indexing, to support backwards compatibility with GraphDB """ return getattr(self, item)
[docs]def get_results_by_ligand( sts_list: List[List[Structure]] ) -> DefaultDict[str, AbsoluteBindingResult]: """ Given a list of list of structures (each list corresponds to output structure from each FEP leg), return the result for each ligand. :return: dict with keys of ["complex_st", "complex_dg", "solvent_dg", "complex_dg_correction", "final_dg"]. final_dg is None if any values are missing. """ def _st_is_ligand(st): # must check that atom_total is not 1 to avoid getting dummy ligand leg_is_solvent = (st.property.get(constants.ABSOLUTE_BINDING_LEGS) == constants.ABSOLUTE_BINDING_LEGS.VAL.SOLVENT) st_is_solute = (st.property.get( constants.CT_TYPE) == constants.CT_TYPE.VAL.SOLUTE) return st.atom_total != 1 and leg_is_solvent and st_is_solute ABSOLUTE_BINDING_LEGS = constants.ABSOLUTE_BINDING_LEGS results_by_ligand = defaultdict(AbsoluteBindingResult) for sts in sts_list: ligand_title = sts[0].property[constants.FEP_ORIGINAL_TITLE] result = results_by_ligand[ligand_title] leg = None dg = None dg_correction = None for st in sts: leg = leg or st.property.get(constants.ABSOLUTE_BINDING_LEGS) dg = dg or st.property.get(constants.FEP_DG_PREFIX) dg_correction = dg_correction or st.property.get( constants.FEP_RESTRAINT_CORRECTION) if _st_is_ligand(st): result.ligand_st = st if leg == ABSOLUTE_BINDING_LEGS.VAL.COMPLEX: result.complex_st = sts[ 0] # sts[0] is the fsys_ct from the complex leg result.complex_dg = Measurement.from_string(dg) if dg else None result.complex_dg_correction = float( dg_correction) if dg_correction else None elif leg == ABSOLUTE_BINDING_LEGS.VAL.SOLVENT: result.solvent_dg = Measurement.from_string(dg) if dg else None for ligand_title, result in results_by_ligand.items(): if result.is_complete: complex_st = result.complex_st complex_st.property[FEP_DDG_PREFIX] = str(result.final_dg) complex_st.title = complex_st.property[constants.FEP_ORIGINAL_TITLE] return results_by_ligand
[docs]def get_csv(results_by_ligand: dict) -> str: """Return the csv content report energies for each ligand. :param: results_by_ligand is got from `get_results_by_ligand` """ report_csv = get_csv_header() for ligand_title, result in results_by_ligand.items(): if result.is_complete: complex_st = result.complex_st ligand_st = result.ligand_st smiles = _get_ligand_smiles(ligand_st) if ligand_st else "" report_csv += get_csv_row(complex_st.title, result.complex_dg, result.solvent_dg, result.complex_dg_correction, result.final_dg, smiles) return report_csv
def _get_ligand_smiles(ligand_st: structure.Structure) -> str: """ Get the smiles for the ligand, first removing any atoms which were not part of the original ligand such as counterions """ aids_to_delete = [ a.index for a in ligand_st.atom if not a.property.get(constants.FEP_ABSOLUTE_BINDING_LIGAND) ] ligand_st = ligand_st.copy() ligand_st.deleteAtoms(aids_to_delete) return analyze.generate_smiles(ligand_st)
[docs]def get_csv_row(ligand: str, complex_dg: Measurement, solvent_dg: Measurement, complex_dg_correction: float, final_dg: Measurement, smiles: str) -> str: """ Given the results for a ligand, return a formatted row. """ return ','.join(( ligand, f'{complex_dg.val:g}', f'{complex_dg.unc:g}', f'{solvent_dg.val:g}', f'{solvent_dg.unc:g}', f'{complex_dg_correction:g}', f'{final_dg.val:g}', f'{final_dg.unc:g}', smiles, )) + '\n'
[docs]class FepAbsoluteBindingAnalysis(cmj.StageBase): NAME = "fep_absolute_binding_analysis" TAG = NAME.upper() previous_compound_ids = set(), cmj.Picklable PARAM = cmj._create_param_when_needed(""" DATA = { report = "$MAINJOBNAME_report.csv" output_structure = "$MAINJOBNAME-out.mae" input_graph_file = "" } VALIDATE = { report = {type = str} output_structure = {type = str1} input_graph_file = {type = str1} } """)
[docs] def check_param(self): ev = super().check_param() if "graph_file" in self.param: ev.record_error( err="MSJ contains stages generated with an " "outdated release. Please regenerate the files using " "20-4 or later") return ev
[docs] def collect_inputfile(self): return [self.param.input_graph_file.val]
[docs] def crunch(self): self._print("debug", "In FepAbsoluteBindingAnalysis.crunch") sts_list = [ struc.read_all_structures(pj.output.struct_file()) for pj in self.get_prejobs() ] results_by_ligand = get_results_by_ligand(sts_list) if len(results_by_ligand) == 0: self._print( "quiet", "Missing jobs for analysis. " "Please check for failed jobs and try restarting the workflow.") failed_job = cmj.Job('job', None, self, None, '.') failed_job.status.set(cmj.JobStatus.BACKEND_ERROR) self.add_job(failed_job) return analysis_failed = False jobname = cmj.ENGINE.jobname checkpoint = f'{jobname}-multisim_checkpoint' cmd = [ 'run', '-FROM', 'scisol', "fep_mapper_cleanup.py", checkpoint, '-force', '-fmp', self.param.input_graph_file.val, '-out-fmp', f"{jobname}_out.fmp" ] self._print("quiet", " ".join(cmd)) exit_status = subprocess.call(cmd) if exit_status != 0: analysis_failed = True self._print("quiet", "ERROR: Unable to complete fep_mapper_cleanup.py") else: self._files4copy.append(f"{jobname}_out.fmp") self._files4copy.append(f"{jobname}_out.fmpdb") pj0 = self.get_prejobs()[0] jobname, jobdir = self._get_jobname_and_dir(pj0) if not os.path.isdir(jobdir): os.makedirs(jobdir) util.chdir(jobdir) new_job = cmj.Job(pj0.jobname, pj0, self, None, jobdir) complex_sts = [] hash_ids = set() for ligand_title, result in results_by_ligand.items(): final_dg = result.final_dg if final_dg is not None: complex_st = result.complex_st hash_ids.add(complex_st.property[constants.FEP_HASH_ID]) complex_sts.append(complex_st) self._print("quiet", f"Results for {complex_st.title}:") self._print("quiet", f"Complex dG: {result.complex_dg}") self._print( "quiet", f"Complex correction dG: {result.complex_dg_correction}") self._print("quiet", f"Solvent dG: {result.solvent_dg}") self._print("quiet", f"Final dG: {final_dg}") else: self._print("quiet", "Missing values for dg calculation:") self._print("quiet", f"Result: {result}") analysis_failed = True if complex_sts: out_fname = f'{jobname}-out.mae' struc.write_structures(complex_sts, out_fname) new_job.output.set_struct_file(out_fname) # Copy the output back to the root directory out_fname_copy = Path(cmj.ENGINE.base_dir, self.param.output_structure.val) shutil.copy(out_fname, out_fname_copy) self._files4copy.append(out_fname_copy) # Write out report report_csv = get_csv(results_by_ligand) if report_csv: out_fname = f'{jobname}-out.csv' Path(out_fname).write_text(report_csv) new_job.output.add(out_fname, tag="CSV") report_copy = Path(cmj.ENGINE.base_dir, self.param.report.val) shutil.copy(out_fname, report_copy) self._files4copy.append(report_copy) # Check out the new compounds num_new_compounds = len( hash_ids - FepAbsoluteBindingAnalysis.previous_compound_ids) FepAbsoluteBindingAnalysis.previous_compound_ids.update(hash_ids) self._print( "quiet", f'Number of new compounds completed in the current job: {num_new_compounds}' ) if num_new_compounds: desmond_license.checkout_model2_compounds( num_new_compounds, product=constants.PRODUCT.FEP) if analysis_failed: new_job.status.set(cmj.JobStatus.BACKEND_ERROR) else: new_job.status.set(cmj.JobStatus.SUCCESS) self.add_job(new_job) self._print("debug", "Done FepAbsoluteBindingAnalysis.crunch")
# these are deprecated classes for pickled files
[docs]class FepAbsoluteBindingMdPrimer: pass
[docs]class FepAbsoluteBindingPrimer: pass
[docs]class FepAbsoluteBindingLauncher: pass