Source code for schrodinger.application.desmond.packages.traj_util

"""
Advanced utilities for trajectory handling

Copyright Schrodinger, LLC. All rights reserved.
"""

import glob
import os
from typing import List

from schrodinger.application.desmond.packages import topo
from schrodinger.application.desmond.packages import traj
from schrodinger.utils import fileutils


def _atom_selector(cms_model, selector):
    if isinstance(selector, str):
        # `selector' must be an ASL expression.
        return cms_model.select_atom(selector)
    # `selector' must be a callable.
    return selector(cms_model)


[docs]def extract_atoms(cms_model, selector, tr, format="maestro", is_subsystem=False): """ A generator to extract the specified atoms from the trajectory. :type cms_model: `cms.Cms` :param cms_model: This must be consistent with the trajectory `tr` and created from `topo.read_cms`. :type selector: `str` or a callable :param selector: If the value is a `str`, it must a valid ASL expression consistent with `cms_model`. The expression will be repeatedly applied to each frame, thus the selected atoms may differ from frame to frame. If the value is a callable, it should take a full system CT as the argument and return a list of selected atom indices. :type tr: `list` of `traj.Frame` objects :param tr: Trajectory :type format: `str` :param format: Currently, we support only "maestro" (or "mae" in short) and `None`. If it's "maestro", this function will return a `schrodinger.structure.Structure` object for the subsystem, or it will return a `traj.Frame` object for the subsystem. :type is_subsystem: `bool` :param is_subsystem: - If true, this function will extract the specified subsystem from the trajectory. It's important to understand what we mean by "subsystem", for this, refer to the docstring of the `topo.extract_subsystem` function. - If false, this function will extract an arbitrary set of atoms as specified by `selector`. :rtype: A tuple. The first element is a `schrodinger.structure.Structure` object if `format="maestro"`, or a `traj.Frame` object if `format=None` The second element is the corresponding aids. If no atoms are selected for a frame, the first element is `None` and the second element is an empty list. """ for fr in tr: if is_subsystem: if cms_model.need_msys: topo.update_cms(cms_model, fr) else: cms_model.update_with_frame(fr) else: topo.update_ct(cms_model, cms_model, fr) cms_model.set_nactive_gids(fr.nactive, fr.natoms) aids = _atom_selector(cms_model, selector) if not aids: yield None, [] continue if is_subsystem: asl = "atom.num %s" % str(aids)[1:-1] if cms_model.need_msys: ct, aids = topo.extract_subsystem(cms_model, asl) else: (ct, gids), aids = cms_model.extract_with_asl(asl, with_aid=True) if format in ["maestro", "mae"]: if is_subsystem: ct.fix_filenames() else: ct = cms_model.extract(aids, True) for prop in [ "s_m_original_cms_file", "s_chorus_trajectory_file" ]: if prop in ct.property: del ct.property[prop] ct.title = "%s @%g ps" % (ct.title, fr.time) yield ct, aids elif format is None: if cms_model.need_msys: yield fr.reduce(topo.aids2gids(cms_model, aids)), aids else: yield fr.reduce(cms_model.convert_to_gids(aids)), aids else: raise NotImplementedError("unsupported format")
[docs]class TrajectoryUnreadableError(Exception): pass
[docs]def read_traj(cms_model, cms_file): """ Read the trajectory associated with cms_model, and return it. :param cms_model: cms.Cms object. :type cms_model: cms.Cms :param cms_file: CMS file path. :type cms_file: str :return: list of frames. :rtype: list of traj.Frame :raises TrajectoryUnreadableError: If there is an error reading the files. """ trj_path = topo.find_traj_path(cms_model, os.path.dirname(cms_file)) if trj_path is None: raise TrajectoryUnreadableError( "No trajectory found associated with CMS file") try: trj = traj.read_traj(trj_path) except (ValueError, NotImplementedError) as e: raise TrajectoryUnreadableError( 'Failed to read trajectory file: %s %s' % (type(e).__name__, e)) return trj
[docs]def read_cms_and_traj(cms_file): """ Read the .cms file and the associated trajectory, and return the msys model, the cms model, and the trajectory. The associated trajectory is assumed to be in the same directory as the given .cms file. :param cms_file: CMS file path. :type cms_file: str :return: msys System, CMS, and list of frames. :rtype: (msys.System, cms.Cms, list of traj.Frame) :raises TrajectoryUnreadableError: If there is an error reading the files. """ if not os.path.isfile(cms_file): raise TrajectoryUnreadableError('CMS file not found: %s' % cms_file) if not fileutils.is_cms_file(cms_file): raise TrajectoryUnreadableError( 'Invalid format/extension for CMS file: %s' % cms_file) try: msys_model, cms_model = topo.read_cms(cms_file) except RuntimeError as e: raise TrajectoryUnreadableError('Failed to read the CMS file: %s' % e) trj = read_traj(cms_model, cms_file) return msys_model, cms_model, trj
[docs]def find_trajectories(fname_pattern="*", *, recursive=False) -> List[str]: """ Finds all trajectory files that match the specified file name pattern (`fname_pattern`), and returns a list of names of the found trajectory files. `fname_pattern` follows the `glob` syntax. If the pattern doesn't contain a supported trajectory extension name, it will be treated as the pattern of the base name. `recursive` will lead to searching into all subdirectories if `fname_pattern` contains the `**/` pattern. To find all trajectories that have a specified base name in only the current directory, simply call this function: `find_trajectories(<basename>)`. This function returns an empty list if no matches found. """ try: traj.get_fmt(fname_pattern) fnames = glob.glob(fname_pattern, recursive=recursive) except ValueError: fnames = [] for extname in traj.STANDARD_TRAJECTORY_EXTNAMES: fnames.extend( glob.glob(fname_pattern + extname, recursive=recursive)) return fnames
[docs]def pretty_filesize(size_in_byte): K = 1024 M = K * 1024 G = M * 1024 T = G * 1024 if size_in_byte > T: return f"{(size_in_byte + T/2)// T} T" if size_in_byte > G: return f"{(size_in_byte + G/2)// G} G" if size_in_byte > M: return f"{(size_in_byte + M/2)// M} M" if size_in_byte > K: return f"{(size_in_byte + K/2)// K} K" return f"{size_in_byte} B"