Source code for schrodinger.application.matsci.equilibrium_md

"""
Equilibrium molecular dynamics classes/function shared by transport
property calculations using Einstein and Green-Kubo methods.

Copyright Schrodinger, LLC. All rights reserved.
"""

# Contributor: Teng Zhang
import os
import math
import copy
import glob
import numpy
import random
import shutil
import argparse
import collections
import itertools
import functools
import pandas

from types import SimpleNamespace
from scipy.stats import linregress
from scipy import constants as scipy_const

from schrodinger import structure
from schrodinger.job import jobcontrol
from schrodinger.job import queue as jobdj
from schrodinger.utils import fileutils
from schrodinger.utils import units
from schrodinger.infra import mm
from schrodinger.structutils import analyze
from schrodinger.application.desmond import cms
from schrodinger.application.matsci import desmondutils
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import msconst
from schrodinger.application.matsci import msprops
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import textlogger
from schrodinger.application.matsci import codeutils
from schrodinger.application.matsci.nano import xtal

topo = codeutils.get_safe_package('desmond.topo')
traj = codeutils.get_safe_package('desmond.traj')

GREEN_KUBO = 'GK'
MSD = 'MSD'

PROGRAM_NAME_DIFFUSION = 'Diffusion Coefficient'
DEFAULT_JOBNAME_DIFFUSION = 'diffusion_coefficient'
PROGRAM_NAME_VISCOSITY = 'Viscosity'
DEFAULT_JOBNAME_VISCOSITY = 'viscosity'

CUBIC_SPLINE = 'cubic spline'
CUMTRAPZ = 'cumtrapz'
PICOSECOND = 1.e-12
FEMTO2PICO = 0.001
NANO2PICO = units.NANO2PICO
BAR2PASCAL = 100000
PAS2CP = 1000.
GPA2BAR = 10000

TAU_START = 'Tau_Start_(ns)'
TAU_END = 'Tau_End_(ns)'
DIFFUSIVITY_UNIT = "m^2/s"
DIFFUSION_UNIT_CONVERT = scipy_const.angstrom**2 / PICOSECOND
MATSCI_DIFFUSION_COEFFICIENT = 'r_matsci_Diffusion_Coefficient'
MATSCI_DIFFUSION_STR_BASE = 's_matsci_Diffusion_Coefficient'
DIFFUSIVITY_TAU_START_PROP = MATSCI_DIFFUSION_COEFFICIENT + f'_%s_{TAU_START}'
DIFFUSIVITY_TAU_END_PROP = MATSCI_DIFFUSION_COEFFICIENT + f'_%s_{TAU_END}'
DIFFUSIVITY_PROP = MATSCI_DIFFUSION_COEFFICIENT + f'_%s_({DIFFUSIVITY_UNIT})'

DIFFUSIVITY_1D_PROP_BASE = MATSCI_DIFFUSION_COEFFICIENT + f'_1D_%s_({DIFFUSIVITY_UNIT})'
DIFFUSIVITY_2D_PROP_BASE = MATSCI_DIFFUSION_COEFFICIENT + f'_2D_%s_({DIFFUSIVITY_UNIT})'
DIFFUSIVITY_3D_PROP_BASE = MATSCI_DIFFUSION_COEFFICIENT + f'_3D_%s_({DIFFUSIVITY_UNIT})'
DIFFUSIVITY_X_PROP_BASE = MATSCI_DIFFUSION_COEFFICIENT + f'_X_%s_({DIFFUSIVITY_UNIT})'
DIFFUSIVITY_Y_PROP_BASE = MATSCI_DIFFUSION_COEFFICIENT + f'_Y_%s_({DIFFUSIVITY_UNIT})'
DIFFUSIVITY_Z_PROP_BASE = MATSCI_DIFFUSION_COEFFICIENT + f'_Z_%s_({DIFFUSIVITY_UNIT})'

DIFFUSIVITY_1D_R2_PROP_BASE = MATSCI_DIFFUSION_COEFFICIENT + '_1D_%s_R2'
DIFFUSIVITY_2D_R2_PROP_BASE = MATSCI_DIFFUSION_COEFFICIENT + '_2D_%s_R2'
DIFFUSIVITY_3D_R2_PROP_BASE = MATSCI_DIFFUSION_COEFFICIENT + '_3D_%s_R2'
DIFFUSIVITY_X_R2_PROP_BASE = MATSCI_DIFFUSION_COEFFICIENT + '_X_%s_R2'
DIFFUSIVITY_Y_R2_PROP_BASE = MATSCI_DIFFUSION_COEFFICIENT + '_Y_%s_R2'
DIFFUSIVITY_Z_R2_PROP_BASE = MATSCI_DIFFUSION_COEFFICIENT + '_Z_%s_R2'

VISCOSITY_UNIT = 'cP'
VISCOSITY_UNIT_CONVERT = BAR2PASCAL**2 * PICOSECOND * scipy_const.angstrom**3 * PAS2CP
VISCOSITY_BASE_PROP = 'r_matsci_%s_Viscosity_%s_'
VISCOSITY_PROP = VISCOSITY_BASE_PROP + f'({VISCOSITY_UNIT})'
VISCOSITY_TAU_START_PROP = VISCOSITY_BASE_PROP + TAU_START
VISCOSITY_TAU_END_PROP = VISCOSITY_BASE_PROP + TAU_END
VISCOSITY_FILE_PROP_BASE = 's_matsci_%s_Viscosity_%s_CSV'

MDSIM = 'mdsim'
PLUGIN = 'plugin'
REMOVE_COM_MOTION = 'remove_com_motion'

FIRST = 'first'
START = 'start'
DOT = '_dot_'
TRAJECTORY = 'trajectory'
INTERVAL = 'interval'
OPTIONS = 'options'
TRAJECTORY_INTERVAL = DOT.join([TRAJECTORY, INTERVAL])

MDSIM_PLUGIN_REMOVE_COM_MOTION = f"{MDSIM}.{PLUGIN}.{REMOVE_COM_MOTION}"
MDSIM_PLUGIN_REMOVE_COM_MOTION_FIRST = f"{MDSIM_PLUGIN_REMOVE_COM_MOTION}.{FIRST}"
MDSIM_PLUGIN_REMOVE_COM_MOTION_INTERVAL = f"{MDSIM_PLUGIN_REMOVE_COM_MOTION}.{INTERVAL}"

MAE = 'mae'
CMS = msconst.CMS
CMS_OUT_EXT = msconst.CMS_OUT_EXT
INPUT_MAE = 'input_mae'
INPUT_CMS = 'input_cms'
INPUT_FILE = 'input_file'
TIME = 'time'
TIMESTEP = 'timestep'
ENSEMBLE = 'ensemble'
TEMPERATURE = 'temperature'
PRESSURE = 'pressure'
RANDOM_SEED = 'random_seed'
PROCS = 'procs'
GPU = 'gpu'

MD_OPTIONS_TYPES = {
    INPUT_CMS: str,
    TIME: float,
    TIMESTEP: float,
    ENSEMBLE: str,
    TEMPERATURE: float,
    PRESSURE: float,
    TRAJECTORY_INTERVAL: float,
    RANDOM_SEED: float,
    PROCS: int,
    GPU: bool
}

ENERGY_GROUPS = 'energy_groups'
ENEGRP_OPTIONS_DEFAULTS = {INTERVAL: None, START: None, ENERGY_GROUPS: None}
NONE_TYPE = type(None)
ENEGRP_OPTIONS_TYPES = {
    INTERVAL: (float, NONE_TYPE),
    START: (float, NONE_TYPE),
    ENERGY_GROUPS: (list, NONE_TYPE)
}

MSJ = msconst.MSJ
ENE = 'ene'
MULTISIM_LOG = 'multisim_log'
OCMS = 'ocms'
LOG = 'log'
TRJ = 'trj'
CFG = 'cfg'
WRITE_VELOCITY = 'write_velocity'
ENEGRP = 'enegrp'
LAST = 'last'
VISCOSITY_FILE = 'viscosity_file'
AUTOCORRELATION_FILE = 'autocorrelation_file'
MSD_FILE = 'MSD_file'
ZIP_OUTPUTS = 'zip_outputs'
WAM_TYPE = jobutils.WAM_TYPES.MS_AMORPHOUS_DIELECTRIC

SAVE_OPTIONS_DEFAULTS = {
    MSJ: True,
    ENE: True,
    MULTISIM_LOG: True,
    LOG: True,
    TRJ: False,
    WRITE_VELOCITY: False,
    OCMS: True,
    VISCOSITY_FILE: False,
    ZIP_OUTPUTS: False,
    AUTOCORRELATION_FILE: False,
    LAST: True,
    CFG: True,
    desmondutils.PTENSOR: False
}

TRJ_EXT = TRJ

INPUT_CMS = 'input_cms'
FLAG_ASL = '-asl'
FLAG_SMILES = '-smiles_pattern'

FLAG_ENSEMBLE = parserutils.FLAG_MD_ENSEMBLE
NVE = msconst.ENSEMBLE_NVE
NVT = msconst.ENSEMBLE_NVT
NPT = msconst.ENSEMBLE_NPT
ENSEMBLES = [NVE, NVT, NPT]
FLAG_MD_TEMP = parserutils.FLAG_MD_TEMP
FLAG_MD_PRESSURE = parserutils.FLAG_MD_PRESS

FLAG_MD_TIME = parserutils.FLAG_MD_TIME
FLAG_MD_TIMESTEP = parserutils.FLAG_MD_TIMESTEP
FLAG_MD_SAVE_TRJ = '-md_save_trj'

FLAG_RUN_TYPE = '-run_type'
FULL_RUN = 'full'
POST_RUN = 'post'
MULTI_RUN = 'multi'
FIT_ONLY = 'fit'
ALL_RUN_TYPE = [FULL_RUN, POST_RUN, MULTI_RUN, FIT_ONLY]

FLAG_BASE_FILENAME = '-base_filename'
FLAG_DATA_LAST_FRAC = '-data_last_frac'
FLAG_DATA_START = '-data_start'
FLAG_TRJ_FRAME_NUM = '-trj_frame_num'
FLAG_TAU_START = '-tau_start'
FLAG_TAU_END = '-tau_end'
FLAG_TRAJ_DAT = '-traj_dat'
FLAG_TRJ_FOLDER = '-trj_folder'
BASE_FILENAME = 'BASE_FILENAME'
FLAG_MS_DIFFUSION = '-ms_diffusion'

FLAG_PRESS_DAT = '-press_dat'
FLAG_ENEGRP_DAT = '-enegrp_dat'
FLAG_ENEGRP_START = '-md_enegrp_start'
FLAG_MD_SAVE_ENEGRP = '-md_save_enegrp'

FLAG_GPU = parserutils.FLAG_GPU

FLAG_BLOCK_NUM = '-block_num'
FLAG_VARIATION = '-variation'
STD_FILE_EXT = '_block_std'

FLAG_MD_SIM_NUM = '-md_sim_num'
FLAG_INPUT_SEARCH = '-search_inputs'

MIN_TIME_NS = 0.01
MAX_TIMESTEP_FS = 100
MIN_TIMESTEP_FS = 0.01
MAX_TRJ_FRAME_START = 1000000

ARG_DEFAULTS = {
    FLAG_MD_PRESSURE: 1.01325,
    FLAG_MD_TEMP: 300.0,
    FLAG_MD_TIME: 1.0,
    FLAG_MD_TIMESTEP: 1.0,
    parserutils.FLAG_MD_TRJ_INT: 1.0,
    FLAG_DATA_START: 10,
    FLAG_DATA_LAST_FRAC: 0.8,
    FLAG_TAU_START: 0.01,
    FLAG_TAU_END: 0.99,
    FLAG_ASL: 'all',
    FLAG_BLOCK_NUM: 1,
    FLAG_VARIATION: 0.10,
    parserutils.FLAG_MD_ENEGRP_INT: 0.01,
    FLAG_ENEGRP_START: 0.0,
    FLAG_MD_SIM_NUM: 10
}
DATA_LAST_PCT_RANGE = [0.001, 1.]
ALL_MD_FLAGS = [
    FLAG_ENSEMBLE, FLAG_MD_TEMP, FLAG_MD_PRESSURE, FLAG_MD_TIME,
    FLAG_MD_TIMESTEP, parserutils.FLAG_MD_TRJ_INT, FLAG_MD_SAVE_TRJ,
    parserutils.FLAG_RANDOM_SEED
]

MAX_TIME_NS = 10000.
MIN_DATA_POINT_NUM = 50

# Stub left to avoid removing a public function from a public module
memory_usage_psutil = jobutils.memory_usage_psutil

logger = None


[docs]def log_debug(msg): """ Add a message to debug logger file. :type msg: str :param msg: The message to debug logging """ global logger if logger is None: logger = textlogger.create_debug_logger(__name__) logger.debug(msg)
[docs]def get_core_num(): """ Return the number of the cores specified by users. None means not specified. :return: the number of cores specified by users. :rtype: int or None """ toplvl_host_args = os.environ.get('TOPLEVEL_HOST_ARGS') try: host_args = toplvl_host_args.split('-HOST')[1] except (IndexError, AttributeError): return None try: # Use all CPUs for one multisim job (see PANEL-7891 comment) return int(host_args.split(":")[-1]) except (IndexError, ValueError): # Users pass -HOST localhost instead of -HOST localhost:1 return None
[docs]def get_reserves_cores_state(options): """ Return the state of reserves cores so that some python standard library functions can have access to all CPU resources. :param check_runtype: If true, the FLAG_RUN_TYPE in the options is checked :type check_runtype: bool :return: True, if reserves_cores_state should be set as True to allow some python function to utilize multiple CPUs. :rtype: bool """ if jobutils.get_option(options, FLAG_RUN_TYPE) == FULL_RUN: return False # If core num is specified by users explicitly, reserves_cores of True allows # users to access all CPUs in some python standard library functions return bool(get_core_num())
[docs]def clearProps(cms_model, str_in_key='Diffusion'): """ Clear the diffusion related properties and should be called before new calculations. :param cms_model: A cms model to clear the properties :type cms_model: 'schrodinger.application.desmond.cms.Cms' """ props = [x for x in cms_model.property.keys() if str_in_key in x] for prop in props: cms_model.remove_cts_property(prop)
[docs]def get_files(filename, ext): """ Get all the files with same extension, if ext is provided. :type filename: str :param filename: file to be located and returned :type ext: str or None :param ext: extension to search files of the same type :rtype: list of str :return: list of filenames with path """ if ext: file_path = os.path.dirname(filename) return glob.glob(os.path.join(file_path, '*' + ext)) else: return [filename]
[docs]def check_finite_molecules(struct, atom_ids=None): """ Check whether the molecules in structure are finite. :param struct: the structure whose molecules will be checked :type struct: `schrodinger.structure.Structure` :param atom_ids: molecules sharing atoms with this set will be checked. If None, all the molecules are checked. :type atom_ids: set of int, list of int, list of list, or None :return: message if any molecules are infinite :rtype: string or None """ if atom_ids and not isinstance(atom_ids, set): atom_ids = set(DiffusivityMsdAnalysis.flatten(atom_ids)) for mol in struct.molecule: mol_atom_indices = mol.getAtomIndices() if atom_ids: # Check whether the mol contains the selected atoms shared_atom_ids = atom_ids.intersection(mol_atom_indices) if not shared_atom_ids: continue # Pass the atom indexes of a whole molecule as is_infinite2 extracts if xtal.is_infinite2(struct, atom_indices=mol_atom_indices): ids = sorted(shared_atom_ids) if atom_ids else mol_atom_indices return f"The atoms {', '.join(map(str, ids[:3]))}... are in an infinite molecule." if atom_ids: # Clear the checked selected atom ids atom_ids.difference_update(shared_atom_ids) if not atom_ids: return
[docs]def get_md_results(basename, options, log, log_error, save_opts=None, smart_distribution=True): """ From parsed options, setup and run md simulation. :param basename: basename for md simulation job :type basename: str :param options: The parsed command line option :type options: Named tuple :param log: log information :type log: function :param log_error: log error information and exit :type log_error: function :param smart_distribution: turn on / off the smart_distribution :type smart_distribution: bool :rtype: str, str, str :return: output cms, trj folder, and enegrp_dat """ host = jobutils.get_jobhost_name() jdj = jobdj.JobDJ(verbosity='normal', max_failures=jobdj.NOLIMIT, hosts=[(host, 1)]) if not smart_distribution: jdj.disableSmartDistribution() md_opts = { INPUT_CMS: options.input_cms, TIME: options.md_time, TIMESTEP: options.md_timestep, ENSEMBLE: options.md_ensemble, TEMPERATURE: options.md_temp, PRESSURE: options.md_press, TRAJECTORY_INTERVAL: options.md_trj_int, RANDOM_SEED: options.seed, PROCS: 1, GPU: True } md_opts = SimpleNamespace(**md_opts) md_job_info = setup_md_job(basename, md_opts, save_opts=save_opts) jdj.addJob(md_job_info.cmd) log("Molecular dynamics simulation is running...", pad=True) jdj.run() if jdj.total_failed: log_error("Molecular dynamics simulation failed.") else: log("Molecular dynamics simulation finished.") return md_job_info.ocms_name, md_job_info.trj_folder, md_job_info.enegrp_dat
[docs]def setup_md_job(name_base, md_opts, enegrp_opts=None, save_opts=None, command_dir=None, **kwargs): """ Write the .msj file, form the command, figure out the output filenames, and register files to backend for molecular dynamics simulations. Note: if command_dir is not None, please call this method inside the command_dir as the returned ocms_name doesn't have command_dir in the file path. :param name_base: str :type name_base: base name for the md job :param md_opts: 'type.SimpleNamespace' :type icms: this data struct has attributes: icms for input cms filename, time for simulation time in ps, timestep for md timestep in ps, ensemble for md ensemble (NVE, NVT, etc.), temperature for md temperature in K, pressure for md pressure in bar, trajectory_dot_interval for time interval in ps, random_seed for the velocity thermal randomization, procs for the total number of processors, gpu for the request of GPU resource. :param enegrp_opts: 'type.SimpleNamespace' or None :type enegrp_opts: this data struct has attributes: energy_groups for the list of keywords for desmond energy_groups, interval for output time interval in ps, start for the output starting time in ps. If None, no enegrp file is generated. :param save_opts: 'type.SimpleNamespace' or None :type save_opts: this data struct has boolean attributes to save files likes: msj, ene, multisim_log, ocms, log, trj, cfg, write_velocity, enegrp, last, autocorrelation_file, zip_outputs, viscosity_file. If None, use default settings to decide the files to be copied back. :param command_dir: str :type command_dir: this is the path from the job start dir to the subjob dir. The setup function is expected to run in the subjob setup stage, and thus the subjob dir should be passed. If there is only one master job (no sub dir), current dir is used. In addition, the returned file paths are the the basename, and then users need to form the full file path when they decide to run in subfolder or master folder directly. :rtype: 'type.SimpleNamespace' :return: data struct with cmd command to submit md, output cms, trj folder, enegrp file, jobname """ # Validation of the input params assert isinstance(md_opts, SimpleNamespace) for md_key, md_value_type in MD_OPTIONS_TYPES.items(): if not hasattr(md_opts, md_key): raise ValueError(f"md_opts doesn't have {md_key} as an attribute.") md_value = getattr(md_opts, md_key) isinstance(md_value, md_value_type) if enegrp_opts is None: if save_opts is None: save_opts = SimpleNamespace() setattr(save_opts, ENEGRP, False) else: assert isinstance(enegrp_opts, SimpleNamespace) for enegrp_key, enegrp_value in ENEGRP_OPTIONS_DEFAULTS.items(): if not hasattr(enegrp_opts, enegrp_key): setattr(enegrp_opts, enegrp_key, enegrp_value) for enegrp_key, enegrp_value_type in ENEGRP_OPTIONS_TYPES.items(): enegrp_value = getattr(enegrp_opts, enegrp_key) if not isinstance(enegrp_value, enegrp_value_type): raise TypeError( f"{enegrp_value} as {enegrp_key} is not of {enegrp_value_type}." ) if enegrp_value is None: # None means auto-setting: set it according to MD trj settings trj_attr = DOT.join([TRAJECTORY, enegrp_key]) md_value = getattr(md_opts, trj_attr, None) if md_value is not None: setattr(enegrp_opts, enegrp_key, md_value) if save_opts is not None: assert isinstance(save_opts, SimpleNamespace) # Additional unused attributes for certain driver can be added here. for save_key, save_value in SAVE_OPTIONS_DEFAULTS.items(): if not hasattr(save_opts, save_key): setattr(save_opts, save_key, save_value) save_value = getattr(save_opts, save_key) assert isinstance(save_value, bool) # Filenames are based on jobnames jobname = name_base + '_md' msj_name = jobname + '.msj' multisim_log_file = jobname + '_multisim.log' ocms_name = jobname + '-out.cms' enegrp_dat = jobname + '_enegrp.dat' ptensor_dat = jobname + desmondutils.PTENSOR_EXT trj_folder = jobname + '_trj' log_file = jobname + '.log' ene_file = jobname + '.ene' cfg_file = jobname + '-out.cfg' backend = jobcontrol.get_backend() if backend: # Register output files outfiles_status = { msj_name: save_opts.msj, multisim_log_file: save_opts.multisim_log, ocms_name: save_opts.ocms, log_file: save_opts.log, ene_file: save_opts.ene, cfg_file: save_opts.cfg, trj_folder: save_opts.trj, enegrp_dat: save_opts.enegrp, ptensor_dat: save_opts.ptensor } for output_file, to_save in outfiles_status.items(): if not to_save: continue if command_dir: output_file = os.path.join(command_dir, output_file) backend.addOutputFile(output_file) early_step = [md_opts.timestep, md_opts.timestep, md_opts.timestep * 3] # Every timestep, zero momentum backend_plugin = "{" backend_plugin += f" {MDSIM_PLUGIN_REMOVE_COM_MOTION_FIRST} = 0" backend_plugin += f" {MDSIM_PLUGIN_REMOVE_COM_MOTION_INTERVAL} = 0" if enegrp_opts is not None: if enegrp_opts.start: backend_plugin += f" {MDSIM}.{PLUGIN}.{ENERGY_GROUPS}.{FIRST} = %s" % enegrp_opts.start if enegrp_opts.interval: backend_plugin += ( f" {MDSIM}.{PLUGIN}.{ENERGY_GROUPS}.{INTERVAL} = %s" % enegrp_opts.interval) if enegrp_opts.energy_groups: backend_plugin += ( f" {MDSIM}.{PLUGIN}.{ENERGY_GROUPS}.{OPTIONS} = [%s]" % ' '.join(enegrp_opts.energy_groups)) backend_plugin += "}" stage = desmondutils.MDMSJStringer( time=md_opts.time, temp=md_opts.temperature, pressure=md_opts.pressure, timestep=early_step, ensemble=md_opts.ensemble, random_seed=md_opts.random_seed, backend=backend_plugin, trajectory_dot_interval=md_opts.trajectory_dot_interval, trajectory_dot_write_velocity='true' \ if save_opts.write_velocity else 'false', energy_group='true' if enegrp_opts else 'false', last=save_opts.last, **kwargs) system = cms.Cms(md_opts.input_cms) desmondutils.create_msj([stage], filename=msj_name, check_cg=system) cmd = desmondutils.get_multisim_command(md_opts.input_cms, ocms_name, msj_name=msj_name, job_name=jobname) job_info = SimpleNamespace(cmd=cmd, ocms_name=ocms_name, trj_folder=trj_folder, enegrp_dat=enegrp_dat, ptensor=ptensor_dat, jobname=jobname) return job_info
[docs]def type_model_file(arg, model_formats=None): """ Validate that the argument is a file that contains a model. :param arg: the filename :type arg: str :param model_formats: the supported formats of the model :type model_formats: list of str :raise ArgumentTypeError: when the file cannot be open by cms or structure reader :return: filename with path :rtype: str """ if not model_formats: model_formats = [CMS, MAE] assert set(model_formats).issubset([MAE, CMS]) arg = parserutils.type_file(arg) if CMS in model_formats and fileutils.is_cms_file(arg): try: cms.Cms(arg) except Exception: raise argparse.ArgumentTypeError( f'File {arg} is not a valid cms file.') else: return arg if MAE in model_formats and fileutils.is_maestro_file(arg): try: structure.Structure.read(arg) except Exception: raise argparse.ArgumentTypeError( f'File {arg} is not a valid mae file.') else: return arg raise argparse.ArgumentTypeError( f'{arg} is not in formats: {model_formats}')
[docs]def add_md_basic_argument(parser, model_formats=None): """ Add necessary parser arguments for setting up a MD simulation. :param parser: The parser to add additional options :type parser: `argparse.ArgumentParser` :param model_formats: the supported formats of the model :type model_formats: list of str :rtype: 'argparse._ArgumentGroup' :return: argparse ArgumentGroup with MD setting options """ if model_formats == None: model_formats = [CMS] has_cms = CMS in model_formats help = ['Desmond cms for molecular dynamics simulations'] if has_cms else [] input_file = INPUT_CMS if has_cms else None if MAE in model_formats: help.append('Maestro mae for post analysis.') input_file = INPUT_MAE if input_file is None else INPUT_FILE help = ' or '.join(help) md_basic = parser.add_argument_group('MD Basic Setting') md_basic.add_argument(input_file, metavar=input_file.upper(), type=functools.partial(type_model_file, model_formats=model_formats), help=help) if input_file != INPUT_CMS: md_basic.add_argument(f"-{INPUT_CMS}", help=argparse.SUPPRESS) md_basic.add_argument( FLAG_ENSEMBLE, metavar='MD_ENSEMBLE', default=NVE, choices=ENSEMBLES, help="Molecular dynamics ensemble (%s) for data collection." % ", ".join(ENSEMBLES)) md_basic.add_argument( FLAG_MD_TEMP, metavar='KELVIN', type=parserutils.type_positive_float, default=ARG_DEFAULTS[FLAG_MD_TEMP], help="Temperature (K) of molecular dynamics simulations.") md_basic.add_argument( FLAG_MD_PRESSURE, metavar='BAR', type=float, default=ARG_DEFAULTS[FLAG_MD_PRESSURE], help="Pressure (bar) of NPT molecular dynamics simulations.") md_basic.add_argument(FLAG_MD_TIME, metavar='NANOSECOND', type=parserutils.type_positive_float, default=ARG_DEFAULTS[FLAG_MD_TIME], help="Time (ns) of molecular dynamics simulations.") md_basic.add_argument( FLAG_MD_TIMESTEP, metavar='MD_TIMESTEP', type=parserutils.type_positive_float, default=ARG_DEFAULTS[FLAG_MD_TIMESTEP], help="Timestep (fs) of molecular dynamics simulations.") # MATSCI - 9064: remove cpu desmond support md_basic.add_argument(FLAG_GPU, action="store_true", default=True, help=argparse.SUPPRESS) parserutils.add_random_seed_parser_argument(md_basic) return md_basic
[docs]def add_md_output_argument(parser, extra_options): """ Add parser arguments to record output information for MD simulations. :param parser: The parser to add additional options :type parser: `argparse.ArgumentParser` :param extra_options: list of str :type extra_options: extra options can be added according to these keywords :rtype: 'argparse._ArgumentGroup' :return: argparse ArgumentGroup with MD output recording information """ md_output = parser.add_argument_group('MD Output Setting') md_output.add_argument( parserutils.FLAG_MD_TRJ_INT, metavar='PICOSECONDS', type=parserutils.type_nonnegative_float, default=ARG_DEFAULTS[parserutils.FLAG_MD_TRJ_INT], help="Trajectories will be recorded every FLAG_MD_TRJ_INT (ps) " "during the molecular dynamics simulations.") if FLAG_MD_SIM_NUM in extra_options: # Replica runs for ensemble average may skip the saving of *-out.cms parserutils.add_save_desmond_files_parser_argument(md_output) else: md_output.add_argument( FLAG_MD_SAVE_TRJ, action='store_true', help="Save the trajectory from the molecular dynamics simulation.") # EnergyGroup-based analysis if FLAG_ENEGRP_DAT in extra_options: md_output.add_argument( FLAG_ENEGRP_DAT, metavar='DESMOND_ENEGRP_DAT', help=( "Please provide energy group file with pressure tensor recorded " "if molecular dynamics simulation is skipped.")) md_output.add_argument( parserutils.FLAG_MD_ENEGRP_INT, metavar='PICOSECONDS', type=parserutils.type_nonnegative_float, default=ARG_DEFAULTS[parserutils.FLAG_MD_ENEGRP_INT], help="Pressure tensor will be recorded every FLAG_MD_ENEGRP_INT (ps) " "during the molecular dynamics simulations.") md_output.add_argument( FLAG_MD_SAVE_ENEGRP, action="store_true", help="Pressure tensor outputs (*_enegrp.dat) from molecular dynamics " "simulations will be saved ") if FLAG_MD_SIM_NUM in extra_options: # The molecular simulation before this time is used to reach # equilibrium state (no enegrp output) md_output.add_argument( FLAG_ENEGRP_START, metavar='PICOSECONDS', type=parserutils.type_nonnegative_float, default=ARG_DEFAULTS[FLAG_ENEGRP_START], help="Start to record pressure tensor when simulation time " "reaches ENEGRP_START (ps).") if FLAG_TRJ_FOLDER in extra_options: md_output.add_argument(FLAG_TRJ_FOLDER, metavar='TRJ_FOLDER', help="Analyze this trajectory, if %s %s" % (FLAG_RUN_TYPE, POST_RUN)) return md_output
[docs]def add_analysis_argument(parser, extra_options): """ Add parser arguments for analysis. :param parser: The parser to add additional options :type parser: `argparse.ArgumentParser` :param extra_options: list of str :type extra_options: extra options can be added according to these keywords :rtype: 'argparse._ArgumentGroup' :return: argparse ArgumentGroup with analysis options """ analysis_parameter = parser.add_argument_group('Analysis Parameter') if FLAG_DATA_LAST_FRAC in extra_options: analysis_parameter.add_argument( FLAG_DATA_LAST_FRAC, action="store", metavar='FRACTION', default=ARG_DEFAULTS[FLAG_DATA_LAST_FRAC], type=lambda x: parserutils.type_ranged_num( x, top=DATA_LAST_PCT_RANGE[1], bottom=DATA_LAST_PCT_RANGE[0], top_allowed=True, bottom_allowed=False), help="The last this fraction of the molecular dynamics data will be" " used for analysis.") else: analysis_parameter.add_argument( FLAG_DATA_START, action="store", metavar='DATA_NUMBER', default=int(ARG_DEFAULTS[FLAG_DATA_START]), type=parserutils.type_nonnegative_int, help= "DATA_NUMBER at the beginning of a molecular dynamics data will be" " excluded from analysis.") # ASL/SMILE selection based analysis if FLAG_ASL in extra_options: analysis_parameter.add_argument( FLAG_ASL, action="store", metavar='ASL', default=ARG_DEFAULTS[FLAG_ASL], help="ASL string to define atom selection.") if FLAG_SMILES in extra_options: analysis_parameter.add_argument( FLAG_SMILES, action="store", metavar='SMILES', help="Only analyze molecules matching this SMILES string") # FLAG_TRJ_FOLDER indicate this is a trajectory analysis if FLAG_TRJ_FOLDER in extra_options: analysis_parameter.add_argument( FLAG_TRJ_FRAME_NUM, action="store", metavar='TRJ_NUMBER', type=parserutils.type_positive_int, help="Read this number of trajectory frames or until the end.") # The following should be deprecated if FLAG_BLOCK_NUM in extra_options: analysis_parameter.add_argument( FLAG_BLOCK_NUM, action="store", metavar='BLOCK_NUM', default=ARG_DEFAULTS[FLAG_BLOCK_NUM], type=parserutils.type_positive_int, help= "Data from original single simulation will be divided into BLOCK_NUM" " short successive blocks, and each block will be analyzed independently" " to calculate standard deviation for coefficient of variation check" ) if FLAG_BLOCK_NUM in extra_options or FLAG_MD_SIM_NUM in extra_options: analysis_parameter.add_argument( FLAG_VARIATION, action="store", metavar='VARIATION', type=parserutils.type_positive_float, help="If provided, the tau_end is adjusted according to VARIATION " "(standard deviation over mean) so that Tau region has variation " "less than VARIATION.") return analysis_parameter
[docs]def add_parser_opts(parser, final_prop, mid_prop, use_tau=True, extra_options=None, model_formats=None): """ Add all necessary parser arguments for setting up a MD simulation and optional arguments for certain green-kubo property calculation. :param parser: The parser for error information :type parser: `argparse.ArgumentParser` :param final_prop: str :type final_prop: the final property needs to be calculated :param mid_prop: str :type mid_prop: the property needs to be calculated before the final prop :param use_tau: bool :type use_tau: Tau limits are added, if True :param extra_options: None or list of str :type extra_options: extra options can be added according to these keywords :param model_formats: the supported formats of the model :type model_formats: list of str :rtype: 'argparse._ArgumentGroup', 'argparse._ArgumentGroup', 'argparse._ArgumentGroup' :return: argparse ArgumentGroups with setting options """ if extra_options is None: extra_options = [] if model_formats is None: model_formats = [CMS] md_basic = add_md_basic_argument(parser, model_formats=model_formats) md_output = add_md_output_argument(parser, extra_options) analysis_parameter = add_analysis_argument(parser, extra_options) if FLAG_MD_SIM_NUM in extra_options: analysis_indep_run = parser.add_argument_group('Independent Run') parser.add_argument( FLAG_RUN_TYPE, action='store', metavar='RUN_TYPE', default=FULL_RUN, help= ("Define the type of this calculation. %s: run molecular dynamics " "simulations and post analysis; %s: skip the molecular dynamics " "simulations and perform post-analysis; %s: post-analysis only includes " "multi-run average and curve fitting; %s: only perform curve fitting on " "the averaged data." % (FULL_RUN, POST_RUN, MULTI_RUN, FIT_ONLY))) # Correlation-based analysis if use_tau: analysis_parameter.add_argument( FLAG_TAU_START, action="store", metavar='NANOSECOND', default=ARG_DEFAULTS[FLAG_TAU_START], type=parserutils.type_nonnegative_float, help= "%s data before MD_TAU_START (ns) are excluded for %s calculation." % (mid_prop.capitalize(), final_prop)) analysis_parameter.add_argument( FLAG_TAU_END, action="store", metavar='NANOSECOND', type=parserutils.type_positive_float, default=MAX_TIME_NS, help="%s data after MD_TAU_END (ns) are excluded for %s calculation." % (mid_prop.capitalize(), final_prop)) # Independent runs for ensemble average if FLAG_MD_SIM_NUM in extra_options: analysis_indep_run.add_argument( FLAG_MD_SIM_NUM, action='store', metavar='MD_SIM_NUM', type=parserutils.type_positive_int, default=ARG_DEFAULTS[FLAG_MD_SIM_NUM], help= "MD_SIM_NUM numbers of independent molecular dynamics simulations each" " followed by post-analysis are fired off under jobcontrol.") analysis_indep_run.add_argument( FLAG_INPUT_SEARCH, action='store_true', help= "When %s %s or %s %s, this flag allows users to search the folder " "containing the user-defined file so that multiple files with the " "same extension from previous molecular dynamics simulations " "or post-analyses are used as inputs for this calculation." % (FLAG_RUN_TYPE, POST_RUN, FLAG_RUN_TYPE, MULTI_RUN)) return md_basic, md_output, analysis_parameter
[docs]def check_options(options, parser, option_flag, min_data_point, check_tau=True, check_gpu=False): """ Validate the options. :type options: Named tuple :param options: The parsed command line options :type parser: `argparser.ArgumentParser` :param parser: parser to log error information :type option_flag: str :param option_flag: time interval is defined by this flag :type min_data_point: int :param min_data_point: minimum required data point :type check_tau: bool :param check_tau: validate tau range against available time range :type check_gpu: bool :param check_gpu: check GPU/CPU resource availability. For a remote host, -gpu flag requires a GPU host, and no -gpu flag requires a CPU HOST. For localhost and no job control, only check whether GPU resource is available when -gpu flag exists. """ input_cms = fileutils.get_existing_filepath(options.input_cms) if not input_cms: parser.error( f"The input cms file does not exist. ({options.input_cms})") options.input_cms = input_cms if options.run_type not in ALL_RUN_TYPE: parser.error("%s must be one of the following %s." % (FLAG_RUN_TYPE, ', '.join(ALL_RUN_TYPE))) if check_tau: options.tau_start *= NANO2PICO options.tau_end *= NANO2PICO if options.run_type == FULL_RUN: # in driver time unit is ps, compatible with Desmond options.md_time *= NANO2PICO options.md_timestep *= FEMTO2PICO if check_tau and options.run_type == FULL_RUN: check_tau_options(options, parser, option_flag, min_data_point) if options.run_type == POST_RUN: if option_flag == parserutils.FLAG_MD_TRJ_INT: if not options.trj_folder: msys_model, cms_model = topo.read_cms(options.input_cms) options.trj_folder = topo.find_traj_path( cms_model, base_dir=os.path.dirname(options.input_cms)) if not options.trj_folder: parser.error( "Please provide the trajectory folder path. (-trj_folder)") options.trj_folder = fileutils.get_existing_filepath( options.trj_folder) if not options.trj_folder: parser.error( "The specified trajectory folder path does not exist. (-trj_folder)" ) elif option_flag == parserutils.FLAG_MD_ENEGRP_INT: if not options.enegrp_dat: parser.error( "Please provide the enegrp_dat file via %s flag to run " "post analysis (%s %s)." % (FLAG_ENEGRP_DAT, FLAG_RUN_TYPE, POST_RUN)) options.enegrp_dat = fileutils.get_existing_filepath( options.enegrp_dat) if not options.enegrp_dat: parser.error( "The specified enegrp file does not exist. (-enegrp_dat)") if check_gpu: hostname = jobutils.get_jobhost_name() is_gpu_avail = jobutils.is_jobhost_gpu_available() if hostname in [None, 'localhost']: if options.gpu and not is_gpu_avail: parser.error(f"Please remove {parserutils.FLAG_GPU}, or use a " f"GPU host via -HOST.") elif jobutils.is_hostname_known( hostname) and options.gpu != is_gpu_avail: parser.error(f"Please use {parserutils.FLAG_GPU} for a remote " "GPU host, or remove it for a remote CPU host.")
[docs]def check_tau_options(options, parser, option_flag, min_data_point): """ Validate the Tau options. :type options: Named tuple :param options: The parsed command line options :type parser: `argparser.ArgumentParser` :param parser: parser to log error information :type option_flag: str :param option_flag: time interval is defined by this flag :type min_data_point: int :param min_data_point: minimum required data point """ production_time = options.md_time # FLAG_ENEGRP_START defines when Desmond starts to output enegrp data if option_flag == parserutils.FLAG_MD_ENEGRP_INT: enegrp_start = jobutils.get_option(options, FLAG_ENEGRP_START) if enegrp_start: production_time = options.md_time - enegrp_start time_interval = jobutils.get_option(options, option_flag) available_data_num = math.floor(production_time / time_interval) # Either data_start or data_last_frac. See add_analysis_argument() # Skip data by frame number data_start = jobutils.get_option(options, FLAG_DATA_START) if data_start: available_data_num -= data_start # Skip data using the last percentage data_last_frac = jobutils.get_option(options, FLAG_DATA_LAST_FRAC) if data_last_frac: available_data_num = math.floor(available_data_num * data_last_frac) tau_start_frame = math.ceil(options.tau_start / time_interval) tau_end_frame = math.floor(options.tau_end / time_interval) tau_data_point_num = min(available_data_num - tau_start_frame, tau_end_frame - tau_start_frame) if tau_data_point_num < min_data_point: msg = (f"Only {int(tau_data_point_num)} Tau data points are found, " f"but a minimum of {min_data_point} are needed. Please " f"increase simulation time ({FLAG_MD_TIME}), increase Tau " f"end ({FLAG_TAU_END}), decrease data record interval " f"({option_flag}), skip less simulation data " f"({FLAG_DATA_START} / {FLAG_DATA_LAST_FRAC}), or decrease " f"Tau start ({FLAG_TAU_START}).") if option_flag == parserutils.FLAG_MD_ENEGRP_INT and enegrp_start: msg = msg[:-1] + ( f', or decrease enegrp_start ({FLAG_ENEGRP_START}).') parser.error(msg)
[docs]class Model(object): """ Wrapper around a structure. """
[docs] def __init__(self, filename): """ :param filename: file path pointing to a cms or mae file. :type filename: str """ self.filename = filename self.is_cms = fileutils.is_cms_file(self.filename) self.filetype = CMS if self.is_cms else MAE self._model = None
[docs] def load(self): """ Load a model. """ raise NotImplementedError( "This is the method to load a model from the file.")
[docs] def setProperty(self, key, value): """ Set the property to the model. :param key: property name :type key: str :param value: the value of the property :type value: str, float, int """ raise NotImplementedError("This is a method setting a property.")
@property def property(self): """ Return the model property. :return: a dictionary :rtype: dict """ return self._model.property
[docs] def getExtension(self): """ Return the extension of the filename. :return: the extension of the filename :rtype: str """ return fileutils.splitext(self.filename)[-1]
[docs] def getStruct(self): """ Return the 'structure.Structure' object. :return: structure of the model :rtype: 'structure.Structure' """ return self._model
[docs]class MaeModel(Model): """ Wrapper around Structure. """
[docs] def load(self): """ Load a model from the mae file. """ self._model = structure.Structure.read(self.filename)
[docs] def setProperty(self, key, value): """ Set the property to the mae model. :param key: property name :type key: str :param value: the value of the property :type value: str, float, int """ self._model.property[key] = value
[docs] def removeTrajProp(self): """ Remove the trajectory property. """ self._model.property.pop(msprops.TRAJECTORY_FILE_PROP, None)
[docs] def write(self, fname, wam_type=WAM_TYPE): """ Write the model into a file with the source_path and feature's WAM :param fname: filename to write out the model. :type fname: str :param wam_type: default WAM type :type wam_type: str """ jobutils.set_source_path(self._model) jobutils.write_mae_with_wam([self._model], fname, wam_type)
[docs]class CmsModel(Model): """ Wrapper around Cms. """
[docs] def load(self): """ load a model from a cms file. """ self._model = cms.Cms(self.filename)
[docs] def setProperty(self, key, value): """ Set the property to the cms model. :param key: property name :type key: str :param value: the value of the property :type value: str, float, int """ self._model.set_cts_property(key, value)
[docs] def removeTrajProp(self): """ Remove the trajectory property. """ self._model.remove_cts_property(msprops.TRAJECTORY_FILE_PROP)
[docs] def write(self, fname, wam_type=WAM_TYPE): """ Write the model into a file with the source_path and feature's WAM :param fname: filename to write out the model. :type fname: str :param wam_type: default WAM type :type wam_type: str """ jobutils.set_source_path(self._model) jobutils.write_cms_with_wam(self._model, fname, wam_type)
[docs] def setWam(self, wam_type=WAM_TYPE): """ Set the feature's WAM for the model. :param wam_type: default WAM type :type wam_type: str """ jobutils.set_structure_wam(self._model, wam_type)
[docs] def getStruct(self): """ Return the 'structure.Structure' object. :return: structure of the model :rtype: 'structure.Structure' """ return self._model.fsys_ct
[docs]def get_model(filename): """ Get the model from a cms or mae file. :param filename: file path pointing to a cms or mae file. :type filename: str :return: the model in the file. :rtype: 'CmsModel' or 'MaeModel' """ if fileutils.is_cms_file(filename): model = CmsModel(filename) else: model = MaeModel(filename) model.load() return model
[docs]class EquilibriumMdBase(object): """ This is a base class for equilibrium MD subclass, and should be not be instantiated directly. """
[docs] def __init__(self, input_cms, logger=False): """ :param input_cms: str :type input_cms: cms input file :param logger: `logging.Logger`, None or False :type logger: loggerobj.logger prints to logfilename; None prints to stdout; False mutes logging and raises errors """ self.input_cms = input_cms self.logger = logger # The following dictionary is saved in the cms model by savePropAndCms() self.prop2save = {} # 'r_matsci_xxx' as key and a float as value (self.msys_model, self.cms_model) = topo.read_cms(self.input_cms)
[docs] def log(self, msg, **kwargs): """ Add a message to the log file in driver mode, or ignore the logging in module mode. :type msg: str :param msg: The information message Additional keyword arguments are passed to the textlogger.log_msg function """ if self.logger is False: return kwargs['logger'] = self.logger textlogger.log_msg(msg, **kwargs)
[docs] def log_error(self, msg): """ Exit with the error printed in driver mode, or raise the error in module mode. :param msg str: The error message :raise: RuntimeError in module mode """ if self.logger is False: raise RuntimeError(msg) textlogger.log_error(msg, logger=self.logger)
[docs] def checkSetTimeInterval(self, time_intervals=None): """ Time interval must be the same for all trajectery. :type time_intervals: list or None :param time_intervals: time intervals between every two frames. """ if not time_intervals: time_intervals = [] pre_time = self.time_list[0] for cur_time in self.time_list[1:]: time_intervals.append(cur_time - pre_time) pre_time = cur_time time_intervals = numpy.matrix(time_intervals) self.time_interval = numpy.average(time_intervals) time_equal_crit = self.time_interval * 0.00001 time_intervals_diff = (time_intervals - self.time_interval) / time_equal_crit if time_intervals_diff.round().any(): self.log_error('Different time intervals are found. Aborting...')
[docs] def checkConstantVol(self, allow_npt=True): """ Calculate volume and check whether it changes. :type allow_npt: bool :param allow_npt: allow volume to change, if True """ volumes = numpy.matrix(self.vol_list) self.volume = numpy.average(volumes) if not allow_npt: volume_equal_crit = self.volume * 0.00001 volume_diff = (volumes - self.volume) / volume_equal_crit if volume_diff.round().any(): self.log_error( 'The volume of the system changes, but constant volume' 'is required for viscosity calculation. Aborting...')
[docs] def calDensity(self): """ Caluate the average system density. """ # unit kg/m^3 self.density = self.cms_model.total_weight * scipy_const.value( 'atomic mass constant') / (self.volume * scipy_const.angstrom**3)
[docs] def savePropAndCms(self, cms_out=None, wam_type=None): """ Save properties to cms model, delete trj information if needed, and write out cms file. :param str cms_out: name of the output cms. :type wam_type: int or None :param wam_type: One of the enums defined in workflow_action_menu.h if the results should have a Workflow Action Menu in Maestro """ jobutils.set_source_path(self.cms_model) for key, value in self.prop2save.items(): try: value = float('%.5g' % value) except TypeError: pass self.cms_model.set_cts_property(key, value) if not self.save_trj_data: for prop in [ msprops.ORIGINAL_CMS_PROP, msprops.TRAJECTORY_FILE_PROP ]: self.cms_model.remove_cts_property(prop) if cms_out is None: cms_out = self.outfile if wam_type: jobutils.write_cms_with_wam(self.cms_model, cms_out, wam_type) else: self.cms_model.write(cms_out)
[docs]class TrjBase(EquilibriumMdBase): """ This is a base class for equilibrium MD subclass that uses trajectory, and should be not be instantiated directly. """
[docs] def readTrj(self, make_whole=False): """ Read trj frames from trj folder. :type make_whole: bool :param make_whole: if True, bond across PBCs are fixed so that two bonded particles won't be on opposite sides of the system """ # MATSCI-5668: Create a low memory trajectory frame iterator traj.Source.CACHE_LIMIT_BYTES = 0 self.log('Reading trajectory...', pad=True) try: all_traj_frames = traj.read_traj(str(self.trj_folder)) except (IOError, TypeError) as msg: self.log_error("Trajectory reading failed, error was: %s" % str(msg)) if self.log: self.log(f"Total trajectory frame number: {len(all_traj_frames)}") if hasattr(self, 'trj_frame_num') and self.trj_frame_num: data_end = self.trj_frame_num + self.data_start self.traj_frames = all_traj_frames[self.data_start:data_end] else: # Last frame may have different time interval self.traj_frames = all_traj_frames[self.data_start:-1] if not self.traj_frames: # If only one trajectory frame exists, use it instead of skipping self.traj_frames = all_traj_frames[self.data_start:] if not self.traj_frames: self.log_error( "{0} trajectory frames are found, but after excluding first {1} frames, " "no trajectory frames are left for further analysis.".format( len(all_traj_frames), self.data_start)) self.frame_num = len(self.traj_frames) if make_whole: # Fix bonds across PBCs try: topo.make_whole(self.msys_model, self.traj_frames) except ValueError: # MATSCI-9443: Equilibrium method doesn't work with dynamic atom selection self.log_error( "Please provide a classical molecular dynamics trajectory." " (the hybrid GCMC/MD simulation is not supported)") self.log("Available trajectory frame number: %i " % self.frame_num)
[docs] def trajFrames(self, make_whole=False): """ Fix pbc bonds (if needed), yield the trajectory frame, and garbage-collect to release the occupied memory. :type make_whole: bool :param make_whole: if True, bond across PBCs are fixed so that two bonded particles won't be on opposite sides of the system :rtype: `schrodinger.application.desmond.packages.traj.frame` :return: a trajectory frame """ for traj_frame in self.traj_frames: if make_whole: # Fix bonds across PBCs topo.make_whole(self.msys_model, [traj_frame]) yield traj_frame # garbage-collect a used trajectory frame traj_frame._source.clear_cache() traj_frame._object = None
[docs] def checkTau(self, frame_num, min_tau_data_point): """ Check whether trj frame number is large enough. :param frame_num: int :type frame_num: number of total data point :param min_tau_data_point: int :type min_tau_data_point: minimum requested data """ # Find the trj_interval and calculate tau trj frame limit time = [x.time for x in itertools.islice(self.traj_frames, 2)] try: trj_interval = time[1] - time[0] except IndexError: self.log_error(f"Only {len(time)} trajectory frames are found.") self.log("Trajectory time interval: %.5f (ps)" % trj_interval) self.tau_frame_end = int(math.floor(self.tau_end / trj_interval)) self.tau_frame_start = int(math.ceil(self.tau_start / trj_interval)) trajectory_max_time = frame_num * trj_interval # Set tau_end to be <= max tau, and validate tau_start # Trj frame for backend, time (ns) for frontend if self.tau_frame_end > frame_num: self.log( f"-tau_end ({self.tau_end} ps) exceeds the available trajectory range. " f"Setting -tau_end to the largest possible value {trajectory_max_time} ps.", pad=True, pad_below=True) self.tau_frame_end = frame_num if self.tau_frame_start >= frame_num: self.log_error( f"-tau_start ({self.tau_start} ps) is larger than total trajectory " f"time length ({trajectory_max_time:5g} ps). Aborting...") elif self.tau_frame_start >= self.tau_frame_end: self.log_error( "frame frequency is too big or larger than the tau window. " "Aborting...") elif self.tau_frame_start + min_tau_data_point > self.tau_frame_end: self.log_error( "Only {0} trajectory frames available for Tau calculation, " "but the analysis needs at least {1} Tau data points." "Please extend the MD simulation, enlarge the Tau range, or " "increase the trajectory output frequency.".format( self.tau_frame_end - self.tau_frame_start, min_tau_data_point))
[docs] def checkVel(self): """ Check whether each frame contains velocity information. """ for traj_frame in self.traj_frames: if traj_frame.vel() is None: self.log_error( "Please make sure trajectory folder has " "velocity information in every trajectory frame.")
[docs]class MsdTrajBase(TrjBase): """ Trajectory base class for Mean squared displacement method. """ ATOM = 'atom' METHOD = 'MSD' X = 'x' Y = 'y' Z = 'z' TAU_PS = 'Tau(ps)' MSD_ANG2 = 'MSD(angstrom^2)' STDEV_ANG2 = 'Std Dev(angstrom^2)' Y_HEADERS = [MSD_ANG2, STDEV_ANG2] CSV_HEADER_3D = [TAU_PS] + [f'3D {x}' for x in Y_HEADERS] ANISO_DIMS = ['1D'] * 2 + ['2D'] * 2 CSV_HEADER_PLANE = [f'{x} {y}' for x, y in zip(ANISO_DIMS, Y_HEADERS * 2)] AXES = [X] * 2 + [Y] * 2 + [Z] * 2 CSV_HEADER_AXIS = [f'{x} {y}' for x, y in zip(AXES, Y_HEADERS * 3)] TAU_START_PROP = DIFFUSIVITY_TAU_START_PROP % METHOD TAU_END_PROP = DIFFUSIVITY_TAU_END_PROP % METHOD VECTOR_DELIM = ',' VECTOR_DELIM_WITH_SPACE = VECTOR_DELIM + ' ' NORMAL = 'Normal' DIFFUSION_FILE_PROP = 's_matsci_Diffusion_MSD_File' DIFFUSIVITY_VECTOR_PROP = '_'.join( [MATSCI_DIFFUSION_STR_BASE, METHOD, NORMAL]) DIFFUSIVITY_1D_PROP = DIFFUSIVITY_1D_PROP_BASE % METHOD DIFFUSIVITY_2D_PROP = DIFFUSIVITY_2D_PROP_BASE % METHOD DIFFUSIVITY_3D_PROP = DIFFUSIVITY_3D_PROP_BASE % METHOD DIFFUSIVITY_X_PROP = DIFFUSIVITY_X_PROP_BASE % METHOD DIFFUSIVITY_Y_PROP = DIFFUSIVITY_Y_PROP_BASE % METHOD DIFFUSIVITY_Z_PROP = DIFFUSIVITY_Z_PROP_BASE % METHOD STDEV = 'stdev' SD_EXT = '_'.join([METHOD, STDEV]) DIFFUSIVITY_1D_SD_PROP = DIFFUSIVITY_1D_PROP_BASE % SD_EXT DIFFUSIVITY_2D_SD_PROP = DIFFUSIVITY_2D_PROP_BASE % SD_EXT DIFFUSIVITY_3D_SD_PROP = DIFFUSIVITY_3D_PROP_BASE % SD_EXT DIFFUSIVITY_X_SD_PROP = DIFFUSIVITY_X_PROP_BASE % SD_EXT DIFFUSIVITY_Y_SD_PROP = DIFFUSIVITY_Y_PROP_BASE % SD_EXT DIFFUSIVITY_Z_SD_PROP = DIFFUSIVITY_Z_PROP_BASE % SD_EXT DIFFUSIVITY_1D_R2_PROP = DIFFUSIVITY_1D_R2_PROP_BASE % METHOD DIFFUSIVITY_2D_R2_PROP = DIFFUSIVITY_2D_R2_PROP_BASE % METHOD DIFFUSIVITY_3D_R2_PROP = DIFFUSIVITY_3D_R2_PROP_BASE % METHOD DIFFUSIVITY_X_R2_PROP = DIFFUSIVITY_X_R2_PROP_BASE % METHOD DIFFUSIVITY_Y_R2_PROP = DIFFUSIVITY_Y_R2_PROP_BASE % METHOD DIFFUSIVITY_Z_R2_PROP = DIFFUSIVITY_Z_R2_PROP_BASE % METHOD DIFFUSIVITY = collections.namedtuple('DIFFUSIVITY', ['diffusivity', 'std_dev', 'r2']) ISOTROPIC = 'isotropic' ANISOTROPIC = 'anisotropic' ISOTROPIC_3D = ISOTROPIC + ':3D' ANISOTROPIC_1D = ANISOTROPIC + ':1D' ANISOTROPIC_2D = ANISOTROPIC + ':2D' DIMENSION_MAP = {ISOTROPIC_3D: 3, ANISOTROPIC_1D: 1, ANISOTROPIC_2D: 2} DIFFUSION_PROPS_MAP = { 1: DIFFUSIVITY(diffusivity=DIFFUSIVITY_1D_PROP, std_dev=DIFFUSIVITY_1D_SD_PROP, r2=DIFFUSIVITY_1D_R2_PROP), 2: DIFFUSIVITY(diffusivity=DIFFUSIVITY_2D_PROP, std_dev=DIFFUSIVITY_2D_SD_PROP, r2=DIFFUSIVITY_2D_R2_PROP), 3: DIFFUSIVITY(diffusivity=DIFFUSIVITY_3D_PROP, std_dev=DIFFUSIVITY_3D_SD_PROP, r2=DIFFUSIVITY_3D_R2_PROP), X: DIFFUSIVITY(diffusivity=DIFFUSIVITY_X_PROP, std_dev=DIFFUSIVITY_X_SD_PROP, r2=DIFFUSIVITY_X_R2_PROP), Y: DIFFUSIVITY(diffusivity=DIFFUSIVITY_Y_PROP, std_dev=DIFFUSIVITY_Y_SD_PROP, r2=DIFFUSIVITY_Y_R2_PROP), Z: DIFFUSIVITY(diffusivity=DIFFUSIVITY_Z_PROP, std_dev=DIFFUSIVITY_Z_SD_PROP, r2=DIFFUSIVITY_Z_R2_PROP), } DIFFUSION_COL_MAP = { 1: (0, 3, 4), 2: (0, 5, 6), 3: (0, 1, 2), X: (0, 3, 4), Y: (0, 5, 6), Z: (0, 7, 8), }
[docs] def __init__(self, options, **kwargs): """ :type options: Named tuple :param options: The parsed command line options """ super().__init__(options.input_cms, **kwargs) self.options = options self.time_msd_std = None self.tau_frame_start = None self.tau_frame_end = None
[docs] @staticmethod def fitMsd(time_msd_std, start_idx, end_idx, dimension=3): """ Fit the mean squared displacement :param time_msd_std: first column is time in ps, second column is mean squared displacement in angstrom^2. :type time_msd_std: numpy.ndarray :param start_idx: index to select the starting frame :type start_idx: int :param end_idx: index to select the ending frame :type end_idx: int :param dimension: 1 stands for 1D; 2 stands for 2D; 3 stands for 3D :type dimension: int :return: diffusivity coefficient in m^2/s, diffusivity_std in m^2/s, R-squared in linear regression :rtype: float, float, float """ slope, intercept, r_value, p_value, std_err = linregress( time_msd_std[start_idx:end_idx, 0], time_msd_std[start_idx:end_idx, 1]) # https://en.wikipedia.org/wiki/Mean_squared_displacement # 1D scenario above, one obtains the MSD in that dimension as 2Dt # Here, we treat it as isotropic in three dimensions which is 6Dt diffusivity_value = slope * DIFFUSION_UNIT_CONVERT / dimension / 2 diffusivity_std = std_err * DIFFUSION_UNIT_CONVERT / dimension / 2 r_squared = r_value**2 return diffusivity_value, diffusivity_std, r_squared
[docs] @classmethod def vectorStr(cls, vector): """ Convert vector array to str format. :param vector: a vector :type vector: 1 x 3 array of floats :return: vector in str format :rtype: str """ return f"({cls.VECTOR_DELIM_WITH_SPACE.join([f'{x:.4g}' for x in vector])})"
[docs] def getMsdHeader(self): """ Get the MSD csv header. :return: column labels separated by comma. :rtype: str """ header = self.CSV_HEADER_3D.copy() if self.options.plane is not None: header += self.CSV_HEADER_PLANE if self.options.axis: header += self.CSV_HEADER_AXIS header = ','.join(header) return header
[docs] def setTauFrame(self, tau_start=None, tau_end=None): """ Set Tau frame start and end limits. :param tau_start float: the starting tau value in ps :param tau_end float: the ending tau value in ps """ def find_nearest(array, value): array = numpy.asarray(array) idx = (numpy.abs(array - value)).argmin() return idx if tau_start is None: tau_start = self.options.tau_start if tau_end is None: tau_end = self.options.tau_end self.tau_frame_start = find_nearest(self.time_msd_std[:, 0], tau_start) # the tau_frame_end is the excluded upper limit self.tau_frame_end = find_nearest(self.time_msd_std[:, 0], tau_end) + 1
[docs] def setIsotropicMSD(self): """ Set the isotropic 3d diffusion coefficients. """ volume_diffusivity, diffusivity_std, r_squared = self.fitMsd( self.time_msd_std[:, :2], self.tau_frame_start, self.tau_frame_end) self.prop2save[self.DIFFUSIVITY_3D_PROP] = volume_diffusivity self.prop2save[self.DIFFUSIVITY_3D_SD_PROP] = diffusivity_std self.prop2save[self.DIFFUSIVITY_3D_R2_PROP] = r_squared log_debug(f"MEM: setIsotropicMSD {jobutils.memory_usage_psutil()} MB.")
[docs] def setPlaneMSD(self): """ Set the anisotropic in-plane 2d diffusion coefficients. """ if self.options.plane is None: return self.prop2save[self.DIFFUSIVITY_VECTOR_PROP] = self.vectorStr( self.options.plane) tau = self.time_msd_std[:, 0].reshape(-1, 1) line_msd = self.time_msd_std[:, 3].reshape(-1, 1) plane_msd = self.time_msd_std[:, 5].reshape(-1, 1) line_diffusivity, line_diff_std, line_r_sq = self.fitMsd( numpy.concatenate((tau, line_msd), axis=1), self.tau_frame_start, self.tau_frame_end, dimension=1) self.prop2save[self.DIFFUSIVITY_1D_PROP] = line_diffusivity self.prop2save[self.DIFFUSIVITY_1D_SD_PROP] = line_diff_std self.prop2save[self.DIFFUSIVITY_1D_R2_PROP] = line_r_sq plane_diffusivity, plane_diff_std, plane_r_sq = self.fitMsd( numpy.concatenate((tau, plane_msd), axis=1), self.tau_frame_start, self.tau_frame_end, dimension=2) self.prop2save[self.DIFFUSIVITY_2D_PROP] = plane_diffusivity self.prop2save[self.DIFFUSIVITY_2D_SD_PROP] = plane_diff_std self.prop2save[self.DIFFUSIVITY_2D_R2_PROP] = plane_r_sq log_debug(f"MEM: setPlaneMSD {jobutils.memory_usage_psutil()} MB.")
[docs] def setAxisMSD(self): """ Set the anisotropic along-axis 1d diffusion coefficients. """ if not self.options.axis: return index_incre = 0 if self.options.plane is None else 4 x_col_index = 3 + index_incre y_col_index = 5 + index_incre z_col_index = 7 + index_incre tau = self.time_msd_std[:, 0].reshape(-1, 1) x_msd = self.time_msd_std[:, x_col_index].reshape(-1, 1) x_time_msd_std = numpy.concatenate((tau, x_msd), axis=1) x_diffusivity, x_diff_std, x_r_sq = self.fitMsd(x_time_msd_std, self.tau_frame_start, self.tau_frame_end, dimension=1) self.prop2save[self.DIFFUSIVITY_X_PROP] = x_diffusivity self.prop2save[self.DIFFUSIVITY_X_SD_PROP] = x_diff_std self.prop2save[self.DIFFUSIVITY_X_R2_PROP] = x_r_sq y_msd = self.time_msd_std[:, y_col_index].reshape(-1, 1) y_time_msd_std = numpy.concatenate((tau, y_msd), axis=1) y_diffusivity, y_diff_std, y_r_sq = self.fitMsd(y_time_msd_std, self.tau_frame_start, self.tau_frame_end, dimension=1) self.prop2save[self.DIFFUSIVITY_Y_PROP] = y_diffusivity self.prop2save[self.DIFFUSIVITY_Y_SD_PROP] = y_diff_std self.prop2save[self.DIFFUSIVITY_Y_R2_PROP] = y_r_sq z_msd = self.time_msd_std[:, z_col_index].reshape(-1, 1) z_time_msd_std = numpy.concatenate((tau, z_msd), axis=1) z_diffusivity, z_diff_std, z_r_sq = self.fitMsd(z_time_msd_std, self.tau_frame_start, self.tau_frame_end, dimension=1) self.prop2save[self.DIFFUSIVITY_Z_PROP] = z_diffusivity self.prop2save[self.DIFFUSIVITY_Z_SD_PROP] = z_diff_std self.prop2save[self.DIFFUSIVITY_Z_R2_PROP] = z_r_sq log_debug(f"MEM: setAxisMSD {jobutils.memory_usage_psutil()} MB.")
[docs]class MultiToOneConverter(MsdTrajBase): """ The class to gather multiple files and merge them into one. """
[docs] def __init__(self, msd_files, jobname, options, save=True): """ :param msd_files: the msd files (with path) for diffusion data. :type msd_files: list of str :param jobname: the jobname :type jobname: str :param options: commandline options :type options: SimpleNamespace or namedtuple :param save: If True, add output files to the backend :type save: bool """ super().__init__(options) self.msd_files = sorted(msd_files) self.jobname = jobname self.save = save self.outfile = self.jobname + DiffusivityMsdAnalysis.DIFFUSION_FILE_EXT jobutils.add_outfile_to_backend(self.outfile) self.time_msd_std = None self.data_stacked = None self.ms_diffusion = jobutils.get_option(options, FLAG_MS_DIFFUSION)
[docs] def run(self): """ Main function to load data, process data, and fit. """ self.loadData() self.calWriteMeanAndStd() self.combineTrace() self.combineOnsagerCsv()
[docs] def loadData(self): """ Read in all msd files. """ assert len(self.msd_files) > 0 data = [] for msd_file in self.msd_files: # Tau, mean squared displacements, standard deviation single_run_data = numpy.loadtxt(msd_file, delimiter=',') data.append(single_run_data) self.data_stacked = numpy.stack(data) self.time_msd_std = numpy.mean(self.data_stacked, axis=0)
[docs] def calWriteMeanAndStd(self): """ Calculate and write mean and std of the diffusion from multi runs. """ replica_num, tau_num, col_num = self.data_stacked.shape if replica_num > 1: # The std dev is over multi-replica instead of # within one replica over multiple frame pair of the same Tau. std_over_replica = numpy.std(self.data_stacked, axis=0) std_col_num = (col_num - 1) // 2 for index in range(std_col_num): col_idx = index * 2 + 1 self.time_msd_std[:, col_idx + 1] = std_over_replica[:, col_idx] header = self.getMsdHeader() numpy.savetxt(self.outfile, self.time_msd_std, fmt='%.10e', delimiter=",", header=header)
[docs] def combineTrace(self): """ Combine the trace files from MSD diffusion calculations. """ trace_files = self.getMSDRelatedFiles( ext=DiffusivityMsdAnalysis.TRACE_FILE_EXT) trace_files = [x for x in trace_files if os.path.exists(x)] if not trace_files: return data = [numpy.loadtxt(x, delimiter=',') for x in trace_files] data = numpy.concatenate(data, axis=1) trace_file = self.jobname + DiffusivityMsdAnalysis.TRACE_FILE_EXT numpy.savetxt(trace_file, data, delimiter=',', header=DiffusivityMsdAnalysis.TRACE_HEADER) if self.save: jobutils.add_outfile_to_backend(trace_file)
[docs] def combineOnsagerCsv(self): """ Combine the onsager series """ if not self.ms_diffusion: return onsager_csvs = self.getMSDRelatedFiles( ext=DiffusivityMsdAnalysis.ONSAGER_FILE_EXT) if not onsager_csvs: return if len(onsager_csvs) != 1: self.log_error("Combining Onsager CSV files are not supported") onsager_csv = self.jobname + DiffusivityMsdAnalysis.ONSAGER_FILE_EXT try: shutil.copy2(onsager_csvs[0], onsager_csv) except IOError as err: self.log_error(str(err)) if self.save: jobutils.add_outfile_to_backend(onsager_csv)
[docs] def getMSDRelatedFiles(self, ext): """ Get related files with given extension which are similar msd log file :param str ext: extension name :rtype: list :return: list of file names """ files = [ x.replace(DiffusivityMsdAnalysis.DIFFUSION_FILE_EXT, ext) for x in self.msd_files ] files = [x for x in files if os.path.exists(x)] return files
[docs]class TrajectoryFrame: """ Container of cms_model and trajectory frame with custom functions and storage """ ATOM = MsdTrajBase.ATOM # Position matrix has 3 columns (x, y, z) COL_NUM_POS = 3 cms_model = None msys_model = None
[docs] def __init__(self, frame, ids, com_type=ATOM, vel=False): """ Crease a TrajectoryFrame object, update gids, mass, and positions. :type frame: `traj.Frame` :param frame: One trajectory frame :type ids: list :param ids: atom id list [int1, int2...] or [(int1, int2...), (...)...] :type com_type: string :param com_type: mass center of atoms within each com_type group :type vel: bool :param vel: velocity in trajectry frame is used to calculate system translation and shift the system before unwrapping displacements """ self.ids = ids self.com_type = com_type self.vel = vel self.gids = [] self.masses = numpy.array([]) self.grp_masses = [] self.setGids() self.setMass() self.setGrpMass() self.setFrame(frame)
[docs] def setGids(self): """ Get global ids of beads (atom or mass center) to extra positions and velocities. """ if self.com_type == self.ATOM: # self.ids = [int1, int2...] self.gids = topo.aids2gids(self.cms_model, self.ids, include_pseudoatoms=False) return # self.ids = [(int1, int2...), (...)...] self.gids = [] for mol in self.ids: mds = topo.aids2gids(self.cms_model, mol, include_pseudoatoms=False) self.gids.append(mds)
[docs] def setMass(self): """ Set the mass of each bead (atom or mass center of atoms). """ masses = [] try: for atom_gid in self.gids: masses.append(self.msys_model.atoms[atom_gid].mass) except TypeError: # com_type!=ATOM and self.ids = [(int1, int2...), (...)...] # As atom_id is a list not int, msys_model.atoms[] raises TypeError for mol_gids in self.gids: grp_mass = [ self.msys_model.atoms[atom_gid].mass for atom_gid in mol_gids ] masses.append(grp_mass) # self.mass must be array instead of matrix, # because grp_mass may have different lengths. self.masses = numpy.array(masses)
[docs] def setGrpMass(self): """ Set the total mass of each grouped atoms. """ if self.com_type == self.ATOM: return self.grp_masses = [sum(x) for x in self.masses]
[docs] def setFrame(self, frame): """ Update trajectry, time, sel coordinates, box, and reduced positions. :type frame: `traj.Frame` objects :param frame: Trajectory """ self.frame = frame self.time = self.frame.time self.setXYZ() self.setBox() self.reducedUnitNumpyPos() # Not necessary, if translation of whole system eliminated # can be simplified if total momentum from desmond directly # self.vel is only for ref atom, and ref -com_type is always ATOM if self.vel: self.total_mass = self.masses.sum() self.momentum2Velocity()
[docs] def setXYZ(self): """ Set the xyz coordinates obtaioned from trajectory frame. If -com_type != ATOM, the the center of mass is calculated first. """ if self.com_type == self.ATOM: self.cur_pos = self.frame.pos()[self.gids] else: self.cur_pos = [] all_pos = numpy.matrix(self.frame.pos()) for idx in range(len(self.gids)): atom_gids = self.gids[idx] pos_with_mass = self.masses[idx] * all_pos[atom_gids] pos = (pos_with_mass / self.grp_masses[idx]).tolist()[0] self.cur_pos.append(pos) self.cur_pos = numpy.matrix(self.cur_pos) self.orig_pos = copy.deepcopy(self.cur_pos)
[docs] def setBox(self): """ Set the stored pbc box size. """ self.box = [ self.frame.box[0][0], self.frame.box[1][1], self.frame.box[2][2] ]
[docs] def reducedUnitNumpyPos(self): """ Convert the XYZ in real unit to reduced unit. """ for col in range(self.COL_NUM_POS): self.cur_pos[:, col] /= self.box[col]
[docs] def momentum2Velocity(self): """ Calculate the mean velocity of system using velocity information from trajectory frame. """ vel_mass = self.masses * (numpy.matrix(self.frame.vel())[self.gids]) self.reduced_momentum_vel = vel_mass / self.total_mass for col in range(self.COL_NUM_POS): self.reduced_momentum_vel /= self.box[col]
[docs]class DiffusivityMsdAnalysis(MsdTrajBase): """ Main class that performs coordinate unwrap, displacement calculation, and diffusivity fit. """ MIN_TAU_DATA_POINT_NUM = 10 COL_NUM_POS = TrajectoryFrame.COL_NUM_POS ATOM = MsdTrajBase.ATOM MOLECULE = 'molecule' MONOMER = 'monomer' SMARTS = 'SMARTS' DIFFUSION_FILE_EXT = '_msd.log' ONSAGER_FILE_EXT = '_onsager.csv' TRACE_FILE_EXT = '_trace.csv' TRACE_HEADER = 'X1, Y1, Z1, X2, Y2, Z2, ...' MAX_NUM_TRACE = 1000 DEFAULT_REF = 'all' # Particles are not allowed to move more than this distance between adjacent # trj frames MAX_REDUCED_DISPLACEMENT = 0.45
[docs] def __init__(self, options, jobname, command_dir=os.curdir, save=True, **kwargs): """ :param options: commandline options :type options: SimpleNamespace or namedtuple :param jobname: the jobname :type jobname: str :param command_dir: the command dir with respect to the launch dir :type command_dir: str :param save: If True, add output files to the backend :type save: bool :param log: function :type log: function to log information :param log_error: function :type log_error: log error information and exit calculation """ super().__init__(options, **kwargs) self.jobname = jobname self.command_dir = command_dir self.save = save self.asl = self.options.asl self.ref = self.options.ref self.com_type = self.options.com_type self.smarts = self.options.smarts self.trj_folder = self.options.trj_folder self.momentum = self.options.momentum self.vel = self.options.vel self.num_trace = self.options.num_trace self.data_start = self.options.data_start self.trj_frame_num = self.options.trj_frame_num self.tau_start = self.options.tau_start self.tau_end = self.options.tau_end self.save_trj_data = self.options.save_trj_data self.infile = self.options.input_cms self.outfile = self.jobname + CMS_OUT_EXT self.msd_file = self.jobname + self.DIFFUSION_FILE_EXT self.ms_diffusion = jobutils.get_option(options, FLAG_MS_DIFFUSION) if self.ms_diffusion: self.onsager_csv = self.jobname + self.ONSAGER_FILE_EXT if self.save: for filename in [self.outfile, self.msd_file]: filepath = os.path.join(self.command_dir, filename) jobutils.add_outfile_to_backend(filepath) jobutils.seed_random_number_generator(options) # set the cms_model for all TrajectoryFrame class TrajectoryFrame.cms_model = self.cms_model TrajectoryFrame.msys_model = self.msys_model self.displacements = [] self.time_intervals = [] nmol = len(self.cms_model.comp_ct) self.onsager_coeff = numpy.zeros(nmol * nmol) self.make_whole = False self.tau_num = None self.sample_num = collections.defaultdict(int) self.volume_squared_sum = collections.defaultdict(float) self.volume_double_squared_sum = collections.defaultdict(float) self.column_total = 3 if self.options.plane is not None: self.column_total += 4 self.line_squared_sum = collections.defaultdict(float) self.line_double_squared_sum = collections.defaultdict(float) self.plane_squared_sum = collections.defaultdict(float) self.plane_double_squared_sum = collections.defaultdict(float) if self.options.axis: self.column_total += 6 self.x_squared_sum = collections.defaultdict(float) self.x_double_squared_sum = collections.defaultdict(float) self.y_squared_sum = collections.defaultdict(float) self.y_double_squared_sum = collections.defaultdict(float) self.z_squared_sum = collections.defaultdict(float) self.z_double_squared_sum = collections.defaultdict(float)
[docs] def run(self): """ Calculate unwrapped displacement, check time interval, and calculate diffusivity. """ self.setSelectionAndLoadTrj() self.setSquaredDisplacementTimeserial() self.setMolDisplacement() self.exportTrace() self.collectData() self.setIsotropicMSD() self.setPlaneMSD() self.setAxisMSD() self.setTauRange() self.savePropAndCms() self.saveMsdTxt()
[docs] def setSelectionAndLoadTrj(self): """ Set the atom selection and load trajectory. """ self.setIds() self.setMakeWholeState() self.readTrj(make_whole=False) self.checkTau(self.frame_num, self.MIN_TAU_DATA_POINT_NUM) if self.vel: self.checkVel()
[docs] def setSquaredDisplacementTimeserial(self): """ Set the atom selection and load trajectory. """ self.setFinalDisplacement() self.checkSetTimeInterval(time_intervals=self.time_intervals) self.setSquaredDisplacement()
[docs] @staticmethod def isIterable(item): """ Whether the input is iterable. Currently, list, set, and tuple are treated as iterable. :param item: str, list, tuple, or other data type :return: Whether the input is iterable :rtype: bool """ return isinstance(item, (list, set, tuple))
[docs] @classmethod def flatten(cls, an_iterable_item): """ Robust function to flatten list recursively. :param an_iterable_item: The iterable item to be flatten. (an iterable that describes the whole or part of the workflow) :type an_iterable_item: list, tuple or set :return: The flatten items in a list ([uniterable value 1, uniterable value 2, uniterable value 3 ...]) :rtype: list of uniterable values (str or dict) """ return [an_iterable_item] if not cls.isIterable(an_iterable_item) else \ [x for y in an_iterable_item for x in cls.flatten(y)]
[docs] def setMakeWholeState(self): """ Set the make_whole state. If true, the bonds across the PBCs will be fixed before trajectory analysis. This happens when the mass center of a group covalently bonded atoms (e.g molecule) is needed. Only set state here as make_whole on per-frame reduces peak memory usage """ if self.com_type == self.ATOM: com_is_ok = True else: sel_ids_flat = self.flatten(self.sel_ids) selected_struct = self.cms_model.fsys_ct.extract(sel_ids_flat, copy_props=True) # The selected atoms for diffusion calculation are in an infinite network # The diffusion of infinite network is non-equilibrium technique without # much support and unwrapping of infinite network is not very well defined # Therefore, advance with defaults. (no special treatment) com_is_ok = xtal.is_infinite2(selected_struct) self.make_whole = not com_is_ok
[docs] def readTrj(self, make_whole=False): """ See parent methods for docs. make_whole removes the bond broken by PBC. """ super().readTrj(make_whole=make_whole) log_debug(f"MEM: readTrj {jobutils.memory_usage_psutil()} MB.")
[docs] def checkVel(self): """ See parent methods for docs. """ super().checkVel() log_debug(f"MEM: checkVel {jobutils.memory_usage_psutil()} MB.")
[docs] def checkSetTimeInterval(self, time_intervals=None): """ See parent methods for docs. """ super().checkSetTimeInterval(time_intervals=time_intervals) log_debug( f"MEM: checkSetTimeInterval {jobutils.memory_usage_psutil()} MB.")
[docs] def savePropAndCms(self): """ See parent methods for docs. """ super().savePropAndCms() log_debug(f"MEM: savePropAndCms {jobutils.memory_usage_psutil()} MB.")
[docs] def getIdx(self, asl, com_type=ATOM, smarts_pattern=None): """ Get atom indices from asl selection. :type asl: str :param asl: ASL search string. :type com_type: string :param com_type: mass center of atoms within each com_type group is calculated :type smarts_pattern: string :param smarts_pattern: SMARTS pattern :rtype: list :return: index of atoms that matche the asl string. [int1, int2...], if use_type == ATOM; [(int1, int2...), (...)...], if if use_com != ATOM """ if not asl: return [] try: atom_idxs = self.cms_model.select_atom(asl) except mm.MmException: self.log_error('%s is invalid' % asl) else: if not atom_idxs: self.log_error(f"{FLAG_ASL} {asl} selects no atoms") if com_type == self.SMARTS: smarts_atom_list = analyze.evaluate_smarts_by_molecule( self.cms_model, smarts_pattern, canvas=False, unique_sets=True, use_rdkit=True) else: smarts_atom_list = [] return self.groupIdx(self.cms_model, atom_idxs, com_type=com_type, smarts_atom_list=smarts_atom_list)
[docs] @classmethod def groupIdx(cls, struct, atom_ids, com_type=ATOM, smarts_atom_list=None): """ Group atoms according to com_type, or find the SMARTS matches within the atom selection. :type struct: `schrodinger.structure` :param struct: structure containing atoms :type atom_ids: list :param atom_ids: atom id list of atom selection :type com_type: string :param com_type: atom, molecule, monomer, or SMARTS as in COM_TYPE :type smarts_atom_list: list :param smarts_atom_list: [[a1, a2..],[b1, b2..]..], each sublist contains atom ids that match the SMARTS pattern. :rtype: list :return: [a1, a2, ..], if com_type=ATOM. [(a1, a2..), (c1, c2..)..], union of atom selection and SMARTS matches or each (..) is a subset of atom selection that belongs to one molecule or residue """ if com_type == cls.ATOM: return atom_ids if com_type == cls.SMARTS: if not smarts_atom_list: return [] shared_smarts = [] atom_ids_set = set(atom_ids) for one_smarts in smarts_atom_list: if set(one_smarts).issubset(atom_ids_set): shared_smarts.append(tuple(one_smarts)) return shared_smarts grouped_atom_dict = {} for atom_id in atom_ids: atom = struct.atom[atom_id] if com_type == cls.MONOMER: # If res is not defined, resnum equals to molecule_number by default mol_res_key = ( atom.molecule_number, atom.resnum, atom.inscode, ) elif com_type == cls.MOLECULE: mol_res_key = atom.molecule_number grouped_atom_dict.setdefault(mol_res_key, []).append(atom_id) grouped_atom_ids = [] for one_mol in grouped_atom_dict.values(): grouped_atom_ids.append(tuple(sorted(one_mol))) return grouped_atom_ids
[docs] def setIds(self): """ Evaluate asl string and save selected atom ids. """ warning_msgs = [] if self.momentum and not self.ref: self.ref = self.DEFAULT_REF warning_msgs.append( "-ref atoms are set to be all atoms in the trajectories " "(-ref = 'all'). Use -ref option to customize the reference " "atoms for momentum correction.") self.sel_ids = self.getIdx(self.asl, com_type=self.com_type, smarts_pattern=self.smarts) self.ref_ids = self.getIdx(self.ref, com_type=self.ATOM) self.sel_num = len(self.sel_ids) if self.com_type != self.ATOM: warning_msgs.append( "Mass center of atoms within each {0} is used for MSD".format( self.com_type)) msg = check_finite_molecules(self.cms_model, atom_ids=self.sel_ids) if msg: msg += ' The diffusion results of infinite matrix may not be accurate.' warning_msgs.append(msg) for msg in warning_msgs: self.log(msg, pad=True) log_debug(f"MEM: setIds {jobutils.memory_usage_psutil()} MB.")
[docs] def setFinalDisplacement(self): """ Choose functions to get the displacement matrix if self.momentum=True, do recentering using ref atoms, """ if self.momentum: self.setFinalDisplacementCenter() else: self.setFinalDisplacementNoCenter()
[docs] def setMolDisplacement(self): """ Compute the displacement of each molecule center of mass """ if not self.ms_diffusion: return # Compute sum of displacement of each species. nmol_total = numpy.array( [ct.mol_total for ct in self.cms_model.comp_ct]) nmol_cum_total = numpy.cumsum(nmol_total) # Total number of molecule mol_total = self.cms_model.fsys_ct.mol_total # See equation 18 of Krishna et al. Ind. Eng. Chem. Res., Vol. 44, # No. 17, 20056941 to compute onsager coefficient. # onsager_coeff_ij = 1/6N * Sum(r_com_ik(t+dt) - r_com_ik(t)) * Sum( # r_com_jl(t+dt) - r_com_jl(t)). Here N is total number of molecule. # r_com_ik is position of k molecule of species i. Similarly, # r_com_jl is position of l molecule of species k. # self.displacemets is a list of list. Each inner list is # displacement of molecules compared to previous frame. for disp in self.displacements: # Split based on the molecule type mol_displacement = numpy.split(disp, nmol_cum_total[:-1]) # Compute the sum over each molecule type. sum_mol_disp = [numpy.sum(var, axis=0) for var in mol_displacement] self.computeOnsagerCoefficient(sum_mol_disp) columns = [ f'coeff_{i}_{j}' for i in range(len(nmol_total)) for j in range(len(nmol_total)) ] self.onsager_coeff = numpy.cumsum(self.onsager_coeff, axis=0) # Divide by 6N. See equation 18 of Krishna et al. Ind. Eng. Chem. Res., # Vol. 44, No. 17, 20056941 self.onsager_coeff = self.onsager_coeff / (6 * mol_total) df = pandas.DataFrame(self.onsager_coeff, columns=columns, dtype=numpy.float) df[TIME] = df.index * self.time_interval df = df[[TIME] + columns] with open(self.onsager_csv, 'w') as onsager_fp: msg = ','.join(str(nmol) for nmol in list(nmol_total)) onsager_fp.write("# Number of molecules: " + msg + "\n") df.to_csv(self.onsager_csv, index=False, float_format='%.6f', mode='a') if self.save: jobutils.add_outfile_to_backend(self.onsager_csv)
[docs] def computeOnsagerCoefficient(self, mol_disp): """ Compute time integrated Onsager coefficient. :param list mol_disp: Displacement of each molecule in a list """ step_coeff = numpy.zeros(len(mol_disp) * len(mol_disp)) for idx1, disp1 in enumerate(mol_disp): for idx2, disp2 in enumerate(mol_disp): idx = len(mol_disp) * idx1 + idx2 step_coeff[idx] = numpy.dot(disp1, disp2.T).item() self.onsager_coeff = numpy.vstack((self.onsager_coeff, step_coeff))
[docs] def exportTrace(self): """ Export trace to file """ if self.num_trace == 0: return trace_file_path = self.jobname + self.TRACE_FILE_EXT num_centers = self.initial_pos.shape[0] num_trace = min(self.num_trace, num_centers) sample_idxes = random.sample(list(range(num_centers)), num_trace) trace_pos = self.initial_pos[sample_idxes].flatten() trace_matrix = trace_pos.copy() for disp in self.displacements: trace_pos += disp[sample_idxes].flatten() trace_matrix = numpy.vstack((trace_matrix, trace_pos)) numpy.savetxt(trace_file_path, trace_matrix, delimiter=',', header=self.TRACE_HEADER) if self.save: jobutils.add_outfile_to_backend(trace_file_path)
[docs] def setFinalDisplacementCenter(self): """ Calculate the displacement between adjacent trajectories, unwrap the coordinates, and correct the mass center. """ cur_sel_frame = None cur_ref_frame = None pre_sel_frame = None pre_ref_frame = None # ref_frame calculates mass center shift: # for ref frame, always use all atom instead of molecule (com_type=ATOM); # for ref frame, use vel to cancel constant translation, if vel=True. for frame in desmondutils.get_desmond_trj_frames( traj_frames=self.traj_frames): if self.make_whole: # Fix bonds across PBCs try: topo.make_whole(self.msys_model, [frame]) except ValueError: # MATSCI-9443: Equilibrium method doesn't work with dynamic atom selection self.log_error( "Please provide a classical molecular dynamics trajectory." " (the hybrid GCMC/MD simulation is not supported)") if pre_sel_frame: # Frames are reused by update with new trj: # no extra frame created and no mass matrix recalculation cur_ref_frame.setFrame(frame) cur_sel_frame.setFrame(frame) # use vel to cancel constant acceleration translation, if self.vel = True cons_acceleration_shift = self.getDisplacementFromConstTranslation( cur_ref_frame, pre_ref_frame) # get the unwrapped adjacent displacement after constant momentum shift ref_adjacent_displacement = self.unwrapReducedDisplacement( cur_ref_frame, pre_ref_frame, cons_acceleration_shift=cons_acceleration_shift) # get mass center shifted vector shift_centered_mass = self.getShiftVectorByRecenterMass( cur_ref_frame.masses, ref_adjacent_displacement) # really unwrap the displacement of sel sel_adjacent_displacement = self.unwrapReducedDisplacement( cur_sel_frame, pre_sel_frame, cons_acceleration_shift=cons_acceleration_shift) # shift the mass center sel_adjusted_displacement = self.getAdjustedDisplacement( sel_adjacent_displacement, shift_centered_mass, cur_sel_frame.box) self.displacements.append(sel_adjusted_displacement) self.time_intervals.append(cur_ref_frame.time - pre_ref_frame.time) # prepare for next loop: pre -> cur; cur -> frame to be reused pre_ref_frame, cur_ref_frame = cur_ref_frame, pre_ref_frame pre_sel_frame, cur_sel_frame = cur_sel_frame, pre_sel_frame else: pre_ref_frame = TrajectoryFrame(frame, self.ref_ids, vel=self.vel) cur_ref_frame = copy.deepcopy(pre_ref_frame) # for sel frame, never use vel (vel=False) pre_sel_frame = TrajectoryFrame(frame, self.sel_ids, com_type=self.com_type, vel=False) cur_sel_frame = copy.deepcopy(pre_sel_frame) self.initial_pos = cur_sel_frame.cur_pos.copy() log_debug( f"MEM: setFinalDisplacementCenter {jobutils.memory_usage_psutil()} MB." )
[docs] def setFinalDisplacementNoCenter(self): """ Calculate the displacement between adjacent trajectories, unwrap the coordinates directly (no mass center shift). """ cur_sel_frame = None pre_sel_frame = None for frame in desmondutils.get_desmond_trj_frames( traj_frames=self.traj_frames): if pre_sel_frame: cur_sel_frame.setFrame(frame) sel_adjacent_displacement = self.unwrapReducedDisplacement( cur_sel_frame, pre_sel_frame) sel_adjusted_displacement = self.realUnitCoordinate( sel_adjacent_displacement, cur_sel_frame.box) self.displacements.append(sel_adjusted_displacement) self.time_intervals.append(cur_sel_frame.time - pre_sel_frame.time) pre_sel_frame, cur_sel_frame = cur_sel_frame, pre_sel_frame else: pre_sel_frame = TrajectoryFrame(frame, self.sel_ids, self.com_type, False) cur_sel_frame = copy.deepcopy(pre_sel_frame) self.initial_pos = cur_sel_frame.cur_pos.copy()
[docs] def getDisplacementFromConstTranslation(self, cur_frame, pre_frame): """ Estimate the position shift between two frames by time x mean velocity. :type cur_frame: `traj.Frame` objects :param cur_frame: current trajectory frame :type pre_frame: `traj.Frame` objects :param pre_frame: previous trajectory frame :rtype: list or None :return: list of float (xyz coordinates), if vel=True """ if self.vel: cons_acceleration_shift = (cur_frame.reduced_momentum_vel + pre_frame.reduced_momentum_vel) / 2 * ( cur_frame.time - pre_frame.time) return cons_acceleration_shift.tolist()[0] return
[docs] def unwrapReducedDisplacement(self, cur_frame, pre_frame, cons_acceleration_shift=None): """ Calculate wrapped XYZ displacement, shift the mass center, and unwrap displacement. :type cur_frame: `traj.Frame` objects :param cur_frame: current trajectory frame :type pre_frame: `traj.Frame` objects :param pre_frame: previous trajectory frame :type cons_acceleration_shift: list of float :param cons_acceleration_shift: xyz shift of mass center due to ref atom collective translation assuming constant acceleration :rtype: numpy.matrix :return: unwrapped coordinates MXN matrix in reduced unit after const momentum shift """ reduced_unwrapped_displacement = cur_frame.cur_pos - pre_frame.cur_pos if cons_acceleration_shift: for col in range(self.COL_NUM_POS): reduced_unwrapped_displacement[:, col] -= cons_acceleration_shift[ col] reduced_unwrapped_displacement -= reduced_unwrapped_displacement.round() if numpy.amax(numpy.absolute(reduced_unwrapped_displacement) ) > self.MAX_REDUCED_DISPLACEMENT: abs_reduced_displacement = numpy.absolute( reduced_unwrapped_displacement) col = abs_reduced_displacement.argmax() % self.COL_NUM_POS max_displacement = cur_frame.box[ col] * abs_reduced_displacement.max() mass_center_id = abs_reduced_displacement[:, col].argmax() self.log_error( f"Between two adjacent trajectory frames ({pre_frame.time:.2f} ps, " f"{cur_frame.time:.2f} ps), mass center {mass_center_id} has too large " f"displacement ({max_displacement:.2f} Angstrom), " "but the largest allowable atom displacement is " f"{cur_frame.box[col] * self.MAX_REDUCED_DISPLACEMENT:.2f} Angstrom. \n" "Please output trajectory frames more frequently. \n" "Aborting...") else: return reduced_unwrapped_displacement
[docs] def getShiftVectorByRecenterMass(self, mass, adj_displacement): """ Get the XYZ shift vetor of the unwrapped mass center (reduced unit). :type mass: list :param mass: list of atom mass :type adj_displacement: numpy.matrix :param adj_displacement: unwrapped displacement matrix in reduced unit :rtype: list :return: list of float """ displacement_mass = mass * adj_displacement / numpy.sum(mass) shift_vector = numpy.squeeze(displacement_mass.tolist()) return shift_vector
[docs] def getAdjustedDisplacement(self, selected_adj_displacement, shift_centered_mass, box): """ Get the "correct" displacement in xyz coordinates (real unit) for adjacent trajectory. :type selected_adj_displacement: numpy.matrix :param selected_adj_displacement: wrapped adjacent trajectory displacement :type shift_centered_mass: list :param shift_centered_mass: vector to zero mass center of ref atoms :type box: list :param box: list of float, the periodic boundary dimensions :rtype: numpy.matrix :return: MXN matrix in real unit, wrapped adjacent trajectory displacement """ for col in range(self.COL_NUM_POS): selected_adj_displacement[:, col] -= shift_centered_mass[col] return self.realUnitCoordinate(selected_adj_displacement, box)
[docs] def realUnitCoordinate(self, selected_adj_displacement, box): """ Change reduced unit to real unit for displacement matrix. :type selected_adj_displacement: numpy.matrix :param selected_adj_displacement: displacement matrix in reduced unit :type box: list :param box: pbc box size :rtype: numpy.matrix :return: displacement matrix in real unit """ for col in range(self.COL_NUM_POS): selected_adj_displacement[:, col] *= box[col] return selected_adj_displacement
[docs] def setSquaredDisplacement(self): """ Loop over all adjacent displacement matrix to run MSD calculation """ self.log('Calculating MSD...') self.displacement_num = len(self.displacements) self.tau_num = self.displacement_num + 1 self.time_msd_std = numpy.zeros((self.tau_num, self.column_total)) log_debug( f"MEM: self.time_msd_std {jobutils.memory_usage_psutil()} MB.") for start_id in range(self.displacement_num): self.sdFromDisplacement(start_id) log_debug( f"MEM: sdFromDisplacement {jobutils.memory_usage_psutil()} MB.")
[docs] def sdFromDisplacement(self, start_id): """ Calculate the squared displacement from start_id trj frame to proper frame :type start_id: int :param start_id: calculation starts from the frame of start_id """ accu_displacement = numpy.zeros((self.sel_num, self.COL_NUM_POS)) for itr_id in range(start_id, self.displacement_num): index = itr_id - start_id self.sample_num[index] += 1 accu_displacement += self.displacements[itr_id] squared_sum = numpy.square(accu_displacement).sum() self.volume_squared_sum[index] += squared_sum double_squared_sum = numpy.square(squared_sum) self.volume_double_squared_sum[index] += double_squared_sum if self.options.plane is not None: # displacement projection on line and it is the scale factor unit_vector = numpy.array(self.options.plane) unit_vector /= numpy.linalg.norm(unit_vector) scale_factors = numpy.dot(accu_displacement, unit_vector) disp_line_comp = unit_vector * scale_factors.reshape(-1, 1) disp_plane_comp = accu_displacement - disp_line_comp # scale_factors is a list of floats instead of a list of 3D vectors # as the 3D vector a unit vector, summation on scale_factors and # the 3D vector * scale_factor is equivalent line_squared_sum = numpy.square(scale_factors).sum() plane_squared_sum = numpy.square(disp_plane_comp).sum() # MSD along a line and in the plane self.line_squared_sum[index] += line_squared_sum self.plane_squared_sum[index] += plane_squared_sum # This is for low-memory Std Dev calculation self.line_double_squared_sum[index] += numpy.square( line_squared_sum) self.plane_double_squared_sum[index] += numpy.square( plane_squared_sum) if self.options.axis: x_squared_sum = numpy.square(accu_displacement[:, 0]).sum() self.x_squared_sum[index] += x_squared_sum x_double_squared_sum = numpy.square(x_squared_sum) self.x_double_squared_sum[index] += x_double_squared_sum y_squared_sum = numpy.square(accu_displacement[:, 1]).sum() self.y_squared_sum[index] += y_squared_sum y_double_squared_sum = numpy.square(y_squared_sum) self.y_double_squared_sum[index] += y_double_squared_sum z_squared_sum = numpy.square(accu_displacement[:, 2]).sum() self.z_squared_sum[index] += z_squared_sum z_double_squared_sum = numpy.square(z_squared_sum) self.z_double_squared_sum[index] += z_double_squared_sum
[docs] def collectData(self): """ Collect the msd and std from displacement matrix. # Std is calculated from sqrt(E[X^2] - (E[X])^2) and X is the mean # squared displacement at one Tau (one time point pair) # https://en.wikipedia.org/wiki/Standard_deviation#Definition_of_population_values """ # key = 0 (itr_id-start_id=0) -> tau = 1 time_indexes = range(self.time_msd_std.shape[0]) indexes = time_indexes[:-1] sample_num = [self.sample_num[x] for x in indexes] self.time_msd_std[:, 0] = time_indexes self.time_msd_std[:, 0] *= self.time_interval self.time_msd_std[1:, 1] = list(self.volume_squared_sum.values()) self.time_msd_std[1:, 2] = list(self.volume_double_squared_sum.values()) self.time_msd_std[1:, 1] /= sample_num self.time_msd_std[1:, 2] /= sample_num self.time_msd_std[:, 2] -= numpy.square(self.time_msd_std[:, 1]) self.time_msd_std[:, 2] = numpy.sqrt(self.time_msd_std[:, 2]) if self.options.plane is not None: self.time_msd_std[1:, 3] = list(self.line_squared_sum.values()) self.time_msd_std[1:, 4] = list(self.line_double_squared_sum.values()) self.time_msd_std[1:, 3] /= sample_num self.time_msd_std[1:, 4] /= sample_num self.time_msd_std[:, 4] -= numpy.square(self.time_msd_std[:, 3]) self.time_msd_std[:, 4] = numpy.sqrt(self.time_msd_std[:, 4]) self.time_msd_std[1:, 5] = list(self.plane_squared_sum.values()) self.time_msd_std[1:, 6] = list(self.plane_double_squared_sum.values()) self.time_msd_std[1:, 5] /= sample_num self.time_msd_std[1:, 6] /= sample_num self.time_msd_std[:, 6] -= numpy.square(self.time_msd_std[:, 5]) self.time_msd_std[:, 6] = numpy.sqrt(self.time_msd_std[:, 6]) if self.options.axis: base_index = 3 if self.options.plane is None else 7 x_sidx = base_index x_dsidx = x_sidx + 1 self.time_msd_std[1:, x_sidx] = list(self.x_squared_sum.values()) self.time_msd_std[1:, x_dsidx] = list( self.x_double_squared_sum.values()) self.time_msd_std[1:, x_sidx] /= sample_num self.time_msd_std[1:, x_dsidx] /= sample_num self.time_msd_std[:, x_dsidx] -= numpy.square( self.time_msd_std[:, x_sidx]) self.time_msd_std[:, x_dsidx] = numpy.sqrt(self.time_msd_std[:, x_dsidx]) y_sidx = x_sidx + 2 y_dsidx = x_dsidx + 2 self.time_msd_std[1:, y_sidx] = list(self.y_squared_sum.values()) self.time_msd_std[1:, y_dsidx] = list( self.y_double_squared_sum.values()) self.time_msd_std[1:, y_sidx] /= sample_num self.time_msd_std[1:, y_dsidx] /= sample_num self.time_msd_std[:, y_dsidx] -= numpy.square( self.time_msd_std[:, y_sidx]) self.time_msd_std[:, y_dsidx] = numpy.sqrt(self.time_msd_std[:, y_dsidx]) z_sidx = y_sidx + 2 z_dsidx = y_dsidx + 2 self.time_msd_std[1:, z_sidx] = list(self.z_squared_sum.values()) self.time_msd_std[1:, z_dsidx] = list( self.z_double_squared_sum.values()) self.time_msd_std[1:, z_sidx] /= sample_num self.time_msd_std[1:, z_dsidx] /= sample_num self.time_msd_std[:, z_dsidx] -= numpy.square( self.time_msd_std[:, z_sidx]) self.time_msd_std[:, z_dsidx] = numpy.sqrt(self.time_msd_std[:, z_dsidx]) self.time_msd_std[:, 1:] /= self.sel_num
[docs] def saveMsdTxt(self): """ Save MSD matrix: [time(ps), MSD(Angstrom^2), STD(Angstrom^2)] """ header = self.getMsdHeader() numpy.savetxt(self.msd_file, self.time_msd_std, header=header, delimiter=',') log_debug(f"MEM: saveMsdTxt {jobutils.memory_usage_psutil()} MB.")
[docs] def setTauRange(self): """ Set Tau start and end limits. """ tau_start = self.tau_frame_start * self.time_interval / NANO2PICO tau_end = self.tau_frame_end * self.time_interval / NANO2PICO self.prop2save[self.TAU_START_PROP] = tau_start self.prop2save[self.TAU_END_PROP] = tau_end
[docs]class SingleMDJob(jobutils.RobustSubmissionJob): """ Job class to submit a molecular dynamics simulation. """
[docs] def __init__(self, jobname, md_opts, enegrp_opts, save_opts, command_dir=os.curdir, **kwargs): """ :type jobname: str :param jobname: The basename for the job :type md_opts: 'SimpleNamespace' :param md_opts: molecular dynamics options :type enegrp_opts: 'SimpleNamespace' :param enegrp_opts: enegrp options :type save_opts: 'SimpleNamespace' :param save_opts: files to save :type command_dir: str :param command_dir: the job launch dir """ self.jobname = jobname self.md_opts = md_opts self.enegrp_opts = enegrp_opts self.save_opts = save_opts self.command_dir = command_dir self.kwargs = kwargs # _command is set in setup method based on above options self._command = None self.md_job_info = None jobutils.RobustSubmissionJob.__init__(self, self._command, name=self.jobname, command_dir=command_dir)
[docs] def setup(self): """ Over write parent class method. """ if self.command_dir != os.curdir: cms_basename = os.path.basename(self.md_opts.input_cms) try: shutil.copy(os.path.join(os.pardir, cms_basename), os.curdir) except FileNotFoundError: # This shouldn't happen under standard normal condition where # input cms is in the job submission dir. This is to ease the # non-standard manually testing when input cms is else where if os.path.isabs(self.md_opts.input_cms): cms_path = self.md_opts.input_cms else: cms_path = os.path.join(os.pardir, self.md_opts.input_cms) shutil.copy(cms_path, os.curdir) self.md_opts.input_cms = cms_basename # addOutputFile() needs command_dir as the backend points to the parent # job start dir self.md_job_info = setup_md_job(self.jobname, self.md_opts, enegrp_opts=self.enegrp_opts, save_opts=self.save_opts, command_dir=self.command_dir, **self.kwargs) self._command = self.md_job_info.cmd
[docs]class MultiDriver(object): """ Class to fire off multiple subjobs. """
[docs] def __init__(self, options, jobname_base, logger=None, smart_jdj=False): """ :type options: Named tuple :param options: The parsed command line options :type jobname_base: str :param jobname_base: base name of this job :type logger: `logging.Logger` :param logger: The logger for this builder :type smart_jdj: bool :param smart_jdj: whether to use smart distribution of jobs """ self.options = options self.jobname_base = jobname_base self.logger = logger self.smart_jdj = smart_jdj self.jdj = None self.input_cms = self.options.input_cms self.backend = jobcontrol.get_backend() self.out_cms = self.jobname_base + '-out.cms' jobutils.add_outfile_to_backend(self.out_cms, set_structure_output=True)
[docs] def run(self): """ Run multiple calculation and perform ensemble average. """ if self.options.run_type == FULL_RUN: # Run MD in parallel self.setUpQue() self.initiateAndAddMdJobs() self.runSubjobs() # Record the output file from each subjob pass elif self.options.run_type == POST_RUN: # Read MD outputs from previous runs pass elif self.options.run_type == MULTI_RUN: # Read outfiles from previous calculations pass if self.options.run_type != FIT_ONLY: # FULL_RUN, POST_RUN, and MULTI_RUN need self.convertMultiToOne() else: self.loadDataFromAllInOneFile() # Final curve fitting pass
[docs] def convertMultiToOne(self): """ Convert data from files from multiple subjobs into one single file. """ raise NotImplementedError('This method combines all the final ' 'outfiles from subjobs into one.')
[docs] def loadDataFromAllInOneFile(self): """ Set the all-in-one file based on user input. """ raise NotImplementedError( 'This method loads the data for curve fitting.')
[docs] def setUpQue(self): """ Setup job queue. """ self.jdj = jobdj.JobDJ(verbosity='normal', max_failures=jobdj.NOLIMIT) if not self.smart_jdj: self.jdj.disableSmartDistribution()
[docs] def initiateAndAddMdJobs(self, SingleMDJob=SingleMDJob, has_enegrp=False, additional_save_info=None, **kwargs): """ Construct options, and initiate SingleMDJob jobs, and added them to jobdj. :param SingleMDJob: instantiate this class to an object that can be added to jobdj :type SingleMDJob: 'queue.JobControlJob' :param has_enegrp: if False, no enegrp file is generated. :type has_enegrp: bool :param additional_save_info: additional information on what files to be copied back. :type additional_save_info: 'types.SimpleNamespace' """ if jobutils.get_option(self.options, parserutils.SAVE_FLAG): # parserutils.SAVE_TRJ controls the saving states of the subjobs save_ocms = self.options.save_trj_data in [ parserutils.SAVE_CMS, parserutils.SAVE_TRJ ] save_trj = self.options.save_trj_data == parserutils.SAVE_TRJ else: # FLAG_MD_SAVE_TRJ controls the saving states of the master jobs, # and that of the subjob if there is only one subjob assert hasattr(self.options, FLAG_MD_SAVE_TRJ[1:]) save_ocms = True save_trj = self.options.md_save_trj md_opts = { INPUT_CMS: self.options.input_cms, TIME: self.options.md_time, TIMESTEP: self.options.md_timestep, ENSEMBLE: self.options.md_ensemble, TEMPERATURE: self.options.md_temp, PRESSURE: self.options.md_press, TRAJECTORY_INTERVAL: self.options.md_trj_int, RANDOM_SEED: self.options.seed, PROCS: 1, GPU: True } md_opts = SimpleNamespace(**md_opts) enegrp_opts = None if has_enegrp: enegrp_opts = { INTERVAL: self.options.md_enegrp_int, START: self.options.md_enegrp_start, ENERGY_GROUPS: ['pressure_tensor'] } enegrp_opts = SimpleNamespace(**enegrp_opts) save_opts = { OCMS: save_ocms, TRJ: save_trj, } if additional_save_info: save_opts.update(additional_save_info) save_opts = SimpleNamespace(**save_opts) if self.options.seed and self.options.md_sim_num > 1: jobutils.seed_random_number_generator(self.options, self.log) for md_run_idx in range(1, self.options.md_sim_num + 1): jobname_base = '_'.join([self.jobname_base, str(md_run_idx)]) new_md_opts = copy.deepcopy(md_opts) if self.options.md_sim_num > 1: random_seed = random.randint(parserutils.RANDOM_SEED_MIN, parserutils.RANDOM_SEED_MAX) new_md_opts.random_seed = random_seed command_dir = jobname_base job = SingleMDJob(jobname_base, new_md_opts, enegrp_opts, save_opts, command_dir=command_dir, **kwargs) self.jdj.addJob(job)
[docs] def log(self, msg, **kwargs): """ Add a message to the log file :type msg: str :param msg: The message to log Additional keyword arguments are passed to the textlogger.log_msg function """ kwargs['logger'] = self.logger textlogger.log_msg(msg, **kwargs)
[docs] def log_error(self, msg): """ Add a message to the log file and exit with an error code :type msg: str :param msg: The message to log """ textlogger.log_error(msg, logger=self.logger)
[docs] def runSubjobs(self): """ Run molecular dynamics jobs and Summarize the status of the runs. """ self.log( f'{self.jdj.total_added} independent molecular dynamics ' 'simulations are running...', pad_below=True) self.jdj.run() if not self.jdj.done_jobs: self.log_error('None of the molecular dynamics simulations are ' 'successful. Aborting...') fail_jobnames = [ job.name for job in self.jdj.failed_jobs if job.name.endswith('_md') ] if fail_jobnames: self.log(f"{self.jobname_base} calculations will not include the " f"failed MD runs: {', '.join(fail_jobnames)}.") else: self.log('All independent simulations are successful.', pad_below=True)