Source code for schrodinger.application.desmond.packages.traj_util
"""
Advanced utilities for trajectory handling
Copyright Schrodinger, LLC. All rights reserved.
"""
import glob
import os
from typing import List
from schrodinger.application.desmond.packages import topo
from schrodinger.application.desmond.packages import traj
from schrodinger.utils import fileutils
def _atom_selector(cms_model, selector):
if isinstance(selector, str):
# `selector' must be an ASL expression.
return cms_model.select_atom(selector)
# `selector' must be a callable.
return selector(cms_model)
[docs]def extract_atoms(cms_model,
selector,
tr,
format="maestro",
is_subsystem=False):
"""
A generator to extract the specified atoms from the trajectory.
:type cms_model: `cms.Cms`
:param cms_model: This must be consistent with the trajectory `tr` and
created from `topo.read_cms`.
:type selector: `str` or a callable
:param selector: If the value is a `str`, it must a valid ASL expression
consistent with `cms_model`. The expression will be
repeatedly applied to each frame, thus the selected atoms
may differ from frame to frame. If the value is a callable,
it should take a full system CT as the argument and return
a list of selected atom indices.
:type tr: `list` of `traj.Frame` objects
:param tr: Trajectory
:type format: `str`
:param format: Currently, we support only "maestro" (or "mae" in short)
and `None`. If it's "maestro", this function will return
a `schrodinger.structure.Structure` object for the
subsystem, or it will return a `traj.Frame` object for
the subsystem.
:type is_subsystem: `bool`
:param is_subsystem:
- If true, this function will extract the specified subsystem from the
trajectory. It's important to understand what we mean by "subsystem",
for this, refer to the docstring of the `topo.extract_subsystem`
function.
- If false, this function will extract an arbitrary set of atoms as
specified by `selector`.
:rtype: A tuple.
The first element is a `schrodinger.structure.Structure` object
if `format="maestro"`, or a `traj.Frame` object if `format=None`
The second element is the corresponding aids. If no atoms are
selected for a frame, the first element is `None` and the second
element is an empty list.
"""
for fr in tr:
if is_subsystem:
if cms_model.need_msys:
topo.update_cms(cms_model, fr)
else:
cms_model.update_with_frame(fr)
else:
topo.update_ct(cms_model, cms_model, fr)
cms_model.set_nactive_gids(fr.nactive, fr.natoms)
aids = _atom_selector(cms_model, selector)
if not aids:
yield None, []
continue
if is_subsystem:
asl = "atom.num %s" % str(aids)[1:-1]
if cms_model.need_msys:
ct, aids = topo.extract_subsystem(cms_model, asl)
else:
(ct, gids), aids = cms_model.extract_with_asl(asl,
with_aid=True)
if format in ["maestro", "mae"]:
if is_subsystem:
ct.fix_filenames()
else:
ct = cms_model.extract(aids, True)
for prop in [
"s_m_original_cms_file", "s_chorus_trajectory_file"
]:
if prop in ct.property:
del ct.property[prop]
ct.title = "%s @%g ps" % (ct.title, fr.time)
yield ct, aids
elif format is None:
if cms_model.need_msys:
yield fr.reduce(topo.aids2gids(cms_model, aids)), aids
else:
yield fr.reduce(cms_model.convert_to_gids(aids)), aids
else:
raise NotImplementedError("unsupported format")
[docs]def read_traj(cms_model, cms_file):
"""
Read the trajectory associated with cms_model, and return it.
:param cms_model: cms.Cms object.
:type cms_model: cms.Cms
:param cms_file: CMS file path.
:type cms_file: str
:return: list of frames.
:rtype: list of traj.Frame
:raises TrajectoryUnreadableError: If there is an error reading the files.
"""
trj_path = topo.find_traj_path(cms_model, os.path.dirname(cms_file))
if trj_path is None:
raise TrajectoryUnreadableError(
"No trajectory found associated with CMS file")
try:
trj = traj.read_traj(trj_path)
except (ValueError, NotImplementedError) as e:
raise TrajectoryUnreadableError(
'Failed to read trajectory file: %s %s' % (type(e).__name__, e))
return trj
[docs]def read_cms_and_traj(cms_file):
"""
Read the .cms file and the associated trajectory, and return the msys
model, the cms model, and the trajectory. The associated trajectory is
assumed to be in the same directory as the given .cms file.
:param cms_file: CMS file path.
:type cms_file: str
:return: msys System, CMS, and list of frames.
:rtype: (msys.System, cms.Cms, list of traj.Frame)
:raises TrajectoryUnreadableError: If there is an error reading the files.
"""
if not os.path.isfile(cms_file):
raise TrajectoryUnreadableError('CMS file not found: %s' % cms_file)
if not fileutils.is_cms_file(cms_file):
raise TrajectoryUnreadableError(
'Invalid format/extension for CMS file: %s' % cms_file)
try:
msys_model, cms_model = topo.read_cms(cms_file)
except RuntimeError as e:
raise TrajectoryUnreadableError('Failed to read the CMS file: %s' % e)
trj = read_traj(cms_model, cms_file)
return msys_model, cms_model, trj
[docs]def find_trajectories(fname_pattern="*", *, recursive=False) -> List[str]:
"""
Finds all trajectory files that match the specified file name pattern
(`fname_pattern`), and returns a list of names of the found trajectory
files.
`fname_pattern` follows the `glob` syntax. If the pattern doesn't contain
a supported trajectory extension name, it will be treated as the pattern of
the base name.
`recursive` will lead to searching into all subdirectories if
`fname_pattern` contains the `**/` pattern.
To find all trajectories that have a specified base name in only the
current directory, simply call this function:
`find_trajectories(<basename>)`.
This function returns an empty list if no matches found.
"""
try:
traj.get_fmt(fname_pattern)
fnames = glob.glob(fname_pattern, recursive=recursive)
except ValueError:
fnames = []
for extname in traj.STANDARD_TRAJECTORY_EXTNAMES:
fnames.extend(
glob.glob(fname_pattern + extname, recursive=recursive))
return fnames
[docs]def pretty_filesize(size_in_byte):
K = 1024
M = K * 1024
G = M * 1024
T = G * 1024
if size_in_byte > T:
return f"{(size_in_byte + T/2)// T} T"
if size_in_byte > G:
return f"{(size_in_byte + G/2)// G} G"
if size_in_byte > M:
return f"{(size_in_byte + M/2)// M} M"
if size_in_byte > K:
return f"{(size_in_byte + K/2)// K} K"
return f"{size_in_byte} B"