Source code for schrodinger.application.desmond.replica_sid_generator

import os
from typing import Set
from typing import Tuple
from typing import Union

import numpy

import schrodinger
import schrodinger.application.desmond.automatic_analysis_generator as aag
import schrodinger.application.desmond.cms as cms
import schrodinger.structure as structure
import schrodinger.structutils.analyze as analyze
import schrodinger.structutils.build as build
import schrodinger.utils.sea as sea
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import util
from schrodinger.application.desmond.replica_dE import replica_container
from schrodinger.application.desmond.replica_dE import replicas_monitor
from schrodinger.application.desmond.struc import get_reference_ct
from schrodinger.application.desmond.util import parse_res
from schrodinger.structutils import createfragments
from schrodinger.structutils import smiles as smiles_mod
from schrodinger.utils import subprocess

from . import config
from .util import get_traj_filename

try:
    from schrodinger.application.scisol.packages.fep import \
        protein_mutation_generator as pmg
except ImportError:
    pmg = None


[docs]def get_cov_lig_info(cms_st): """ Find ligand residue ID for covalent ligand job. The inputs should always be a complex system/complex leg. :param cms_st: Desmond system structure :type cms_st: `cms.Cms` :rtype: tuple(`str`, `str`) or tuple(None, None) :return: (chain, resnum) information of the covalent ligand """ custom_tag = cms_st.ref_ct.property.get(constants.COVALENT_LIGAND, None) if custom_tag is not None: return parse_res(custom_tag)[:2] else: from schrodinger.application.scisol.packages.fep.fepmae import \ find_covalent_residue_asl cov_res_asl = find_covalent_residue_asl(cms_st.ref_ct) cov_res_idx = analyze.evaluate_asl(cms_st.ref_ct, cov_res_asl) if cov_res_idx: cov_lig_atm = cms_st.ref_ct.atom[cov_res_idx[0]] return (cov_lig_atm.chain, cov_lig_atm.resnum) return (None, None)
[docs]class AlchemAsl(object):
[docs] def __init__(self, ref_asl, mut_asl, ref_solv_asl=None, mut_solv_asl=None): self._ref_asl = ref_asl self._mut_asl = mut_asl self._ref_solv_asl = ref_solv_asl self._mut_solv_asl = mut_solv_asl
@property def ref_asl(self): return self._ref_asl @property def mut_asl(self): return self._mut_asl @property def ref_solv_asl(self): return self._ref_solv_asl @property def mut_solv_asl(self): return self._mut_solv_asl
[docs]def setup_alchem_properties(cms_st, alchem_asl_obj, perturbation_type, leg_type): """ This method sets up all alchemical selections for different types of FEPs and respected perturbation legs. :type cms_st: `cms.Cms` :type alchem_asl_obj: `AlchemAsl` :param alchem_asl_obj: AlchemAsl object :type perturbation_type: `str` :param perturbation_type: FEP_TYPE as defined in constants.FEP_TYPES :type leg_type: `str` :param leg_type: either a 'solvent' or 'complex' :rtype: (`SmallMoleculeReport`, `SmallMoleculeReport`), (`str`, `str`) :return: two tuples of pairs: SmallMoleculeReport and full protein ASL strings """ full_protein_asl = (f"protein and not ({alchem_asl_obj.mut_asl})", f"protein and not ({alchem_asl_obj.ref_asl})") template_frag_asl = "protein and (chain.name %s and res.num %i-%i) and not (%s)" ref_metal_asl = None mut_metal_asl = None if constants.FEP_TYPES.COVALENT_LIGAND == perturbation_type: if leg_type == constants.FepLegTypes.COMPLEX: chain_name, resid = get_cov_lig_info(cms_st) templ_covlig_asl = f"(chain.name {chain_name} and res.num {resid}) and not (%s) and not " \ f"(atom.{constants.FEP_COVALENT_PROTEIN_BB_ATOM} 1 or res.ptype ACE NMA)" ref_asl = templ_covlig_asl % alchem_asl_obj.mut_asl mut_asl = templ_covlig_asl % alchem_asl_obj.ref_asl # exclude covalent ligand from protein selection tmpl = f" and not (chain.name {chain_name} and res.num {resid} and not " \ f"atom.{constants.FEP_COVALENT_PROTEIN_BB_ATOM} 1)" full_protein_asl = (full_protein_asl[0] + tmpl, full_protein_asl[1] + tmpl) else: ref_asl = alchem_asl_obj.ref_asl mut_asl = alchem_asl_obj.mut_asl full_protein_asl = (None, None) elif constants.FEP_TYPES.PROTEIN_STABILITY == perturbation_type: full_protein_asl = (leg_type == constants.FepLegTypes.COMPLEX) and \ full_protein_asl or (None, None) chain_name, resid, ins_code = parse_prm_tag(cms_st) if chain_name is None and resid is None: return (None, None), full_protein_asl ref_asl = _get_alchem_asl(chain_name, resid, ins_code, alchem_asl_obj.mut_asl) mut_asl = _get_alchem_asl(chain_name, resid, ins_code, alchem_asl_obj.ref_asl) elif constants.FEP_TYPES.SMALL_MOLECULE == perturbation_type: ref_asl = alchem_asl_obj.ref_asl mut_asl = alchem_asl_obj.mut_asl full_protein_asl = full_protein_asl \ if leg_type == constants.FepLegTypes.COMPLEX else (None, None) elif constants.FEP_TYPES.METALLOPROTEIN == perturbation_type: template_metal_ligand_asl = 'ligand and ({})' template_metal_protein_asl = 'protein and ({})' template_metal_metal_asl = '((ions) or (metals) or (metalloids)) and ({})' ref_asl = template_metal_ligand_asl.format(alchem_asl_obj.ref_asl) mut_asl = template_metal_ligand_asl.format(alchem_asl_obj.mut_asl) ref_prot_asl = template_metal_protein_asl.format(alchem_asl_obj.ref_asl) mut_prot_asl = template_metal_protein_asl.format(alchem_asl_obj.mut_asl) ref_metal_asl = template_metal_metal_asl.format(alchem_asl_obj.ref_asl) mut_metal_asl = template_metal_metal_asl.format(alchem_asl_obj.mut_asl) full_protein_asl = (ref_prot_asl, mut_prot_asl) \ if leg_type == constants.FepLegTypes.COMPLEX else (None, None) elif constants.FEP_TYPES.PROTEIN_SELECTIVITY == perturbation_type: chain_name, resid, ins_code = parse_prm_tag(cms_st) if chain_name is None and resid is None: return (None, None), full_protein_asl ref_asl = _get_alchem_asl(chain_name, resid, ins_code, alchem_asl_obj.mut_asl) mut_asl = _get_alchem_asl(chain_name, resid, ins_code, alchem_asl_obj.ref_asl) elif constants.FEP_TYPES.LIGAND_SELECTIVITY == perturbation_type: """ In complex leg we want ligand-protein interactions. In solvent leg, there is no ligand, so we're looking at fragment-protein interactions. """ if leg_type == constants.FepLegTypes.COMPLEX: lig_aids = analyze.evaluate_asl( cms_st.comp_ct[0], '(not water and not (ions) and not (metals))') ref_asl = mut_asl = analyze.generate_asl(cms_st.comp_ct[0], lig_aids) else: chain_name, resid, ins_code = parse_prm_tag(cms_st) if chain_name is None and resid is None: return (None, None), full_protein_asl ref_asl = _get_alchem_asl(chain_name, resid, ins_code, alchem_asl_obj.mut_asl) mut_asl = _get_alchem_asl(chain_name, resid, ins_code, alchem_asl_obj.ref_asl) elif perturbation_type == constants.FEP_TYPES.ABSOLUTE_BINDING: # In ABFEP, even though the ref_ct is the dummy atom while the # mut_ct is the ligand, lambda=0 is where the ligand interacts # with the protein ref_asl = f'({alchem_asl_obj.mut_asl}) and a.{constants.FEP_ABSOLUTE_BINDING_LIGAND} 1' mut_asl = "res.ptype 'DU '" # Define the protein asl, making sure there is only one copy of the # protein in the analysis full_protein_asl = (full_protein_asl[1], full_protein_asl[0]) \ if leg_type == constants.FepLegTypes.COMPLEX else (None, None) alchem_asl_obj._ref_solv_asl, alchem_asl_obj._mut_solv_asl = \ alchem_asl_obj.mut_solv_asl, alchem_asl_obj.ref_solv_asl elif perturbation_type == constants.FEP_TYPES.SOLUBILITY: full_protein_asl = (None, None) ref_asl = alchem_asl_obj.ref_asl mut_asl = alchem_asl_obj.mut_asl ref_st = cms_st.extract(cms_st.select_atom(ref_asl)) mut_st = cms_st.extract(cms_st.select_atom(mut_asl)) # get ASL/structures for alchemical solvent ref_solvent_st = cms_st.extract(cms_st.select_atom(alchem_asl_obj.ref_solv_asl)) \ if alchem_asl_obj.ref_solv_asl else None mut_solvent_st = cms_st.extract(cms_st.select_atom(alchem_asl_obj.mut_solv_asl)) \ if alchem_asl_obj.mut_solv_asl else None ref = SmallMoleculeReport(ref_st, perturbation_type, leg_type, 1, ref_asl, ref_solvent_st, alchem_asl_obj.ref_solv_asl, ref_metal_asl) mut = SmallMoleculeReport(mut_st, perturbation_type, leg_type, 2, mut_asl, mut_solvent_st, alchem_asl_obj.mut_solv_asl, mut_metal_asl) alchem_info_list = (ref, mut) return alchem_info_list, full_protein_asl
[docs]def parse_prm_tag(cms_model: cms.Cms) -> \ Union[Tuple[None, None, None], Tuple[str, int, str]]: """ Given a cms model, get the chain, resnum, and inscode of the mutated residue. Mutated sites are parsed from the s_bioluminate_Mutations property which is a string in the format of A:33B(ALA->VAL) where A is the chain, 33 is the residue number, B in the insertion code (optional), and the mutation is from an alanine to a valine. In the case of 1) a multisite+multistep mutation (e.g. WT -> A-ALA41ILE,A-ALA43GLY) 2) there is no s_bioluminate_Mutations property we must skip ligand analysis, so return (None, None, None) """ mutated_sites = list(_get_cms_mutated_sites(cms_model)) if not mutated_sites: # no mutation sites were found return None, None, None elif len(mutated_sites) > 1: # sid analysis does not support multiple ligands, and hence must be # skipped for multisite mutations return None, None, None chain, resnum, inscode = mutated_sites[0] return chain, resnum, inscode
def _get_cms_mutated_sites(cms_model: cms.Cms) -> Set[Tuple[str, int, str]]: """ Given a cms, return a set of residues that were mutated between the reference and mutant cts. If the BIOLUMINATE_MUTATION cannot be found (on cms created before 20-2) return an empty set. """ reference_mutations = cms_model.ref_ct.property.get( constants.BIOLUMINATE_MUTATION) mutant_mutations = cms_model.mut_ct.property.get( constants.BIOLUMINATE_MUTATION) if reference_mutations is None or mutant_mutations is None: return set() return pmg.get_mutated_sites(reference_mutations, mutant_mutations) def _get_alchem_asl(chain_name: str, resnum: int, inscode: str, asl_to_exclude: str) -> str: """ Get asl containing residues on the given chain with the given residue numbers (plus and minus 1) and insertion code while exclusing `asl_to_exclude` """ asl = (f'protein and (chain.name "{chain_name}" ' f'and res.num {resnum-1}-{resnum+1}') if inscode: asl += f' and res.i "{inscode}"' asl += f') and not ({asl_to_exclude})' return asl
[docs]def write_recentered_traj(traj_fn: str, asl: str) -> Tuple[str, str]: """ Center the cms/trajectory to the provided asl string. We also need to rename the CT_TYPE of 'solute' to 'solvent' to ensure proper centering. Return the new cms and trj filenames. """ from schrodinger.application.desmond.packages import topo from schrodinger.application.desmond.packages import traj from schrodinger.application.desmond.packages import traj_util from schrodinger.application.desmond.packages import pfx msys_model, cms_model, tr = traj_util.read_cms_and_traj(traj_fn) for ct in cms_model.comp_ct: if ct.property[constants.CT_TYPE] == constants.CT_TYPE.VAL.SOLUTE: ct.property[constants.CT_TYPE] = constants.CT_TYPE.VAL.SOLVENT break center_aids = cms_model.select_atom(asl) if cms_model.need_msys: gids = topo.aids2gids(cms_model, center_aids, True) topo.center_cms(msys_model, gids, cms_model) topo.center(msys_model, gids, tr) else: gids = cms_model.convert_to_gids(center_aids, True) pfx.center_cms(gids, cms_model) pfx.center(cms_model.glued_topology, gids, tr) out_traj_fn = 'centered_solubility-out.cms' out_traj_path = 'centered_solubility.xtc' cms_model.fix_filenames(out_traj_fn, out_traj_path) cms_model.write(out_traj_fn) traj.write_traj(tr, out_traj_path) return out_traj_fn, out_traj_path
[docs]class FEPReport(object):
[docs] def __init__(self, basename, energy_output, task_type='lambda_hopping', n_win=12, perturbation_type=None): self.basename = basename self.replicas_monitor = None self.nreplicas = n_win self.task_type = task_type self.prm_tag = None self.prot_info = None self.alchem_info_list = [] self.lambda0_result = None self.lambda1_result = None self.replica_energies = None self.prot_info = None self.perturbation_type = perturbation_type self.full_protein_asl = [None, None] self.alchem_info_list = [None, None] self.simulation_details = FEPSimulationReport(basename, task_type, perturbation_type) self.cms_st = self.simulation_details.get_cms() self.replica_energies = replica_container(basename, energy_output, task_type=task_type, n_win=n_win) if self.replica_energies: self.nreplicas = len(self.replica_energies.replica_list) if self.simulation_details.cfg and task_type in [ 'lambda_hopping', 'afep' ]: self.replicas_monitor = replicas_monitor( basename, self.simulation_details.cfg, task_type) self.setup_alchem_properties() self.prot_info = ProteinReport(self.cms_st, self.full_protein_asl[0], mutation_tag=self.prm_tag) self.lambda0_result = self.get_analysis(fep_lambda=0) self.lambda1_result = self.get_analysis(fep_lambda=1)
[docs] def setup_alchem_properties(self): self._determine_fep_leg() self.alchem_asl = self._get_alchemical_asls() self.alchem_info_list, self.full_protein_asl = \ setup_alchem_properties(self.cms_st, self.alchem_asl, self.perturbation_type, self.leg_type) self.simulation_details.perturbation_type = self.perturbation_type
def _determine_fep_leg(self): """Given a cms file we determine its component and the FEP leg""" if self.perturbation_type == constants.FEP_TYPES.SOLUBILITY: self.leg_type = constants.FepLegTypes.HYDRATION if self.cms_st.comp_ct[ 0].mol_total == 1 else constants.FepLegTypes.SUBLIMATION return ref_ctid = self.cms_st.comp_ct.index(self.cms_st.ref_ct) leg_type = constants.FepLegTypes.COMPLEX if self.perturbation_type in [ constants.FEP_TYPES.SMALL_MOLECULE, constants.FEP_TYPES.LIGAND_SELECTIVITY, constants.FEP_TYPES.PROTEIN_SELECTIVITY ] and ref_ctid == 0: leg_type = constants.FepLegTypes.SOLVENT elif self.perturbation_type in [ constants.FEP_TYPES.PROTEIN_STABILITY, constants.FEP_TYPES.COVALENT_LIGAND ]: if self.cms_st.ref_ct.atom_total < constants.LIGAND_TOTAL_ATOMS_LIMIT: leg_type = constants.FepLegTypes.SOLVENT elif self.perturbation_type == constants.FEP_TYPES.METALLOPROTEIN: if self.cms_st.ref_ct.atom_total < constants.LIGAND_TOTAL_ATOMS_LIMIT: leg_type = constants.FepLegTypes.SOLVENT elif self.perturbation_type == constants.FEP_TYPES.ABSOLUTE_BINDING: leg_type = self.cms_st.ref_ct.property[ constants.ABSOLUTE_BINDING_LEGS] self.leg_type = leg_type def _get_alchemical_asls(self): """ For each lambda value (0 or 1), return the ligand asl and then the alchemical solvent (can be either ion or water) asl. """ def _get_asl(_ct): ctid = self.cms_st.comp_ct.index(_ct) atom_list = self.cms_st.get_fullsys_ct_atom_index_range(ctid) lig_struc = self.cms_st.extract(atom_list) ions_or_waters = analyze.evaluate_asl( lig_struc, f'atom.{constants.ALCHEMICAL_ION}') # if no alchemical waters or ions if not ions_or_waters: return (analyze.generate_asl(self.cms_st, atom_list), None) ions_or_waters_idx = [ lig_struc.atom[i].property[constants.ORIGINAL_INDEX] for i in ions_or_waters ] lig_idx = list(set(atom_list) - set(ions_or_waters_idx)) return (analyze.generate_asl(self.cms_st, lig_idx), analyze.generate_asl(self.cms_st, ions_or_waters_idx)) if self.perturbation_type == constants.FEP_TYPES.SOLUBILITY: alchem_prop = f'atom.{constants.FEP_ABSOLUTE_LIGAND} 1' sel_atoms = analyze.evaluate_asl(self.cms_st, alchem_prop) mol_number = set( [self.cms_st.atom[i].molecule_number for i in sel_atoms]) # this property should be set only on a single molecule if len(mol_number) != 1: raise RuntimeError("More than one alchemical molecule set.") mol_asl = f'mol.num {list(mol_number)[0]}' return AlchemAsl(mol_asl, mol_asl, mol_asl, mol_asl) ref_asl, ref_solvent_asl = _get_asl(self.cms_st.ref_ct) mut_asl, mut_solvent_asl = _get_asl(self.cms_st.mut_ct) return AlchemAsl(ref_asl, mut_asl, ref_solvent_asl, mut_solvent_asl)
[docs] def get_ark_results(self): """ Function organizes and returns ARK abject """ kw_list = sea.List() m = sea.Map() kw_list.append(self.simulation_details.export()) temp_m = self.prot_info and self.prot_info.export() if temp_m: kw_list.append(temp_m) for lig in self.alchem_info_list: if lig is not None: kw_list.append(lig.export()) if self.replicas_monitor: kw_list.append(self.replicas_monitor.export()) if self.replica_energies: kw_list.append(self.replica_energies.export()) if self.lambda0_result: res_m1 = sea.Map() res_m1['ResultLambda0'] = self.lambda0_result kw_list.append(res_m1) if self.lambda1_result: res_m2 = sea.Map() res_m2['ResultLambda1'] = self.lambda1_result kw_list.append(res_m2) m['Keywords'] = kw_list m['BaseFilename'] = self.basename if self.prm_tag: m["ProteinMutation"] = self.prm_tag return m
[docs] def export(self, filename=None): """ Writes a file with SID results in them, so they can be read into SID gui """ m = self.get_ark_results() if not filename: filename = self.basename + '.sid' with open(filename, 'w') as f: f.write(self.ark_str(m))
[docs] def ark_str(self, str_in): """Sanitize ARK string, by removing the doubleqoutes""" return str(str_in).strip("\"")
[docs] def launch_SID(self, traj_fn, st2_fn, eaf_fn): """ This method launches analyze_simulation.py, a backend for SID analysis """ from schrodinger.application.desmond.packages import topo analyzeTrajExec = os.path.join(os.environ.get("SCHRODINGER"), "run") # assuming the trajectory and cms file are in the same folder traj_path = topo.find_traj_path_from_cms_path(traj_fn) or \ get_traj_filename(traj_fn.replace('-out.cms', '').replace('.gz', '')) if traj_path is None: print( 'Could not locate a trajectory directory for given CMS file: {}.' .format(traj_fn)) return # For solubility FEP, we must rename the 'solute' CT type and re-center # trajectory on the molecule of interest. This is to prevent the glue to # be applied to 'solute' block for reliable RMSD nad SASA calculations. if self.perturbation_type == constants.FEP_TYPES.SOLUBILITY: traj_fn, traj_path = write_recentered_traj( traj_fn, self.alchem_info_list[1].asl) cmd = [ analyzeTrajExec, "analyze_simulation.py", traj_fn, traj_path, eaf_fn, st2_fn, ] print(f"Post-process INFO: {' '.join(cmd)}") result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, ) if result.returncode: print(f'ERROR: SID job failed: {result.stdout}, {result.stderr}')
[docs] def get_analysis(self, fep_lambda): """ This method generates an analysis input file, submits the analysis, and returns an ARK object with results. :type fep_lambda: int :rtype `ARK object` """ # Save the original pose as reference structure for later analysis ref_st = get_reference_ct(self.cms_st.fsys_ct) # Use the analysis reference coords if present ref_st = get_reference_ct( ref_st, prop_names=constants.ANALYSIS_REFERENCE_COORD_PROPERTIES) self.ref_ct_fname = os.path.join(os.getcwd(), 'reference_structure.mae') ref_st.write(self.ref_ct_fname) # protein ligand interaction want_rmsd = want_prmsf = want_ppi = bool(self.prot_info.prot_st) if self.perturbation_type == constants.FEP_TYPES.SOLUBILITY: want_ltorsion = want_lrmsf = want_lprops = True if fep_lambda == 1 else False alchem_st = self.alchem_info_list[fep_lambda].st alchem_asl = self.alchem_info_list[fep_lambda].asl want_pli = False alchem_metal_asl = "" elif self.alchem_info_list[fep_lambda] is not None: want_pli = bool( self.prot_info.prot_st) and self.alchem_info_list[0].asl want_ltorsion = (self.perturbation_type == constants.FEP_TYPES.LIGAND_SELECTIVITY) want_lrmsf = want_lprops = True alchem_st = self.alchem_info_list[fep_lambda].st alchem_asl = self.alchem_info_list[fep_lambda].asl alchem_metal_asl = self.alchem_info_list[fep_lambda].metal_asl else: # see parse_prm_tag for cases when alchem_info may be None want_pli = want_ltorsion = want_lrmsf = want_lprops = False alchem_st, alchem_asl, alchem_metal_asl = None, "", "" if self.perturbation_type == constants.FEP_TYPES.ABSOLUTE_BINDING and fep_lambda == 1: want_ltorsion = want_lrmsf = False alchem_st, alchem_asl = None, '' protein_fep = self.perturbation_type in [ constants.FEP_TYPES.LIGAND_SELECTIVITY, constants.FEP_TYPES.PROTEIN_STABILITY, constants.FEP_TYPES.COVALENT_LIGAND, constants.FEP_TYPES.PROTEIN_SELECTIVITY ] ark_kw = aag.getPLISKWList( self.cms_st, alchem_st, alchem_asl, self.ref_ct_fname, protein_asl=self.full_protein_asl[fep_lambda], want_rmsd=want_rmsd, want_ppi=want_ppi, want_prmsf=want_prmsf, want_pli=want_pli, want_lrmsf=want_lrmsf, want_ltorsion=want_ltorsion, protein_fep=protein_fep, metal_asl=alchem_metal_asl, want_lprops=want_lprops) # get torsions: if self.perturbation_type in [ constants.FEP_TYPES.SMALL_MOLECULE, constants.FEP_TYPES.COVALENT_LIGAND, constants.FEP_TYPES.METALLOPROTEIN ]: lig1_st = self.alchem_info_list[0].st.copy() lig2_st = self.alchem_info_list[1].st.copy() if self.perturbation_type == constants.FEP_TYPES.COVALENT_LIGAND: asl = f"not (atom.{constants.FEP_COVALENT_PROTEIN_BB_ATOM} 1 " \ "or res.ptype ACE NMA)" lig1_st = lig1_st.extract(analyze.evaluate_asl(lig1_st, asl)) lig2_st = lig2_st.extract(analyze.evaluate_asl(lig2_st, asl)) tors_ark_kw = aag.getFEPTorsionKeywords( lig1_st, lig2_st, self.alchem_info_list[0].asl, self.alchem_info_list[1].asl, fep_lambda=fep_lambda, is_covalent=self.perturbation_type == constants.FEP_TYPES.COVALENT_LIGAND) ark_kw += tors_ark_kw elif self.perturbation_type == constants.FEP_TYPES.ABSOLUTE_BINDING and fep_lambda == 0: ark_kw += sea.List([ t for t in aag.getTorsionKeywords( self.alchem_info_list[0].st.copy(), self.alchem_info_list[0].asl) ]) elif (self.perturbation_type == constants.FEP_TYPES.SOLUBILITY and fep_lambda == 1 and self.leg_type == constants.FepLegTypes.SUBLIMATION): ark_kw.append( aag.getSolubilityKeywords( self.alchem_info_list[fep_lambda].asl)) replica = 0 if fep_lambda == 1: replica = self.nreplicas - 1 suffix = self.basename + '_replica%s' % replica directory = '' # in case of regular FEP, where all the lambda windows/replicas # are in a separate directory if self.task_type == 'fep': basename = self.basename.split('_lambda')[0] directory = basename + '_lambda%i/' % (replica) suffix = basename + '_lambda%i' % (replica) temp_ark = sea.Map() temp_ark["Keywords"] = ark_kw cms_filename = directory + suffix + '-out.cms' if self.task_type in ['lambda_hopping', 'afep']: temp_ark["Trajectory"] = f'{self.basename}_replica{replica}-out.cms' if self.task_type == 'fep': temp_ark["Trajectory"] = f'{directory}{suffix}-out.cms' if self.alchem_info_list[fep_lambda]: temp_ark["LigandASL"] = self.alchem_info_list[fep_lambda].asl input_fn = directory + suffix + '-in.st2' with open(input_fn, 'w') as st2: st2.write(str(temp_ark)) temp_ark["Trajectory"].val = util.gz_fname_if_exists( temp_ark["Trajectory"].val) cms_filename = util.gz_fname_if_exists(cms_filename) # setup output file output_fn = suffix + '-out.eaf' self.launch_SID(cms_filename, input_fn, output_fn) if os.path.isfile(output_fn): with open(output_fn) as fh: return sea.Map(fh.read()) return None
[docs]class FEPSimulationReport(object):
[docs] def __init__(self, basename, task_type, perturbation_type, cfg=None): self.salt = [] self.salt_concentration = [] self.basename = basename self.task_type = task_type self.perturbation_type = perturbation_type self.cms_st = self.read_cms(self.basename) if cfg is None: self.cfg = self.read_cfg(self.basename) else: self.cfg = cfg self.ncpu, self.ngpu = self.get_cpu_gpu_info() self.job_name = self.basename self.job_type = self.get_job_type() self.temperature = self.get_temperature() self.ensemble = self.get_ensemble() self.total_atoms = self.cms_st.atom_total self.total_charge = self.cms_st.formal_charge self.nwaters = self.get_nwaters() self.entry_title = self.get_entry_title() self.force_fields = self.get_ff() self.sim_time = self.get_sim_time_ns() self.process_salt_and_ions()
[docs] def export(self): m = sea.Map() m['FEPSimulation'] = sea.Map() m['FEPSimulation']['NumberCPU'] = self.ncpu m['FEPSimulation']['JobName'] = self.job_name m['FEPSimulation']['EntryTitle'] = self.entry_title m['FEPSimulation']['JobType'] = self.job_type m['FEPSimulation']['TemperatureK'] = self.temperature m['FEPSimulation']['Ensemble'] = self.ensemble m['FEPSimulation']['TotalAtoms'] = self.total_atoms m['FEPSimulation']['TotalCharge'] = self.total_charge m['FEPSimulation']['NumberWaters'] = self.nwaters m['FEPSimulation']['ForceFields'] = self.force_fields m['FEPSimulation']['TotalSimulationNs'] = self.sim_time m['FEPSimulation']['PerturbationType'] = self.perturbation_type m['FEPSimulation']['ReleaseVersion'] = schrodinger.get_release_name() if self.salt: m['SaltInformation'] = sea.Map() m['SaltInformation']['SaltMolecules'] = self.salt m['SaltInformation']['SaltConcentration'] = self.salt_concentration return m
[docs] def process_salt_and_ions(self): salt = [] salt_concentration = [] IONS_FFIO_TYPES = [ constants.CT_TYPE.VAL.POSITIVE_SALT, constants.CT_TYPE.VAL.NEGATIVE_SALT, constants.CT_TYPE.VAL.ION ] for ct in self.cms_st.comp_ct: if ct.property[constants.CT_TYPE] not in IONS_FFIO_TYPES: continue salt.extend([m.atom[1].element for m in ct.molecule]) if salt: wat = self.nwaters * 55. # molarity of pure water. for ion in set(salt): concent_str = "%.1f mM" % ((salt.count(ion) / wat) * 1e6) salt_concentration.append((ion, concent_str)) self.salt = salt self.salt_concentration = salt_concentration
[docs] def get_cms(self): return self.cms_st
[docs] def get_cpu_gpu_info(self): ncpus = 1 ngpus = 0 if not self.cfg: return ncpus, ngpus cpus = self.cfg.ORIG_CFG.cpu.val if isinstance(cpus, int): ncpus = cpus else: ncpus = numpy.product(cpus) return ncpus, ngpus
[docs] def get_sim_time_ns(self): sim_time = 0.0 if not self.cfg: return sim_time if 'time' in list(self.cfg): sim_time = self.cfg.time.val elif 'mdsim' in list(self.cfg) and \ 'last_time' in list(self.cfg.mdsim): sim_time = self.cfg.mdsim.last_time.val elif 'remd' in list(self.cfg) and \ 'last_time' in list(self.cfg.remd): sim_time = self.cfg.remd.last_time.val return sim_time / 1000.0
[docs] def get_job_type(self): if self.task_type == 'lambda_hopping': if self.perturbation_type == constants.FEP_TYPES.SOLUBILITY: return 'Solubility FEP' else: return 'FEP/REST' elif self.task_type == 'fep': return 'FEP' else: return 'Unknown'
[docs] def get_ensemble(self): ensemble = 'Unknown*' if not self.cfg: return ensemble cfg = self.cfg if 'ensemble' in list(cfg.ORIG_CFG): if cfg.ORIG_CFG.ensemble.has_tag('class_'): ensemble = str(cfg.ORIG_CFG.ensemble.class_.val) else: ensemble = str(cfg.ORIG_CFG.ensemble.val) if config.is_gcmc(cfg.ORIG_CFG): # DESMOND-9447 ensemble = ensemble.replace('N', 'mu') return ensemble
[docs] def get_temperature(self): temp = 300. if not self.cfg: return temp cfg = self.cfg if 'temperature' in list(cfg.ORIG_CFG): temp = cfg.ORIG_CFG.temperature.val elif 'temperature' in list(cfg.integrator): temp = cfg.integrator.temperature.T_ref.val else: temp = 300. return float(temp)
[docs] def read_cms(self, basename): if self.task_type == 'lambda_hopping': cms_filename = basename + '_replica_0-in.cms' elif self.task_type == 'afep': cms_filename = os.path.join(basename, basename + '_replica_0-in.cms') elif self.task_type == 'fep': directory = basename + '_lambda%i/' % 0 cms_filename = directory + basename + '_lambda0-in.cms' else: raise TypeError(f'Unknown task_types: {self.task_type}') cms_filename = util.gz_fname_if_exists(cms_filename) cms_st = cms.Cms(cms_filename) for atom in cms_st.atom: atom.property[constants.ORIGINAL_INDEX] = atom.index return cms_st
[docs] def get_nwaters(self): atom_list = self.cms_st.select_atom('water and not a.e H') return len(atom_list)
[docs] def get_entry_title(self): entry_title = 'Unknown' if 's_m_title' in self.cms_st.property: temp_title = self.cms_st.property['s_m_title'] temp_title = temp_title.replace('(full system)', '') temp_title = temp_title.replace('-out', '').strip() if len(temp_title): entry_title = temp_title return entry_title
[docs] def get_ff(self): if 's_ffio_name' in self.cms_st.comp_ct[0].ffio.property: ff = self.cms_st.comp_ct[0].ffio.property['s_ffio_name'] if ff.count('_REASSIGN'): ff = ff.replace('_REASSIGN', '*') else: ff = 'OPLS_2005' return ff
[docs] def read_cfg(self, basename): cfg_filename = basename + '-out.cfg' if self.task_type == 'fep': cfg_filename = basename + '_lambda0/' + basename + '_lambda0-out.cfg' if os.path.isfile(cfg_filename): with open(cfg_filename) as fh: return sea.Map(fh.read()) else: return None
[docs]class ProteinReport(object):
[docs] def __init__(self, cms_st, prot_asl, mutation_tag=None): self.cms_st = cms_st self.prot_st = self.get_protein(prot_asl) self.mutation_tag = mutation_tag if self.prot_st: self.natoms, self.nheavy = self.get_number_atoms() self.nresidues, self.prot_chains = self.get_residues() self.charge = self.prot_st.formal_charge self.hot_atoms, self.hot_heavy_atoms = self.get_hot_atoms()
[docs] def export(self): if not self.prot_st: return None m = sea.Map() m['ProteinInfo'] = sea.Map() m['ProteinInfo']['NumberAtoms'] = self.natoms m['ProteinInfo']['NumberHeavyAtoms'] = self.nheavy m['ProteinInfo']['NumberResidues'] = self.nresidues m['ProteinInfo']['ChainsNames'] = list(self.prot_chains) m['ProteinInfo']['ChainsResidues'] = list(self.prot_chains.values()) m['ProteinInfo']['FormalCharge'] = self.charge m['ProteinInfo']['NumberHotAtoms'] = len(self.hot_atoms) m['ProteinInfo']['HotHeavyAtoms'] = self.hot_heavy_atoms if self.mutation_tag: m['ProteinInfo']['MutationTag'] = self.mutation_tag return m
[docs] def get_hot_atoms(self): """ Returns atoms in the hot region """ if not self.prot_st: return [], [] # depending how you set up rest region, you may have different # property name hot_atoms = analyze.evaluate_asl( self.prot_st, f'atom.{constants.REST_HOTREGION} 1 or atom.i_user_hotregion 1 ' f'or atom.{constants.REST_PROPERTIES.COMPLEX_HOTREGION} 1 ' f'or atom.{constants.REST_PROPERTIES.SOLVENT_HOTREGION} 1') hot_heavy_atoms = [ i for i in hot_atoms if self.prot_st.atom[i].element != 'H' ] return hot_atoms, hot_heavy_atoms
[docs] def get_residues(self): if not self.prot_st: return 0, 0 prot_residues = 0 prot_chains = {} for chain in self.prot_st.chain: chain_name = chain.name.strip() if not chain_name: chain_name = "NoChainId" rescount = len(chain.residue) prot_residues += rescount prot_chains[chain_name] = rescount return prot_residues, prot_chains
[docs] def get_number_atoms(self): if not self.prot_st: return 0, 0 natoms = self.prot_st.atom_total prot_st_noh = self.prot_st.copy() build.delete_hydrogens(prot_st_noh) natoms_noh = prot_st_noh.atom_total return natoms, natoms_noh
[docs] def get_protein(self, asl): if not asl: return None prot_idx = self.cms_st.select_atom(asl) if len(prot_idx): return self.cms_st.extract(prot_idx) return None
[docs]class SmallMoleculeReport(object):
[docs] def __init__(self, st, perturbation_type, leg_type, ligand_number=0, asl=None, alchem_solvent_st=None, alchem_solvent_asl=None, metal_asl=None): """ :param perturbation_type: one of several perturbation types :type perturbation_type: str :param leg_type: solvent, complex or vacuum :type leg_type: str :param asl: Asl for the ligand :type asl: str :param alchem_solvent_asl: Asl for alchemical solvent, can be either water or ions :type alchem_solvent_asl: str :param alchem_solvent_st: Ct of alchemical solvent, can be either water or ions :type alchem_solvent_st: Structure :param metal_asl: Asl for the metals and ions :type metal_asl: str """ self.st = st self.ligand_number = ligand_number self.asl = asl self.metal_asl = metal_asl self.perturbation_type = perturbation_type self._leg_type = leg_type self.smiles = self.get_smiles() self.nRB = analyze.get_num_rotatable_bonds(self.st, max_size=aag.MAX_RING_SIZE) self.atomic_mass = self.st.total_weight self.charge = self.st.formal_charge self.natoms, self.nheavy_atoms = self.get_natoms() self.mol_formula = self.get_mol_formula() self.fragments = self.getLigandFragments() self.resname = self.get_resname() self.hot_atoms, self.hot_heavy_atoms = self.get_hot_atoms() self._alchem_solvent_st = alchem_solvent_st self._alchem_solvent_asl = alchem_solvent_asl self.alchem_solv_type, self.natoms_alchem_solv = self.get_alchem_solv()
[docs] def export(self): m = sea.Map() k = 'PeptideInfo' if self.perturbation_type in [ constants.FEP_TYPES.LIGAND_SELECTIVITY, constants.FEP_TYPES.SMALL_MOLECULE, constants.FEP_TYPES.METALLOPROTEIN, constants.FEP_TYPES.ABSOLUTE_BINDING, constants.FEP_TYPES.SOLUBILITY ]: k = 'LigandInfo' m[k] = sea.Map() m[k]['LigandNumber'] = self.ligand_number m[k]['NumberAtoms'] = self.natoms m[k]['NumberHeavyAtoms'] = self.nheavy_atoms m[k]['ASL'] = self.asl m[k]['SMILES'] = self.smiles m[k]['NumberRotatableBonds'] = self.nRB m[k]['AtomicMass'] = self.atomic_mass m[k]['Charge'] = self.charge m[k]['MolecularFormula'] = self.mol_formula m[k]['NumberFragments'] = len(self.fragments) m[k]['PDBResidueName'] = self.resname m[k]['NumberHotAtoms'] = len(self.hot_atoms) m[k]['HotHeavyAtoms'] = self.hot_heavy_atoms m[k]['PerturbationType'] = self.perturbation_type m[k]['LegType'] = self._leg_type if self.alchem_solv_type: m[k]['AlchemicalSolvent'] = self.alchem_solv_type m[k]['AlchemicalSolventASL'] = self._alchem_solvent_asl m[k]['AlchemicalSolventAtoms'] = self.natoms_alchem_solv return m
[docs] def get_alchem_solv(self): """ Return a alchemical solvent types and number of atoms of such type """ if not self._alchem_solvent_st: return None, 0 solv_type = ' '.join( [res.pdbres.strip() for res in self._alchem_solvent_st.residue]) return solv_type, len(self._alchem_solvent_st.atom)
[docs] def get_hot_atoms(self): """ Returns number of atoms in the hot region. Depending where the rest region is set up, different property names are used. """ hot_atoms = analyze.evaluate_asl( self.st, f'atom.{constants.REST_HOTREGION} 1 or atom.i_user_hotregion 1 ' f'or atom.{constants.REST_PROPERTIES.COMPLEX_HOTREGION} 1 ' f'or atom.{constants.REST_PROPERTIES.SOLVENT_HOTREGION} 1') hot_heavy_atoms = [ i for i in hot_atoms if self.st.atom[i].element != 'H' ] return hot_atoms, hot_heavy_atoms
[docs] def getLigandFragments(self): """ Fragments the ligand in several fragments using the murcko rules. returns the list of mappings """ frag_creator = createfragments.FragmentCreator( atoms=2, bonds=100, carbon_hetero=True, maxatoms=500, # arbirary choice, which is big enough for cyclic # peptide and small enough to be considered "ligand" removedup=False, murcko=False, recap=True, complete=False, add_h=False, verbose=False) fragments = frag_creator.fragmentStructureForEA(self.st) if len(fragments): return fragments return [self.st]
[docs] def get_resname(self): resname = [] for res in structure.get_residues_by_connectivity(self.st): res_tag = res.pdbres.strip() if res_tag != '' and res_tag not in ['ACE', 'NMA']: resname.append(res_tag) res = list(set(resname)) if len(res) == 1 and not len(res[0]): ligand_resname = ['None'] else: ligand_resname = str(res) return ligand_resname[1:-1]
[docs] def get_mol_formula(self): return analyze.generate_molecular_formula(self.st)
[docs] def get_natoms(self): natoms = self.st.atom_total nheavy_atoms = 0 for atom in self.st.atom: if atom.atomic_number != 1: nheavy_atoms += 1 return natoms, nheavy_atoms
[docs] def get_smiles(self): smiles_gen = smiles_mod.SmilesGenerator() smiles = smiles_gen.getSmiles(self.st) return smiles
if __name__ == '__main__': # coverage: +SKIP import sys fsr = FEPReport(sys.argv[1].replace('.sid', ''), None, task_type=None, n_win=None, perturbation_type=constants.FEP_TYPES.LIGAND_SELECTIVITY) fsr.export() print('done')