Source code for schrodinger.application.desmond.packages.traj

"""
Molecular dynamics trajectory handling module
Define common APIs for various trajectory formats.
Basic trajectory handling algorithms are also implemented here.

Copyright Schrodinger, LLC. All rights reserved.
"""

import contextlib
import copy
import enum
import os
import sys
from collections import OrderedDict
from collections.abc import Iterable
from itertools import chain
from typing import List
from typing import Optional

import numpy

from schrodinger.application.desmond.packages.msys import molfile
from schrodinger.structure import StructureReader

# Design Philosophy:
#
# - Extremely lightweight
#
#   No one wants a big, slow, deeply hierarchical, and often overly-complicated
#   framework to work on. So, try to be lean and be a thin layer on top of the
#   infrastructure, try to provide functionalities orthogonal to each other.
#
# - No trajectory container class!
#
#   And no confusing methods, no little bugs everywhere. Here, a trajectory is
#   a *stupid* list of frames. Do mixing, slicing, splitting, recordering,
#   skipping, trunctating, concatenating, ..., just as you do it with a Python
#   C{list}.
#
# - Do one thing and do it well
#
#   No big superman functions or classes. Have multiple fundamental functions
#   or classes with focused and specific functional goals. Compose them to
#   achieve more advanced functionalities.
#
# - Format agonistic
#
#   Potentially support various formats and mixed format trajectories. Native
#   supports for the XTC format (which is a nice lossy-compression trajectory
#   format from GROMACS) has been in mind.
#
# - Minimum processor and memory overhead
#
#   Don't make expensive copies of frame data until it's absolutely necessary.


[docs]class Fmt(enum.Enum): """ Trajectory format enum """ AUTO = 0 DTR = 1 XTC = 2
_FMT_EXTNAME = {Fmt.DTR: "_trj", Fmt.XTC: ".xtc"}
[docs]def get_fmt(fname, format=Fmt.AUTO): """ Returns the format based on the given trajectory file name (`fname`) and the format specification (`format`). If the format specification is definite (i.e., not `Fmt.AUTO`), this function will trivially return the value of `format`, otherwise it will check the `fname`'s value, determining the format, and return the result. :type fname: `str` :param fname: Trajectory file name :type format: `Fmt` :param format: Trajectory format specification :rtype : `Fmt` :return: The format of the given trajectory `fname` """ if format == Fmt.AUTO: extname = fname.rstrip(os.sep)[-4:] if extname == "_trj": return Fmt.DTR if extname == ".xtc": return Fmt.XTC raise ValueError("Unrecognized trajectory format: %s" % extname) return format
[docs]def get_trajectory_extname(format=Fmt.AUTO, ref_fname=None): if format == Fmt.AUTO: assert isinstance(ref_fname, str) format = get_fmt(ref_fname) return _FMT_EXTNAME[format]
# Enumerates the extension names of all supported trajectory formats. STANDARD_TRAJECTORY_EXTNAMES = tuple( get_trajectory_extname(e) for e in Fmt if e != Fmt.AUTO) def _sizeof(a) -> int: """ :type a: Any type. If `a` is a `numpy.ndarray`, `a.nbytes` will be returned, otherwise `sys.getsizeof(a)` will be returned. :return: Used memory by `a` in bytes """ return a.nbytes if isinstance(a, numpy.ndarray) else sys.getsizeof(a) def _estimate_frame_memory_usage(frame) -> int: """ Estimate the memory space usage of the given frame. :return: Memory in bytes as used by `frame` """ size = _sizeof(frame) try: size += _sizeof(frame.pos()) except AttributeError: pass except TypeError: # `frame` has a `pos` property (as is the case of `molfil.Frame`). size += _sizeof(frame.pos) try: size += _sizeof(frame.vel()) except AttributeError: pass except TypeError: # `frame` has a `vel` property (as is the case of `molfil.Frame`). size += _sizeof(frame.vel) return size
[docs]class Source(object): CACHE_LIMIT_BYTES = 1e9 # 1GB
[docs] def __init__(self, reader, name="sometraj"): """ `reader` specifies where to retrieve a trajectory frame when it's needed but currently NOT in the memory (in other words, it's not cached by this source). A reader could potentially be a file reader, a string reader, a socket reader, a database dealer, etc., as long as it satisfies the duck-typing requirements (see below). And interestingly, the probably simplest reader is another ``trajectory`` (i.e., a list of `Frame` objects) in memory. Duck typing for ``readers``. - Random access `reader[index]` should return the `index`-th frame. Here `index` is zero-based, of Python style (i.e., -1 means the last frame, etc.). - `len` support `len(reader)` should return the total number of frames. If a reader doesn't support the duck typing as specified above, a subclass can be created to bypass the typing requirements. See `DtrSource` below as an example. """ self._reader = reader self._name = name self._frame_memsize = 0 # Least-recently-used cache of (native) frames. The order of the # items in the cache is from least-recently-used to most-recently-used. self._cache = OrderedDict()
def __str__(self): return "traj-source:'%s'" % self._name @property def name(self): return self._name def _retrieve_frame(self, index): return self._reader[index]
[docs] def retrieve_frame(self, index): """ Retrieves the `index`-th frame from the source. `index` is zero-based. This method assumes the source allows random access to frames. :raise: `IndexError` if `index` is out of range, or `IOError` if the trajectory reader is closed. """ frame = self._cache.get(index) if frame is None: if self._reader is None: raise IOError("Trajectory reader is closed.") try: frame = self._retrieve_frame(index) except Exception as e: raise IndexError("Retrieving trajectory frame#%d failed. " "%s, %s" % (index, self, e)) # Maintains the ordering of the cached items. The currently-being-read # frame should be placed to the end of the cache. self._cache.pop(index, None) self._cache[index] = frame self._limit_cache() return frame
def _limit_cache(self): while self.used_space() > self.CACHE_LIMIT_BYTES: # Uncaches the least-recently-used frame. self._cache.popitem(last=False)
[docs] def clear_cache(self): self._cache.clear()
[docs] def used_space(self): if self._frame_memsize == 0: for v in self._cache.values(): self._frame_memsize = _estimate_frame_memory_usage(v) break return self._frame_memsize * len(self._cache)
[docs] def nframes(self): """ Returns the total number of frames in this trajectory source. """ return len(self._reader)
[docs] def close(self): """ Closes down the trajectory reader. A couple of points to note: 1. Once this method is called, this `Source` object can only retrieve frames that are internally cached. Attempting to retrieve uncached frames will incur an `IOError`. Normally we do NOT know which frames are cached, so to be safe this method should be called only when we are done with the trajectory. 2. Normally, only some `Frame` objects reference a `Source` object, if all the `Frame` objects are garbaged, the `Source` object will be garbaged automatically, and there is no need to call this `close` method separately. """ # `self._reader` typically does NOT have a `close` method or something # like that. But if it does, we should call that method. with contextlib.suppress(AttributeError): self._reader.close() self._reader = None
[docs]class DtrSource(Source): def _retrieve_frame(self, index): return self._reader.frame(index)
[docs] def nframes(self): return self._reader.nframes
[docs]class XtcSource(Source):
[docs] def __init__(self, *args, **kwargs): Source.__init__(self, *args, **kwargs) # Data loading is lazy self._nframes = -1 self._next_index = 0 self._iter_frame = self._reader.frames()
def _retrieve_frame(self, index): # Unlike the DTR, the XTC reader is sequential and does NOT support # random access. fr = None while index >= self._next_index: fr = next(self._iter_frame) self._next_index += 1 if fr is None: self._iter_frame = self._reader.frames() self._next_index = 0 return self._retrieve_frame(index) return fr
[docs] def nframes(self): # By default, xtc trajectory doesn't save frame count. Thus we read the # frames once to get it. if self._nframes == -1: # data is not loaded yet i = -1 for i, _ in enumerate(self._reader.frames()): pass self._nframes = i + 1 return self._nframes
[docs]class Frame: """ This class, as its name suggests, represents a single trajectory frame. So far, we allow a frame to be mutated in the following ways: - Assign a new chemical time - Reduce to a subset of particles - Coordinates of all particles are translated We call changes generally as ``decorations``. They might not make into the frame data, or at least not immediately. Why? This avoids making multiple expensive copies of the data. """
[docs] def __init__(self, source: Optional[Source], index: Optional[int]): """ :param source: The object to retrieve the actual frame data from. It can be `None`, which means this `Frame` object itself contains all the frame data. :param index: The index in `source` to retrieve the frame data. It can be `None`, which means this `Frame` object itself contains all the frame data. """ self._source = source self._index = index self._object = None self._time = None # `float` or `None` self._imap = None # `numpy.ndarray` or `None` self._pos = None # Impostor position vector
def __str__(self): return "%s, frame#%d" % (self._source, self._index) def __deepcopy__(self, memo={}): # noqa: M511 return self.copy() def _frame(self): """ Return the raw frame object where decorations haven't be solidified. """ if self._object is None: self._object = self._source.retrieve_frame(self._index) return self._object def _mapped_index(self, i): return (i if self._imap is None else (list(map(self._imap.__getitem__, i)) if isinstance(i, list) else self._imap[i])) def _copy_and_decorate(self, obj, indices): """ Makes a copy of the native trajectory-frame object and decorates the copied object if necessary. :type obj: A native frame type, e.g., `molfile.Frame` :param obj: A native frame """ if isinstance(obj, molfile.Frame): pos, vel = obj.pos, obj.vel n = (pos is not None and len(pos)) or \ (vel is not None and len(vel)) or 0 # FIXME: This is to temporarily work around an exception in Py3 build: # TypeError: No registered converter was able to produce a C++ rvalue of type long from this Python object of type numpy.int64 # This bug would probaby be automatically fixed by a newer # version of Boost.Python where numpy is part of it. # Or, we probably don't need to refix. Don't recall why we # convert `range(n)` to a numpy array. # indices = numpy.array(range(n)) if indices is None else indices indices = range(n) if indices is None else map(int, indices) indices = list(indices) new_obj = obj.select(indices) new_obj.time = obj.time if self._time is None else self._time new_obj.box[:] = obj.box return new_obj raise NotImplementedError("Unsupported trajectory frame type: %s" % type(obj)) def _get_native_object(self, indices=None): if indices is None: indices = self._imap elif self._imap is not None: indices = self._imap[indices] obj = self._frame() if isinstance(obj, Frame): return obj._get_native_object(indices) elif isinstance(obj, molfile.Frame): return (obj, indices) raise NotImplementedError("Unsupported trajectory frame type: %s" % type(obj)) def _make(self, force=False): """ Solidifies decorations. If there are decorations or `force == True`, this function might make a copy of the frame data. """ obj, indices = self._get_native_object() if indices is not None or self._time is not None or force: self._object = self._copy_and_decorate(obj, indices) self._source = None self._index = None self._time = None self._imap = None return self
[docs] def source(self): return self._source
[docs] def copy(self): """ Return a copy of this `Frame` object. The position and velocity arrays will be copied. """ new_obj = Frame(self._source, self._index) new_obj._object = self._object new_obj._time = self._time new_obj._imap = copy.copy(self._imap) new_obj._pos = copy.copy(self._pos) new_obj._make(True) return new_obj
@property def natoms(self): if self._imap is not None: return len(self._imap) obj = self._frame() if isinstance(obj, molfile.Frame): return len(obj.pos) # Adds another `if' statement here here if `obj' is of a native class, # otherwise we expect it to support the duck-type `obj.natoms'. return obj.natoms @property def nactive(self): if self._imap is not None: # We assume a subsystem contains no inactive atoms. return len(self._imap) obj = self._frame() if isinstance(obj, molfile.Frame): # For backward compatibility, if no NACTIVE recorded in the # trajectory frame, `obj.nactive` will return the default value # 0. In this case, we return the `natoms`. This only happens when # the trajectory is in DTR and generated by the old Desmond code # (2018-2 build06 and eailer). return obj.nactive or len(obj.pos) # Adds another `if' statement here here if `obj' is of a native class, # otherwise we expect it to support the duck-type `obj.nactive'. return obj.nactive @property def is_gcmc(self): """ Return whether the frame is from a gcmc simulation """ return self.nactive != self.natoms
[docs] def pos(self, i=None): """ Return the position vector(s). i can be any of the following types: - None: Return the whole position-vector array; - int: `i` should be a GID, and this function return position vector of the particle `i`. - Iterable: `i` should be a "list" of GIDs, and this function returns a NEW numpy array of position-vectors of the specified particles. """ i = list(i) if isinstance(i, Iterable) else i if self._pos is None: obj = self._make()._frame() the_pos = obj.pos else: the_pos = self._pos return the_pos if i is None else the_pos[self._mapped_index(i)]
[docs] def vel(self, i=None): """ Return the velocity vector(s). This method may throw if this frame doesn't have velocity data. i can be any of the following types: - None: Return the whole position-vector array; - int: `i` should be a GID, and this function return position vector of the particle `i`. - Iterable: `i` should be a "list" of GIDs, and this function returns a NEW numpy array of position-vectors of the specified particles. """ obj = self._make()._frame() i = list(i) if isinstance(i, Iterable) else i if isinstance(obj, molfile.Frame): return obj.vel if i is None else obj.vel[self._mapped_index(i)] # Adds another `if' statement here if `obj' is of a native class, # otherwise we expect it to support the duck-type `obj.vel(i)'. return obj.vel(i)
[docs] def hijackPos(self, impos): """ Tentatively replace the internal position array of this `Frame` object with an external numpy array `impos`. We can call the external array "imposter". After calling this method, the `pos` method will return values from the imposter until the imposter is removed (by calling `hijackPos(None)`). The imposter should be a Nx3 numpy array, where N can be of any size. Note the impacts on other methods of this class: - pos Use `impos`. - copy The entire `impos` array will be copied into the copy of this `Frame` object. - reduce Still function in the same way, but note that the `impos` array will not be changed by these methods. - moveby The same as `reduce` - write Still function in the same way, note that it's the internal position array, not `impos`, that will be written out. :type impos: `None`, or `numpy.ndarray` of size Nx3, where N is arbitray number :param impos: The position array to tentatively replac the internal position array in this `Frame` object. If the value is `None`, we remove the "imposter" and recover the internal position array. """ self._pos = impos
def _getTime(self): return self._frame().time if self._time is None else self._time def _setTime(self, value): self._time = value time = property(_getTime, _setTime) @property def box(self): # The box matrix from Desmond trajectory is row-majored. In other # words, the primitive cell vectors are the rows of the box matrix. obj, _ = self._get_native_object() return obj.box
[docs] def transpose_box(self): """ Transpose the simulation box. """ # After DESMOND-8339 (18-4), CPU and GPU trajectories on disk use # row-majored simulation box, i.e., the primitive vectors are the rows. # Before 18-4, the GPU trajectories on disk use column-majored box. box = self.box for i in range(3): for j in range(i + 1, 3): box[i][j], box[j][i] = box[j][i], box[i][j]
@property def orig_index(self): return self._index
[docs] def reduce(self, indices, copy=True): """ Keep only the atoms specified by `indices`, the other atoms will be deleted from this frame. The caller needs to ensure the `indices` order matches with the msys model of the reduced system. :param copy: If true, this function will make a copy of this frame and reduce atoms in this copy, and finally return the copy. """ fr = self.copy() if copy else self fr._imap = numpy.asarray(indices) return fr
[docs] def moveby(self, x, y, z): """ Translate all positions by the given `x`, `y`, and `z`. """ obj = self._make(True)._frame() if isinstance(obj, molfile.Frame): obj.moveby(x, y, z) else: raise NotImplementedError("Unsupported trajectory frame type: %s" % type(obj)) return self
[docs] def write(self, writer): """ Write out this frame using the given `writer`. """ obj = self._make()._frame() if isinstance(obj, molfile.Frame): writer.frame(obj) else: raise NotImplementedError("Unsupported trajectory frame type: %s" % type(obj))
[docs]class MaeFrameReader: """ This class reads a number of MAE files into a sequence of "bare-metal" frame objects. See docstring of `MaeSource` below for more detail. """
[docs] def __init__( self, mae_fnames: List[str], cms_model: "Cms", # noqa: F821 frame_template: Frame): self._mae_fnames = mae_fnames # `cms_model` should be generated by `topo.read_cms` in order to have # `cms_model.allaid_gids` set up correctly. self._allaid_gids = cms_model.allaid_gids self._frame_template = frame_template
def __getitem__(self, i: int): st = list(StructureReader(self._mae_fnames[i]))[0] new_fr = self._frame_template.copy() new_fr.pos()[self._allaid_gids] = st.getXYZ() if new_fr.vel() is not None: # Tries to set velocities if they exist in the mae file. with contextlib.suppress(KeyError): # Reads out the velocities into an array. If there is no # velocities, we'll exit this block immediately. vel = [] for atom in st.atom: vel.append([ atom.property["r_ffio_x_vel"], atom.property["r_ffio_y_vel"], atom.property["r_ffio_z_vel"] ]) new_fr.vel()[self._allaid_gids] = vel else: # We cannot set velocities if `frame_template` does not have # velocities. # FIXME: Enhance the C++ molfile module to lift up this limitation. pass # Sets the box matrix. from schrodinger.application.desmond.cms import get_box new_fr.box[:] = numpy.reshape(get_box(st), newshape=(3, 3)) # Sets the time to the index. new_fr.time = i # Returns the "bare metal" frame type that `self._frame_template` is # using. return new_fr._make()._frame()
[docs] def __len__(self): return len(self._mae_fnames)
[docs]class MaeSource(Source): """ This is to reverse-convert a list of MAE (or CMS) files into a single trajectory file. The molecular structure in each MAE file will become a single frame in the trajectory. If an MAE file contains multiple CTs, only the first one will be used. CAVEATS: This kind of conversions always have some caveats, which in this particular case are the following: 1. Velocities are not always present in either the trajectory frame template or the MAE file. In either case, the velocities will not be saved into the resultant frame. 2. Since we generally don't a priori know the exact force field to be used, it's difficult/impossible to predict about the pseudoatoms. So if they exist in the trajectory frame, their positions will not be set. This is normally not a problem for MD simulations the pseudoatoms' positions are normally determined on the fly in MD simulations. 3. The time of each frame is arbitrarily set to the index of the frame. 4. This will not work properly for GCMC systems where the number of active atoms in the frame template happens to differ from that of the structures in the MAE files. """
[docs] def __init__( self, mae_fnames: List[str], cms_model: "Cms", # noqa: F821 frame_template: Frame): # `cms_model` should be generated by `topo.read_cms`. Source.__init__(self, reader=MaeFrameReader(mae_fnames, cms_model, frame_template), name=",".join(mae_fnames))
[docs]def read_traj(inp, format=Fmt.AUTO, return_iter=False): """ Lazy load trajectory. :type inp: `str` :param inp: Input trajectory's file name :type format: `Fmt` :param format: Trajectory file format :param return_iter: If `True`, return an iterator, otherwise return a `list` of `Frame`. """ if not isinstance(inp, str): raise TypeError("Unsupported trajectory IO type: '%s'" % type(inp)) if isinstance(format, Fmt): fmt = get_fmt(inp, format) elif isinstance(format, str) and "dtr" == format: # Only for backward compatibility # `format' as a C{str} is deprecated. Use `traj.Fmt.<*>' instead. fmt = Fmt.DTR else: raise ValueError("Unsupported trajectory format: %s" % format) if fmt == Fmt.DTR: source = DtrSource(molfile.dtr.read(inp), name=inp) elif fmt == Fmt.XTC: source = XtcSource(molfile.xtc.read(inp), name=inp) if return_iter: return (Frame(source, i) for i in range(source.nframes())) else: return [Frame(source, i) for i in range(source.nframes())]
class _XtcWriterAdapter(object): # XTC writer alters the frame's coordinate data: It scales down each # coordinate by 10 because it uses the nanometers instead of Angstroms. We # have to recover the original coordinates after calling the writer. That's # why we need this adapter. def __init__(self, xtc_writer): self._writer = xtc_writer def frame(self, molfile_frame): pos = numpy.copy(molfile_frame.pos) self._writer.frame(molfile_frame) molfile_frame.pos[:] = pos def close(self): self._writer.close()
[docs]def write_traj(tr, fname, format=Fmt.AUTO): """ :type tr: `list` of `Frame` or an iterator :type fname: `str` :param fname: Output file name :type format: `Fmt` :param format: Trajectory file format """ tr_iter = iter(tr) try: first_fr = next(tr_iter) except StopIteration: # empty trajectory return if isinstance(format, Fmt): fmt = get_fmt(fname, format) elif isinstance(format, str) and "dtr" == format: # Only for backward compatibility # `format' as a C{str} is deprecated. Use `traj.Fmt.<*>' instead. fmt = Fmt.DTR else: raise ValueError("Unsupported trajectory format: %s" % format) fmt2plugin = { Fmt.DTR: molfile.dtr, Fmt.XTC: molfile.xtc, } plugin = fmt2plugin[fmt] writer = plugin.write(fname, natoms=first_fr.natoms) if fmt == Fmt.XTC: writer = _XtcWriterAdapter(writer) first_fr.write(writer) for fr in tr_iter: fr.write(writer) writer.close() if Fmt.DTR == fmt: # Creates the silly C{"clickme.dtr"} file. fh = open(os.path.join(fname, "clickme.dtr"), "w") fh.close()
[docs]def merge(*arg): """ Merge a list of trajectories. If there is any overlapped frames among these trajectories, the ones from the latter trajectory (higher index in the `arg` list) will be kept. """ if not arg: return [] time_frame = sorted({e.time: e for e in sum(arg, [])}.items()) return list(zip(*time_frame))[1]
[docs]def concat(first_time, delta_time, *args): """ Concatenate a list of trajectories or iterators and reassign the chemical time for each frame. Results are yielded. :type first_time: `float` :param first_time: Chemical time of the first frame. :type delta_time: `float` :param delta_time: The chemical time interval between two successive frames. :param args: trajectories as iterables """ time = first_time for e in chain.from_iterable(args): e.time = time time += delta_time yield e
[docs]def extract_subsystem(tr, gids): """ Extract the subsystem as specified by the `gids` for each frame in the given trajectory `tr`, and return a new trajectory for the subsystem. The original trajectory is not mutated in any way by this function. Inactive atoms in GCMC trajectory cannot be extracted. :type tr: `list` of `Frame` :param tr: Original trajectory :type gids: `list` of `int` :param gids: A list of gids that specifies the subsystem. :rtype: `list` :return: A new trajectory for the subsystem """ # Preserve the C{gids} order but remove duplicats. Duplication will cause # incorrect result. gids = sorted(set(gids), key=gids.index) sub_tr = [] for fr in tr: inactive = list(filter(lambda x: x >= fr.nactive, gids)) if inactive: raise RuntimeError( f"Cannot extract inactive atoms with indices: {inactive}.") sub_tr.append(fr.reduce(gids)) return sub_tr
[docs]def is_empty_trajectory(trj_name: str) -> bool: """ Determine whether the specified trajectory is empty (in filesystem sense, ie relevant files exist, and are of size zero). Note that trajectory readers will fail to open a trajectory which is empty in this sense. :param trj_name: path to the trajectory """ trj_fmt = get_fmt(trj_name) if trj_fmt == Fmt.XTC: return 0 == os.path.getsize(trj_name) elif trj_fmt == Fmt.DTR: metadata = os.path.join(trj_name, 'metadata') return os.path.isfile(metadata) and 0 == os.path.getsize(metadata) else: return False