Source code for schrodinger.application.desmond.packages.topo

"""
Functionalities to handle molecular topologies

Copyright Schrodinger, LLC. All rights reserved.
"""

import copy
import os
from collections.abc import Iterable

import numpy

import schrodinger.application.desmond.packages.msys.pfx as pfx_module
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import constants
from schrodinger.application.desmond.packages import msys
from schrodinger.infra import mm

# - We face two topology models: cms and msys.
# - The cms topology model is more compatible with all Schrodinger stuff,
#   whereas the msys model is with the Desmond backend and the trajectory data
#   model.
# - A major goal of this module is to bridge the two models.


[docs]class DuckFrame: """ A duck-type frame with limited interface. We can duck-type a msys model (i.e., a `msys.System` object) into a `DuckFrame` object, example:: dfr = DuckFrame(msys_model) Changes on the `DuckFrame' object should NOT affect the original `model` object. """ # FIXME: I only implemented the interface for what is needed now. We may # need enhancements down the road, but that should be easy. # -YW (Jun 2, 2016)
[docs] def __init__(self, model): """ Copy the coordinates, the velocities, and the box matrix of the original model. FIXME: We only support `msys.System` type of `model` for now. """ if isinstance(model, msys.System): self._pos = model.getPositions() self._vel = model.getVelocities() self._box = model.getCell() self._natoms = model.natoms self._nactive = model.nactive else: NotImplementedError("Unsupported model type: %s" % type(model))
[docs] def pos(self, i=None): """ Return the position vector(s). i can be any of the following types: - `None`: Return the whole position-vector array; - `int` : `i` should be a GID, and this function return position vector of the particle `i`. - `Iterable`: `i` should be a "list" of GIDs, and this function returns a NEW numpy array of position-vectors of the specified particles. """ i = numpy.asarray(i) if isinstance(i, Iterable) else i return self._pos if i is None else self._pos[i]
[docs] def vel(self): return self._vel
@property def natoms(self): return self._natoms @property def nactive(self): return self._nactive @property def box(self): """ Return a row-majored 3x3 matrix. The primitive cell vectors are the rows of the matrix, which should be consistent with `traj.Frame.box`. """ return self._box
[docs]def cms_atom(cms_model): """ Returns an iterator through all atoms in a CMS model. At each iteration, we get a tuple of `(fsys_atom, comp_atom, comp_ct, ct_index)`, where - fsys_atom: atom in full-system CT - comp_atom: atom in component CT - comp_ct: component CT to which `comp_atom` belong - ct_index: index of the component CT The iterator breaks right before returning any inactive atoms, unlike the function below that returns indices of all atoms (including inactive atoms). """ i = 1 for ct_index, ct in enumerate(cms_model.comp_ct): for a in ct.atom: if i > cms_model.atom_total: # We have iterated through all active atoms. break yield (cms_model.atom[i], a, ct, ct_index) i += 1
[docs]def cms_atom_index(cms_model): """ Returns an iterator through all atom indices in a CMS model. At each iteration, we get a tuple of `(fsys_atom_index, comp_atom_index, comp_ct, ct_index)`, where - fsys_atom_index: atom index in full-system CT - comp_atom_index: atom index in component CT - comp_ct: component CT to which `comp_atom_index` belong - ct_index: index of the component CT This function differs from the above in that it returns indices of all atoms including inactive atoms. Note that the `fsys_atom_index` for inactive atoms is merely a "projected" index as the fullsystem CT doesn't really contain any inactive atoms. """ fsys_idx = 1 for ct_index, ct in enumerate(cms_model.comp_ct): for comp_atom_index in range(1, ct.atom_total + 1): yield (fsys_idx, comp_atom_index, ct, ct_index) fsys_idx += 1
[docs]def aid_match(cms_model): """ :rtype: 1D '1-indexed' `numpy.ndarray` of ints :return: Returns an array of atom indices. Index = 1-based atom index as in the full-system CT, value = index of the matched atom (0 means not matched). """ match = numpy.zeros(cms_model.comp_atom_total + 1, dtype=int) if cms_model.ref_ct and cms_model.mut_ct: aid_delta = [ 0, ] for ct in cms_model.comp_ct: aid_delta.append(aid_delta[-1] + ct.atom_total) ref_ct = cms_model.ref_ct mut_ct = cms_model.mut_ct ref_ct_index = cms_model.comp_ct.index(ref_ct) mut_ct_index = cms_model.comp_ct.index(mut_ct) ref_aid_delta = aid_delta[ref_ct_index] mut_aid_delta = aid_delta[mut_ct_index] for atom in mut_ct.atom: aid = mut_aid_delta + atom.index matched_aid = atom.property[constants.FEP_MAPPING] if matched_aid: matched_aid += ref_aid_delta match[aid] = matched_aid match[matched_aid] = aid return match
[docs]def pseudoatom_match(msys_model, cms_model): """ This function will find the match between pseudoatoms and return it as two lists of pseudoatom indices. The first list is for the reference CT, and the second for the mutant CT. :: match = pseudoatom_match(msys_model, cms_model) j = match[0][i] where `j` is the matched pseudoatom's index in the ffio_pseudo block of the mutant CT, `i` is the pseudoatom's index in that of the reference CT. If i-th pseudoatom is not matched, j's value is zero. `match[0][0]` is always 0, which is junk and should be ignored. :: j = match[1][i] is similar, except that `i` is of the mutant CT, and `j` is of the reference CT. For non-alchemical-FEP systems, this function returns `[]`. :type msys_model: `msys.System` :type cms_model: `cms.Cms`. This object should be created by the `read_cms(...)` function (see below). :rtype: `[list[int], list[int]]` or `[]` """ if cms_model.fep_ct is None: return [] # The system may contain many more pseudoatoms than what we care about; for # example, the system may use TIP4P water model and then have millions of # pseudoatoms, and also the protein molecules may have pseudoatoms. We # must NOT include those in our returned list. So we need to first figure # out a rough range of the pseudoatoms and then get pseudatom match for # those within the range. ref_ct = cms_model.ref_ct mut_ct = cms_model.mut_ct ref_ct_index = cms_model.comp_ct.index(ref_ct) mut_ct_index = cms_model.comp_ct.index(mut_ct) # We should NOT assume which CT comes first. first_ct_index = min(ref_ct_index, mut_ct_index) last_ct_index = max(ref_ct_index, mut_ct_index) + 1 atom_totals = [ct.atom_total for ct in cms_model.comp_ct] first_aid = 1 + sum(atom_totals[:first_ct_index]) first_gid = cms_model.gid(first_aid) if len(cms_model.comp_ct) <= last_ct_index: # If `ref_ct' or `mut_ct' is the last CT (e.g., in vacuum alchemical # FEP simulations), this is the correct way to get `last_gid'. last_gid = msys_model.natoms else: last_aid = 1 + sum(atom_totals[:last_ct_index]) last_gid = cms_model.gid(last_aid) raw_match = [ msys_model.atom(i)["fep_mapping"] for i in range(first_gid, last_gid) if msys_model.atom(i).atomic_number == 0 ] mut_pseudomatch = [0] * (len(mut_ct.ffio.pseudo) + 1) ref_pseudomatch = [0] for i in range(len(ref_ct.ffio.pseudo)): j = raw_match[i] ref_pseudomatch.append(j) if j > 0: mut_pseudomatch[j] = i + 1 return [ref_pseudomatch, mut_pseudomatch]
# FIXME: this action should happen somewhere before using the protein-ligand # interaction analyzers if constants.ORIGINAL_INDEX is missing in cms_model
[docs]def set_original_index_property(cms_model): """ For each atom in the full-system CT, add an atom-level property: constants.ORIGINAL_INDEX, and set its value to the atom's ID. """ for a in cms_model.fsys_ct.atom: a.property[constants.ORIGINAL_INDEX] = a.index for a in cms_model._raw_fsys_ct.atom: a.property[constants.ORIGINAL_INDEX] = a.index
[docs]def find_traj_path(cms_model, base_dir=None): """ Return path of the trajectory file or dir, whose name was saved into a CT-level property in the output cms file. Return `None` is valid path cannot be found. Note that this function is temporary. In the long run the coupling between trajectory directory and cms file will be removed. The user should be explicit about where the trajectory file/folder is. If you are developing a new command line tool, please do not rely on this function to get the trajectory path. Instead, explicitly pass in the trajectory path. See an example below: $SCHRODINGER/run analyze_simulation.py -h :type cms_model: `cms.Cms` :type base_dir: `str` :rtype: `str` or `None` """ try: traj_path = cms_model.property['s_chorus_trajectory_file'] except KeyError: return None traj_path = cms.fix_idx_traj_path(traj_path) if base_dir: traj_path = os.path.join(base_dir, os.path.basename(traj_path)) if os.path.exists(traj_path): return traj_path
[docs]def find_traj_path_from_cms_path(cms_path): """ Return path of the trajectory directory/file that is "associated" with the given CMS file. This works only if both are in the same parent directory. :param cms_path: Path to the CMS file :type cms_path: `str` :return: Path to the trajectory directory or file. :rtype: `str` or `None` """ if not os.path.isfile(cms_path): return None st = cms.structure.StructureReader.read(cms_path) return find_traj_path(st, os.path.dirname(cms_path))
[docs]def read_cms(fname=None, from_string=None, remove_inactive_atoms_in_fsys=True): """ Read a .cms file from the given file name `fname`, or from a string buffer. How does this differ from `cms.Cms(fname)`? - Here, two models will be created for the given .cms file. So you will get 2-tuple return value: The first member is the msys model, and the second is the cms model. - The cms model returned by this function will give you correct gids, whereas the model returned by `cms.Cms(fname)` might not do so when there are virtual sites. - The cms model returned by this function will have three extra attributes: - `pseudoatoms` This is a map from the gid of a physical atom to that of its pseudoatom(s). - `pseudomatch` This is pseudoatom match between the two alchemical molecules. (See the docstring of `pseudoatom_match` for detail) Will we unite this with `cms.Cms(fname)`? Maybe. But I don't see a strong motivation right now, and I don't want to overengineer. -YW (May 26, 2016) FIXME: `msys.LoadMAE(...)` is unbelievably slow and may take up to 10 sec for typically-sized alchemical FEP systems. """ assert bool(fname) ^ bool(from_string) if fname: model0 = msys.LoadMAE(path=fname) model1 = cms.Cms( file=fname, remove_inactive_atoms_in_fsys=remove_inactive_atoms_in_fsys) else: model0 = msys.LoadMAE(buffer=from_string.encode()) model1 = cms.Cms( string=from_string, remove_inactive_atoms_in_fsys=remove_inactive_atoms_in_fsys) if model1.nactive_gids >= 0: model0.nactive = model1.nactive_gids # The gids in `model1' is incorrect when there are pseudoatoms, and it's # unfortunately very difficult and error-prone to get the correct gids. # Here we use the msys model to help us to fix the gids in the cms model. # The fix here is a lot simpler and more robust. cms_comp_atom_total = model1.comp_atom_total gid = numpy.ones(cms_comp_atom_total + 1, dtype=int) * numpy.iinfo(int).min # `gid[0]` is junk and ignored. match = aid_match(model1) curr_gid = 0 pseudoatoms = {} # Value = GIDs of pseudoatoms; key = GID of host atom for aid in range(1, cms_comp_atom_total + 1): if gid[aid] >= 0: continue a = model0.atom(curr_gid) while a.atomic_number < 1: # skip pseudoatoms for b in a.bonded_atoms: pseudoatoms.setdefault(b.id, []).append(a.id) curr_gid += 1 a = model0.atom(curr_gid) gid[aid] = curr_gid matched = match[aid] if matched: gid[matched] = curr_gid curr_gid += 1 while curr_gid < model0.natoms: a = model0.atom(curr_gid) if a.atomic_number < 1: for b in a.bonded_atoms: pseudoatoms.setdefault(b.id, []).append(a.id) else: raise RuntimeError("Inconsistent number of atoms between cms and" " msys models") curr_gid += 1 pseudoatoms = {k: sorted(v) for k, v in pseudoatoms.items()} model1.pseudoatoms = pseudoatoms # don't let anyone else muck it up since it's directly exposed gid.flags.writeable = False model1.gid_map = gid # Must be called after the `gid' is correctly set for `model1'. model1.pseudomatch = pseudoatom_match(model0, model1) # a list that contains all AIDs which also have pseudoatoms. model1.aids_with_vs = get_aids_with_virtuals(model1) set_original_index_property(model1) return model0, model1
[docs]def extract_subsystem(cms_model, asl): """ We need to clarify on what we call `subsystem` here. First, let's review the hierarchical structure of a chemical system in the cms model: - At the most top level is the whole system. - Under that are a number of CTs (connectivity tables). Each CT contains one molecule (usually in the case of macromolecules), or more (the most obvious example is the water CT, which include thousands of water molecules). - Under that are chains, which belong to the same molecule but not necessarily covalently bonded. - Under that are residues, groups, atoms, pseudoatoms, ..., which we don't have to go into the detail. Despite of the fact that there are already a lot of concepts to grasp, it's still sometimes inadequate to describe a portion of the system. The problem arises mainly due to the fact that the cms model contains force field data: When we do atom selections, we have to keep that in mind, we won't be able to create a valid .cms file if we select a number of atoms that don't match the force field data. To address that problem, we introduce an extra concept that sits between the CT and the molecule levels in the hierarchy. We can call it `instance`, and it refers to a single complete substructure that matches the ffio. For example, in a protein CT, the ffio describe the whole protein structure, and so the instance there is the whole protein, and the CT contains only 1 instance; whereas in a water CT, the ffio describes only a single water molecule, and thus the instance is a single water molecule, and the CT contains a number of instances. With that, we now define a subsystem as follows: A subsystem is a portion of the original system, containing 1 or more instances of any types as defined in the original system. By this definition, a subsystem should always be a valid cms model. We allow users to specify a subsystem using ASL, but how do we deal with cases where the ASL expression doesn't include a complete set of atoms in their respective instances? For now, we will simply automatically expand the selection to the whole instance. This is good enough for most cases, I think. Of course, in future we should have no problem to be more sophisticated in deciding to expand or shrink the selection. :type cms_model: `cms.Cms` :param cms_model: A cms model generated by `read_cms` (see above). Won't be mutated. :type asl: `str` :param asl: ASL expression to specify the subsystem. :rtype: (new-cms-model, seletion-in-aid) :return: Returns a new `cms.Cms` object (the input `cms_model` will not be mutated), and an unsorted list of aids of the selected atoms. The new `cms.Cms` is guaranteed to contain no inactive atoms. """ # Basically, we make a copy of the original cms model and then delete the # unselected atoms. We cannot call the L{structure.Structure.extract} # function to create a new L{structure.Structure} object because that # doesn't copy the ffio_ff and fepio_fep blocks. cms_model = copy.copy(cms_model) asl = "fillmol(%s)" % asl sel = cms_model.select_atom_comp(asl) atom_index_fix = [] accu_index = 0 for comp_ct in cms_model.comp_ct: atom_index_fix.append(accu_index) accu_index += comp_ct.atom_total expanded_sel = [] new_comp_ct = [] for i, (atom_list, ct) in enumerate(zip(sel, cms_model.comp_ct)): if atom_list: new_comp_ct.append(ct) # The following 3 scenarios can all happen: # 1. atom_total == num_site # 2. atom_total > num_site # 3. atom_total < num_site # Also, `atom_total' might or not be divisible by `num_site'. # Sigh, this is a big mess, we have to clear it first: What we will # do is to not count the pseudo atoms in the ffio.site into # `num_site', and then we make it true that `atom_total' is # divisible by `num_site', which simplifies things a LOT! num_atom = len(atom_list) num_site = len([e for e in ct.ffio.site if e.type != "pseudo"]) num_instance = ct.atom_total // num_site selected_instances = set([0]) if 1 == num_instance else \ set((e - 1) // num_site for e in atom_list) if num_instance == len(selected_instances): first = atom_index_fix[i] + 1 last = first + ct.atom_total selected_atoms = list(range(first, last)) else: selected_atoms = [] for j in selected_instances: first = j * num_site + 1 last = first + num_site selected_atoms.extend(list(range(first, last))) del_atoms = set(range(1, ct.atom_total + 1)) - \ set(selected_atoms) ct.deleteAtoms(del_atoms) # Finds out the pseudoatoms associated with the deleted atoms. # These pseudoatoms need to be deleted, too. num_pseudo_per_instance = len(ct.ffio.site) - num_site pseudo = [] if num_pseudo_per_instance == 1: # A much faster algorithm for the special case pseudo = list( set((e - 1) // num_site + 1 for e in del_atoms)) elif num_pseudo_per_instance > 1: # Slower general algorithm # Finds out the deleted instances. del_instances = {(ai - 1) // num_site for ai in del_atoms} # From the deleted instances, we figure out the deleted # pseudoatom indices in the ffio.pseudo block. pseudo_indices_per_instance = numpy.arange( 1, 1 + num_pseudo_per_instance, dtype=numpy.int32) for instance_index in del_instances: index_delta = instance_index * num_pseudo_per_instance pseudo.extend( list(index_delta + pseudo_indices_per_instance)) pseudo.sort(reverse=True) for j in pseudo: del ct.ffio.pseudo[j] selected_atoms = [ ai + atom_index_fix[i] for ai in selected_atoms ] expanded_sel.extend(selected_atoms) # Saves a reference for later clean up. fep_ct = cms_model.fep_ct cms_model.fep_ct = None cms_model.ref_ct = None cms_model.mut_ct = None cms_model.comp_ct = new_comp_ct orig_active_total = cms_model.active_total # This model does not contain inactive atoms as its component CTs come from the # selection of cms_model.select_atom_comp(), which filters out inactive atoms. cms_model.active_total = cms_model.comp_atom_total # This is to avoid syncing twice since `active_total` setter may sync. # It also triggers sync when comp_atom_total after extraction happens to be # equal to active_total (it is the reverse of the Cms.active_total setter # logic). if cms_model._show_inactive or orig_active_total == cms_model.comp_atom_total: cms_model.synchronize_fsys_ct() cms_model.ffio_refresh() # Ensures correctness of attributes: `.fep_ct', `.ref_ct', and `.mut_ct'. if cms_model.mut_ct and \ (None, None) == (cms_model.fep_ct, cms_model.ref_ct): # Total FEP # Let's check if `.mut_ct' has "i_ffio_grp_ligand" = 1 atom property. num_atom = len(a for a in cms_model.mut_ct.atom if 1 == a.property[constants.FEP_ABSOLUTE_LIGAND]) if num_atom == 0: # No, then it's not really a CT for total free energy FEP. cms_model.mut_ct = None elif None in [cms_model.ref_ct, cms_model.mut_ct]: # Alchemical FEP cms_model.mut_ct = None cms_model.ref_ct = None if cms_model.fep_ct: cms_model.fep_ct.fepio = None cms_model.fep_ct = None if cms_model.mut_ct is None: # For a non-FEP model, ensures it doesn't contain FEP-specific CT # properties. for ct in cms_model.comp_ct: ct.property.pop(constants.FEP_FRAGNAME, None) ct.property.pop(constants.FEPIO_STAGE, None) if cms_model.fep_ct is None and fep_ct is not None: cms.delete_fepio(fep_ct) # FIXME: gid_refresh() may NOT work for models with virtual sites. cms_model.gid_refresh() return cms_model, expanded_sel
[docs]def matched_gids(cms_model, sub_cms, sub_msys): """ Return GIDs of `cms_model` that match the GIDs of `sub_cms` and preserve the ordering, which is essential for trajectory extraction/reduction. :param cms_model: Original system where `subsystem` was extracted from :type cms_model: `cms.Cms` :param sub_cms: A subsystem extracted from `cms_model`. The atoms are expected to have a property called `i_des_orig_aid`, and the values should be the AIDs in `cms_model`. :type sub_cms: `cms.Cms` :param sub_msys: This object must match `sub_cms`. :type sub_msys: `msys.System` @rtype: `list` of `int` """ # All this trouble originates from the fact that the AID to GID map is # neither onto (pseudoatoms), nor 1-to-1 (FEP core atoms). # Here the (retrospective) strategy is to use the reloaded subsytem which # contains the desired GID order to figure out the original cms_model's GID # order for extraction. sub_gids_count = sub_msys.natoms # There are three mappings involved: # subsystem GID # -> (subsystem AID, idx) map1: onto but not 1-to-1 # -> (original-system AID, idx) map2: 1-to-1 and onto # -> original-system GID map3: 1-to-1 but most likely not onto # Here idx is to identify pseudoatoms associtated with the AID. # There are two situations when map1 is not 1-to-1: # (Yuqing confirmed that pseudoatoms associated with dummy atoms are not # mapped, i.e., same GID, different parent AIDs, and different parent GIDs # does not occur.) # 1. FEP core physical atoms (same GID, different AIDs) # 2. FEP core mapped pseudoatoms (same GID, different parent AIDs, # but same parent GID) # To make it 1-to-1, we pick either AID. This does not affect the uniqueness # of the output of map3. # Another assumption here is that sibling pseudoatoms' GID order does not # change after extraction. # FIXME: Use a variable (constant) for 'i_des_orig_aid', no string literal. sub_gid_2_orig_aid = { gid: sub_cms.atom[aid].property['i_des_orig_aid'] for aid, gid in enumerate(sub_cms.allaid_gids, start=1) } pseudoatom_gid_2_parent_gid = {} for parent_gid, pseudoatom_gids in sub_cms.pseudoatoms.items(): for idx, gid in enumerate(pseudoatom_gids): pseudoatom_gid_2_parent_gid[gid] = (cms_model.gid( sub_gid_2_orig_aid[parent_gid]), idx) orig_gids = [] for sub_gid in range(sub_gids_count): try: orig_gid = cms_model.gid(sub_gid_2_orig_aid[sub_gid]) except KeyError: # sub_gid must be a pseudoatom orig_parent_gid, idx = pseudoatom_gid_2_parent_gid[sub_gid] orig_gid = cms_model.pseudoatoms[orig_parent_gid][idx] orig_gids.append(orig_gid) return orig_gids
[docs]def comp_atoms(cms_model): """ A coroutine to iterate through all atoms in all component CTs of the `cms_model`. :type cms_model: `cms.Cms` """ for ct in cms_model.comp_ct: for a in ct.atom: yield a
[docs]def update_ct_box(ct, box): """ Given a `ct`, set the following CT-level properties using the values from `box`:: "r_chorus_box_ax", "r_chorus_box_ay", "r_chorus_box_az", "r_chorus_box_bx", "r_chorus_box_by", "r_chorus_box_bz", "r_chorus_box_cx", "r_chorus_box_cy", "r_chorus_box_cz", :type box: Any type that supports double subscriptions like `box[0][1]`, where both indices are zero-based. :rtype: `structure.Structure` :return: The same input `ct` object, with its "r_chorus_box_*" CT-level properties updated. """ PREFIX = "r_chorus_box_" for i, a in enumerate("abc"): for j, x in enumerate("xyz"): ct.property[PREFIX + a + x] = box[i][j] return ct
[docs]def update_ct(ct, cms_model, fr, allaid_gids=None, is_fullsystem=True, update_vel=False): """ Updates coordinates and simulation-box-matrix of the input `ct` with the given trajectory frame. Here `cms_model` and `fr` should correspond to the same simulation, and `ct` could be full system CT, component CT, or other subsystems extracted from the system. Note GCMC systems with multiple types of solvent are not supported. :type ct: `structure.Structure` :param ct: structure to be updated :type fr: `traj.Frame` or `DuckFrame` :type cms_model: `cms.Cms` :param cms_model: It should be generated by the `read_cms` function. :param allaid_gids: GIDs of all AIDs :type allaid_gids: `numpy.ndarray`, or `list` of `int` or `None`. The caller needs to make sure that its length and order match the atoms in `ct`. `None` means `cms_model.allaid_gids` is used. :param is_fullsystem: whether or not this represents a fullsystem CT. Knowing this can keep us from doing extra work :type is_fullsystem: `bool` :param update_vel: Update the atom velocities. :type update_vel: `bool` :rtype: `structure.Structure` :return: The same input `ct` object, with the atom coordinates and simulation box matrix updated from the frame. Example: fsys_ct = cms_model.fsys_ct.copy() update_ct(fsys_ct, cms_model, tr[i]) """ if allaid_gids is not None: assert len(allaid_gids) == ct.atom_total, \ "Atom index array does not match atom count." gids = allaid_gids else: # Only works for single solvent block with inactive atoms gids = cms_model.allaid_gids[:ct.atom_total] if is_fullsystem and fr.natoms == cms_model.comp_atom_total and \ cms_model.ref_ct is None: # For simulation systems without pesudo-atoms and without FEP, the AIDs # and the GIDs have one-to-one correspondence. This enables a much # faster atom coordinate update. ct.setXYZ(fr.pos()[:ct.atom_total]) else: if allaid_gids is None and not is_fullsystem: raise ValueError("allaid_gids must be supplied when ct is not a " "full system ct") ct.setXYZ(fr.pos(gids)) update_ct_box(ct, fr.box) vel = fr.vel() if update_vel and vel is not None: allaid_vel = vel[gids] for a, v in zip(ct.atom, allaid_vel): set_atom_velocity(a, v) return ct
[docs]def update_fsys_ct_from_frame_GF(fsys_ct, cms_model, fr, frames_to_smooth=None, aids_to_smooth=None, update_vel=False): """ This function is the future version of `update_fsys_ct_from_frame` and the difference is that `fsys_ct` will be inactive-free. """ orig_pos = None # temporally write the smoothed positions into the frame if frames_to_smooth and len(frames_to_smooth) > 1: assert fr in frames_to_smooth, \ "The frames for smoothing should contain the frame of interest." non_solvent_gids = asl2gids(cms_model, 'not solvent', False) if aids_to_smooth is None: gids = non_solvent_gids else: # FIXME: The input cms_model is not in sync with the input fr. # It may be an overkill to call cms_model.set_nactive(fr) # here though. Maybe we just dis-allow averaging on solvent # and filter them out here? gids = aids2gids(cms_model, aids_to_smooth, False) if set(gids) - set(non_solvent_gids): print( 'WARNING: Solvent atoms are selected for smoothing. It may not work for GCMC trajectories.' ) if gids: allpos = numpy.empty([len(frames_to_smooth), len(gids), 3]) for i, fr_s in enumerate(frames_to_smooth): allpos[i] = fr_s.pos(gids) pos = fr.pos() orig_pos = pos[gids] pos[gids] = allpos.mean(axis=0) # TODO: Maybe all logic below should go into update_ct() at later point active_total = cms_model.active_total_from_nactive_gids( fr.nactive, fr.natoms) fsys_ct_atom_total = fsys_ct.atom_total comp_atom_total = cms_model.comp_atom_total if fsys_ct_atom_total != active_total: solvent_cts = cms_model.get_solvent_cts() if len(solvent_cts) > 1: raise ValueError("Only one gcmc solvent is currently supported") solvent_ct = solvent_cts[0] if fsys_ct_atom_total < active_total: other_atom_total = comp_atom_total - solvent_ct.atom_total solvent_active_total = active_total - other_atom_total fsys_ct_atom_total_in_solvent = fsys_ct_atom_total - other_atom_total solvent_extraction_indices = range(solvent_active_total, fsys_ct_atom_total_in_solvent, -1) extracted = solvent_ct.extract(solvent_extraction_indices, copy_props=True) # perform the equivalent of mark_fullsystem_ct in cms.py on new # atoms only solvent_ct_type = solvent_ct.property[constants.CT_TYPE] solvent_ct_index = cms_model.comp_ct.index(solvent_ct) + 1 for atom in extracted.atom: atom.property[constants.CT_TYPE] = solvent_ct_type atom.property[constants.CT_INDEX] = solvent_ct_index fsys_ct.extend(extracted) elif fsys_ct_atom_total > active_total: fsys_ct.deleteAtoms(range(fsys_ct_atom_total, active_total, -1)) update_ct(fsys_ct, cms_model, fr, allaid_gids=cms_model.gid_map[1:active_total + 1], update_vel=update_vel) if orig_pos is not None: # remove side effect on the input fr pos[gids] = orig_pos
[docs]def update_fsys_ct_from_frame(fsys_ct, cms_model, fr, frames_to_smooth=None, aids_to_smooth=None): """ Update a full system CT using a frame object, including atom positions, simulation box, atom-level properties i_des_atom_domain and i_m_visibility. If inactive atoms are present, they are labelled and set invisible. If `frames_to_smooth` is given, the smoothed coordinates are used to update the full system CT's coordinates instead. :type frames_to_smooth: `list` of `traj.Frame` :param frames_to_smooth: Frames whose atom coordinates are to be smoothed to update the atom coordinates in `fsys_ct`. Note it should contain `fr`. :type aids_to_smooth: `list` of `int` or `None` :param aids_to_smooth: AIDs of atoms whose coordinates are to be smoothed. If `None`, default to non-solvent atoms. """ if not cms_model._show_inactive: update_fsys_ct_from_frame_GF(fsys_ct, cms_model, fr, frames_to_smooth, aids_to_smooth) return orig_pos = None # temporally write the smoothed positions into the frame if frames_to_smooth and len(frames_to_smooth) > 1: assert fr in frames_to_smooth, \ "The frames for smoothing should contain the frame of interest." if aids_to_smooth is None: gids = asl2gids(cms_model, 'not solvent', False) else: gids = aids2gids(cms_model, aids_to_smooth, False) if gids: allpos = numpy.empty([len(frames_to_smooth), len(gids), 3]) for i, fr_s in enumerate(frames_to_smooth): allpos[i] = fr_s.pos(gids) pos = fr.pos() orig_pos = pos[gids] pos[gids] = allpos.mean(axis=0) update_ct(fsys_ct, cms_model, fr) if orig_pos is not None: # remove side effect on the input fr pos[gids] = orig_pos # label inactive atoms if fr.natoms != fr.nactive: active_total = cms_model.active_total_from_nactive_gids( fr.nactive, fr.natoms) first_inactive_aid = active_total + 1 # Only works for single solvent first_solvent_aid = cms_model.comp_atom_total - \ cms_model.comp_ct[-1].atom_total + 1 for i in range(first_solvent_aid, first_inactive_aid): fsys_ct.atom[i].property[ constants. DES_ATOM_DOMAIN] = constants.DES_ATOM_DOMAIN.VAL.SOLVENT for i in range(first_inactive_aid, cms_model.comp_atom_total + 1): fsys_ct.atom[i].property[ constants. DES_ATOM_DOMAIN] = constants.DES_ATOM_DOMAIN.VAL.INACTIVE
# FIXME: this function will become obsolete when inactive-free fsys_ct is default
[docs]def get_active_fsys_ct_from_frame(fsys_ct, cms_model, fr): """ Return a `Structure` object that contains only the active atoms from the given frame `fr`. Note that the atom coordinates in the input `fsys_ct` are updated with those from the frame. For GCMC simulations, this active full system CT is extracted from the input full system CT to exclude inactive (inactive) atoms. A new Structure object will be returned in this case. For normal MD simulations, the active full system CT is the entire system, and the updated `fsys_ct` object will be returned. This function has the same interface as `update_ct`. :param fsys_ct: full system CT :type fr: `traj.Frame` """ if not cms_model._show_inactive: update_fsys_ct_from_frame_GF(fsys_ct, cms_model, fr) return fsys_ct update_ct(fsys_ct, cms_model, fr) if fr.natoms != fr.nactive: cms_model.set_nactive_gids(fr.nactive, fr.natoms) active_aids = numpy.arange(1, cms_model.active_total + 1) return fsys_ct.extract(active_aids) else: return fsys_ct
[docs]def set_atom_velocity(atom, vel): """ This function will set the following atom properties:: r_ffio_x_vel r_ffio_y_vel r_ffio_z_vel with `vel[0]`, `vel[1]`, and `vel[2]`, respectively. """ atom.property["r_ffio_x_vel"] = vel[0] atom.property["r_ffio_y_vel"] = vel[1] atom.property["r_ffio_z_vel"] = vel[2]
[docs]def update_cms_physical(cms_model, pos, vel, box): """ Update the physical data of a cms model. The physical data include all atom positions and velocities and also the box matrix. :type cms_model: `cms.Cms` :param cms_model: The cms model to be updated. This model should be created by the `read_cms` function, and its atom count and order should match to the `pos`, `vel` inputs. :type pos: `numpy.ndarray`. 3xN matrix, where N is number of atoms. :type vel: `numpy.ndarray`. 3xN matrix, where N is number of atoms. :type box: `numpy.ndarray`. 3x3 matrix whose ROWS are primitive cell vectors. """ allaid_pos = pos[cms_model.allaid_gids] cms_model.setXYZ(allaid_pos[:cms_model.atom_total]) cms_model._raw_fsys_ct.setXYZ(allaid_pos[:cms_model.atom_total]) start = 0 for ct in cms_model.comp_ct: end = start + ct.atom_total ct.setXYZ(allaid_pos[start:end]) start = end if vel is not None: atom0 = cms_model.atom atom1 = cms_model._raw_fsys_ct.atom atom2 = comp_atoms(cms_model) allaid_vel = vel[cms_model.allaid_gids] # zip truncates to smallest length so we needn't worry about mismatch # between comp_atom_total and atom_total for a0, a1, a2 in zip(atom0, atom1, atom2): v = allaid_vel[a0.index - 1] set_atom_velocity(a0, v) set_atom_velocity(a1, v) set_atom_velocity(a2, v) for ct in [cms_model, cms_model._raw_fsys_ct] + cms_model.comp_ct: update_ct_box(ct, box) cms_model.box = box.flatten()
[docs]def check_consistency(cms_model, frame): """ Return `None` if `cms_model` and `frame` are consistent, or a `str` object describing the inconsistency if they are not consistent. :type cms_model: `cms.Cms` :param cms_model: It should be generated by the `read_cms` function. :type frame: `traj.Frame` """ all_gids = aids2gids(cms_model, range(1, cms_model.comp_atom_total + 1), include_pseudoatoms=True) # FEP systems can have multiple AIDs sharing the same GID. num_gids = len(set(all_gids)) if num_gids != frame.natoms: return f"Number of GIDs is {num_gids} in CMS, whereas {frame.natoms} " \ f"in frame"
[docs]def update_cms(cms_model, frame, update_pseudoatoms=True): """ Update the given `cms_model` with the atom coordinates and the simulation box matrix from a simulation frame `frame`. For GCMC systems, it also updates the CT-level property 'i_des_active_total'. N.B.: If you call this function for every frame of a long trajectory, you might find this function is quite slow. Also keep in mind that its performance has a strong dependency on the number of atoms in the system. If all you want is a full-system CT with atom coordinates updated by the trajectory frame, consider using the `get_active_fsys_ct_from_frame` or `update_ct` functions above, which are much faster. The input `cms_model` and `frame` objects should match. When in doubt, call `check_consistency`. :type cms_model: `cms.Cms` :param cms_model: It should be generated by the `read_cms` function. :type frame: `traj.Frame` or duck type (see `DuckFrame` for the interface requirements). :rtype: `structure.Structure` :return: The same input `cms_model` object, with the atom coordinates and the simulation box matrix updated from the frame. """ # FIXME: Can we make this function run faster? pos = frame.pos() vel = frame.vel() # `vel' could be `None'. box = frame.box update_cms_physical(cms_model, pos, vel, box) cms_model.set_nactive_gids(frame.nactive, frame.natoms) if not update_pseudoatoms: return cms_model # Updates the ffio_pseudo block. Damn, it's complicated. # First, let's figure out number of pseudo atoms in each component CT. has_vel = vel is not None pseudo_counts = [] for ct in cms_model.comp_ct: # Solvent block's ct.ffio.site has only 1 molecule to reduce redundancy. num_site = len([e for e in ct.ffio.site if e.type != "pseudo"]) num_instance = ct.atom_total // num_site pseudo_counts.append((len(ct.ffio.site) - num_site) * num_instance) # Next, let's get the coordinates of all pseudoatoms from the trajectory # frame. To do this, we first get the gids of all pseudoatoms and then use # it as an index-array to get the coordinates. pseudo_gids = sorted(sum(cms_model.pseudoatoms.values(), [])) pseudo_pos = pos[pseudo_gids] pseudo_vel = has_vel and vel[pseudo_gids] # Now, update the ffio_pseudo block with `pseudo_pos' and `pseudo_vel'. # - We have to treat an alchemical FEP system specially because of the # potential matches of pseudoatoms. if cms_model.fep_ct: comp_ct = cms_model.comp_ct i_ref = comp_ct.index(cms_model.ref_ct) i_mut = comp_ct.index(cms_model.mut_ct) mut2ref_pseudomatch = cms_model.pseudomatch[1] num_matched = len([_f for _f in mut2ref_pseudomatch if _f]) num_unmatched_in_mut_ct = pseudo_counts[i_mut] - num_matched k = 0 k_mut_ct = None pseudo_pos_in_ref_ct = ["0th-elem-is-junk"] pseudo_vel_in_ref_ct = ["0th-elem-is-junk"] for i, ct in enumerate(cms_model.comp_ct): if cms_model.fep_ct: if i == i_ref: # Only updates the pseudoatoms in `ref_ct'. for j in range(1, pseudo_counts[i] + 1): a = ct.ffio.pseudo[j] a.x_coord = pseudo_pos[k][0] a.y_coord = pseudo_pos[k][1] a.z_coord = pseudo_pos[k][2] if has_vel: a.x_vel = pseudo_vel[k][0] a.y_vel = pseudo_vel[k][1] a.z_vel = pseudo_vel[k][2] pseudo_vel_in_ref_ct.append(pseudo_vel[k]) pseudo_pos_in_ref_ct.append(pseudo_pos[k]) k += 1 # Now a few elements after the `k'-th are coordinates of the # pseudoatoms in `mut_ct'. Skip them. k_mut_ct = k k += num_unmatched_in_mut_ct elif i == i_mut: # We cannot update for `mut_ct' right now. Because its # pseudoatoms' coordinates might depend on those of the matched # pseudoatoms in `ref_ct', which may be behind `mut_ct' in the # component CT list and thus hasn't been updated at this moment. # We will update `mut_ct' at the end. continue else: for j in range(1, pseudo_counts[i] + 1): a = ct.ffio.pseudo[j] a.x_coord = pseudo_pos[k][0] a.y_coord = pseudo_pos[k][1] a.z_coord = pseudo_pos[k][2] if has_vel: a.x_vel = pseudo_vel[k][0] a.y_vel = pseudo_vel[k][1] a.z_vel = pseudo_vel[k][2] k += 1 if cms_model.fep_ct: k = k_mut_ct ct = cms_model.mut_ct for j in range(1, pseudo_counts[i_mut] + 1): matched_index = mut2ref_pseudomatch[j] if matched_index > 0: pos = pseudo_pos_in_ref_ct[matched_index] vel = has_vel and pseudo_vel_in_ref_ct[matched_index] else: pos = pseudo_pos[k] vel = has_vel and pseudo_vel[k] k += 1 a = ct.ffio.pseudo[j] a.x_coord = pos[0] a.y_coord = pos[1] a.z_coord = pos[2] if has_vel: a.x_vel = vel[0] a.y_vel = vel[1] a.z_vel = vel[2] return cms_model
[docs]def update_msys(msys_model, frame): """ Updates the given msys model `msys_model` with the atom coordinates and velocities and the simulation box matrix from a simulation frame `frame`. :type msys_model: `msys.System` :type frame: `traj.Frame` or duck type (see `DuckFrame` for the interface requirements). :rtype: `msys.System` :return: The same input `msys_model` object, with the atom coordinates, velocities and the simulation box matrix updated from the frame. """ msys_model.setPositions(frame.pos()) msys_model.setVelocities(frame.vel()) msys_model.setCell(frame.box) msys_model.nactive = frame.nactive return msys_model
[docs]def get_aids_with_virtuals(cms_model): """ :type cms_model: `cms.Cms` :return: a set of full-system AIDs that have virtual sites attached to them. :rtype: set of `int` """ atom_ids_with_virtuals = {} for ict, comp_ct in enumerate(cms_model.comp_ct): component_atom_ids = set() if len(comp_ct.ffio.virtual) > 0: # unroll ffio_sites if needed natom = len(comp_ct.ffio.site) - len(comp_ct.ffio.virtual) ncomponent = comp_ct.atom_total // natom for v in comp_ct.ffio.virtual: for offset in range(ncomponent): component_atom_ids.add(v.ai + (offset * natom)) atom_ids_with_virtuals[ict] = component_atom_ids # Full system atom ids that have virtual sites. aids_with_virtuals = set() for full_system_idx, component_idx, _, ct_index in cms_atom_index( cms_model): if component_idx in atom_ids_with_virtuals[ct_index]: aids_with_virtuals.add(full_system_idx) return aids_with_virtuals
[docs]def aids2gids(cms_model, aids, include_pseudoatoms=True): """ Convert a list of atom IDs (`aids`) into a list of gids. If any selected atoms have pseudoatoms associated to them, the pseudoatoms will be included into the list of gids. It puts all pseudoatoms' GIDs after all physical atoms' GIDs with arbitrary ordering. :type cms_model: `cms.Cms` :param cms_model: A cms model generated by `read_cms` (see above). :rtype: `list` of `int` :return: A list of gids of the selected particles """ gids = [cms_model.gid(i) for i in aids] if include_pseudoatoms: gids_set = set(gids) g2a = {k: v for k, v in zip(gids, aids)} # Includes gids of the involved pseudoatoms. for gid_phys, gid_pseudo in cms_model.pseudoatoms.items(): if gid_phys in gids_set and g2a[gid_phys] in cms_model.aids_with_vs: gids.extend(gid_pseudo) return gids
[docs]def asl2gids(cms_model, asl, include_pseudoatoms=True): """ Evaluate an ASL expression, and return a list of gids of particles selected by `asl`. If any selected atoms have pseudoatoms associated to them, the pseudoatoms will be included into the list of gids. :type cms_model: `cms.Cms` :param cms_model: A cms model generated by `read_cms` (see above). :type asl: `str` :param asl: An ASL expression :rtype: `list` of `int` :return: An unsorted list of gids of the selected particles """ aids = cms_model.select_atom(asl) return aids2gids(cms_model, aids, include_pseudoatoms)
def _glue_action(pfx, gids): pfx.glue(gids) def _align_action(pfx, gids, ref_pos=None, weights=None): pfx.align(gids, coords=ref_pos, weights=weights)
[docs]def make_glued_topology(msys_model, cms_model): """ Add glued topology to msys_model, which contains with extra bonds based on proximity of the solute molecules. It works with `_pfx_apply` to properly center disjoint solute molecules (e.g., proteins with missing segments, dimers) The assumption is that the input cms_model has the correct spatial configurations. """ gid_pairs = cms.get_gluepoints(cms_model) msys_model.glue(gid_pairs)
def _pfx_apply(msys_model, tr, *actions): """ Return modified trajectory. Possible actions are `_glue_action` and `_align_action`. All actions utilize {msys_model.glued_topology}. """ pfx = pfx_module.Pfx(msys_model.glued_topology, fixbonds=True) for a in actions: a(pfx) for e in tr: pfx.apply(e.pos(), e.box, e.vel()) return tr
[docs]def make_whole(msys_model, tr): """ In MD simulation, molecules can be broken due to the periodic boundary condition, which makes some atoms be at one side of the simulation box and the other atoms at the opposite side. This function will edit the atom coordinates so to make broken molecules whole again for each frame of the MD trajectory `tr`. :type msys_model: msys.System :param msys_model: The msys model, which must be consistent with the trajectory in terms of the molecular topology. This object will not be mutated. :type tr: `list` :param tr: The simulation trajectory to be modified :rtype: `list` :return: Modified simulation trajectory """ return tr and _pfx_apply(msys_model, tr)
[docs]def glue(msys_model, gids, tr): """ First, make-whole all molecules in the simulation system, and then glue the selected molecules together. :type msys_model: msys.System :param msys_model: The msys model, which must be consistent with the trajectory in terms of the molecular topology. This object will not be mutated. :type gids: `list` :param gids: A list of gids to specify the molecules to be glued :type tr: `list` :param tr: The simulation trajectory to be modified :rtype: `list` :return: Modified simulation trajectory """ return tr and _pfx_apply(msys_model, tr, lambda pfx: _glue_action(pfx, gids))
[docs]def center(msys_model, gids, tr, dims=None): """ This function will do what `glue` does, but it will do one more thing: It will translate the coordinates of all atoms so that the specified molecules will be placed at the center (origin) of the simulation box. :type msys_model: msys.System :param msys_model: The msys model, which must be consistent with the trajectory in terms of the molecular topology. This object will not be mutated. :type gids: `list` :param gids: A list of gids to specify the molecules to be glued and centered :type tr: `list` :param tr: The simulation trajectory to be modified :type dim: `None` or `list` of `int`, e.g., [2] for z axis, [0, 1] for x-y plane. If set to `None`, center on all three spatial dimensions :param dim: dimensions to be centered on :rtype: `list` :return: Modified simulation trajectory """ if dims: all_pos = [fr.pos() for fr in tr] dims_to_keep = list({0, 1, 2} - set(dims)) cached = [pos[:, dims_to_keep] for pos in all_pos] tr and _pfx_apply(msys_model, tr, lambda pfx: _glue_action(pfx, gids), lambda pfx: _align_action(pfx, gids)) if dims: for i, pos_dims_to_keep in enumerate(cached): all_pos[i][:, dims_to_keep] = pos_dims_to_keep return tr
[docs]def superimpose(msys_model, gids, tr, ref_pos, weights=None): """ This function will do what `center` does, but in addition it will align and superimpose all the coordinates in the frame with respect to a reference coordinates (ref_pos). This operation will be applied to all frames in the trajectory. :type msys_model: msys.System :param msys_model: The msys model, which must be consistent with the trajectory in terms of the molecular topology. This object will not be mutated. :type gids: `list` :param gids: A list of gids to specify the molecules to be glued and centered :type tr: `list` :param tr: The simulation trajectory to be modified :type ref_pos: `numpy.array`. 3xN matrix, where N is length of `gids`. :param ref_pos: Reference coordinates used for alignments :type weights: `list` or None :param weights: Atom weights used for alignments. If it's `None`, all atoms will be weighted equally. :rtype: `list` :return: Modified simulation trajectory """ return tr and _pfx_apply( msys_model, tr, lambda pfx: _glue_action(pfx, gids), lambda pfx: _align_action(pfx, gids, ref_pos=ref_pos, weights=weights))
[docs]def make_whole_cms(msys_model, cms_model): """ Similar to `make_whole` (see above), but on a cms model instead of a simulation trajectory. Both `msys_model` and `cms_model` must be previously obtained through the `read_cms` function. They both should have the same atom coordinates and the same simulation box matrix. :rtype: `structure.Structure` :return: Modified cms model """ fr = DuckFrame(msys_model) _pfx_apply(msys_model, [fr]) update_cms(cms_model, fr) return cms_model
[docs]def glue_cms(msys_model, gids, cms_model): """ Similar to `glue` (see above), but on a cms model instead of a simulation trajectory. Both `msys_model` and `cms_model` must be previously obtained through the `read_cms` function. They both should have the same atom coordinates and the same simulation box matrix. :rtype: `structure.Structure` :return: Modified cms model """ fr = DuckFrame(msys_model) _pfx_apply(msys_model, [fr], lambda pfx: _glue_action(pfx, gids)) update_cms(cms_model, fr) return cms_model
[docs]def center_cms(msys_model, gids, cms_model, dims=None): """ Similar to `center` (see above), but on a cms model instead of a simulation trajectory. Both `msys_model` and `cms_model` must be previously obtained through the `read_cms` function. They both should have the same atom coordinates and the same simulation box matrix. :param dim: dimensions to be centered on :type dim: `None` or `list` of `int`, e.g., [2] for z axis, [0, 1] for x-y plane. If set to `None`, center on all three spatial dimensions :rtype: `structure.Structure` :return: Modified cms model """ fr = DuckFrame(msys_model) if dims: pos = fr.pos() dims_to_keep = list({0, 1, 2} - set(dims)) cached = pos[:, dims_to_keep] _pfx_apply(msys_model, [fr], lambda pfx: _glue_action(pfx, gids), lambda pfx: _align_action(pfx, gids)) if dims: pos[:, dims_to_keep] = cached update_cms(cms_model, fr) return cms_model
[docs]def superimpose_cms(msys_model, gids, cms_model, ref_pos, weights=None): """ Similar to `superimpose` (see above), but on a cms model instead of a simulation trajectory. Both `msys_model` and `cms_model` must be previously obtained through the `read_cms` function. They both should have the same atom coordinates and the same simulation box matrix. :rtype: `structure.Structure` :return: Modified cms model """ fr = DuckFrame(msys_model) _pfx_apply( msys_model, [fr], lambda pfx: _glue_action(pfx, gids), lambda pfx: _align_action(pfx, gids, ref_pos=ref_pos, weights=weights)) update_cms(cms_model, fr) return cms_model
[docs]def is_dynamic_asl(cms_model, asl) -> bool: """ Return True if ASL could evaluate to different results on different frames. :type cms_model: `cms.Cms` :type asl: `str` """ asl = cms.Cms.META_ASL.get(asl, asl) mm.mmss_initialize(mm.error_handler) if mm.mmasl_is_dynamic_expression(asl): return True if cms_model.active_total != cms_model.comp_atom_total: aids = cms_model.select_atom(f'({asl}) and solvent') return bool(aids) return False