Source code for schrodinger.application.desmond.cms

"""
Classes and functions for dealing with `*.cms` files.

Copyright Schrodinger, LLC. All rights reserved.

"""

# Contributors: Yujie Wu

import base64
import copy
import json
import math
import os
from collections import defaultdict
from collections import namedtuple
from itertools import combinations
from itertools import product
from itertools import repeat
from past.utils import old_div
from typing import Any
from typing import Dict
from typing import Iterator
from typing import List
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import TypeVar
from typing import Union

import numpy as np

import schrodinger.application.desmond.ffiostructure as ffiostructure
import schrodinger.infra.mm as mm
import schrodinger.structure as structure
import schrodinger.structutils.analyze as analyze
import schrodinger.structutils.measure as measure
from schrodinger.application.desmond import constants
from schrodinger.application.desmond.constants import RestrainTypes

ACTIVE_TOTAL = "i_des_active_total"
NACTIVE_GIDS = "i_des_nactive_gids"
Box = List[float]


[docs]def fix_idx_traj_path(path: str) -> str: """ Return the trajectory path for path string with .idx extension. """ if path.endswith('_trj.idx'): # This is an older CMS file; look for trj directory in the same # location as the idx file (most common case) path = path[:-4] elif path.endswith('-out.idx'): # Another older CMS file format path = path[:-8] + '_trj' return path
[docs]def check_sanity(struc): """ """ fsys_ct = None comp_ct = [] for st in struc: try: if (st.property[constants.CT_TYPE] == constants.CT_TYPE.VAL.FULL_SYSTEM): if (fsys_ct is not None): raise ValueError( f'More than one "{constants.CT_TYPE.VAL.FULL_SYSTEM}" CTs were found' ) fsys_ct = st else: comp_ct.append(st) except KeyError: raise KeyError( f'"{constants.CT_TYPE}" not defined for the CT titled: "{st.title}"' ) if fsys_ct is None: raise ValueError( f'"{constants.CT_TYPE.VAL.FULL_SYSTEM}" CT not found\n') if comp_ct == []: raise ValueError('Component CT(s) not found\n') for st in comp_ct: if (not has_ffio(st)): raise ValueError( '"mmffio" block not defined for a component CT titled: "%s"' % st.title) return [fsys_ct] + comp_ct
[docs]def mark_fullsystem_ct(struc): """ Marks atoms in the `constants.CT_TYPE.VAL.FULL_SYSTEM` CT with the `constants.CT_TYPE` and `constants.CT_INDEX` properties, and returns the marked full_system CT as a 'Structure' object. :param struc: A list of 'Structure' objects. The first object should be the "full_system" CT, followed by the component CTs in the same order as in the .cms file. Note that the returned 'Structure' object will be a new one, not the same one as in 'struc'. """ if struc[0].property[ constants.CT_TYPE] != constants.CT_TYPE.VAL.FULL_SYSTEM: raise ValueError("First CT is not a full_system CT") fsys_ct = struc[0] # Creates and sets the `constants.CT_TYPE` and `constants.CT_INDEX` properties # for the 'full_system' CT. mm.mmct_atom_property_set_string(fsys_ct.handle, constants.CT_TYPE, mm.MMCT_ATOMS_ALL, "") mm.mmct_atom_property_set_int(fsys_ct.handle, constants.CT_INDEX, mm.MMCT_ATOMS_ALL, 0) i_atom = 1 fsys_atom_total = fsys_ct.atom_total for ct_index, ct in enumerate(struc[1:]): ct_type = ct.property[constants.CT_TYPE] # this is necessary for GCMC systems with no inactive atoms in the # fullsystem ct n_remaining_fsys_atoms = fsys_atom_total - (i_atom - 1) for i in range(min(ct.atom_total, n_remaining_fsys_atoms)): fsys_ct.atom[i_atom].property[constants.CT_TYPE] = ct_type fsys_ct.atom[i_atom].property[constants.CT_INDEX] = ct_index + 1 i_atom += 1 return fsys_ct
[docs]def get_box(ct) -> Box: """ Given a CT, extract the following properties' values:: "r_chorus_box_ax", "r_chorus_box_ay", "r_chorus_box_az", "r_chorus_box_bx", "r_chorus_box_by", "r_chorus_box_bz", "r_chorus_box_cx", "r_chorus_box_cy", "r_chorus_box_cz", and returns them as a list of float values. The order of the values in the list is the same as written above. A 'KeyError' exception will be raised if any property is missing in the CT. """ return [ct.property[prop] for prop in constants.SIM_BOX]
[docs]def get_boxsize(box: Box): """ Given a simulation box in the form of a 3x3 matrix, this function returns the size of the box. """ a = [ box[0], box[1], box[2], ] b = [ box[3], box[4], box[5], ] c = [ box[6], box[7], box[8], ] size_x = math.sqrt(a[0] * a[0] + a[1] * a[1] + a[2] * a[2]) size_y = math.sqrt(b[0] * b[0] + b[1] * b[1] + b[2] * b[2]) size_z = math.sqrt(c[0] * c[0] + c[1] * c[1] + c[2] * c[2]) return [ size_x, size_y, size_z, ]
[docs]def dotprod(v, u): """ Returns the dot product of two 3-D vectors. """ return v[0] * u[0] + v[1] * u[1] + v[2] * u[2]
[docs]def crossprod(v, u): """ Returns the cross product of two 3-D vectors. """ a = v[1] * u[2] - v[2] * u[1] b = v[2] * u[0] - v[0] * u[2] c = v[0] * u[1] - v[1] * u[0] return [ a, b, c, ]
[docs]def norm(v): """ Returns the norm of the 3-D vector 'v'. """ return math.sqrt(dotprod(v, v))
[docs]def get_boxvolume(box): a = [ box[0], box[1], box[2], ] b = [ box[3], box[4], box[5], ] c = [ box[6], box[7], box[8], ] return abs(dotprod(a, crossprod(b, c)))
[docs]def aslselect_atom(struc, asl, should_mark_fullsystem=True): """ Similar as 'aslselect_atom', but the 'struc' must be a list of CTs ('Structure' objects) in the same order as in a .cms file (i.e., the first one must be a "full_system" CT, followed by component CTs). The periodic boundary condition will be taken into account. The `constants.CT_TYPE` and `constants.CT_INDEX` atom properties are recognized. This function returns a 'dict' object. The keys are CT index ('full_system' CT's index is 0, component CT's index starts from 1), and the values are list of selected atoms of the corresponding component CT. """ if (should_mark_fullsystem): fsys_ct = mark_fullsystem_ct(struc) else: fsys_ct = struc[0] atom_list_fsys = analyze.evaluate_asl(fsys_ct, asl) atom_list_comp = {} atom_index_fix = {} accu_index = 0 for i, ct in enumerate(struc[1:]): atom_list_comp[i + 1] = [] atom_index_fix[i + 1] = accu_index accu_index += ct.atom_total for i in atom_list_fsys: ct_index = fsys_ct.atom[i].property[constants.CT_INDEX] atom_list_comp[ct_index].append(i - atom_index_fix[ct_index]) return atom_list_comp
[docs]def has_ffio(ct): """ Returns 1 if 'ct' has an mmffio block, or 0 if it does not. 'ct' should be either a FFIOStructure or a Structure object. """ if (isinstance(ct, ffiostructure.FFIOStructure)): return ct.hasFfio() uh = mm.mmct_ct_m2io_get_unrequested_handle(ct.handle) return mm.m2io_inquire_block_name(uh, "ffio_ff")
[docs]def delete_fepio(ct): """ Delete the fepio_fep block from the input structure, which should have the block (or the behavior is undefined). :type ct: Structure """ handle = mm.mmct_ct_m2io_get_unrequested_handle(ct) mm.m2io_delete_named_block(handle, "fepio_fep") ct.fephandle = None
[docs]def has_fepio(ct): """ Returns True if any of the 'ct' have a fepio_fep block, or False if they do not. :param ct: Either a FFIOStructure or a Structure object """ if (isinstance(ct, ffiostructure.FFIOStructure)): return ct.hasFepio() uh = mm.mmct_ct_m2io_get_unrequested_handle(ct.handle) return mm.m2io_inquire_block_name(uh, "fepio_fep")
[docs]def get_model_system_type(struc): """ Given structures, return the simulation type. :param struc: `structure.Structure` objects. :type struc: iterable of `structure.Structure` objects. :rtype: constants.SystemType """ for ct in struc: if has_fepio(ct) or constants.FEP_ALCHEMICAL_CHANGE in ct.property: return constants.SystemType.ALCHEMICAL ret_code = constants.SystemType.OTHER for ct in struc: atom_property = list(ct.atom[1].property) if (constants.FEP_ABSOLUTE_LIGAND in atom_property): if (constants.FEP_ABSOLUTE_ENERGY in atom_property): lig_atom = set( analyze.evaluate_asl( ct, f"atom.{constants.FEP_ABSOLUTE_LIGAND} 1")) ene_atom = set( analyze.evaluate_asl( ct, f"atom.{constants.FEP_ABSOLUTE_ENERGY} 1")) if (lig_atom != ene_atom): print( f"error: Atom properties \"{constants.FEP_ABSOLUTE_ENERGY}\" and \"{constants.FEP_ABSOLUTE_LIGAND}\" do NOT match with " "each other.") print( " The model system might be setup for ligand-binding FEP calculations but atom " "properties were not set correctly for that purpose.") ret_code = constants.SystemType.OTHER break if (lig_atom != set()): ret_code = constants.SystemType.BINDING else: print( f"error: Atom property \"{constants.FEP_ABSOLUTE_ENERGY}\" is missing while \"{constants.FEP_ABSOLUTE_LIGAND}\" is defined." ) print( " The model system might be setup for ligand-binding FEP calculations but atom properties " "were not set correctly for that purpose.") ret_code = constants.SystemType.OTHER break return ret_code
[docs]def decomp_rawfep_structure(struc): """ """ env_ct = [] ref_ct = [] mut_ct = [] for st in struc: try: if st.property[constants.FEP_FRAGNAME] == "none": ref_ct.append(st) else: mut_ct.append(st) except KeyError: env_ct.append(st) return env_ct, ref_ct, mut_ct
T = TypeVar('T') AtomIdxType = List[Tuple[int, int]] ChargeMassType = List[Tuple[float, float]] SequenceOrValue = Union[T, Sequence[T]] KType = SequenceOrValue[float] RefType = SequenceOrValue[float] SigmaType = Optional[SequenceOrValue[float]]
[docs]class Restrain: """ """
[docs] def __init__(self, atom_idx: AtomIdxType, k: KType, ref: RefType, sigma: SigmaType = None): """ :param atom_idx: List of (ct index, atom index) tuples. """ # position harmonic: 1 atom, 3 k, 3 ref, sigma = None # position fbhw : 1 atom, 1 k, 3 ref, 1 sigma # stretch fbhw : 2 atom, 1 k, 1 ref, 1 sigma # angle fbhw : 3 atom, 1 k, 1 ref, 1 sigma # torsion fbhw : 4 atom, 1 k, 1 ref, 1 sigma # NOE : 2 atom, 1 k, 2 ref, 2 sigma self.atom_idx = atom_idx self.k = k self._ref = ref self.sigma = tuple(sigma) if isinstance(sigma, list) else sigma if len(self.atom_idx) == 1 and not self.sigma: if not isinstance(self.k, list): self.k = [self.k] * 3 elif len(self.k) != 3: raise ValueError( "wrong force constant setting for position restraint") is_NOE = (len(self.atom_idx) == 2 and not isinstance(self.k, list) and isinstance(self._ref, list) and len(self._ref) == 2 and isinstance(self.sigma, tuple) and len(self.sigma) == 2) if is_NOE: raise ValueError("NOE restraints are not supported") # create a key that is not order-dependent self.key = self._get_key(self.atom_idx, self.sigma, self.ref)
def __str__(self): """ """ return "atom = %s, k = %s, ref = %s, sigma = %s" % ( str(self.atom_idx), str(self.k), str(self.ref), str(self.sigma), ) def _get_atomkey(self, atom_idx: AtomIdxType): """ Create a key which can be used in a dictionary such that all restrains with 'equivalent' atom indices will have the same key. This allows for rearranging the order of atom indices when it does not affect the resulting restrain, such as exchanging the first and last atom in an angle restrain. :rtype: A combination of nested tuples and frozensets depending on the restrain type """ # use frozenset when order doesn't matter, or tuple otherwise if len(atom_idx) == 1: return atom_idx[0] elif len(atom_idx) == 2: # 2 atom groups -> set of sets if isinstance(atom_idx[0], list): return frozenset(frozenset(a) for a in atom_idx) # 2 atom indices -> set else: return frozenset(atom_idx) elif len(atom_idx) == 3: # central atom and set of 2 bonded atoms return atom_idx[1], frozenset([atom_idx[0], atom_idx[2]]) elif len(atom_idx) == 4: # set of two planes plane0 = frozenset(atom_idx[0:3]) plane1 = frozenset(atom_idx[1:4]) return frozenset([plane0, plane1]) def _get_key(self, atom_idx: AtomIdxType, sigma: SigmaType, ref: RefType): """ Get the total key consisting of the atom indices and sigma, if used, such that all 'equivalent' restrains will have the same key. :rtype: A combination of nested tuples and frozensets depending on the restrain type """ atomkey = self._get_atomkey(atom_idx) if len(atom_idx) == 2 and isinstance(ref, list) and len(ref) == 3: # when len(ref) == 3, sigma is ignored when setting ffio # block, and the round-trip value is always dummy 1E20, # so we ignore it here. # (Not sure why there are two types of stretch, with and # without sigma) return atomkey else: return atomkey, sigma @property def ref(self): return self._ref @ref.setter def ref(self, value): self._ref = value self.key = self._get_key(self.atom_idx, self.sigma, self.ref)
[docs] def merge_with(self, a: 'Restrain'): """ Merge this restrain with another, keeping the maximum force constant and copying reference coordinates from the other restrain. """ if isinstance(self.k, list): assert len(self.k) == 3 # Position restraint self.k = list(map(max, zip(self.k, a.k))) else: self.k = max(self.k, a.k) if a.ref: self._ref = a.ref
[docs]class RestrainGroup: """ Acts as a 'default' class, whose methods either no-op or in the case of `merge_with` return concrete subclasses. """
[docs] def merge_with(self, other: 'RestrainGroup') -> 'RestrainGroup': """ Merge this group with another group """ # return other so we can promote this abstract RestrainGroup to a # concrete subclass when merging return other
[docs] def retain_refs(self, other: 'RestrainGroup'): """ Copy reference values from any duplicate entries in another group to this group """ pass
[docs] def retain_ref_single(self, other: Restrain): """ If other is a duplicate of an entry in this group, copy reference value from this group's entry to other """ pass
[docs] def merge_single(self, other: Restrain) -> Restrain: """ Merge a single Restrain into this group, returning this group The default implementation creates a new DictRestrainGroup with only other and returns it """ res = DictRestrainGroup() res.add(other) return res
[docs] def values(self) -> Iterator[Any]: """ An iterator over the values in this group. The type of the values is determined by the subclass """ return ()
def __bool__(self): return False
[docs]class DictRestrainGroup(dict, RestrainGroup):
[docs] def merge_with( self, other: Union[RestrainGroup, 'DictRestrainGroup']) -> 'DictRestrainGroup': """ Subclasses are not interoperable - other is assumed to be either a 'null' RestrainGroup or a DictRestrainGroup, but not a PosRestrainArray subclass """ if other: # bool(RestrainGroup) == False assert isinstance(other, DictRestrainGroup) # merge other DictRestrainGroup into self other_copy = other.copy() # merge values for duplicate keys for other_k, other_v in other.items(): if other_k in self: self_v = self[other_k] self_v.merge_with(other_v) del other_copy[other_k] # update with any new keys self.update(other_copy) return self
[docs] def merge_single(self, other: Restrain) -> 'DictRestrainGroup': # merge single restrain into self if other.key in self: self_v = self[other.key] self_v.merge_with(other) else: self.add(other) return self
[docs] def add(self, restrain: Restrain): self[restrain.key] = restrain
[docs] def retain_refs(self, other: Union[RestrainGroup, 'DictRestrainGroup']): """ Subclasses are not interoperable - other is assumed to be either a 'null' RestrainGroup or a DictRestrainGroup, but not a PosRestrainArray subclass """ if other: # bool(RestrainGroup) == False assert isinstance(other, DictRestrainGroup) # pull refs onto self for other_k, other_v in other.items(): if other_k in self: self[other_k].ref = other_v.ref
[docs] def retain_ref_single(self, other: Restrain): # push ref onto other if other.key in self: other._ref = self[other.key].ref
def __bool__(self): return bool(len(self))
[docs]class PosRestrainArray(RestrainGroup): """ Stores a group of position restraints as a numpy structured array for faster creation (if from existing arrays) and merging """ AIDX_TYPE = [('ct', 'i8'), ('atom', 'i8')] DTYPE = [("atom_idx", AIDX_TYPE), ("k", "f8", (3,)), ("ref", "f8", (3,))] INTERSECT_KEYS = "atom_idx"
[docs] def __init__(self, atom_idxs, k=None, ref=None): """ Atoms can be an iterable of tuples with type corresponding to `self.DTYPE` such that they can be passed directly to numpy's structured array constructor, or a numpy array of atom indices, in which case k and ref must also be numpy arrays of the same length. In the latter case, the array fields will be filled in with each of the subarrays. :param atom_idxs: The atom indices or a list of tuples with all of the parameters :param k: The force constants :param ref: The reference values """ if k is None and ref is None and len(atom_idxs): # construction by tuples self.arr = np.array(atom_idxs, dtype=self.DTYPE) else: # construction by subarrays self.arr = np.empty(shape=len(atom_idxs), dtype=self.DTYPE) if len(atom_idxs): self.arr['atom_idx'] = atom_idxs self.arr['ref'] = ref self.arr['k'] = k
def _intersect_with(self, other) -> (np.array, np.array): """ Find the duplicate entries in self and other and return their indices, respectively """ _, self_idx, other_idx = np.intersect1d(self.arr[self.INTERSECT_KEYS], other.arr[self.INTERSECT_KEYS], assume_unique=True, return_indices=True) return self_idx, other_idx
[docs] def merge_with( self, other: Union[RestrainGroup, 'PosRestrainArray']) -> 'PosRestrainArray': """ Subclasses are not interoperable - other is assumed to be either a 'null' RestrainGroup or a PosRestrainArray, but not a DictRestrainArray subclass """ if other: # bool(RestrainGroup) == False assert isinstance(other, PosRestrainArray) self_idx, other_idx = self._intersect_with(other) other_k = other.arr['k'][other_idx] self.arr['k'][self_idx] = np.maximum(self.arr['k'][self_idx], other_k) self.arr['ref'][self_idx] = other.arr['ref'][other_idx] mask = np.ones(other.arr.size, dtype=bool) mask[other_idx] = False self.arr = np.concatenate([self.arr, other.arr[mask]]) return self
[docs] def merge_single(self, other): raise NotImplementedError
[docs] def retain_refs(self, other: Union[RestrainGroup, 'PosRestrainArray']): """ Subclasses are not interoperable - other is assumed to be either a 'null' RestrainGroup or a PosRestrainArray, but not a DictRestrainArray subclass """ if other: # bool(RestrainGroup) == False assert isinstance(other, PosRestrainArray) self_idx, other_idx = self._intersect_with(other) self.arr['ref'][self_idx] = other.arr['ref'][other_idx]
[docs] def retain_ref_single(self, other): raise NotImplementedError
[docs] def values(self) -> np.array: return self.arr
[docs] def __len__(self): return len(self.arr)
def __bool__(self): return bool(len(self))
[docs]class FBHWPosRestrainArray(PosRestrainArray): DTYPE = [("atom_idx", PosRestrainArray.AIDX_TYPE), ("k", "f8"), ("ref", "f8", (3,)), ("sigma", "f8")] INTERSECT_KEYS = ["atom_idx", "sigma"]
[docs] def __init__(self, atom_idxs=None, k=None, ref=None, sigma=None): super().__init__(atom_idxs=atom_idxs, k=k, ref=ref) # not necessary when constructing from tuples if sigma: # will broadcast as needed self.arr['sigma'] = sigma
[docs]class RestrainGroupContainer(defaultdict): """ Collection of `RestrainGroups` corresponding to the different `constants.RestrainTypes` """
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.default_factory = RestrainGroup
[docs] def merge_group(self, key: RestrainTypes, val: RestrainGroup): """ Merges a new restrain group with the existing value and updates the result. """ self[key] = self[key].merge_with(val)
[docs] def merge_single(self, key: RestrainTypes, val: Restrain): """ Merges a new restrain into the corresponding restrain group value and updates the result. """ self[key] = self[key].merge_single(val)
def __bool__(self): return any(map(bool, self.values()))
[docs]class ModelRestrainList(list): def __bool__(self): return any(map(bool, self))
[docs]class AtomGroup(object): """ """
[docs] def __init__(self, atom=None, name=None, index=None): """ """ self.atom = atom # List of lists of atom indices self.name = name self.index = index
[docs]class Site(object): """ """
[docs] def __init__(self, type, charge, mass, vdwtype): """ """ self.type = type self.charge = charge self.mass = mass self.vdwtype = vdwtype
# for unit tests def __eq__(self, other): return (self.type == other.type and self.charge == other.charge and self.mass == other.mass and self.vdwtype == other.vdwtype)
[docs]class Pseudo(object): """ """
[docs] def __init__(self, x, y, z): """ """ self.x = x self.y = y self.z = z
[docs]class Constraint(object): """ """ NUM_CONSTRAINT = { "AH1": 1, "AH2": 2, "AH3": 3, "AH4": 4, "AH5": 5, "HOH": 3, }
[docs] def __init__(self, func, atom_i=0, atom_j=0, atom_k=0, atom_l=0, atom_m=0, c1=0, c2=0, c3=0, c4=0, c5=0, c6=0): """ """ self.func = func self.atom_i = atom_i self.atom_j = atom_j self.atom_k = atom_k self.atom_l = atom_l self.atom_m = atom_m self.c1 = c1 self.c2 = c2 self.c3 = c3 self.c4 = c4 self.c5 = c5 self.c6 = c6
[docs] def num_constraint(self): """ """ return Constraint.NUM_CONSTRAINT[self.func]
[docs]class Vdw(object): """ A class for handling VDW parameters. """
[docs] def __init__(self, atom_type, func, c): """ Constructor. :param atom_type: A pair of strings for the names of the two atom types. If only one string is given, the other will be considered as the same as the first one. If more than 2 strings are given, only the first two will be used, and the other ones will be ignored. :param func: A string telling the particular VDW potential type. :param c: A tuple of floating numbers for the parameters. """ t1 = atom_type[0] if (1 == len(atom_type)): t2 = atom_type[0] else: t2 = atom_type[1] self.atom_type = ( t1, t2, ) self.func = func self.c = c
# __init__
[docs] def c6(self): """ c6 = 4 * epsilon * sigma**6 """ return 4 * self.c[1] * math.pow(self.c[0], 6.0)
[docs]def combine_vdw(v1, v2, comb_rule="geometric"): """ Given two homogenous `Vdw` objects 'v1' and 'v2' and a combination rule 'comb_rule', this function returns a `Vdw` object as a combination of 'v1' and 'v2'. """ if (v1.atom_type[0] != v1.atom_type[1] or v2.atom_type[0] != v2.atom_type[1]): raise ValueError("Cannot combine inhomogenous VDW types") # FIXME: Implement checking comb_rule # FIXME: Implement checking VDW function c1 = math.sqrt(v1.c[0] * v2.c[0]) c2 = math.sqrt(v1.c[1] * v2.c[1]) return Vdw(( v1.atom_type[0], v2.atom_type[0], ), v1.func, ( c1, c2, ))
[docs]def calc_average_vdw_coeff(struc): """ Calculates and returns the average dispersion coefficient. The unit of the coefficient is in the unit of the original ffio block. :param struc: A list of `schrodinger.structure.Structure` objects that have been pre-treated by the `prep_struc` function. """ # Checks the CT-level property `constants.CT_TYPE` to get the solvent CTs. sol = [] for ct in struc: try: ent = ct.property[constants.CT_TYPE] if (ent in [ constants.CT_TYPE.VAL.ION, constants.CT_TYPE.VAL.MEMBRANE, constants.CT_TYPE.VAL.SOLVENT, ]): sol.append(ct) except KeyError: print( f"warning: '{constants.CT_TYPE}' property not found in all CTs." ) print( "warning: Average dispersion coefficient can NOT be set automatically." ) return None # Checks whether `sol' is empty and whether we have ffio blocks within the ``solvent'' CTs. if ([] == sol): return None else: for ct in sol: comb_rule = mm.mmffio_ff_get_comb_rule(ct.ffhandle) if (None is comb_rule or "geometric" != comb_rule.lower()): print("warning: Unknown combination rule '%s'" % comb_rule) print( "warning: Average dispersion coefficient can NOT be calculated." ) return None # Gets all VDW parameters for solvent atoms. vdw = {} for ct in sol: ct._num_mol = ct.mol_total ct._num_atom = ct.atom_total ct._num_vdw = mm.mmffio_ff_get_num_vdwtypes(ct.ffhandle) for i in range(1, ct._num_vdw + 1): vdwtype = mm.mmffio_vdwtype_get_name(ct.ffhandle, i) func = mm.mmffio_vdwtype_get_funct(ct.ffhandle, i) c1 = mm.mmffio_vdwtype_get_c1(ct.ffhandle, i) c2 = mm.mmffio_vdwtype_get_c2(ct.ffhandle, i) vdw[vdwtype] = Vdw((vdwtype,), func, ( c1, c2, )) vdw[vdwtype]._num_atom = 0 # Calculates the number of atoms for each VDW type. for ct in sol: ct._num_atom_per_mol = old_div(ct._num_atom, ct._num_mol) for i in range(1, ct._num_atom_per_mol + 1): vdwtype = mm.mmffio_site_get_vdwtype(ct.ffhandle, i) vdw[vdwtype]._num_atom += ct._num_mol # Calculates the average dispersion coefficient. tot_sum = 0.0 tot_count = 0 for ct in sol: for i in range(1, ct._num_atom_per_mol + 1): sub_sum = 0.0 sub_count = 0 v1 = vdw[mm.mmffio_site_get_vdwtype(ct.ffhandle, i)] # print v1.atom_type[0], v1._c[0], v1._c[1] for v2 in vdw.values(): # print " ", v2.atom_type[0], v2.c[0], v2.c[1], v = combine_vdw(v1, v2) c1 = v.c[0] c2 = v.c[1] # print "; ", c1, c2 sub_sum += 4 * c2 * math.pow(c1, 6) * v2._num_atom sub_count += v2._num_atom # Substracts the amount for the bonding atoms, including the `i'-th atom itself. bonded_atom = {} # We use a dictionary to avoid duplicates. for atom in ct.atom[i].bonded_atoms: # Loops through atoms that are one bond away from the `i'-th atom. bonded_atom[int(atom)] = 0 for atom2 in atom.bonded_atoms: # Loops through atoms that are two bonds away from the `i'-th atom. # This will include the `i'-th atom itself into the list. bonded_atom[int(atom2)] = 0 for j in list(bonded_atom): if (i <= j): v2 = vdw[mm.mmffio_site_get_vdwtype(ct.ffhandle, j)] v = combine_vdw(v1, v2) c1 = v.c[0] c2 = v.c[1] sub_sum += 4 * c2 * math.pow(c1, 6) sub_count -= 1 tot_sum += sub_sum * ct._num_mol tot_count += sub_count * ct._num_mol # print tot_sum, tot_count # print tot_sum / tot_count return old_div(int(1000 * tot_sum / tot_count), 1000.0)
[docs]class Cms(structure.Structure): """ """ ATOMGROUP_PREFIX = "i_ffio_grp_" MODEL_SYSTEM_TYPE = [ "standard model system", "model system for mutation FEP", "model system for total free energy FEP", ] META_ASL = { "heavy_atom": "not atom.elem H", constants.CT_TYPE.VAL.SOLUTE: f"atom.{constants.CT_TYPE} '{constants.CT_TYPE.VAL.SOLUTE}'", constants.CT_TYPE.VAL.SOLVENT: f"atom.{constants.CT_TYPE} '{constants.CT_TYPE.VAL.SOLVENT}'", constants.CT_TYPE.VAL.COSOLVENT: f"atom.{constants.CT_TYPE} '{constants.CT_TYPE.VAL.COSOLVENT}'", "solute_heavy_atom": f"atom.{constants.CT_TYPE} '{constants.CT_TYPE.VAL.SOLUTE}' and not atom.elem H", "solvent_heavy_atom": f"atom.{constants.CT_TYPE} '{constants.CT_TYPE.VAL.SOLVENT}' and not atom.elem H", constants.CT_TYPE.VAL.MEMBRANE: f"atom.{constants.CT_TYPE} '{constants.CT_TYPE.VAL.MEMBRANE}'", } PROP_CMS = mm.M2IO_DATA_ORIGINAL_CMS_FILE PROP_TRJ = mm.M2IO_DATA_TRAJECTORY_FILE _IDMAP_FIELDS = ( # gid of the first atom in component ct, # this field has to be set 'start_gid', # parent atom to pseudo-particle gids, optional 'parent_to_pseudo', # pseudo particle to parent-particle gid map to gid, optional 'pseudo_to_parent', # atom id to gid maps of all generated titration states, optional 'titration_gid_maps', # parent atom id to gid maps of all generated titration states, optional 'titration_parent_pseudo_maps', # atom-id to gid map, optional 'to_gid', # connection for all gids in merged fep or titration states, optional 'topology') _IdMap = namedtuple('_IdMap', _IDMAP_FIELDS) def _init_id_maps(self): self.id_maps = list([ self._IdMap(*(None,) * len(self._IdMap._fields)) for i in self.comp_ct ])
[docs] def __init__(self, file=None, string=None, remove_inactive_atoms_in_fsys=True): """ Initialize cms object """ file = file and str(file) # Reads the .cms file or string. struc = [e for e in ffiostructure.CMSReader(file, input_string=string)] # This full_system CT may not contain the `constants.CT_TYPE` and # `constants.CT_INDEX` CT properties. self._raw_fsys_ct = struc[0].copy() self._site_map = None self._unroll = None self.fsys_ct = mark_fullsystem_ct(struc) self.comp_ct = struc[1:] # fep_ct is None, or the CT with the "fepio_fep" block. self.fep_ct = None self.ref_ct = None self.mut_ct = None self.box = get_box(self.fsys_ct) self._init_id_maps() self.titration_states = [] self._charge = None self._mass = None structure.Structure.__init__(self, self.fsys_ct.handle) self._show_inactive = not remove_inactive_atoms_in_fsys # atom_total is the length of the fsys_ct which may or may not be equal # to the combined length of the comp cts, depending on the presence of # inactive atoms and the value of self._show_inactive when the cms file was # written. # Files could be written with _show_inactive = True or False total_with_inactive = self.comp_atom_total has_inactive = self.active_total != total_with_inactive if has_inactive: # check if there is a discrepancy between the _show_inactive mode and # the input fullsystem ct: # atom_total == comp_ct_total corresponds with _show_inactive if self._show_inactive and self.atom_total != total_with_inactive: self.synchronize_fsys_ct() elif not self._show_inactive and self.atom_total == total_with_inactive: self.synchronize_fsys_ct() # Note this gid_map is only correct for non-FEP system without pseudoatoms # To ensure the correct AID2GID mapping, use topo.py::read_cms self.gid_map = np.arange(-1, total_with_inactive, dtype=int) self.gid_map[0] = np.iinfo(self.gid_map.dtype).min # don't let anyone else muck it up since it's directly exposed self.gid_map.flags.writeable = False self.ffio_refresh() self.gid_refresh() self.pseudoatoms = {} self.pseudomatch = [[], []] self.aids_with_vs = set()
def __copy__(self): """ """ # This is just a workround. It may not duplicate all attributes of the # current `Cms' object. s = self.write_to_string() new_model = Cms(string=s, remove_inactive_atoms_in_fsys=not self._show_inactive) new_model.gid_map = self.gid_map.copy() new_model.gid_map.flags.writeable = False new_model.pseudoatoms = copy.deepcopy(self.pseudoatoms) new_model.pseudomatch = copy.deepcopy(self.pseudomatch) new_model.aids_with_vs = copy.deepcopy(self.aids_with_vs) return new_model def __getstate__(self): return { 'string': self.write_to_string(), '_show_inactive': self._show_inactive, 'gid_map': self.gid_map, 'pseudoatoms': self.pseudoatoms, 'pseudomatch': self.pseudomatch, 'aids_with_vs': self.aids_with_vs } def __setstate__(self, state): if isinstance(state, str): # Backwards compatibility for old pkl files return super().__setstate__(state) new_model = Cms( string=state['string'], remove_inactive_atoms_in_fsys=not state['_show_inactive']) for k, v in state.items(): if k in ['string', '_show_inactive']: continue setattr(new_model, k, v) self.__dict__ = new_model.__dict__
[docs] def synchronize_fsys_ct(self): """ Sync the fullsystem CT `fsys_ct` to the component CTs. The handle of the fullsystem CT will remain unchanged. In order to modify the atoms of the cms object, modifications should be made to the component CTs, and then this method called to propagate the changes to the fullsystem CT. """ # A copy of the `fsys_ct`, `raw_fsys_ct`, is also made here directly # after syncing with the component CT, but before adding extra # CT_INDEX and CT_TYPE atom properties corresponding to their original # component CTs. This copy is used for writing to disk, and will not # reflect any changes made to `fsys_ct` hereafter. # The handle of `fsys_ct` never changes, but `_raw_fsys_ct` will # change handles each time this method is called. num_atom = self.fsys_ct.atom_total self.fsys_ct.deleteAtoms(range(1, num_atom + 1)) for ct in self.comp_ct: if not self._show_inactive: # we use min because ACTIVE_TOTAL is not guaranteed to be up to # date, for instance if atoms are deleted after it is set ct_active_total = min( ct.property.get(ACTIVE_TOTAL, ct.atom_total), ct.atom_total) if ct_active_total != ct.atom_total: # for some reason this is faster than extracting ct = ct.copy() ct.deleteAtoms(range(ct_active_total + 1, ct.atom_total + 1)) # We need to use extend since it keeps the handle (unlike merge) self.fsys_ct.extend(ct) self._raw_fsys_ct = self.fsys_ct.copy() self.fsys_ct = mark_fullsystem_ct([self.fsys_ct] + self.comp_ct)
[docs] @staticmethod def clean_ct_properties(ct): """ Replace newlines and square brackets in ct property names and values. This function needs to be call for all cts that is to be read by desmond to guard againt destro failure. """ def sanitize(name): if not isinstance(name, str): return name return name.replace('[', '/').replace(']', '/').replace('\n', ' ') for k, v in list(ct.property.items()): new_k = sanitize(k) new_v = sanitize(v) try: del ct.property[k] except ValueError: ct.property[k] = new_v ct.property[new_k] = new_v
[docs] def sanitize_for_viparr(self): """ Viparr's custom mae file reader does not play well with certain characters like square brackets. This is also true when the these characters are 'escaped'. Thus this method sanitizes offensive property names and their values by replacing them with forward slashes. This method checks if the CT properties are okay for viparr. """ for ct in self.comp_ct + [self.fsys_ct]: self.clean_ct_properties(ct) self.synchronize_fsys_ct()
@property def need_msys(self): return self.id_maps[0].start_gid is None @property # Total (including inactive ones) number of particles in trajactory frame def particle_total(self): ct = self.comp_ct[-1] return self.id_maps[-1].start_gid + ct.atom_total + \ ct.property[constants.FFIO_NUM_PSEUDOS] @property def use_titration(self): for ct in self.comp_ct: if ct.property.get(constants.USE_TITRATION, False): return True return False @property # In general this the property to use for desmond.packages.pfx functions. def glued_topology(self): return self._glued_topology
[docs] def glue(self, glue_pts, start_fresh=False): ''' :type glue_pts: `list` whose entries are `list` of two GIDs ''' if start_fresh: self._glued_topology = self._generate_topology() for gid1, gid2 in glue_pts: self._glued_topology[gid1].append(gid2)
def _generate_topology(self) -> List[List[int]]: """ Generate topology accepted by msys periodic fix constructor msys.pfx.Pfx(topology=topo, fixbonds=True). Topology is represented by a list of lists. Topology[i] consists of all particles connected to the particle i. """ titration_ct = self.get_titration_ct() fep_ref_ct, fep_mut_ct = self.get_fep_cts() topology = [] for ct_index, ct in enumerate(self.comp_ct): ct_id_map = self.id_maps[ct_index] if ct == titration_ct or ct == fep_ref_ct: for atoms in ct_id_map.topology: topology.append(atoms) # fep_mut_ct is combined into fep_ref_ct therefore the connection # is the same for both of them. elif ct == fep_mut_ct and fep_ref_ct is not None: continue # other than titration and fep cts, connection is just the m_bond table # plus virtual sites to parent. Only the virtual sites connection is # serialized by msys. Need to combine the two pieces here. else: for a in ct.atom: topology.append([ ct_id_map.start_gid + a.index - 1 for a in a.bonded_atoms ]) if ct_id_map.parent_to_pseudo is not None: pseudo2parent = [] for k, v in ct_id_map.parent_to_pseudo.items(): for v_id in v: pseudo2parent.append((v_id + ct_id_map.start_gid, k + ct_id_map.start_gid)) for _, parent in sorted(pseudo2parent, key=lambda x: x[0]): topology.append([parent]) return topology
[docs] def get_fep_cts(self): """ find ref and mut cts by fep_fragname property """ ref_ct = None mut_ct = None for ct in self.comp_ct: fragname = ct.property.get(constants.FEP_FRAGNAME) if fragname == "none": ref_ct = ct elif fragname is not None: mut_ct = ct return (ref_ct, mut_ct)
[docs] def get_titration_ct(self): """ find titration ct """ for ct in self.comp_ct: for a in ct.atom: if constants.FEP_TITRATABLE_GROUP in a.property: return ct
[docs] def get_solvent_cts(self): """ find any cts with ffio_ct_type solvent """ return [ ct for ct in self.comp_ct if ct.property[constants.CT_TYPE] == constants.CT_TYPE.VAL.SOLVENT ]
[docs] def get_membrane_cts(self): """ Find and structures with ffio_ct_type=membrane """ return [ ct for ct in self.comp_ct if ct.property[constants.CT_TYPE] == constants.CT_TYPE.VAL.MEMBRANE ]
[docs] def ffio_refresh(self): """ 'atom_index' starts from 1. """ self._site = [] self._unroll = [] for ct in self.comp_ct: site_list = [] for e in ct.ffio.site: # this is only used to populate an atom-indexed site list. # anyone looking for pseudoatoms should use the ffio block # directly if e.type == 'atom': site_list.append(Site(e.type, e.charge, e.mass, e.vdwtype)) if site_list: num_rolled = ct.atom_total // len(site_list) self._unroll.append(num_rolled) self._site.extend(site_list * num_rolled) for ct in self.comp_ct: if (ct.hasFepio()): self.fep_ct = ct break if self.fep_ct or self.use_titration: (self.ref_ct, self.mut_ct) = self.get_fep_cts() for ct in self.comp_ct: ligand_atom = analyze.evaluate_asl( ct, f"atom.{constants.FEP_ABSOLUTE_LIGAND} 1") if len(ligand_atom) > 0: self.mut_ct = ct break
[docs] def get_fullsys_ct_atom_index_range(self, comp_ct_index): """ for a given component CT, return a map of that compontent CT in full_system CT """ # index starts with 1 ntotal_atom = 1 for i in range(0, comp_ct_index): ntotal_atom += self.comp_ct[i].atom_total return list( range(ntotal_atom, ntotal_atom + self.comp_ct[comp_ct_index].atom_total))
[docs] def get_lambda_atom_indices(self, lambda_val): """ :param lambda_val: lambda value, 0 or 1 :return: list of int (for atoms), list of list of int (for virtual sites) """ # find ligand 1 for i, st in enumerate(self.comp_ct): if st.property.get( constants.MOLTYPE) == constants.MOLTYPE.VAL.LIGAND: ligand = i break atom_list = [] virtual_list = [] for i, st in enumerate(self.comp_ct): if i != ligand + (1 - lambda_val): atom_list += self.get_fullsys_ct_atom_index_range(i) virtual_list.append( list( range(st.atom_total + 1, st.atom_total + 1 + len(st.ffio.virtual)))) return atom_list, virtual_list
@staticmethod def _update_pseudo(pseudo_block, pos, vel): for i in range(len(pseudo_block)): a = pseudo_block[i + 1] a.x_coord = pos[i][0] a.y_coord = pos[i][1] a.z_coord = pos[i][2] if vel is not None: a.x_vel = vel[i][0] a.y_vel = vel[i][1] a.z_vel = vel[i][2]
[docs] def update_with_frame(self, frame): """ Given frame object (traj.Frame), update coordinates and velocites ( if they exist in frame) for all particles. All coordinates in the serialized titration states are updated as well if titration ct exists. This also works for the new alchemical msys code for FEP and will replace topo.update_cms in the long run. """ from schrodinger.application.desmond.packages import topo pos = frame.pos() vel = frame.vel() # `vel' could be `None'. box = frame.box topo.update_cms_physical(self, pos, vel, box) for ct_index, ct in enumerate(self.comp_ct): pseudo_block = ct.ffio.pseudo id_map = self.id_maps[ct_index] if id_map.to_gid is not None: pseudo_gids = id_map.to_gid[ct.atom_total + 1:len(pseudo_block) + ct.atom_total + 1] else: pseudo_start = id_map.start_gid + ct.atom_total pseudo_gids = list( range(pseudo_start, len(pseudo_block) + pseudo_start)) pseudo_pos = pos[pseudo_gids] pseudo_vel = None if vel is not None: pseudo_vel = vel[pseudo_gids] Cms._update_pseudo(pseudo_block, pseudo_pos, pseudo_vel) titration_ct = self.get_titration_ct() if titration_ct is not None: titration_ct_index = self.comp_ct.index(titration_ct) all_cts = str() for i, ct in enumerate(self.titration_states): gid_map = self.id_maps[titration_ct_index].titration_gid_maps[i] atom_gids = gid_map[1:ct.atom_total + 1] ct.setXYZ(pos[atom_gids]) topo.update_ct_box(ct, frame.box) if vel is not None: for a, v in zip(ct.atom, vel[atom_gids]): topo.set_atom_velocity(a, v) pseudo_gids = gid_map[ct.atom_total + 1:] pseudo_pos = pos[pseudo_gids] pseudo_vel = None if vel is not None: pseudo_vel = vel[pseudo_gids] Cms._update_pseudo(ct.ffio.pseudo, pseudo_pos, pseudo_vel) mm.mmffio_ff_mmct_put(ct.ffhandle, ct.handle) all_cts += mm.mmct_ct_to_string(ct) titration_ct.property[ constants.FEP_TITRATION_STATES] = base64.b64encode( all_cts.encode()).decode() self.set_nactive_gids(frame.nactive, frame.natoms)
def _set_gid_map_titration_states(self): """ Given mapping properties set in self by desmond, generate gid_map. Properties expected: - constants.FEP_AID2GID for atom/pseudo-atoms mapping to gids. This property exists for the reference ct of FEP job and titration_ct. The keys in this dictionary (constants.IdConversion) consist of: - COMPONENT_TO_COMBINED: mapping from component-particle id to gid. This will be used to set the to_gid property for both FEP ref and mut cts. For titration, the first map is used to set to_gid for the titration ct. The rest of the mapping arrays are stored in titration_gid_maps. All mapping arrays are padded with a zeroth element so that Maestro atom indices (starting from one) do not need to be shifted for gid lookup. - ATOM_TOTAL: number of atoms in generated titration states. - PSEUDO_END: last gid of the combined titraton or FEP ct. - PARENT2PSEUDO: list of pseudo atoms attached to each parent for all titration states and FEP end points. - PSEUDO2PARENT: inverse map of every pseudo atom to its parent. This is gid-to-gid map. - TOPOLOGY: full connection of the merged titration ct or FEP ct - constants.FFIO_NUM_PSEUDOS: number of pseudo atoms - constants.FFIO_PARENT2PSEUDO: pseudo atoms attached to parent - constants.FEP_TITRATION_STATES: all other states for titration """ titration_ct = self.get_titration_ct() fep_ref_ct, fep_mut_ct = self.get_fep_cts() offset = 0 gids = np.array([-1], dtype=int) def _parent_to_pseudo(ct_parent_and_pseudo, id_map=None): """ Convert given list of lists of parent and pseudo particles to dictionary. Optional id_map is used to convert the particle ids in the original structure to ones in the merged structure (FEP or titration). """ parent_to_pseudo = defaultdict(list) for parent_and_pseudo in ct_parent_and_pseudo: pseudos = parent_and_pseudo[1:] if id_map is None else [ id_map[p + 1] for p in parent_and_pseudo[1:] ] parent_to_pseudo[parent_and_pseudo[0]].extend(pseudos) return parent_to_pseudo def _get_parent_to_pseudo(ct): """ Generate pseudo-to-parent mapping. If constants.NUM_COMPONENT is defined the same connection is repeated after constants.NUM_COMPONENT number of molecules, unroll the template connection to all repeated molecules. """ template_p2pseudo = _parent_to_pseudo( json.loads( base64.b64decode( ct.property[constants.FFIO_PARENT2PSEUDO]).decode())) if constants.NUM_COMPONENT in ct.property: n_template_atoms = sum([ len(ct.molecule[m + 1]) for m in range(ct.property[constants.NUM_COMPONENT]) ]) repetition = ct.atom_total // n_template_atoms n_template_pseudo = n_total_pseudos // repetition parent2pseudo = defaultdict(list) parent_offset = 0 pseudo_offset = ct.atom_total - n_template_atoms for _ in range(repetition): for k, v in template_p2pseudo.items(): parent2pseudo[k + parent_offset] = [ a + pseudo_offset for a in v ] parent_offset += n_template_atoms pseudo_offset += n_template_pseudo return parent2pseudo else: return template_p2pseudo def _aid_map_with_offset(input_list, offset): output = np.array(input_list) + offset output[0] = np.iinfo(output.dtype).min return output def _get_other_titration_states(ct): cts_string = base64.b64decode( ct.property[constants.FEP_TITRATION_STATES]).decode() return [ ffiostructure.FFIOStructure(ct.handle) for ct in structure.StructureReader.fromString(cts_string) ] def _get_maps_for_other_states(all_aid2gid, all_parent2pseudo, offset): gid_maps = [] for m in all_aid2gid[1:]: gid_maps.append(_aid_map_with_offset(m, offset)) parent_pseudo_maps = [] for p in all_parent2pseudo[1:]: parent_pseudo_maps.append(_parent_to_pseudo(p)) return gid_maps, parent_pseudo_maps def _update_fep_mut_ct_maps(all_id_maps, fep_mut_ct_index, aid2gid, parent2pseudo, start_gid): fep_mut_id_map = all_id_maps[fep_mut_ct_index] all_id_maps[fep_mut_ct_index] = fep_mut_id_map._replace( start_gid=start_gid, to_gid=_aid_map_with_offset(aid2gid, start_gid), parent_to_pseudo=_parent_to_pseudo(parent2pseudo, id_map=aid2gid)) def _append_charge_mass_from_ffio_site( charge: List[float], mass: List[float], ffio_site: 'ffiostructure.FFIOStructure.site', start: int, end: int): tmp_charge, tmp_mass = zip(*[(ffio_site[idx].charge, ffio_site[idx].mass) for idx in range(start, end)]) charge.extend(tmp_charge) mass.extend(tmp_mass) def _append_charge_mass(charge: List[float], mass: List[float], charge_mass: ChargeMassType): tmp_charge, tmp_mass = zip(*charge_mass) charge.extend(tmp_charge) mass.extend(tmp_mass) def _append_charge_mass_with_maps( charge: List[float], mass: List[float], ct: 'ffiostructure.FFIOStructure', start_gid: int, gid_maps: List[np.array], states: List['ffiostructure.FFIOStructure']): """ Provided explicit maps from FEP or titration, generate charge and mass values for all gids in the component ct """ # real atoms from template _append_charge_mass_from_ffio_site(charge, mass, ct.ffio.site, 1, ct.atom_total + 1) # use maximum mass for matched atoms for m, st in zip(gid_maps, states): ffio_site = st.ffio.site for (idx, gid) in enumerate(m[1:st.atom_total + 1], 1): if gid - start_gid < ct.atom_total: other_mass = ffio_site[idx].mass if mass[gid] < other_mass: mass[gid] = other_mass num_dummy_atoms = 0 # dummy atoms from other states for m, st in zip(gid_maps, states): ffio_site = st.ffio.site charge_mass = list([ (ffio_site[idx].charge, ffio_site[idx].mass) for (idx, gid) in enumerate(m[1:st.atom_total + 1], 1) if gid - start_gid >= ct.atom_total ]) num_dummy_atoms += len(charge_mass) if len(charge_mass): _append_charge_mass(charge, mass, charge_mass) # virtual sites from template num_pseudo = len(ct.ffio.site) - ct.atom_total if num_pseudo > 0: _append_charge_mass_from_ffio_site(charge, mass, ct.ffio.site, ct.atom_total + 1, len(ct.ffio.site) + 1) # dummy virtual sites from other states pseudo_dummy_start = num_dummy_atoms + num_pseudo + ct.atom_total for m, st in zip(gid_maps, states): ffio_site = st.ffio.site charge_mass = list([ (ffio_site[pid].charge, ffio_site[pid].mass) for (pid, gid) in enumerate(m[st.atom_total + 1:], 1) if gid - start_gid >= pseudo_dummy_start ]) if len(charge_mass): _append_charge_mass(charge, mass, charge_mass) def _append_charge_mass_unrolled(charge: List[float], mass: List[float], ct: 'ffiostructure.FFIOStructure'): """ Without explicit id map, unroll charge and mass values for all gids in the component ct """ ffio_site = ct.ffio.site num_template_atoms = ct.atom_total num_sites = len(ffio_site) repetition = 1 if constants.NUM_COMPONENT in ct.property: num_template_atoms = sum([ len(ct.molecule[m + 1]) for m in range(ct.property[constants.NUM_COMPONENT]) ]) repetition = ct.atom_total // num_template_atoms atom_charge = [] atom_mass = [] _append_charge_mass_from_ffio_site(atom_charge, atom_mass, ffio_site, 1, num_template_atoms + 1) num_virtuals = num_sites - num_template_atoms if num_virtuals > 0: virt_charge = [] virt_mass = [] _append_charge_mass_from_ffio_site(virt_charge, virt_mass, ffio_site, num_template_atoms + 1, num_sites + 1) charge.extend(atom_charge * repetition) mass.extend(atom_mass * repetition) if num_virtuals > 0: charge.extend(virt_charge * repetition) mass.extend(virt_mass * repetition) all_charges = [] all_masses = [] for ct_index, ct in enumerate(self.comp_ct): ct_id_map = self.id_maps[ct_index]._replace(start_gid=offset) new_gids = [] if constants.FEP_AID2GID in ct.property: id_maps = json.loads( base64.b64decode( ct.property[constants.FEP_AID2GID]).decode()) all_aid2gid = id_maps[ constants.IdConversion.COMPONENT_TO_COMBINED] all_parent2pseudo = id_maps[ constants.IdConversion.PARENT2PSEUDO] if ct is titration_ct: self.titration_states = _get_other_titration_states(ct) gid_maps, parent_pseudo_maps = _get_maps_for_other_states( all_aid2gid, all_parent2pseudo, offset) ct_id_map = ct_id_map._replace( titration_gid_maps=gid_maps, titration_parent_pseudo_maps=parent_pseudo_maps) _append_charge_mass_with_maps(all_charges, all_masses, ct, offset, gid_maps, self.titration_states) offset += id_maps[constants.IdConversion.PSEUDO_END] # Because each pseudo particle only has one parent, pseudo2parent is a flat list. # The map is just even to odd slots. This is a gid-to-gid map. pseudo_and_parent = id_maps[ constants.IdConversion.PSEUDO2PARENT] self.id_maps[ct_index] = ct_id_map._replace( to_gid=_aid_map_with_offset(all_aid2gid[0], ct_id_map.start_gid), parent_to_pseudo=_parent_to_pseudo(all_parent2pseudo[0], id_map=all_aid2gid[0]), pseudo_to_parent=dict( zip(pseudo_and_parent[::2], pseudo_and_parent[1::2])), topology=id_maps[constants.IdConversion.TOPOLOGY]) new_gids = self.id_maps[ct_index].to_gid[1:ct.atom_total + 1] for atoms in self.id_maps[ct_index].topology: for idx in range(len(atoms)): atoms[idx] += self.id_maps[ct_index].start_gid # Generate fep_mut_ct mapping here if ct is fep_ref_ct and fep_mut_ct is not None: fep_mut_ct_idx = self.comp_ct.index(fep_mut_ct) _update_fep_mut_ct_maps(self.id_maps, fep_mut_ct_idx, all_aid2gid[1], all_parent2pseudo[1], ct_id_map.start_gid) _append_charge_mass_with_maps( all_charges, all_masses, ct, ct_id_map.start_gid, [self.id_maps[fep_mut_ct_idx].to_gid], [fep_mut_ct]) elif ct is fep_mut_ct: new_gids = self.id_maps[ct_index].to_gid[1:ct.atom_total + 1] else: new_gids = np.arange(offset, offset + ct.atom_total) n_total_pseudos = ct.property[constants.FFIO_NUM_PSEUDOS] offset += ct.atom_total + n_total_pseudos if ct.property[constants.FFIO_NUM_PSEUDOS] > 0: self.id_maps[ct_index] = ct_id_map._replace( parent_to_pseudo=_get_parent_to_pseudo(ct)) else: self.id_maps[ct_index] = ct_id_map _append_charge_mass_unrolled(all_charges, all_masses, ct) gids = np.append(gids, new_gids) self.gid_map = gids self.gid_map[0] = np.iinfo(self.gid_map.dtype).min self.gid_map.flags.writeable = False self._glued_topology = self._generate_topology() self._charge = np.array(all_charges) self._charge.flags.writeable = False self._mass = np.array(all_masses) self._mass.flags.writeable = False
[docs] def convert_to_gids(self, aids, with_pseudo=False): """ Given aids, convert them to gids with pseudo particles as required. :type aids: List[int] :param aids: aids to be converted :type with_pseudo: bool :param with_pseudo: switch to include pseudo particles :rtype: List[int] :return: A list of gids of corresponding to aids """ gids = [self.gid(i) for i in aids] if with_pseudo: atom_end = [] accu_index = 0 for ct in self.comp_ct: accu_index += ct.atom_total atom_end.append(accu_index) for i in aids: ct_index = 0 for j, atom_idx in enumerate(atom_end): if i < atom_idx: ct_index = j break offset = 1 if ct_index > 0: offset += atom_end[ct_index - 1] gid_offset = self.id_maps[ct_index].start_gid parent_to_pseudo = self.id_maps[ct_index].parent_to_pseudo if parent_to_pseudo is not None: gids.extend([ _id + gid_offset for _id in parent_to_pseudo[i - offset] ]) return gids
[docs] def gid_refresh(self): """ """ for ct in self.comp_ct: if constants.FFIO_NUM_PSEUDOS in ct.property and \ not ct.property.get(constants.DESMOND_WRITE_JSON, False): self._set_gid_map_titration_states() return if (self.fep_ct is not None): atommap = [] for e in self.fep_ct.fepio.atommap: atommap.append(( e.ai, e.aj, )) # index is the atom index of the full_system CT, value is the gid. self.gid_map = np.arange(self.comp_atom_total + 1, dtype=int) offset1 = 0 offset2 = 0 for ct in self.comp_ct: try: if (ct.property["i_fepio_stage"] == 1): break else: offset1 += ct.atom_total except KeyError: offset1 += ct.atom_total for ct in self.comp_ct: if (ct is self.fep_ct): break else: offset2 += ct.atom_total gid = offset2 for a1, a2 in atommap: if (a2 > 0): if (a1 > 0): # Atom in the target CT is the same atom in the # reference CT. So the gid of the target atom is # the same as that of the reference atom. i1 = offset1 + a1 i2 = offset2 + a2 self.gid_map[i2] = self.gid_map[i1] else: # Atom in the target CT maps to dummy. i2 = offset2 + a2 gid += 1 self.gid_map[i2] = gid # Atoms after the target CT should be shifted. # Gets the shift. i = offset2 + self.fep_ct.atom_total shift = i - self.gid_map[i] self.gid_map[i + 1:] -= shift # Now values in self.gid_map are (gid + 1). # Gets the gid. self.gid_map -= 1 # nobody should ever be accessing 0, it's only there to make # 1-indexing easier self.gid_map[0] = np.iinfo(self.gid_map.dtype).min # don't let anyone else muck it up since it's directly exposed self.gid_map.flags.writeable = False
[docs] def gid(self, atom_index): """ `atom_index` is the index of the atom in the 'full_system' CT. """ return self.gid_map[atom_index]
@property def allaid_gids(self): """ Get the 0-indexed mapping between aids and gids as a view of the underlying numpy array. :return: `np.array` """ return self.gid_map[1:] @property def comp_atom_total(self): """ Get the sum of the atom totals of the component CTs. For systems with inactive atoms, this will be different than the value of `self.atom_total`, as that value corresponds to the number of atoms in the fullsystem CT. """ return sum([ct.atom_total for ct in self.comp_ct])
[docs] def site(self, atom_index): """ `atom_index` starts from 1. """ return self._site[atom_index - 1]
[docs] def model_system_type(self): """ """ return Cms.MODEL_SYSTEM_TYPE[get_model_system_type(self.comp_ct)]
[docs] def get_fragname(self): """ """ if (constants.SystemType.ALCHEMICAL == get_model_system_type( self.comp_ct)): for ct in self.comp_ct: try: fragname = ct.property[constants.FEP_FRAGNAME] if (fragname != "none"): return fragname except KeyError: pass return None
[docs] def select_atom(self, asl): """ Evaluate ASL and meta-ASL. Inactive atoms are removed. Note that inactive atom removal does not work if they are spread across multiple component CTs. :rtype: `list` of `int` """ if (isinstance(asl, int)): asl = "atom.num " + str(asl) else: asl = asl.strip() if (asl == "FEP_REST_hot"): from . import r_group_asl as rga rga.set_hotregion(self.comp_ct) self.synchronize_fsys_ct() asl = f"atom.{constants.REST_HOTREGION} > 0" if (asl in Cms.META_ASL): asl = Cms.META_ASL[asl] if (asl[0].isdigit()): asl = "atom.num " + asl elif ("asl:" == asl[0:4].lower()): asl = asl[4:] aids = analyze.evaluate_asl(self.fsys_ct, asl) # FIXME: this simple approach fails when multiple component CTs contain # inactive atoms active_total = self.active_total if active_total < self.atom_total: aids = [i for i in aids if i <= active_total] return aids
[docs] def select_atom_comp(self, asl): """ Returns a list of lists. Each list element contains a list of atom indices of the corresponding component CT. """ if (asl == "FEP_REST_hot"): from . import r_group_asl as rga rga.set_hotregion(self.comp_ct) self.synchronize_fsys_ct() return self.select_atom_comp(f"atom.{constants.REST_HOTREGION} > 0") atom_list = self.select_atom(asl) atom_list_comp = [] atom_index_fix = {} accu_index = 0 num_comp_ct = len(self.comp_ct) for i in range(num_comp_ct): atom_list_comp.append([]) atom_index_fix[i] = accu_index accu_index += self.comp_ct[i].atom_total for i in atom_list: ct_index = self.fsys_ct.atom[i].property[constants.CT_INDEX] - 1 atom_list_comp[ct_index].append(i - atom_index_fix[ct_index]) return atom_list_comp
[docs] def get_charge(self, gids: List[int]) -> np.array: """ Given gids, return corresponding charge values """ return self._charge[gids]
[docs] def get_mass(self, gids: List[int]) -> np.array: """ Given gids, return corresponding mass values """ return self._mass[gids]
def _get_gids(self, to_gid: np.array) -> List[int]: """ Give explicit mapping for titration ct, to_gid, generate all gids of in all the component cts. to_gid[0] is unsused such that Maestro atom ids, starting from one, can be used directly for gid lookup. This function is meant to be called to extract all the gids of each individual titration state together with all the other components of the system, including FEP cts.. """ gids = [] titration_ct = self.get_titration_ct() for ct_index, ct in enumerate(self.comp_ct): ct_id_map = self.id_maps[ct_index] if ct is titration_ct: new_gids = to_gid elif ct_id_map.to_gid is not None: # FEP cts with explicit mapping new_gids = list(ct_id_map.to_gid[1:]) else: new_gids = list( range( ct_id_map.start_gid, ct_id_map.start_gid + ct.atom_total + ct.property[constants.FFIO_NUM_PSEUDOS])) gids.extend(new_gids) return gids @staticmethod def _set_pseudo_particle_properties(ct: 'ffiostructure.FFIOStructure', num_pseudos: int, parent2pseudo: List[List[int]]): """ Store num_pseudos in ct property FFIO_NUM_PSEUDOS and pseudo-paticles- parent connection inf ct property FFIO_PARENT2PSEUDO. Each element in parent2pseudo is a flattened list of particle ids, i.e. parent atom followed by pseudo particles attached to it. """ ct.property[constants.FFIO_NUM_PSEUDOS] = num_pseudos if num_pseudos > 0: ct.property[constants.FFIO_PARENT2PSEUDO] = base64.b64encode( json.dumps(parent2pseudo).encode()).decode()
[docs] def extract_titration_states(self): """ Generate Cms objects and particle ids for all titration states. The first Cms object is just self with titration properties removed in the titration ct. Then each of the Cms objects is generated by replacing the titration_ct with other titration state. Each Cms object is an independent titration state interacting with the rest of the system. :return: tuple of Cms and gid lists :rtype: (List[Cms], List[List]): """ def _delete_properties(ct): for prop_key in [ constants.FEP_TITRATION_STATES, constants.FEP_STATE_INFO, constants.FEP_AID2GID, constants.FEP_FRAGNAME ]: if prop_key in ct.property: del ct.property[prop_key] for prop_key in [ constants.FEP_TITRATABLE_GROUP, constants.FEP_MAPPING, constants.PPW_ATOM_INDEX, constants.FEP_TMP_IDX ]: ct.deletePropertyFromAllAtoms(prop_key) def _sync_cms(new_cms): new_cms._init_id_maps() new_cms._set_gid_map_titration_states() new_cms.synchronize_fsys_ct() titration_ct = self.get_titration_ct() new_states = None new_gids = None if titration_ct is not None: id_maps = json.loads( base64.b64decode( titration_ct.property[constants.FEP_AID2GID]).decode()) num_atoms = [n for n in id_maps[constants.IdConversion.ATOM_TOTAL]] num_particles = [ len(m) - 1 for m in id_maps[constants.IdConversion.COMPONENT_TO_COMBINED] ] ct_index = self.comp_ct.index(titration_ct) new_states = [] all_gids = [] state0 = self.copy() _delete_properties(state0.comp_ct[ct_index]) self._set_pseudo_particle_properties( state0.comp_ct[ct_index], num_particles[0] - num_atoms[0], id_maps[constants.IdConversion.PARENT2PSEUDO][0]) _sync_cms(state0) new_states.append(state0) all_gids.append( self._get_gids(list(self.id_maps[ct_index].to_gid[1:]))) for i, new_ct in enumerate(self.titration_states): new_cms = self.copy() state_index = i + 1 self._set_pseudo_particle_properties( new_ct, num_particles[state_index] - num_atoms[state_index], id_maps[constants.IdConversion.PARENT2PSEUDO][state_index]) _delete_properties(new_ct) new_cms.comp_ct[ct_index] = new_ct _sync_cms(new_cms) new_states.append(new_cms) all_gids.append( self._get_gids( list(self.id_maps[ct_index].titration_gid_maps[i][1:]))) return (new_states, all_gids)
[docs] def extract_with_comp_selection( self, selections: List[List[int]]) -> ('Cms', List[int]): """ Extract from given atom numbers in each component ct (selections). A new Cms object and the gids corresponding to all particles selected are returned. The caller needs to make sure the atoms in each selection include all atoms in the ffio_sites block. """ def _add_pseudo_to_selection(gid_sel: List[int], atom_list: List[int], gid_offset: int, parent_to_pseudo: Dict[int, List[int]]): """ Given selected atoms (atom_list), expand gid selection (gid_sel) with the pseudo particles attached with the mapping of parent atom to attached pseudo particles (parent_to_pseudo). """ for aid in atom_list: for gid in [ _id + gid_offset for _id in parent_to_pseudo[aid - 1] ]: if gid not in gid_sel: gid_sel.append(gid) def _delete_particles_from_comp_ct(ct: 'ffiostructure.FFIOStructure', atom_list: List[int], num_pseudos: int, parent_to_pseudo: Dict[int, List[int]]): """ Delete atoms and pseudo particles from from ct and update FFIO_NUM_PSEUDOS ct property if necessary. :param atom_list: atoms ids to keep. :param num_pseudos: number of pseudo particles in the original ct :param parent_to_pseudo: mapping parent atom to attached pseudo particles """ pseudo_offset = ct.atom_total - 1 deleted_atoms = set(range(1, ct.atom_total + 1)) - set(atom_list) ct.deleteAtoms(deleted_atoms) if num_pseudos > 0: deleted_pseudos = [] for parent, pseudos in parent_to_pseudo.items(): if parent + 1 in deleted_atoms: deleted_pseudos.extend( [p - pseudo_offset for p in pseudos]) for j in sorted(deleted_pseudos, reverse=True): del ct.ffio.pseudo[j] ct.property[constants.FFIO_NUM_PSEUDOS] = num_pseudos - len( deleted_pseudos) def _delete_fep_ct_properties(ct: 'ffiostructure.FFIOStructure', num_pseudos: int, parent2pseudo: List[List[int]]): """ Delete fepio, constants.FEP_FRAGNAME, constants.FEPIO_STAGE, constants.FEP_AID2GID and constants.FEP_ALCHEMICAL_CHANGE from ct and update pseudo particle properties. """ if has_fepio(ct): delete_fepio(ct) ct.fepio = None Cms._set_pseudo_particle_properties(ct, num_pseudos, parent2pseudo) for p in [ constants.FEP_FRAGNAME, constants.FEPIO_STAGE, constants.FEP_AID2GID, constants.FEP_ALCHEMICAL_CHANGE ]: ct.property.pop(p, None) gid_sel = [] new_comp_ct = [] atom_offset = 0 new_cms = copy.copy(self) fep_cts = self.get_fep_cts() has_fep_cts = None not in fep_cts if has_fep_cts: fep_ref_idx, fep_mut_idx = map(self.comp_ct.index, fep_cts) if selections[fep_ref_idx] and selections[fep_mut_idx]: raise RuntimeError( "Both fep states are selected for extraction.") ct_idx_map = [] all_num_pseudos = [] for i, (atom_list, ct) in enumerate(zip(selections, new_cms.comp_ct)): if atom_list: new_comp_ct.append(ct) ct_idx_map.append(i) gid_offset = self.id_maps[i].start_gid gid_sel.extend( [self.gid(atom_offset + aid) for aid in atom_list]) num_pseudos = ct.property.get(constants.FFIO_NUM_PSEUDOS, 0) if has_fep_cts and i in (fep_ref_idx, fep_mut_idx): num_pseudos = sum([ len(v) for k, v in self.id_maps[i].parent_to_pseudo.items() ]) all_num_pseudos.append(num_pseudos) # Handle the case where user is extracting a subset # of repeated molecules (i.e. solvent) if constants.NUM_COMPONENT in ct.property and len( atom_list) != ct.atom_total: _delete_particles_from_comp_ct( ct, atom_list, num_pseudos, self.id_maps[i].parent_to_pseudo) if num_pseudos > 0: _add_pseudo_to_selection(gid_sel, atom_list, gid_offset, self.id_maps[i].parent_to_pseudo) atom_offset += self.comp_ct[i].atom_total if has_fep_cts: id_maps = json.loads( base64.b64decode( fep_cts[0].property[constants.FEP_AID2GID]).decode()) for ct_idx, ct in enumerate(new_comp_ct): orig_ct_idx = ct_idx_map[ct_idx] if orig_ct_idx == fep_ref_idx: _delete_fep_ct_properties( ct, all_num_pseudos[ct_idx], id_maps[constants.IdConversion.PARENT2PSEUDO][0]) elif orig_ct_idx == fep_mut_idx: _delete_fep_ct_properties( ct, all_num_pseudos[ct_idx], id_maps[constants.IdConversion.PARENT2PSEUDO][1]) new_cms.fep_ct, new_cms.ref_ct, new_cms.mut_ct = [None] * 3 new_cms.comp_ct = new_comp_ct new_cms._init_id_maps() # selection does not include any ghost atoms new_cms.active_total = new_cms.comp_atom_total new_cms.synchronize_fsys_ct() new_cms.ffio_refresh() new_cms.gid_refresh() return new_cms, gid_sel
[docs] def extract_with_asl( self, asl: str, with_aid: Optional[bool] = False ) -> ('Cms', List[int], Optional[List[int]]): """ Extract from given ASL selection. A new Cms object and the gids corresponding to all particles selected are returned. Aids are returned as well, if requested. If the selection contains only partial molecules, it will be extended to entire molecules. The caller needs to make sure the ASL selects all atoms in the ffio block. """ asl = "fillmol(%s)" % asl sel = self.select_atom_comp(asl) if with_aid: offset = 0 sel_aid = [] for ct, comp_sel in zip(self.comp_ct, sel): sel_aid.extend([idx + offset for idx in comp_sel]) offset += ct.atom_total return self.extract_with_comp_selection(sel), sel_aid else: return self.extract_with_comp_selection(sel)
@property def has_velocities(self) -> bool: for a in self.atom: vx, vy, vz = ( a.property.get(v, 0) for v in constants.VELOCITY_PROPERTIES) if vx != 0.0 or vy != 0.0 or vz != 0.0: return True return False
[docs] def get_pos_vel(self) -> Tuple[np.array, Optional[np.array]]: """ Generate positions and velocities (if they exist) in the same layout as in trajectory. """ def _get_pseudo_pos(pseudo: 'ffiostructure._FFIOPseudo'): return [pseudo.x_coord, pseudo.y_coord, pseudo.z_coord] def _get_velocities(ct: 'structure.Structure', atoms: Optional[List[int]] = None, offset=0) -> np.array: if atoms is None: atoms = range(ct.atom_total) offset = 1 return np.array([[ ct.atom[aid + offset].property.get(v, 0.0) for v in constants.VELOCITY_PROPERTIES ] for aid in atoms]) def _append_pos_vel(pos: np.array, vel: Optional[np.array], more_pos: np.array, more_vel: Optional[np.array], has_vel: bool): pos = np.append(pos, more_pos, axis=0) if has_vel: if more_vel is None: more_vel = np.zeros(shape=[len(more_pos), 3]) vel = np.append(vel, more_vel, axis=0) return pos, vel def _get_additional_pos_vel(ct: 'structure.Structure', start_gid: int, num_pseudo: int, gid_maps: List[np.array], states: List['ffiostructure.FFIOStructure'], has_vel: bool): """ Particle layout for merged titration/fep ct: - atoms from reference state - dummy atoms from all other states - pseudo particles from the reference state - dummy pseudo particles from all other states This function packs the coordinates and velocities for all particles added to the atoms in the reference state. """ pos, vel = _create_empty_pos_vel(has_vel) for m, st in zip(gid_maps, states): # dummy atoms have gid greater than or equal to the reference- # structure-atom number dummy_atoms = list([ idx for (idx, gid) in enumerate(m[1:st.atom_total + 1]) if gid - start_gid >= ct.atom_total ]) dummy_pos = st.getXYZ()[dummy_atoms] # dummy_atoms are array indeces starting from zero. # They need to be offset by one for aid used in _get_velocities. dummy_vel = None if has_vel is None else _get_velocities( st, dummy_atoms, 1) pos, vel = _append_pos_vel(pos, vel, dummy_pos, dummy_vel, has_vel) if num_pseudo > 0: pseudo_pos = [] for p in range(1, num_pseudo + 1): pseudo_pos.append(_get_pseudo_pos(ct.ffio.pseudo[p])) pos, vel = _append_pos_vel(pos, vel, np.array(pseudo_pos), None, has_vel) pseudo_dummy_start = len(pos) + ct.atom_total for m, st in zip(gid_maps, states): pseudo_dummy_pos = [ _get_pseudo_pos(st.ffio.pseudo[pid]) for (pid, gid) in enumerate(m[st.atom_total + 1:], 1) if gid - start_gid >= pseudo_dummy_start ] if pseudo_dummy_pos: pos, vel = _append_pos_vel(pos, vel, np.array(pseudo_dummy_pos), None, has_vel) return pos, vel if has_vel else None def _create_empty_pos_vel( has_vel: bool) -> Tuple[np.array, Optional[np.array]]: pos = np.empty(shape=[0, 3]) vel = np.empty(shape=[0, 3]) if has_vel else None return pos, vel has_vel = self.has_velocities pos, vel = _create_empty_pos_vel(has_vel) fep_ref, fep_mut = self.get_fep_cts() titration_ct = self.get_titration_ct() for i, ct in enumerate(self.comp_ct): id_map = self.id_maps[i] num_pseudos = ct.property.get(constants.FFIO_NUM_PSEUDOS) # fep_mut is dealt with in fep_ref, no need to add # atom pos/vel for it if ct is not fep_mut: ct_vel = None if has_vel is None else _get_velocities(ct) pos, vel = _append_pos_vel(pos, vel, ct.getXYZ(), ct_vel, has_vel) if ct is titration_ct: more_pos, more_vel = _get_additional_pos_vel( ct, id_map.start_gid, num_pseudos, id_map.titration_gid_maps, self.titration_states, has_vel) pos, vel = _append_pos_vel(pos, vel, more_pos, more_vel, has_vel) elif ct is fep_ref: fep_mut_idx = self.comp_ct.index(fep_mut) more_pos, more_vel = _get_additional_pos_vel( ct, id_map.start_gid, num_pseudos, [self.id_maps[fep_mut_idx].to_gid], [fep_mut], has_vel) pos, vel = _append_pos_vel(pos, vel, more_pos, more_vel, has_vel) elif ct is fep_mut: # nothing to do because all fep particles are dealt # with in fep_ref continue else: if num_pseudos > 0: all_pseudo_pos = [] for parent in sorted(id_map.parent_to_pseudo.keys()): for pseudo in id_map.parent_to_pseudo[parent]: all_pseudo_pos.append( _get_pseudo_pos( ct.ffio.pseudo[pseudo - ct.atom_total + 1])) pos, vel = _append_pos_vel(pos, vel, np.array(all_pseudo_pos), None, has_vel) return pos, vel
[docs] def get_restrain(self) -> ModelRestrainList: """ Create a portable list of restraints representing the ffio restraints for each CT. The output is a list of `RestrainGroupContainer`s ( essentially a dict), indexed by each of the `RestraintTypes`, with `RestrainGroup` subclasses as values. These values represent all the restrains of the same type for a given CT. """ restr = ModelRestrainList() for (ct_idx, ct) in enumerate(self.comp_ct, 1): ct_restr = RestrainGroupContainer() def add_array_if_present(restrain_type, ffio_subblock, make_tuple, array_cls): restrain_tuples = [] for e in ffio_subblock: restrain_tuples.append(make_tuple(e)) if restrain_tuples: ct_restr[restrain_type] = array_cls(restrain_tuples) def make_pos_tuple(e): return ((ct_idx, e.ai), (e.c1, e.c2, e.c3), (e.t1, e.t2, e.t3)) def make_fbhw_pos_tuple(e): return ((ct_idx, e.ai), e.fc, (e.x0, e.y0, e.z0), e.sigma) add_array_if_present(RestrainTypes.POS, ct.ffio.restraint, make_pos_tuple, PosRestrainArray) add_array_if_present(RestrainTypes.POS_FBHW, ct.ffio.posfbhw, make_fbhw_pos_tuple, FBHWPosRestrainArray) def add_if_present(restrain_type, ffio_subblock, make_restrain): if len(ffio_subblock): restrain_group = DictRestrainGroup() for e in ffio_subblock: restrain_group.add(make_restrain(e)) ct_restr[restrain_type] = restrain_group def make_stretchfbhw(e): atoms1 = list(zip(repeat(ct_idx), map(int, e.group1.split()))) atoms2 = list(zip(repeat(ct_idx), map(int, e.group2.split()))) # Negative value indicates it was not set in the cms if e.sigma > 9.9E19 or e.sigma < 0: # 'normal' stretch if e.beta <= 0: # ref input as single value sigma = (e.upper - e.lower) / 2 ref = (e.upper + e.lower) / 2 return Restrain([atoms1, atoms2], e.fc, ref, sigma) else: # ref input as list of 3 values return Restrain([atoms1, atoms2], e.fc, [e.lower, e.upper, e.beta], 0) else: # NOE return Restrain([atoms1, atoms2], e.fc, [e.lower, e.upper], [e.sigma, e.beta]) def make_anglefbhw(e): return Restrain(list(zip(repeat(ct_idx), (e.ai, e.aj, e.ak))), e.fc, e.theta0, e.sigma) def make_improperfbhw(e): return Restrain( list(zip(repeat(ct_idx), (e.ai, e.aj, e.ak, e.al))), e.fc, e.phi0, e.sigma) add_if_present(RestrainTypes.STRETCH_FBHW, ct.ffio.stretchfbhw, make_stretchfbhw) add_if_present(RestrainTypes.ANGLE_FBHW, ct.ffio.anglefbhw, make_anglefbhw) add_if_present(RestrainTypes.IMPROPER_FBHW, ct.ffio.improperfbhw, make_improperfbhw) restr.append(ct_restr) return restr
def _ct_set_restrain(self, ct, ct_restrain: RestrainGroupContainer): # Sets restraints. for key, val in ct_restrain.items(): if key == RestrainTypes.POS: # Harmonic position restraint for e in val.values(): if (e['k'] == 0).all(): continue a = ct.ffio.addRestraint() (_, atom_idx), k, ref = e a.ai = atom_idx a.c1, a.c2, a.c3 = k a.t1, a.t2, a.t3 = ref a.funct = "harm" elif key == RestrainTypes.POS_FBHW: for e in val.values(): # position fbhw a = ct.ffio.addPosfbhw() (_, atom_idx), k, ref, sigma = e if k == 0: continue a.ai = atom_idx a.fc = k a.x0, a.y0, a.z0 = ref a.sigma = sigma elif key == RestrainTypes.STRETCH_FBHW: for e in val.values(): if e.k == 0: continue if isinstance(e.ref, list) and len(e.ref) == 2: # NOE a = ct.ffio.addStretchfbhw() a.fc = e.k a.lower, a.upper = e.ref a.sigma, a.beta = e.sigma #(" ".join(map(str, atom)) for atom in e.atom_idx) a.group1, a.group2 = (" ".join( map(str, (at[1] for at in atom))) for atom in e.atom_idx) else: # stretch fbhw a = ct.ffio.addStretchfbhw() a.fc = e.k a.sigma = 1E20 if isinstance(e.ref, list): a.lower, a.upper, a.beta = e.ref else: a.lower = e.ref - e.sigma a.upper = e.ref + e.sigma a.beta = 0 if isinstance(e.atom_idx[0], str): a.group1 = e.atom_idx[0] else: a.group1 = " ".join( map(str, (at[1] for at in e.atom_idx[0]))) if isinstance(e.atom_idx[1], str): a.group2 = e.atom_idx[1] else: a.group2 = " ".join( map(str, (at[1] for at in e.atom_idx[1]))) elif key == RestrainTypes.ANGLE_FBHW: for e in val.values(): if e.k == 0: continue # angle fbhw a = ct.ffio.addAnglefbhw() (_, a.ai), (_, a.aj), (_, a.ak) = e.atom_idx a.fc = e.k a.theta0 = e.ref a.sigma = e.sigma elif key == RestrainTypes.IMPROPER_FBHW: for e in val.values(): if e.k == 0: continue # torsion fbhw a = ct.ffio.addImproperfbhw() (_, a.ai), (_, a.aj), (_, a.ak), (_, a.al) = e.atom_idx a.fc = e.k a.phi0 = e.ref a.sigma = e.sigma
[docs] def set_restrain(self, restrain_list: List[Union[RestrainGroupContainer, List[Restrain]]]): """ `restrain_list` must be a list. Its members should be RestrainGroupContainers or for compatibility, lists of Restrain objects. """ self.clear_restrain() for ct_restr, ct in zip(restrain_list, self.comp_ct): if isinstance(ct_restr, list): raise ValueError("unsupported/obsolete restraints format") else: self._ct_set_restrain(ct, ct_restr)
[docs] def clear_restrain(self): """ Deletes all existing restraints. """ for ct in self.comp_ct: ct.ffio.deleteRestraints(list(range(1, len(ct.ffio.restraint) + 1))) ct.ffio.deletePosfbhws(list(range(1, len(ct.ffio.posfbhw) + 1))) ct.ffio.deleteStretchfbhws( list(range(1, len(ct.ffio.stretchfbhw) + 1))) ct.ffio.deleteAnglefbhws(list(range(1, len(ct.ffio.anglefbhw) + 1))) ct.ffio.deleteImproperfbhws( list(range(1, len(ct.ffio.improperfbhw) + 1)))
[docs] def get_atom_group(self): """ """ num_comp_ct = len(self.comp_ct) grp_name = set() for ct in self.comp_ct: for name in ct.atom[1].property: if (-1 < name.find(self.ATOMGROUP_PREFIX)): grp_name.add(name) grp_name = list(grp_name) sel_atom = [] for e in range(len(grp_name)): sel_atom.append({}) for sa, gn in zip(sel_atom, grp_name): for i, ct in enumerate(self.comp_ct): if (gn in ct.atom[1].property): for atom in ct.atom: try: index = atom.property[gn] except KeyError: index = 0 if (index > 0): if (index not in sa): sa[index] = [] for j in range(num_comp_ct): sa[index].append([]) sa[index][i].append(int(atom)) atom_grp = [] i_prefix = len(self.ATOMGROUP_PREFIX) for gn, sa in zip(grp_name, sel_atom): for index in sa: atom_grp.append( AtomGroup(atom=sa[index], name=gn[i_prefix:], index=index)) return atom_grp
[docs] def set_atom_group(self, atom_group): """ """ self.delete_all_atom_group() atom_group_list = [ atom_group, ] if (isinstance(atom_group, AtomGroup)) else (atom_group if (atom_group) else []) ugn = set() # unique group name for atom_group in atom_group_list: gn = self.ATOMGROUP_PREFIX + atom_group.name ugn.add(gn) ugn = list(ugn) for ct in self.comp_ct: for gn in ugn: mm.mmct_atom_property_set_int(ct.handle, gn, mm.MMCT_ATOMS_ALL, 0) for atom_group in atom_group_list: gn = self.ATOMGROUP_PREFIX + atom_group.name for i, ct in enumerate(self.comp_ct): al = atom_group.atom[i] for i_atom in al: ct.atom[i_atom].property[gn] = atom_group.index
[docs] def merge_atom_group(self, atom_group): """ """ atom_group_list = [ atom_group, ] if (isinstance(atom_group, AtomGroup)) else atom_group for atom_group in atom_group_list: gn = self.ATOMGROUP_PREFIX + atom_group.name for i, ct in enumerate(self.comp_ct): if (gn not in ct.atom[1].property): mm.mmct_atom_property_set_int(ct.handle, gn, mm.MMCT_ATOMS_ALL, 0) al = atom_group.atom[i] for i_atom in al: ct.atom[i_atom].property[gn] = atom_group.index
[docs] def set_atom_group_from_asl(self, asl, group_name, group_index): """ """ self.set_atom_group( AtomGroup(self.select_atom_comp(asl), group_name, group_index))
[docs] def merge_atom_group_from_asl(self, asl, group_name, group_index): """ """ self.merge_atom_group( AtomGroup(self.select_atom_comp(asl), group_name, group_index))
[docs] def delete_atom_group(self, group_name): """ """ name = self.ATOMGROUP_PREFIX + group_name for ct in self.comp_ct: if (name in ct.atom[1].property): ct.deletePropertyFromAllAtoms(name)
[docs] def delete_all_atom_group(self, exception=[]): # noqa: M511 """ """ tmp = [exception] if (isinstance(exception, str)) else exception exception = [] for e in tmp: exception.append(self.ATOMGROUP_PREFIX + e) for ct in self.comp_ct: for name in list(ct.atom[1].property): if (name not in exception and -1 < name.find(self.ATOMGROUP_PREFIX)): ct.deletePropertyFromAllAtoms(name)
[docs] def get_vdw(self): """ Returns the Vdw parameters for all atoms. The returned object is a list. Each element of the returned list is a Vdw object for the corresponding atom. """ vdw = [ None, ] for ct in self.comp_ct: ct_vdw = [] vdw_type = {} for e in ct.ffio.vdwtype: vdw_type[e.name] = Vdw((e.name,), e.funct, ( e.c1, e.c2, )) for e in ct.ffio.site: ct_vdw.append(vdw_type[e.vdwtype]) ct_vdw *= old_div(ct.atom_total, len(ct.ffio.site)) vdw.extend(ct_vdw) return vdw
[docs] def get_constraint(self): """ """ constr = [] for ct in self.comp_ct: ct_constr = [] for e in ct.ffio.constraint: ct_constr.append( Constraint(e.funct, e.ai, e.aj, e.ak, e.al, e.am, e.c1, e.c2, e.c3, e.c4, e.c5, e.c6)) constr.append(ct_constr) return constr
[docs] def get_num_constraint(self): """ """ constr = self.get_constraint() num_constraint = 0 for cb, n in zip(constr, self._unroll): num_constraint_ = 0 for c in cb: num_constraint_ += c.num_constraint() num_constraint += num_constraint_ * n return num_constraint
[docs] def set_nactive_gids(self, nactive_gids, ntotal_gids): """ Given a number of active gids, set the number of physical atoms that are active in the fullsystem and solvent CTs, ie `self.active_total`. Also stores the `nactive_gids` so that it will be written to disk and can be used by msys models. If `nactive_gids` == `ntotal_gids`, this will set `self.active_total` to `self.atom_total`, thereby removing the underlying properties from the fullsystem and solvent CTs. :param nactive_gids: the number of active gids :type nactive_gids: int :param ntotal_gids: the total number of atoms (ie gids) in the frame :type ntotal_gids: int """ # Note - if the frame object carries the active atom counts, we can # simplify the logic below - DESMOND-8369 # TODO if we make the cms model aware of its pseudoatoms natively, we # can do this a lot easier, and not require passing ntotal_gids. if nactive_gids == ntotal_gids: self.active_total = self.comp_atom_total return # set the data underlying the `self.nactive_gids` property self.property[NACTIVE_GIDS] = nactive_gids self._raw_fsys_ct.property[NACTIVE_GIDS] = nactive_gids self.active_total = self.active_total_from_nactive_gids( nactive_gids, ntotal_gids)
[docs] def active_total_from_nactive_gids(self, nactive_gids, ntotal_gids): if nactive_gids == ntotal_gids: # no inactive atoms # Ideally, self.active_total, self.atom_total and self.comp_atom_total # should all be equal in this case. However, the first two may be # out of sync. Thus it's safer to return the component atom total. return self.comp_atom_total solvent_cts = self.get_solvent_cts() if len(solvent_cts) != 1: raise NotImplementedError("Only one solvent is currently " "supported") solvent_ct = solvent_cts[0] if solvent_ct is not self.comp_ct[-1]: raise ValueError("Solvent ct must be the last component ct to " "change the number of active waters with this " "method.") prev_total = self.comp_atom_total - solvent_ct.atom_total # the solvent gids should be sorted solvent_aid_gids = self.allaid_gids[prev_total:] # we need nactive_gids - nactive_solvent_pseudos, but we can write # instead as n_total_gids - n_solvent_pseudos - n_real_inactive to # avoid needing to consider the previous pseudo blocks directly sites = solvent_ct.ffio.site n_sites = len(sites) n_real_sites = len( [s for s in sites if s.property['s_ffio_type'] != 'pseudo']) n_pseudos = len(solvent_ct.ffio.pseudo) # all inactive atoms, physical and pseudo n_inactive = ntotal_gids - nactive_gids n_real_inactive = n_inactive * n_real_sites // n_sites boundary = ntotal_gids - n_pseudos - n_real_inactive # since the solvent tail is sorted, we can use searchsorted solvent_active_total = np.searchsorted(solvent_aid_gids, boundary) return prev_total + solvent_active_total
@property def nactive_gids(self): return self.property.get(NACTIVE_GIDS, -1) @property def is_for_gcmc(self): return self.active_total != self.comp_atom_total @property def active_total(self): """ Get the number of active physical atoms :return: The number of active physical atoms :rtype: `int` """ # we need to get comp_atom_total because this is used during # synchronization and thus atom_total may be out of date return self.property.get(ACTIVE_TOTAL, self.comp_atom_total) @active_total.setter def active_total(self, val): """ Set the number of active physical atoms by updating the underlying properties for the fullsystem and solvent CTs. If the value is equal to the total number of atoms, the underlying CT properties are removed to return the file to its original state. :param val: The number of active physical atoms :type val: `int` """ needs_sync = not self._show_inactive and val != self.active_total atom_total_with_inactive = self.comp_atom_total if val > atom_total_with_inactive: raise ValueError("cannot set number of active atoms to be greater " "than total number of atoms!") solvent_cts = self.get_solvent_cts() if val < atom_total_with_inactive: self.property[ACTIVE_TOTAL] = val self._raw_fsys_ct.property[ACTIVE_TOTAL] = val # for now, single solvent is assumed if len(solvent_cts) != 1: raise NotImplementedError("Only one solvent is currently " "supported") solvent_ct = solvent_cts[0] num_inactive = atom_total_with_inactive - val solvent_ct.property[ ACTIVE_TOTAL] = solvent_ct.atom_total - num_inactive else: # this is already multiple solvent-capable for ct in [self.fsys_ct, self._raw_fsys_ct] + solvent_cts: if ACTIVE_TOTAL in ct.property: del ct.property[ACTIVE_TOTAL] if needs_sync: self.synchronize_fsys_ct()
[docs] def get_degrees_of_freedom(self): """ """ # FIXME: This simple formula is incorrect for relative FEP systems. # Note: 1. This formula assumes that there is no frozen atoms (usually that's the case). # 2. It also assumes that translational motion of the whole system is constrained (PBC system). return self.fsys_ct.atom_total * 3 - self.get_num_constraint() - 3
[docs] def get_ff(self) -> str: """ Get the canonical force field name of the model. :return: Canonical force field name or empty string, if unknown """ for ct in self.comp_ct: # See for example MATSCI-3444 name = ct.ffio.name.replace('_REASSIGN', '') try: version = mm.opls_name_to_version(name) except IndexError: # Raises for example for 'LipidFF' # Some components can be unknown, we are looking for the known ones continue return mm.opls_version_to_name(version) return ''
@staticmethod def _is_solvent_ct(ct: 'ffiostructure.FFIOStructure') -> bool: """ Return true if ct is solvent, False otherwise. """ return ct.property[constants.CT_TYPE] == constants.CT_TYPE.VAL.SOLVENT
[docs] def move_solvent_cts_back(self): """ Rearrange components so that solvent components are last. """ # Warning! Rearranging components might break restraints (!!!) self.comp_ct.sort(key=self._is_solvent_ct) # in place sort self.synchronize_fsys_ct()
[docs] def get_cts(self): """ Return system and component cts. :return: raw full system, full system, and component ct :rtype: list """ return [self._raw_fsys_ct, self.fsys_ct] + self.comp_ct
[docs] def set_cts_property(self, propname: str, value: Any): """ Set a property on all cts :param propname: The property to set :param value: The property's value """ for struct in self.get_cts(): struct.property[propname] = value
[docs] def remove_cts_property(self, propname: str): """ Remove a property from all cts :param propname: The property to remove """ for struct in self.get_cts(): struct.property.pop(propname, None)
[docs] def fix_filenames(self, cms_fname=None, trj_fname=None): def set_ct_prop(ct, prop, prop_val): if prop_val: ct.property[prop] = prop_val elif prop in ct.property: del ct.property[prop] cms_fname = cms_fname and os.path.basename(cms_fname) trj_fname = trj_fname and os.path.basename(trj_fname) for ct in self.comp_ct + [self._raw_fsys_ct, self.fsys_ct]: set_ct_prop(ct, self.PROP_CMS, cms_fname) set_ct_prop(ct, self.PROP_TRJ, trj_fname)
[docs] def write(self, fname): """ """ fname = fname and str(fname) self._raw_fsys_ct.write(fname, "maestro") for ct in self.comp_ct: ct.append(fname, "CMS")
[docs] def write_to_string(self): """ """ s = ffiostructure.write_cms_to_string(self._raw_fsys_ct) for ct in self.comp_ct: s += ffiostructure.write_cms_to_string(ct) return s
def _get_gluepoints_impl_old(model: Cms, cutoff: float) -> Set[Tuple[int]]: """ Find GIDs of the solute atoms' glue points. A grid-based implementation. """ # Besides itself, half of its 26 nearest neighbors are examined. cells_to_exam = np.array([ (0, 0, 0), (1, 0, 0), (1, 1, 0), (0, 1, 0), (-1, 1, 0), (-1, -1, 1), (-1, 0, 1), (-1, 1, 1), (0, -1, 1), (0, 0, 1), (0, 1, 1), (1, -1, 1), (1, 0, 1), (1, 1, 1), ]) # Finds the extremes of the space occupied by the solute molecules. # Don't use 'solute' since they may include ions and crystal waters. aids = model.select_atom(f"{model.META_ASL['solute']} " "and (protein or ligand or nucleic_acids)") if not aids: return [] pos = model.getXYZ(copy=False)[[i - 1 for i in aids]] indices = ((pos - np.min(pos, axis=0)) / cutoff).astype(int) nx, ny, nz = np.max(indices, axis=0) + 1 # Each grid point/cell is a dict of molecule number to atoms grid = [[[defaultdict(list) for i in repeat(None, nz)] for j in repeat(None, ny)] for k in repeat(None, nx)] # Bins the solute atoms atom = model.atom for (i, j, k), aid in zip(indices, aids): a = atom[aid] grid[i][j][k][a.molecule_number].append(a) # Loops through the grid to find close atoms between different solute molecules. close_atom = {} cutoff_sq = cutoff**2 def exam(c0, c1): """ Check pair-wise distances for atoms on different molecules in cell c0 and c1. """ mol_num_iter = combinations(c0, 2) if c0 is c1 else product(c0, c1) for mol_n0, mol_n1 in mol_num_iter: if mol_n0 == mol_n1: continue for a0, a1 in product(c0[mol_n0], c1[mol_n1]): d = (a0.x - a1.x)**2 + (a0.y - a1.y)**2 + (a0.z - a1.z)**2 if d > cutoff_sq: continue if mol_n0 < mol_n1: mol_pair = (mol_n0, mol_n1) atom_pair = (a0.index, a1.index, d) else: mol_pair = (mol_n1, mol_n0) atom_pair = (a1.index, a0.index, d) if mol_pair not in close_atom or d < close_atom[mol_pair][2]: close_atom[mol_pair] = atom_pair for i, j, k in product(range(nx), range(ny), range(nz)): cell = grid[i][j][k] for other_cell in cells_to_exam: m, n, q = other_cell + (i, j, k) if m < 0 or n < 0 or q < 0 or m == nx or n == ny or q == nz: continue exam(cell, grid[m][n][q]) gluepoints = set() for i_atom, j_atom, dr in close_atom.values(): # need to use `topo.read_cms()` to load the Cms object i = int(model.gid(i_atom)) j = int(model.gid(j_atom)) # skip the pair with same gid, this usually happens in FEP simulation if i != j: # different AID pairs may become the same GID pair gluepoints.add((i, j)) return gluepoints def _get_gluepoints_impl(model, cutoff): """ A PBC-aware implementation. WARNING: This algorithm is VERY slow. """ # FIXME: analysis imports this module, here it imports analysis # also this function is not used right now from schrodinger.application.desmond.packages.analysis import Pbc cutoff2 = cutoff * cutoff solute_atom = [] for a in model.atom: if (a.property[constants.CT_TYPE] == constants.CT_TYPE.VAL.SOLUTE): solute_atom.append(a) pbc = Pbc(get_box(model)) close_atom = {} for i, a in enumerate(solute_atom[:-1]): a_mol = a.molecule_number for b in solute_atom[i + 1:]: b_mol = b.molecule_number if (a_mol == b_mol): continue v = pbc.calcMinimalDifference(a.xyz, b.xyz) dr2 = v[0] * v[0] + v[1] * v[1] + v[2] * v[2] if (dr2 < cutoff2): if (a_mol < b_mol): mol_pair = ( a_mol, b_mol, ) atom_pair = ( a, b, ) else: mol_pair = ( b_mol, a_mol, ) atom_pair = ( b, a, ) if (mol_pair in close_atom): if (dr2 < close_atom[mol_pair][2]): close_atom[mol_pair] = atom_pair + (dr2,) else: close_atom[mol_pair] = atom_pair + (dr2,) close_atom = list(close_atom.values()) gluepoints = [] for i_atom, j_atom, dr in close_atom: # must convert to int or will raise error in glue c call i = int(model.gid(int(i_atom))) j = int(model.gid(int(j_atom))) # skip the pair with same gid, this usually happens in FEP simulation if i != j: gluepoints.append(( i, j, )) return gluepoints
[docs]def get_gluepoints(model, cutoff=3.0): """ """ return _get_gluepoints_impl_old(model, cutoff)
[docs]def find_prev_residue(ct, residue): """ """ for res in ct.residue: if (res.isConnectedToResidue(residue)): return res
[docs]def find_next_residue(ct, residue): """ """ for res in ct.residue: if (residue.isConnectedToResidue(res)): return res
[docs]def gen_alpha_helix_restraint(model, helix_asl, fc, sigma, ref): """ Given a model ('Cms' object) and an optional 'helix_asl' expression, returns a string specifying the restraint settings. """ print("Generating relative restraints on alpha helix...") # This code was originally by Dima and later was rewritten by YW. # Rewritten to make this function work better with other code and make the code clearer and simpler. k_hbond = fc[0] k_phi = fc[1] k_psi = fc[2] sigma_hbond = sigma[0] sigma_phi = sigma[1] sigma_psi = sigma[2] ref_hbond = ref[0] ref_phi = ref[1] ref_psi = ref[2] ret = "" calpha = model.select_atom('asl:(%s) and (atom.ptype " CA ")' % helix_asl[4:]) if (len(calpha) == 0): raise RuntimeError("No residue found for alpha helix") print(" Indices of CA atom in the to-be-restrained peptide:", calpha) # Generates hydrogen bond restraints. for i in calpha[:-4]: acceptor_res = model.atom[i].getResidue() donor_res = model.atom[i + 4].getResidue() donor_resname = donor_res.pdbres if (donor_resname in [ "PRO ", "NMA ", "ACE ", ]): continue # Finds donor hydrogen atom. donor_atom = None for a in donor_res.atom: if (a.pdbname == " H "): donor_atom = a break else: continue # Finds acceptor oxygen atom. acceptor_atom = None for a in acceptor_res.atom: if (a.pdbname == " O "): acceptor_atom = a break else: continue if (ref_hbond is None): dist = measure.measure_distance(donor_atom, acceptor_atom) if (dist <= 3.7): ret += "{atom = [%d %d] fc = %s sigma = %s}\n" % \ (int(donor_atom), int(acceptor_atom), k_hbond, sigma_hbond,) else: ret += "{atom = [%d %d] fc = %s sigma = %s ref = %s}\n" % \ (int(donor_atom), int(acceptor_atom), k_hbond, sigma_hbond, ref_hbond,) # Generates dihedral restraints. for i in calpha: # Finds atoms for phi and psi torsions. Ca_0, C_0, N_1, Ca_1, C_1, N_2, Ca_2 = None, None, None, model.atom[ i], None, None, None curr_residue = Ca_1.getResidue() prev_residue = find_prev_residue(model, curr_residue) next_residue = find_next_residue(model, curr_residue) for a in curr_residue.atom: if (a.pdbname == " N "): N_1 = a elif (a.pdbname == " C "): C_1 = a if (N_1 is None or C_1 is None): continue if (prev_residue): for a in prev_residue.atom: if (a.pdbname == " C "): C_0 = a elif (a.pdbname == " CA "): Ca_0 = a if (C_0 is not None and Ca_0 is not None and int(Ca_0) in calpha): # Now we find C_0, N_1, Ca_1, and C_1 for phi. ret += '{atom = [%d %d %d %d] fc = %f sigma = %s ref = %s}\n' % \ (int(C_0), int(N_1), int(Ca_1), int(C_1), k_phi, sigma_phi, ref_phi,) if (next_residue): for a in next_residue.atom: if (a.pdbname == " N "): N_2 = a elif (a.pdbname == " CA "): Ca_2 = a if (N_2 is not None and Ca_2 is not None and int(Ca_2) in calpha): # Now we find N_1, Ca_1, C_1, and N_2 for psi. ret += '{atom = [%d %d %d %d] fc = %f sigma = %s ref = %s}\n' % \ (int(N_1), int(Ca_1), int(C_1), int(N_2), k_psi, sigma_psi, ref_psi,) print(" Relative restraints:") print(ret) return ret
if (__name__ == "__main__"): def test_get_gluepoints(): model = Cms(file="benzene_and_methane_glue_pbc.cms") # print len( _get_gluepoints_impl( model ) ) print(get_gluepoints(model)) model = Cms(file="benzene_and_methane.cms") print(get_gluepoints(model)) def test_get_num_constraint(): model = Cms(file="two.cms") print(model.get_num_constraint()) # It should give 19605. def test_clear_restrain(): model = Cms(file="a.cms") model.clear_restrain() model.write("a-out.cms") def test_gen_alpha_helix_restraint(): model = Cms(file="test.cms") restr = gen_alpha_helix_restraint(model) print(restr) def test_get_degrees_of_freedom(): model = Cms(file="a.cms") print("total number of atoms =", model.atom_total) print("number of constraint =", model.get_num_constraint()) print("degrees of freedom =", model.get_degrees_of_freedom()) test_get_degrees_of_freedom()