Source code for schrodinger.application.desmond.gcmc_utils

import itertools
import shutil
from typing import List

import numpy as np

from schrodinger import structure
from schrodinger.structutils import measure

from . import cms
from . import constants

# more than enough for the standard batch size where < (half + 100) of
# the moves are insertions, plus a generous buffer for overall density
# fluctuations (remaining ~1100)
NUM_COPIES_DEFAULT = (constants.GCMC_BATCH_SIZE_DEFAULT // 2 + 100) + 1100


[docs]def add_inactive_solvent_file(ifile: str, solvent_file: str, ofile: str, **kwargs): """ Opens the input file, calls `add_inactive_solvent`, and writes the result to a new file. :param ifile: the input filename :type ifile: `str` :param solvent_file: the solvent filename :type solvent_file: str :param ofile: the output filename :type ofile: `str` :param kwargs: keyword arguments passed to `add_inactive_solvent` """ model = cms.Cms(ifile) if solvent_file == ifile: solvent_model = model.copy() else: solvent_model = cms.Cms(solvent_file) with_inactive_solvent = add_inactive_solvent(model, solvent_model, **kwargs) with_inactive_solvent.write(ofile)
[docs]def add_inactive_solvent(model: cms.Cms, solvent_model: cms.Cms, num_copies: int = NUM_COPIES_DEFAULT, copy: bool = False): """ Appends `num_copies` copies of the molecules in the supplied solvent model to either system's solvent CT if it exists or otherwise a new solvent CT. These changes are propagated to the fullsystem CT as well. :param model: the model to add inactive solvent to :type model: `cms.Cms` :param solvent_model: a cms model containing a solvent CT, can be the same as `model`. NOTE - if `solvent_model` contains different PBC's than `model`, these PBC's will not be honored, which could lead to 'broken' molecules. :type solvent_model: `cms.Cms` :param num_copies: the number of inactive solvent molecules to add :type num_copies: `int` :param copy: whether or not to return a copy of the input structure. Otherwise the input may be modified. :type copy: `bool` :return: the input model with solvent added :rtype: `cms.Cms` """ if copy: model = model.copy() solvent_cts = solvent_model.get_solvent_cts() if not solvent_cts: raise ValueError("No solvent CT found in solvent model") if len(solvent_cts) != 1: raise NotImplementedError("Only one solvent CT is supported") inactive_solvent_ct = solvent_cts[0].copy() if not inactive_solvent_ct.molecule: raise ValueError("No molecules found in solvent CT") # we can't just extract molecules because this is an ffiostructure and we'd # lose the ffio and fepio blocks num_mols = inactive_solvent_ct.mol_total # append copies of self until we have at least as many copies as requested if num_mols < num_copies: inactive_solvent_ct_copy = inactive_solvent_ct.copy() for i in range((num_copies - 1) // num_mols): ffio_extend_ct(inactive_solvent_ct, inactive_solvent_ct_copy) # then strip off any extra copies, if necessary # we leave this as a separate if statement rather than merging into # above for cases where inactive_solvent_ct starts off with more than the requested # num_copies (often true for the case of augmenting the solvent block with # copies of itself) num_extra_mols = inactive_solvent_ct.mol_total - num_copies if num_extra_mols: num_extra = num_extra_mols * len(inactive_solvent_ct.molecule[1]) ffio_delete_atoms(inactive_solvent_ct, num_extra) model_solvent_cts = model.get_solvent_cts() old_total = model.comp_atom_total if not model_solvent_cts: # copy over properties such as PBC box for prop in model.property: if 'chorus_box' in prop: inactive_solvent_ct.property[prop] = model.property[prop] inactive_solvent_ct.title = constants.CT_TYPE.VAL.SOLVENT inactive_solvent_ct.property[ constants.CT_TYPE] = constants.CT_TYPE.VAL.SOLVENT model.comp_ct.append(inactive_solvent_ct) else: if len(model_solvent_cts) != 1: raise NotImplementedError("Only one solvent CT is supported") ffio_extend_ct(model_solvent_cts[0], inactive_solvent_ct) # we only need to synchronize if the cms displays inactive atoms in the # fullsystem ct # this module is basically performing extensions of the cms class so I think # it's okay to use private members. But maybe we should add a method that # notifies the cms object that inactive atoms have been added and it can take # care of this? It just seems a little like splitting this function up # between this module and the object. if model._show_inactive: model.synchronize_fsys_ct() model.active_total = old_total return model
[docs]def remove_inactive_solvent_file(ifile: str, ofile: str): """ Calls remove_inactive_solvent on a file and writes the result to a new file :param ifile: the input filename :type ifile: `str` :param ofile: the output filename :type ofile: `str` """ model = cms.Cms(ifile) pruned_model = remove_inactive_solvent(model) pruned_model.write(ofile)
[docs]def remove_inactive_solvent(model: cms.Cms, copy_if_gcmc: bool = False): """ Remove inactive solvent from the full system CT and the component CTs such that afterward the `comp_atom_total` will equal the `active_total` :param model: the Cms object from which to remove inactive solvent atoms :type model: `cms.Cms` :param copy_if_gcmc: whether or not to return a copy of the input structure to avoid side effects (ie, the removal of inactive solvent molecules). If the input model does not contain inactive solvent, it is always returned uncopied. :type copy_if_gcmc: `bool` :return: the same Cms object or a copy with its inactive solvent atoms removed :rtype: `cms.Cms` """ num_inactive = model.comp_atom_total - model.active_total if not num_inactive: return model if copy_if_gcmc: model = model.copy() solvent_cts = model.get_solvent_cts() if not solvent_cts: raise ValueError("No solvent CTs found to remove inactive atoms from") if len(solvent_cts) != 1: raise NotImplementedError("Only one solvent CT is supported") solvent_ct = solvent_cts[0] ffio_delete_atoms(solvent_ct, num_inactive) if not solvent_ct.atom_total: model.comp_ct.remove(solvent_ct) if model._show_inactive: model.synchronize_fsys_ct() # this will get rid of i_des_active_total properties model.active_total = model.active_total return model
[docs]def tag_gcmc_ligand_file(ifile: str, ofile: str, **kwargs): """ Calls tag_gcmc_ligand on the input file and writes the results to the output file. :param ifile: the input filename :type ifile: str :param ofile: the output filename :type ofile: str :param kwargs: keyword arguments passed to `tag_gcmc_ligand` """ model = cms.Cms(ifile) tag_gcmc_ligand(model, **kwargs) model.write(ofile)
[docs]def tag_gcmc_ligand(model: cms.Cms, tag_only_mutated: bool = False): """ Finds the atoms of the 'ligand molecule(s)' and sets the `GCMC_LIGAND` property to 1. The 'ligand molecule(s)' is(are) defined as the molecule containing the first atom of the ligand ct(s), in order to exclude alchemical ions/water. If no ligand is found, or if atoms in the system contain the `GCMC_LIGAND` property already, the system is returned unchanged. For FEP systems, both the reference and mutant ligands are tagged. :param model: The input cms object :type model: `cms.Cms` :param tag_only_mutated: If false (default), all atoms in the cms model's FEP cts are tagged. Otherwise, only atoms in the FEP cts which are mapped to dummy atoms are tagged. The case of mutated atoms that map to each other rather than to dummy atoms is not supported. :return: The modified input cms object :rtype: `cms.Cms` """ # tag both ref and mut cts lig_cts = model.get_fep_cts() # if they do not exist, we have nothing to tag if not lig_cts[0]: return model # don't add tags if user has already tagged anything for ct in model.comp_ct: if ct.atom_total: # this may be overkill for atom in ct.atom: if constants.GCMC_LIGAND in atom.property: return model for lig_ct in lig_cts: if lig_ct.atom_total: # this may be overkill for lig_atom in lig_ct.atom: if lig_atom.property.get(constants.ALCHEMICAL_ION, False): continue if tag_only_mutated and not lig_atom.property.get( constants.PROT_LIGAND_REGION, 0): continue lig_atom.property[constants.GCMC_LIGAND] = 1 model.synchronize_fsys_ct() return model
[docs]def tag_gcmc_near_ligand_file(ifile: str, ofile: str, lig_file: str, cutoff_distance: float): """ Calls tag_gcmc_near_ligand on the input file and writes it to the output file, using the given ligand file and cutoff distance. :param ifile: the input file :type ifile: str :param ofile: the output file :type ofile: str :param lig_file: the file containing the ligand model :type lig_file: str :param cutoff_distance: atoms within this distance of any ligand atom will be tagged :type cutoff_distance: float """ model = cms.Cms(ifile) with structure.StructureReader(lig_file) as reader: ligand_model_sts = list(reader) assert len(ligand_model_sts) == 1 ligand_model = ligand_model_sts[0] tag_gcmc_near_ligand(model, ligand_model, cutoff_distance) model.write(ofile)
[docs]def tag_gcmc_near_ligand(model: cms.Cms, ligand_model: cms.Cms, cutoff_distance: float): """ Tags solute atoms in the model that are within the cutoff distance of any atom in the ligand model by setting the `GCMC_LIGAND` atom property to 1. :param model: the input cms model :type model: `cms.Cms` :param ligand_model: the input ligand model :type ligand_model: `structure.Structure` :param cutoff_distance: solute atoms within this distance of any ligand atom will be tagged :type cutoff_distance: float :return: the modified input model :rtype: `cms.Cms` """ marked = False for ct in model.comp_ct: # ligand and receptor appear to be watermap-specific ct types based on # schrodinger.application.desmond.systype.Watermaptyper if ct.property[constants.CT_TYPE] in [ constants.CT_TYPE.VAL.SOLUTE, constants.CT_TYPE.VAL.LIGAND, constants.CT_TYPE.VAL.RECEPTOR ]: close_atoms = measure.get_atoms_close_to_structure( ct, ligand_model, cutoff_distance) atom_list = ct.atom for atom_idx in close_atoms: atom = atom_list[atom_idx] atom.property[constants.GCMC_LIGAND] = 1 marked = True if not marked: raise ValueError("No solute molecules found near the ligand") model.synchronize_fsys_ct() return model
[docs]def ffio_delete_atoms(ffio_ct: cms.ffiostructure.FFIOStructure, num_atoms: int): """ Deletes the given number of atoms and a proportional number of pseudoatoms from the input CT. :param ffio_ct: The input ffio CT :type ffio_ct: `cms.ffiostructure.FFIOStructure` :param num_atoms: the number of atoms to delete :type num_atoms: `int` """ # TODO move to FFIOStructure atom_total = ffio_ct.atom_total num_pseudos = len(ffio_ct.ffio.pseudo) new_atom_total = atom_total - num_atoms ffio_ct.deleteAtoms(range(atom_total, new_atom_total, -1)) if num_pseudos: new_num_pseudos = new_atom_total * num_pseudos // atom_total # NOTE - we are assuming the pseudoatoms are laid out by molecule, not # by psuedoatom type. ffio_ct.ffio.deletePseudos(range(num_pseudos, new_num_pseudos, -1))
[docs]def ffio_extend_ct(ffio_ct: cms.ffiostructure.FFIOStructure, other_ffio_ct: cms.ffiostructure.FFIOStructure): """ Extends an ffio CT with the atoms and pseudoatoms of another ffio CT. The CTs should each only contain copies of the same single molecule. :param ffio_ct: The input ffio CT :type ffio_ct: `cms.ffiostructure.FFIOStructure` :param other_ffio_ct: The ffio CT to extend the input CT with :type other_ffio_ct: `cms.ffiostructure.FFIOStructure` """ site, other_site = ffio_ct.ffio.site, other_ffio_ct.ffio.site real_site, other_real_site = [[ s for s in _site if s.property['s_ffio_type'] != 'pseudo' ] for _site in (site, other_site)] if len(site) != len(other_site) or len(real_site) != len(other_real_site): raise ValueError("FFIO CTs' sites do not match") # TODO move this to FFIOStructure ffio_ct.extend(other_ffio_ct) for pseudo in other_ffio_ct.ffio.pseudo: # NOTE - we are assuming the pseudoatoms are laid out by molecule, not # by psuedoatom type. new_pseudo = ffio_ct.ffio.addPseudo() for key in pseudo.property.keys(): new_pseudo.property[key] = pseudo.property[key]
[docs]def equalize_number_waters(struc_files: List[str]): """ Deletes the waters from the marked component CTs of the structure files until each has the same number of waters. :param struc_files: a dictionary mapping the filenames of cms files to a tuple containing the index of their solvent CT and the total number of atoms in the system. :type struc_files: {str: (int, int)} """ # Note, this is currently tested in stage/simulate_test.py and in # gcmc_utils_test.py. # The tests in the former should be moved to the latter. import schrodinger.application.desmond.packages.topo as topo num_water_atoms = [v[1] for k, v in struc_files.items()] minium_water_atoms = min(num_water_atoms) corners = np.array(list(itertools.product(*[[-.5, .5]] * 3))) def eight_corners(box): """ return list of position vectors of eight box corners """ return corners.dot(box) for inp_fname in struc_files: water_ct_idx, num_water_atoms = struc_files[inp_fname] num_water2_del = (num_water_atoms - minium_water_atoms) // 3 if num_water2_del == 0: continue # the cms model shouldn't have any inactive atoms so we needn't worry # about the inactive atoms argument to read_cms msys_model, cms_model = topo.read_cms(inp_fname) first_water_aid = 1 for comp_idx in range(water_ct_idx): first_water_aid += cms_model.comp_ct[comp_idx].atom_total # make sure we only select water molecules in the ct with the given # `water_ct_idx` num_solvent_atoms = cms_model.comp_ct[water_ct_idx].atom_total last_water_aid = first_water_aid + num_solvent_atoms - 1 gid_offset, last_water_gid = topo.aids2gids( cms_model, [first_water_aid, last_water_aid], include_pseudoatoms=False) water_oxygen = [ a.id for a in msys_model.select("water and oxygen") if gid_offset <= a.id <= last_water_gid ] solute = [a.id for a in msys_model.select("(not solvent) or ions")] box = msys_model.cell all_pos = np.array(msys_model.positions, dtype=np.float32) all_corners = eight_corners(box) del_oxygen_gids = set() for i in range(num_water2_del): water_del = None for c_idx in range(8): pos = all_corners[(i + c_idx) % 8] min_dist = 999.0 min_d_oid = msys_model.natoms + 10 d_water_contact = [] if del_oxygen_gids: d_water_contact = msys_model.findContactIds( 4.0, ids=list(del_oxygen_gids), other=water_oxygen) for o_id in set(water_oxygen) - set(d_water_contact): dist = np.linalg.norm(all_pos[o_id] - pos) if dist < min_dist: min_dist = dist min_d_oid = o_id solute_contact = msys_model.findContactIds(10.0, ids=[min_d_oid], other=solute) if len(solute_contact) > 0: continue else: water_del = min_d_oid break if water_del is not None: del_oxygen_gids.add(water_del) water_oxygen.remove(water_del) else: print("ERROR: cannot find water to delete for %s" % inp_fname) return comp_oxygen_idx = [gid - gid_offset + 1 for gid in del_oxygen_gids] water_ct = cms_model.comp_ct[water_ct_idx] n_virtuals = len(water_ct.ffio.virtual) offset_from_end = 3 del_atoms = [] del_pseudo = [] for oxygen_idx in comp_oxygen_idx: del_atoms.extend([oxygen_idx + j for j in range(3)]) water_number = (oxygen_idx + 2) // 3 del_pseudo.extend([ water_number * n_virtuals + vir_idx for vir_idx in range(n_virtuals) ]) offset_from_end += 3 if n_virtuals: water_ct.ffio.deletePseudos(del_pseudo) water_ct.deleteAtoms(del_atoms) cms_model.ffio_refresh() cms_model.synchronize_fsys_ct() cms_model.write(inp_fname + '_') shutil.move(inp_fname + '_', inp_fname)