Source code for schrodinger.application.matsci.desmondutils

"""
Utilities for working with Desmond.

Copyright Schrodinger, LLC. All rights reserved.
"""

import argparse
import collections
import contextlib
import copy
import io
import itertools
import math
import os
import os.path
import sys
import warnings
from past.utils import old_div

import numpy

# Do not remove stage import: it is not used in this module, but it is referred
# to in other modules.
from schrodinger import structure
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import config
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import ffiostructure
from schrodinger.application.desmond import license
from schrodinger.application.desmond import platforms
from schrodinger.application.desmond.constants import COORD_PROPERTIES
from schrodinger.application.desmond.constants import CT_TYPE
from schrodinger.application.desmond.constants import IS_INFINITE
from schrodinger.application.desmond.constants import NUM_COMPONENT
from schrodinger.application.desmond.constants import USE_CUSTOM_OPLSDIR
from schrodinger.application.matsci import amorphous
from schrodinger.application.matsci import cgforcefield as cgff
from schrodinger.application.matsci import coarsegrain
from schrodinger.application.matsci import codeutils
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import msconst
from schrodinger.application.matsci import msutils
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import textlogger as tlog
from schrodinger.application.matsci.nano import xtal
from schrodinger.forcefield import common
from schrodinger.infra import mm
from schrodinger.infra import mmcheck
from schrodinger.job import jobcontrol
from schrodinger.structutils import analyze
from schrodinger.structutils import build
from schrodinger.structutils import rmsd
from schrodinger.structutils import transform
from schrodinger.test import ioredirect
from schrodinger.utils import cmdline
from schrodinger.utils import fileutils
from schrodinger.utils import sea

topo = codeutils.get_safe_package('desmond.topo')
traj = codeutils.get_safe_package('desmond.traj')
analysis = codeutils.get_safe_package('desmond.analysis')

# Force field text names
OPLS2005 = mm.OPLS_NAME_F14
OPLS3 = mm.OPLS_NAME_F16

CHORUS_PROP_PREFIX = 'r_chorus_box_'
CHORUS_CUBIC_PROPS = [CHORUS_PROP_PREFIX + x for x in ['ax', 'by', 'cz']]
CHORUS_NON_CUBIC_PROPS = [
    CHORUS_PROP_PREFIX + x for x in ['ay', 'az', 'bx', 'bz', 'cx', 'cy']
]
CHORUS_BOX_ALL_PROPS = CHORUS_CUBIC_PROPS + CHORUS_NON_CUBIC_PROPS

PROP_TRJ = cms.Cms.PROP_TRJ

A_KEY = 'r_pdb_PDB_CRYST1_a'
B_KEY = 'r_pdb_PDB_CRYST1_b'
C_KEY = 'r_pdb_PDB_CRYST1_c'
ALPHA_KEY = 'r_pdb_PDB_CRYST1_alpha'
BETA_KEY = 'r_pdb_PDB_CRYST1_beta'
GAMMA_KEY = 'r_pdb_PDB_CRYST1_gamma'
CHARGE_ORDER_PROP = 'i_matsci_polymer_original_charge_index'

PDB_CRYSTAL_PROPS = [A_KEY, B_KEY, C_KEY, ALPHA_KEY, BETA_KEY, GAMMA_KEY]

# These properties will cause the Desmond System Builder to fail (MATSCI-1144)
FATAL_ATOM_PROPERTIES = [
    'i_ffio_grp_ligand', 'i_ffio_grp_energy', 'i_psp_parent', 'f_psp_parent',
    's_psp_parent'
]

# Text for setting family data in the .msj header
MSJ_FAMILY_HEADER = """
   set_family = {%s   }
   """

# Text for .msj file header settings that will apply to all stages. Insert
# actual settings in place of the %s format.
DESMOND_FAMILY_HEADER = """
      desmond = {%s
                       }
      """

PERIODIC_FIX_HEADER = """
         # Remove periodic fix if the system is infinite
         maeff_output.periodicfix = false
         trajectory.periodicfix = false
"""

# See MATSCI-4541. Currently only atomistic systems are supported (DESMOND-7254)
PERIODIC_FIX_HEADER_ATOM = """
         # Prevent drift in infinite periodic system (atomistic systems only)
         backend = {
             mdsim = {
                 plugin = {
                     remove_com_motion = {
                         first = 0.0
                         interval = 0.01
                         type = "remove_com_motion"
                     }
                 }
             }
         }
"""

CRYSTAL_RESTRAINT = ("{\n" + "new = [\n" +
                     "{atoms = ['atom.b_matsci_crystal_atom' ]\n" +
                     "force_constants = [2 2 2 ]\n" + "name = posre_harm\n" +
                     "}\n" + "]\n" + "}\n")
RETAIN_RESTRAINT = "{\n" + "existing = retain\n" + "}\n"
IGNORE_RESTRAINT = "{\n" + "existing = ignore\n" + "}\n"

GCMC_ERROR = ('Grand-canonical Monte Carlo trajectories cannot be '
              'used with this workflow.')

# Pressure tensor label in energy group file
TIME_ENGRP = 'time'
RAW_POTENTIAL_ENGRP = 'en'
POTENTIAL_ENERGY_ENGRP = 'E_p'
KINETIC_ENERGY_ENGRP = 'E_k'
EXTENDED_ENERGY_ENGRP = 'E_x'
PRESSURE_ENGRP = 'P'
VOLUME_ENEGRP = 'V'
DISP_CORR_ENGRP = 'Dispersion_Correction'
ENERGY_CORR_ENGRP = 'Self_Energy_Correction'
CHARGE_CORR_ENGRP = 'Net_Charge_Correction'
GBL_FORCE_ENGRP = 'Global_Force_Sum'
DRIFT_VEL_ENGRP = 'Kinetic'
PAIR_ELEC_ENGRP = 'pair_elec'
PAIR_VDW_ENGRP = 'pair_vdw'
BOND_ENERGY_ENGRP = 'stretch'
ANGLE_ENERGY_ENGRP = 'angle'
DIHED_ENERGY_ENGRP = 'dihedral'
FAR_EXCLUSION_ENGRP = 'far_exclusion'
NON_BONDED_ELEC_ENGRP = 'nonbonded_elec'
NON_BONDED_VDW_ENGRP = 'nonbonded_vdw'
FAR_TERMS_ENGRP = 'far_terms'
TOTAL_ENERGY_ENGRP = 'Total'
VIRIAL_ENGRP = 'Virial'
KINETIC_TENSOR_ENGRP = 'K.E.tensor'
PRESS_TENSOR_ENGRP = 'Pressure_Tensor'
SIM_BOX_ENGRP = 'Simulation_Box'
ALL_ENEGRP_PROPS = tuple(
    (RAW_POTENTIAL_ENGRP, POTENTIAL_ENERGY_ENGRP, KINETIC_ENERGY_ENGRP,
     EXTENDED_ENERGY_ENGRP, PRESSURE_ENGRP, VOLUME_ENEGRP, DISP_CORR_ENGRP,
     ENERGY_CORR_ENGRP, CHARGE_CORR_ENGRP, GBL_FORCE_ENGRP, DRIFT_VEL_ENGRP,
     PAIR_ELEC_ENGRP, PAIR_VDW_ENGRP, BOND_ENERGY_ENGRP, ANGLE_ENERGY_ENGRP,
     DIHED_ENERGY_ENGRP, FAR_EXCLUSION_ENGRP, NON_BONDED_ELEC_ENGRP,
     NON_BONDED_VDW_ENGRP, FAR_TERMS_ENGRP, TOTAL_ENERGY_ENGRP, VIRIAL_ENGRP,
     KINETIC_TENSOR_ENGRP, PRESS_TENSOR_ENGRP, SIM_BOX_ENGRP))
ENEGRP_VALUE = collections.namedtuple('ENEGRP_VALUE', ['time', 'value'])

PTENSOR = 'ptensor'
PTENSOR_EXT = f'.{PTENSOR}.gz'
DESMOND_TRUE = 'true'

SYSTEM_BUILDER_DATA_PATH = os.path.join(fileutils.get_mmshare_data_dir(),
                                        'system_builder')


[docs]def exit_if_incompatible_with_desmond(logger=None): """ Desmond only runs on certain platforms. Exit if the platform being used is not compatible. If a logger is supplied, it will print an error to the logger. :param logging.Logger logger: The logging object to print an error to if the platform is incompatible. If not supplied, nothing will print. :raise SystemExit: If the current platform is not compatible with Desmond """ if sys.platform in platforms.INCOMPATIBLE_PLATFORMS: if logger: logger.error(platforms.PLATFORM_WARNING) raise SystemExit(1)
[docs]def make_chorus_orthorhombic(struct): """ Zero out all the non-diagonal Chorus box properties. The main use of this function is to remove numerical imprecision in the off-diagonal elements that causes Desmond to consider the box triclinic. The function returns the largest off-diagonal element pre-zeroing so the calling routine can decide what to do (warn user, etc) if the off-diagonals looked large enough to be intentionally non-zero. :type struct: `schrodinger.structure.Structure` :param struct: The structure whose Chorus box properties should be modified :rtype: float :return: The absolute value of the largest off-diagonal element before zeroing """ maxval = max([abs(struct.property[x]) for x in CHORUS_NON_CUBIC_PROPS]) for prop in CHORUS_NON_CUBIC_PROPS: struct.property[prop] = 0.0 return maxval
[docs]def has_orthorhombic_pbc(struct, tolerance=0.0): """ Detect if the given structure has a PBC that is orthorhombic (all angles are 90 degrees). :type struct: `schrodinger.structure.Structure` :param struct: The structure to check :type tolerance: float :param tolerance: The tolerance for the 90 degree check, in degrees :rtype: bool :return: True if a PBC exists and it is orthorhombic """ if has_chorus_box_props(struct): params = xtal.get_params_from_chorus(xtal.get_chorus_properties(struct)) return all(abs(x - 90.0) <= tolerance for x in params[3:]) return False
[docs]def might_be_system(structs): """ Check if we're using a Desmond system or system is infinite. :param iterable structs: Each item should be a `structure.Structure` object :rtype: bool or str :return: False if none of the structures are infinite or Desmond systems. An error message is returned if at least one such structure is detected """ msg = ('One or more of the components is infinite or appears to be a ' 'complete Desmond system.') for struct in structs: if has_chorus_box_props(struct): return msg elif has_space_group_pbc_props(struct): # MATSCI-2561 xtal.sync_pbc2(struct) if xtal.is_infinite2(struct): return msg return False
[docs]def determine_box_size(struct): """ Determine the size of a cubic box that encloses the entire structure :type struct: `schrodinger.structure.Structure` :param struct: The structure to use in determining the box size. :rtype: float :return: The largest span in the X, Y or Z direction """ if not struct: return 0.0 return max(struct.getXYZ().ptp(0))
[docs]def add_cubic_chorus_box_props(struct): """ Add the Chorus box properties to the structure for a cubic box :type struct: `schrodinger.structure.Structure` :param struct: The structure to add the properties to """ cube_side = determine_box_size(struct) for prop in CHORUS_CUBIC_PROPS: struct.property[prop] = cube_side for prop in CHORUS_NON_CUBIC_PROPS: struct.property[prop] = 0.0
[docs]def store_chorus_box_props(struct, ax, ay=0.0, az=0.0, bx=0.0, by=None, bz=0.0, cx=0.0, cy=0.0, cz=None): """ Add properties to the structure that define the periodic boundary condition in the way Desmond wants it defined. :type struct: `schrodinger.structure.Structure` :param struct: The structure to add the properties to :type ax: float :param ax: The value of the ax box property :type ay: float :param ay: The value of the ay box property. Defaults to 0. :type az: float :param az: The value of the az box property. Defaults to 0. :type bx: float :param bx: The value of the bx box property. Defaults to 0. :type by: float :param by: The value of the by box property. If not given, this value is set the same as ax. :type bz: float :param bz: The value of the bz box property. Defaults to 0. :type cx: float :param cx: The value of the cx box property. Defaults to 0. :type cy: float :param cy: The value of the cy box property. Defaults to 0. :type cz: float :param cz: The value of the cz box property. If not given, this value is set the same as ax. """ if by is None: by = ax if cz is None: cz = ax # Float on the lines below is useful because sometimes these properties come # from numpy arrays and are numpy double struct.property[CHORUS_PROP_PREFIX + 'ax'] = float(ax) struct.property[CHORUS_PROP_PREFIX + 'ay'] = float(ay) struct.property[CHORUS_PROP_PREFIX + 'az'] = float(az) struct.property[CHORUS_PROP_PREFIX + 'bx'] = float(bx) struct.property[CHORUS_PROP_PREFIX + 'by'] = float(by) struct.property[CHORUS_PROP_PREFIX + 'bz'] = float(bz) struct.property[CHORUS_PROP_PREFIX + 'cx'] = float(cx) struct.property[CHORUS_PROP_PREFIX + 'cy'] = float(cy) struct.property[CHORUS_PROP_PREFIX + 'cz'] = float(cz)
[docs]def has_chorus_box_props(struct): """ Check if the structure has all the Chorus box properties :type struct: `schrodinger.structure.Structure` :param struct: The structure to check for properties :rtype: bool :return: Whether the structure has all the chorus box properties """ cbox_props = set(CHORUS_BOX_ALL_PROPS) all_props = set(struct.property) return cbox_props <= all_props
[docs]def has_space_group_pbc_props(struct): """ Check if the structure has all the space group PBC properties :type struct: `schrodinger.structure.Structure` :param struct: The structure to check for properties :rtype: bool :return: Whether the structure has all the space group pbc properties """ pbc_props = set(PDB_CRYSTAL_PROPS) all_props = set(struct.property) return pbc_props <= all_props
[docs]def is_opls3_model(model): """ Check if the given Cms model was made with OPLS3 :type model: `schrodinger.application.desmond.cms.Cms` :param model: The cms model to check :rtype: bool :return: True if OPLS3 was detected, False if not """ return model.get_ff() == OPLS3
[docs]def is_opls2005_model(model): """ Check if the given Cms model was made with OPLS_2005 :type model: `schrodinger.application.desmond.cms.Cms` :param model: The cms model to check :rtype: bool :return: True if OPLS_2005 was detected, False if not """ return model.get_ff() == OPLS2005
[docs]def get_model_ff_name(model): """ If possible detect what force field the model was created with :type model: `schrodinger.application.desmond.cms.Cms` :param model: The cms model to check :rtype: str :return: The name of the detected forcefield or and empty string if no known force field was detected. FF names are the module constants OPLS3 or OPLS2005 """ if is_opls3_model(model): return OPLS3 elif is_opls2005_model(model): return OPLS2005 return ""
[docs]@codeutils.deprecate(to_remove_in='22-2') def are_isomers_conn(struct1, struct2): """ Compare struct1 with struct2 to see if their 'connectivity' is the same. Returns True only if ct1 and ct2 have same: 1) #atoms; 2) types; 3) bonds; and 4) bond orders. :type struct1: `schrodinger.structure.Structure` :param struct1: Structure to use in comparison :type struct2: `schrodinger.structure.Structure` :param struct2: Another structure to use in comparison :rtype: bool :return: True if structures are equivalent, False otherwise """ # Deprecated, use mmct_ct_compare_connect_py, to be removed in 22-3. # Original implementation called C function. This is slower compared to # mmct_ct_compare_connect_py, because it requires actual structures, Python # implementation operates on atom indices #return (mm.mmct_ct_compare_connect(struct1.handle, # struct2.handle) == mm.MMCT_SAME) indices_a = list(range(1, struct1.atom_total + 1)) indices_b = list(range(1, struct2.atom_total + 1)) return mmct_ct_compare_connect_py(struct1, indices_a, indices_b, struct2=struct2)
[docs]def mmct_ct_compare_connect_py(struct, indices_a, indices_b, struct2=None): """ This is a port of mmlibc/mmct/mmct_ct_compare_connect that takes atom indices instead of structure handlers (extracting structure can be expensive) Compare atom indices (a) from struct with atom indices (b) from struct 2 if set or struct to see if their 'connectivity' is the same. :param structure.Structure struct: Input structure :param list(int) indices_a: Atom indices :param list(int) indices_b: Other atom indices :type struct2: structure.Structure or None :param struct2: Other input structure if not None, otherwise struct will be used :rtype: bool :return: Whether atoms defined with indices_a and indices_b have the same: 1) #atoms; 2) types; 3) bonds; and 4) overall bond orders """ if len(indices_a) != len(indices_b): return False maps = [] structs = (struct, struct2 if struct2 else struct) for indices, struct in zip((indices_a, indices_b), structs): # Map atom index in the structure with atom index if indices would # represent a separate structure maps.append({ struct.atom[atom_idx].index: idx for idx, atom_idx in enumerate(indices) }) for idxa, idxb in zip(indices_a, indices_b): atoma = structs[0].atom[idxa] atomb = structs[1].atom[idxb] # priv_mmct_atom_compare_type if (atoma.atomic_number != atomb.atomic_number or atoma.atom_type != atomb.atom_type): return False if atoma.bond_total != atomb.bond_total: return False for batoma, batomb in zip(atoma.bonded_atoms, atomb.bonded_atoms): if maps[0][batoma.index] != maps[1][batomb.index]: return False for bonda, bondb in zip(atoma.bond, atomb.bond): if bonda.type != bondb.type: return False return True
[docs]def copy_atom_properties(struct1, struct2, atom_props): """ :type struct1: `schrodinger.structure.Structure` :param struct1: The structure from which atom properties are copied :type struct2: `schrodinger.structure.Structure` :param struct2: The structure to which atom properties are copied :type atom_props: list of strings :param atom_props: Atom properties that are copied """ if len(struct1.atom) != len(struct2.atom): raise ValueError("Structures have different number of atoms") for struct1_atom, struct2_atom in zip(struct1.atom, struct2.atom): for prop in atom_props: struct1_atom_prop = struct1_atom.property.get(prop) if struct1_atom_prop is not None: struct2_atom.property[prop] = struct1_atom_prop else: # Remove the prop, if not present in struct1 struct2_atom.property.pop(prop, None)
[docs]def get_npseudos_per_molecule(comp_ct): """ Get number of pseudo atoms per molecule. :param FFIOSTructure comp_ct: CMS component :rtype: int :return: Number of pseudo atoms in the first molecule of the component """ assert comp_ct.property[NUM_COMPONENT] == 1 npseudos = len(comp_ct.ffio.pseudo) # Compute number of pseudos per molecule assert npseudos % comp_ct.mol_total == 0 npseudos = npseudos // comp_ct.mol_total return npseudos
[docs]def get_molecules_pseudos(comp_ct, molecules): """ Given component and molecules indices return the corresponding pseudo indices. :param FFIOSTructure comp_ct: CMS component :param list molecules: List of molecules indices :rtype: set[int] :return: Set of pseudo atoms indices """ assert all(mol <= comp_ct.mol_total for mol in molecules) mol_npseudos = get_npseudos_per_molecule(comp_ct) indices = set() if not mol_npseudos: return indices for mol in molecules: indices.update([(mol - 1) * mol_npseudos + idx for idx in range(1, mol_npseudos + 1)]) return indices
[docs]def extend_comp(comp_ct, struct): """ Given a component (comp_ct) with possible some pseudo atoms, extend it with struct (must be the same isomers, NUM_COMPONENT == 1 is enforced). :param FFIOSTructure comp_ct: CMS component :param int npseudos: Number of pseudo atoms in the first molecule of the component :param structure.Structure struct: Structure to extend the component with """ # Note 1. Nothing is done with virtuals (that appear when custom charges are # used) # Note 2. This function is "unit" tested using STU 31714 assert comp_ct.property[NUM_COMPONENT] == 1 npseudos = get_npseudos_per_molecule(comp_ct) mol_ct = comp_ct.molecule[1].extractStructure() for mol in struct.molecule: mol_st = mol.extractStructure() assert are_isomers_conn(mol_ct, mol_st) comp_ct.extend(struct) if not npseudos: # No pseudo atoms, nothing else to do return for mol in struct.molecule: mol_st = mol.extractStructure() atoms = list(range(1, mol_st.atom_total + 1)) tmatrix = rmsd.get_super_transformation_matrix(mol_st, atoms, mol_ct, atoms) for idx in range(1, npseudos + 1): pseudo = comp_ct.ffio.pseudo[idx] coords = [ pseudo.property.get(coord, 0) for coord in COORD_PROPERTIES ] # Get most optimal pseudo atom coordinates coords = transform.transform_atom_coordinates(coords, tmatrix) new_pseudo = comp_ct.ffio.addPseudo() for val, prop in zip(coords, COORD_PROPERTIES): new_pseudo.property[prop] = val
[docs]def is_traj_gcmc(trj_path, check_all_frames=False): """ Whether trajectory is generated during Grand-canonical Monte Carlo simulation :param str trj_path: The path to input trajectory :param bool check_all_frames: Whether to check all the frames of a trajectory. In case of false it will only check the first frame :rtype: bool :return: Whether passed trajectory is generated during Grand-canonical Monte Carlo simulation """ frames = traj.read_traj(trj_path, return_iter=True) if not check_all_frames: frames = [next(iter(frames))] check_gcmc = any([x.nactive != x.natoms for x in frames]) return check_gcmc
[docs]class SystemBuilder(object): SOLVATION_RADIUS_KEY = 'solvation_exclusion_radius'
[docs] def __init__(self, struct, basename, forcefield=OPLS2005, rezero_system=True, logger=None, copy_output=True, fatal_atom_properties=FATAL_ATOM_PROPERTIES, check_infinite=True, split_components=False, water_fftype=msconst.SPC, set_incorporation=True, wam_type=None, per_structure_wam=False, build_geometry_msj=None, redistribute_hydrogen_mass=False): """ :type struct: `schrodinger.structure.Structure` :param struct: The structure to run the system builder on. :type basename: str :param basename: The base name of the output file :type forcefield: str :param forcefield: The name of the force field to use. The default is 'OPLS_2005'. Note that any custom directory needed for OPLS3 is taken care of via the runtime environment as long as the script was launched via launcher with the oplsdir argument. :type rezero_system: bool :param rezero_system: Whether to move the center of mass to the origin :type logger: `logging.Logger` :param logger: The logger to use to record non-error messages. If None, the messages are printed. :type copy_output: bool :param copy_output: specifies to copy output files back to the launch directory by adding them to the job control backend :type fatal_atom_properties: tuple of strings :param fatal_atom_properties: Properties to remove from structure :type check_infinite: bool :param check_infinite: request assign is_infinite during FF assignment :type split_components: bool :param split_components: Whether to split system in components in the system build :type water_fftype: str :param water_fftype: the force field type for water solute. Must be a key in the WATER_FFTYPES dictionary. :type set_incorporation: bool :param set_incorporation: Set the cms as the incorporated structure when set_incorporation=True and copy_output=True :type wam_type: int or None :param wam_type: One of the enums defined in workflow_action_menu.h if the results should have a Workflow Action Menu in Maestro :type per_structure_wam: bool :param per_structure_wam: Whether wam is to be added per structure :type build_geoemtry_msj: str or None :param build_geoemtry_msj: build_geometry stage msj file :type redistribute_hydrogen_mass: bool :param redistribute_hydrogen_mass: whether to perform hydrogen mass repartition """ assert water_fftype in msconst.WATER_FFTYPES, ( f'Unknown water type: ' f'{water_fftype}. Known types are: ' f'{", ".join(msconst.WATER_FFTYPES)}.') assert not msutils.is_coarse_grain(struct), ('Structure "%s" is ' 'coarse-grained.' % struct.title) # Copy the input structure, since its properties are modified self.struct = struct.copy() self.basename = basename self.forcefield = forcefield self.rezero_system = rezero_system self.logger = logger self.copy_output = copy_output self.fatal_atom_properties = fatal_atom_properties self.check_infinite = check_infinite self.split_components = split_components self.water_fftype = water_fftype self.set_incorporation = set_incorporation self.wam_type = wam_type self.desmond_basename = self.basename + '_system' self.in_mae = self.desmond_basename + '.mae' self.msj_fname = self.desmond_basename + '.msj' self.out_cms = self.desmond_basename + '-out.cms' self.multisim_log = self.desmond_basename + '_multisim.log' self.per_structure_wam = per_structure_wam self.build_geometry_msj = build_geometry_msj self.redistribute_hydrogen_mass = redistribute_hydrogen_mass self.backend = jobcontrol.get_backend() self.component_mol_indices = None self.has_water_ct = False # Set in self.writeMae
[docs] def run(self): """ Prepare and run the desmond system builder :raise FileNotFoundError: If the certain files cannot be found. """ self.cleanAtomProperties() self.cleanStructProperties() self.setChorusBoxProps() self.setForceFieldDir() self.centerStruct() self.splitComponents() self.writeMae() self.writeMsj() self.runBuildGeometry() self.runForceFieldAssignment() # Log file must be added to backend before processing cms (MATSCI-9651) self.addFilesToBackend() self.postProcessCms()
[docs] @tlog.timeit_logger_debug def cleanAtomProperties(self): """ Clean atom properties that cause the system builder to fail """ for pname in self.fatal_atom_properties: self.struct.deletePropertyFromAllAtoms(pname)
[docs] @tlog.timeit_logger_debug def setChorusBoxProps(self): """ If the chorus box properties are not set to define the periodic boundary condition, a cubic PBC will be imposed based on the largest span of the X, Y or Z coordinates. """ if not has_chorus_box_props(self.struct): add_cubic_chorus_box_props(self.struct) xtal.sync_pbc2(self.struct)
[docs] @tlog.timeit_logger_debug def cleanStructProperties(self): """ Romove or re-assign the structure-level properties that causes issues. """ # Mark the structure as solute for now. This ensures that the system builder # will keep water molecules if water is one of the components. MATSCI-815 self.struct.property[CT_TYPE] = CT_TYPE.VAL.SOLUTE # Remove is_infinite property, if present self.struct.property.pop(IS_INFINITE, None) # Remove NUM_COMPONENT property from structure properties, it causes # ffbuilder to fail self.struct.property.pop(NUM_COMPONENT, None)
[docs] @tlog.timeit_logger_debug def setForceFieldDir(self): """ Set custom OPLS directory, if any """ opls_dir = os.getenv('OPLS_DIR') if not opls_dir: return if mm.is_valid_opls_directory(opls_dir): self.struct.property[USE_CUSTOM_OPLSDIR] = True # Note - we don't say what directory is being used because that # will be determined for real at simulation time (MATSCI-8027) # # For testing we can confirm what directory gets used by looking for # the directory that gets loaded in the Desmond multisim file # 'Loading custom OPLS file:' tlog.log(self.logger, 'Using custom OPLS directory') else: tlog.log(self.logger, 'Could not set custom OPLSDIR')
[docs] @tlog.timeit_logger_debug def centerStruct(self): """ Translate the whole structure so that the centroid is at the origin. """ if not self.rezero_system: return transform.translate_center_to_origin(self.struct, [0., 0., 0.])
[docs] @tlog.timeit_logger_debug def splitComponents(self): """ Each list in Component cts contains isomers. """ if not self.split_components: # Place all the mol indices in first component and return self.component_mol_indices = [ list(range(1, self.struct.mol_total + 1)) ] self._mol2aidxs = { mol.number: tuple(mol.getAtomIndices()) for mol in self.struct.molecule } return # Retype in case atom types are stale due to changes in bond order # (MATSCI-6728) self.struct.retype() if self.struct.mol_total == 1: # Special case: entire structure - one molecule (MATSCI-11446) self.component_mol_indices = [[1]] self._mol2aidxs = {1: tuple(range(1, self.struct.atom_total + 1))} return self.component_mol_indices = [] self._mol2aidxs = {} for idx, mol in enumerate(self.struct.molecule): if idx % 1000 == 0: tlog.log_debug(self.logger, 'Splitting %d-th molecule.' % idx) aidxs = mol.getAtomIndices() self._mol2aidxs[mol.number] = tuple(aidxs) for comp_mol_indinces in self.component_mol_indices: ct_aidxs = self._mol2aidxs[comp_mol_indinces[0]] # It is required for desmond that atoms are in the same order # in molecules, are_isomers_conn checks for that if mmct_ct_compare_connect_py(self.struct, ct_aidxs, aidxs): comp_mol_indinces.append(mol.number) break else: self.component_mol_indices.append([mol.number])
[docs] @tlog.timeit_logger_debug def writeMae(self): """ Write out the mae file for Desmond forceField assignment input. :raise FileNotFoundError: if input files for desmond assign forcefield stage cannot be generated. """ if os.path.exists(self.in_mae): fileutils.force_remove(self.in_mae) TMP_PROP = 'i_matsci_orig_atom_idx_tmp' with structure.StructureWriter(self.in_mae) as writer: for idx, comp_mol_indinces in enumerate(self.component_mol_indices): indices = [] for mol_idx in comp_mol_indinces: indices.extend(self._mol2aidxs[mol_idx]) for idx in indices: self.struct.atom[idx].property[TMP_PROP] = idx struct = self.struct.extract(indices, copy_props=True) # Reorder according the original indices remap = { atom.property[TMP_PROP]: atom.index for atom in struct.atom } remap = [remap[idx] for idx in indices] struct = build.reorder_atoms(struct, remap) if self.split_components: # Each structure (component) has only one type of molecule # This greatly speeds up ffbuilder struct.property[NUM_COMPONENT] = 1 # Simple check if it is possible a water component # Water ff assignment needs a component of pure water, # and thus must be used with split_components # water_fftype is an all-atom water model if len(self._mol2aidxs[mol_idx]) == 3: self.prepareWaterModel(struct) struct.deletePropertyFromAllAtoms(TMP_PROP) writer.append(struct) if not os.path.exists(self.in_mae): raise FileNotFoundError("Unable to find input file %s" % self.in_mae)
[docs] def writeMsj(self): """ Write out the msj file for Desmond forceField assignment input. """ if os.path.exists(self.msj_fname): fileutils.force_remove(self.msj_fname) forcefield_opts = { 'forcefield': self.forcefield, 'assign_is_infinite': self.check_infinite, # hard-code this, no known reasons to turn this off 'fail_on_lewis_failure': True, 'hydrogen_mass_repartition': self.redistribute_hydrogen_mass } if self.split_components: # water ff assignment needs a component of pure water, and thus # must be used with split_components forcefield_opts['water'] = self.water_fftype stringer = SysbuildMSJStringer(**forcefield_opts) create_msj([stringer], filename=self.msj_fname) if not os.path.exists(self.msj_fname): raise FileNotFoundError("Unable to find command input file: %s" % self.msj_fname)
[docs] @staticmethod def validateBuildGeometryMsj(msj_fn=None, msj_sea=None): """ Validate MSJ file or Map to have two stages, task and build_geometry. :rtype: (bool, sea.Map) or (bool, str) :return: Whether MSJ is valid. If it is, return Map of the MSJ, otherwise error message """ assert bool(msj_fn) ^ bool(msj_sea), ('msj_fn or msg_sea must be ' 'present.') if msj_fn: try: msj_sea = cmj.msj2sea(msj_fn, None) except RuntimeError as err: return False, str(err) if len(msj_sea.stage) < 2: return False, 'MSJ must have at least two stages.' for idx, (number, name) in enumerate( zip(('First', 'Second'), ('task', 'build_geometry'))): if msj_sea.stage[idx].__NAME__ != name: return False, '%s stage in MSJ must be "%s".' % (number, name) return True, msj_sea
def _removeOverlappingSolvent(self, comp_ct, radius): """ Remove solvent overlapping with other components. :param list[structure.Structure] comp_ct: List of structures :param float radius: Radius check of overlaps (in A) """ assert comp_ct[0].property[CT_TYPE] == CT_TYPE.VAL.FULL_SYSTEM, ( 'First component must be full system.') assert comp_ct[-1].property[CT_TYPE] == CT_TYPE.VAL.SOLVENT, ( 'Last ' 'component must be solvent.') assert 1 == len([ ct for ct in comp_ct if ct.property[CT_TYPE] == CT_TYPE.VAL.SOLVENT ]), ('Only one ' 'solvent component is supported.') full_ct, solvent_ct = comp_ct[0], comp_ct[-1] solvent_first_idx = sum(ct.atom_total for ct in comp_ct[1:-1]) + 1 asl_str = ('fillmol ((a.n %d-%d) AND within %f (a.n 1-%d))' % (solvent_first_idx, full_ct.atom_total, radius, solvent_first_idx - 1)) indices = analyze.evaluate_asl(comp_ct[0], asl_str) solvent_ct_indices = [idx - (solvent_first_idx - 1) for idx in indices] solvent_ct.deleteAtoms(solvent_ct_indices) if solvent_ct.atom_total == 0: # Remove empty solvent component (last in the comp_ct) tlog.log_debug(self.logger, 'Removed entire solvent component') comp_ct.pop() tlog.log_debug( self.logger, 'Removed %d solvent atoms using exclusion radius %.2f A' % (len(solvent_ct_indices), radius))
[docs] def prepareBuildGeometryInput(self, in_mae): """ Prepare mae input for the structure builder :param str in_mae: Input mae (not modified) :return str: Mae file with prepared input for build_geometry """ out_mae = self.desmond_basename + '_build_geometry-in.mae' if not self.has_water_ct: # Nothing to do if water is not present, simply copy and return fileutils.force_copy2(in_mae, out_mae) return out_mae with structure.StructureWriter(out_mae) as writer: with structure.StructureReader(in_mae) as reader: for struct in reader: if struct.property[CT_TYPE] == CT_TYPE.VAL.SOLVENT: # Solvent means that water is present, remove this ct # for now. tlog.log( self.logger, 'Build geometry input contains water solvent, this ' 'is not currently supported, removing it.') continue assert struct.property[CT_TYPE] == CT_TYPE.VAL.SOLUTE, ( 'Only solute type is supported here.') writer.append(struct) assert writer.written_count > 0, 'Build geometry got empty input.' return out_mae
[docs] def setupBuildGeometryMSJ(self, msj_fn): """ Write msj with build_geometry stage. :param str msj_fn: MSJ filename to use :rtype: bool or float :return: False on error. 0 if no solvation was requested, otherwise solvation radius """ assert self.build_geometry_msj, 'build_geometry not requested' def write_msj(msj_fn, stages): with open(msj_fn, 'w') as msj_fh: for stage in stages: msj_fh.write('%s {\n' % stage.__NAME__) msj_fh.write(stage.__str__(" ")) msj_fh.write('}\n\n') status, cfg_sea = self.validateBuildGeometryMsj( msj_fn=self.build_geometry_msj) if not status: tlog.log(self.logger, '%s Will skip build geometry step.' % cfg_sea) return False # build_geometry validated to be the second stage stage = cfg_sea.stage[1] # Allowed keys for now, hard code the rest ALLOWED_KEYS = { 'add_counterion', 'neutralize_system', 'salt', 'solvent', 'solvate_system' } if ('solvate_system' in stage and stage.solvate_system.val and 'solvent' in stage): # Get radius from the stage if present before its deletion # Hard-code to default 2.0 A for now, set minimum to 0.5 radius = max(0.5, (stage[self.SOLVATION_RADIUS_KEY].val if self.SOLVATION_RADIUS_KEY in stage else 2.0)) if (stage.solvent.val == 'FILE'): tlog.log( self.logger, 'solvent = FILE is not supported. Will ' 'skip build geometry step.') return False else: radius = 0 # Solvation wasn't requested for key in set(stage.keys()).difference(ALLOWED_KEYS): del stage[key] # Hard code values for now stage['box'] = 'read' stage['override_forcefield'] = self.forcefield stage['rezero_system'] = False stage['compress'] = "" stage['dir'] = "." stage['jobname'] = "$MASTERJOBNAME" stage['distil_solute'] = False stage[constants.BUILD_GEOMETRY.REMOVE_OVERLAPPED_SOLVENT] = False # Write only first two stages: task and build_geometry write_msj(msj_fn, cfg_sea.stage[:2]) return radius
[docs] def runBuildGeometry(self): """ Run build_geometry with multisim if requested. self.in_mae may be updated here. """ if not self.build_geometry_msj: return log = lambda msg: tlog.log(self.logger, '%s Will skip build geometry step.' % msg) jobname = '%s_build_geometry' % self.desmond_basename msj_fn = '%s.msj' % jobname out_mae = '%s-out.mae' % jobname log_fn = '%s-in.log' % jobname # Weird name with -in csb_fn = '%s-in_sb.csb' % jobname if self.backend and self.copy_output: for file_fn in (msj_fn, out_mae, log_fn, csb_fn): jobutils.add_outfile_to_backend(file_fn) exclude_solvent_radius = self.setupBuildGeometryMSJ(msj_fn) if exclude_solvent_radius is False: # Error was already logged return in_mae = self.prepareBuildGeometryInput(self.in_mae) cmd = get_multisim_command(in_mae, out_mae, msj_name=msj_fn, job_name=jobname, add_license=False) job_obj = jobcontrol.launch_job(cmd) job_obj.wait() if not os.path.isfile(out_mae): log('Could not locate build_geometry output file: %s.' % out_mae) return try: sts = list(msutils.structure_reader(out_mae, do_raise=True)) except Exception as err: log('Could not read output structures from build_geometry. %s' % str(err)) return if not (structure.count_structures(self.in_mae) <= len(sts)): # build_geometry should at least return same number of structures log('Wrong output from build_geometry output.') return if exclude_solvent_radius: self._removeOverlappingSolvent(sts, exclude_solvent_radius) # Update self.in_mae, pretty important (!) self.in_mae = out_mae with structure.StructureWriter(self.in_mae) as writer: # Skip full_system added by build_geometry writer.extend([ st for st in sts if st.property[CT_TYPE] != CT_TYPE.VAL.FULL_SYSTEM ]) tlog.log(self.logger, 'Done building geometry.')
[docs] @tlog.timeit_logger_debug def runForceFieldAssignment(self): """ Run the desmond assign forcefield stage. """ cmd = get_multisim_command(self.in_mae, self.out_cms, msj_name=self.msj_fname, job_name=self.desmond_basename, add_license=False) job_obj = jobcontrol.launch_job(cmd) job_obj.wait()
[docs] def addFilesToBackend(self): """ Add intermediate and final files to backend. """ if not self.backend or not self.copy_output: return if self.set_incorporation: self.backend.setStructureOutputFile(self.out_cms) # Always copy back the log file on failure (PANEL-7922) for filename in [ self.out_cms, self.msj_fname, self.in_mae, self.multisim_log ]: self.backend.addOutputFile(filename) for extension in ['_1-out.tgz', '-multisim_checkpoint']: self.backend.addOutputFile(self.desmond_basename + extension)
[docs] @tlog.timeit_logger_debug def postProcessCms(self): """ Post process of desmond cms output. :raise FileNotFoundError: If the cms cannot be found. """ if not os.path.exists(self.out_cms): raise FileNotFoundError( f'Unable to find final Desmond system file, {self.out_cms}. ' 'The Desmond multisim utility did not finish successfully. ' f'Check the {self.multisim_log} file for more information.') BUILD_GEOM_CT_TYPES = { CT_TYPE.VAL.ION, CT_TYPE.VAL.POSITIVE_SALT, CT_TYPE.VAL.NEGATIVE_SALT } result = cms.Cms(file=self.out_cms) # Mark the structure as solvent to avoid Desmond attempts to keep all # atoms together MATSCI-778 for struct in result.comp_ct: ct_type = struct.property[CT_TYPE] if ct_type in BUILD_GEOM_CT_TYPES: # Skip components coming from build_geometry continue struct.property[CT_TYPE] = CT_TYPE.VAL.SOLVENT # Set the partial charges to custom charges for astruct in itertools.chain([result._raw_fsys_ct], result.comp_ct): for atom in astruct.atom: custom_q = atom.property.get('r_ffio_custom_charge') if custom_q is not None: atom.partial_charge = custom_q # MATSCI-6088: Entry name of result is always "Full system" result.set_cts_property('s_m_title', self.struct.title) if self.wam_type: if self.per_structure_wam: jobutils.set_structure_wam(result, self.wam_type) result.write(self.out_cms) else: jobutils.write_cms_with_wam(result, self.out_cms, self.wam_type) else: result.write(self.out_cms)
[docs] @tlog.timeit_logger_debug def prepareWaterModel(self, struct): """ Prepare water component. :type struct: `schrodinger.structure.Structure` :param struct: The structure whose atom properties to be modified. Must be one component (all molecules of the same type) """ mol_st = struct.molecule[1].extractStructure(copy_props=True) # Check if first molecule has exactly 3 atoms (H2O) if mol_st.atom_total != 3: return from schrodinger.application.desmond.stage.prepare import forcefield # get_water_ct_indices requires possible water components to be solvent mol_st.property[CT_TYPE] = CT_TYPE.VAL.SOLVENT try: indices = forcefield.AssignForcefield.get_water_ct_indices([mol_st]) except ValueError: # Should never happen since components are split already raise AssertionError('Water component is not pure water.') if indices: # It is water component struct.property[CT_TYPE] = CT_TYPE.VAL.SOLVENT self.has_water_ct = True
[docs]def run_system_builder(struct, base, *args, **kwargs): """ Run the Desmond multisim system builder. See SystemBuilder for argument documentation. :type location_type: str :param location_type: the location type, either parserutils.INSTALLED_CG_FF_LOCATION_TYPE or parserutils.LOCAL_CG_FF_LOCATION_TYPE :rtype: str or None :return: The name of the resulting .cms file, or None if the file was not produced successfully """ location_type = kwargs.pop('location_type', parserutils.LOCAL_CG_FF_LOCATION_TYPE) logger = kwargs.get('logger', None) if not msutils.is_coarse_grain(struct): system_builder = SystemBuilder(struct, base, *args, **kwargs) try: system_builder.run() except (FileNotFoundError, jobcontrol.JobLaunchFailure) as err: tlog.log(logger, str(err)) return return system_builder.out_cms # do CG # get force field info forcefield = kwargs.get('forcefield') if not forcefield: tlog.log(logger, 'For CG input a force field name is required.') return try: cgffld_path = cgff.get_force_field_file_path( forcefield, location_type=location_type, local_type_includes_cwd=True, check_existence=True) except ValueError as err: tlog.log(logger, str(err)) return # write cms file sysfile = f'{base}_system-out.cms' pbc_position = struct.property.get(xtal.PBC_POSITION_KEY, xtal.CENTER_PBC_POSITION) try: cgff.create_cg_system(struct, cgffld_path, filename=sysfile, pbc_position=pbc_position) except cgff.MissingParameterError as err: msg = f'Exiting due to critical force field error: {str(err)}' tlog.log(logger, msg) return wam_type = kwargs.get('wam_type') if wam_type: jobutils.add_wam_to_cms(sysfile, wam_type) jobutils.add_outfile_to_backend(sysfile) return sysfile
[docs]def find_forcefield_invalid_structures(structs, ffield_num=None, handle=None): """ Check structures to see if the specified forcefield is valid for them :type structs: list :param structs: A list of `schrodinger.structure.Structure` objects :type ffield_num: int or None :param ffield_num: An integer specifying which forcefield to use, if None then handle must be given :type handle: int or None :param handle: A forcefield handle, if None then ffield_num must be given :rtype: list :return: A list containing all the structures that fail the force field test. The list is empty if all structures are valid for the force field """ assert bool(ffield_num) ^ (handle is not None) if handle is not None: conman = lambda: contextlib.nullcontext(handle) else: # Turn off CM1A charges for this check - they do not need to be # calculated just to check FF typing issues and they are very expensive # MATSCI-2286 conman = lambda: common.opls_force_field(version=ffield_num, no_cm1a_bcc=True) with conman() as handle: invalid = [] for struct in structs: try: # Silence FF typing terminal messages with ioredirect.IOSilence(): with common.assign_force_field(handle, struct): pass except mmcheck.MmException: invalid.append(struct) return invalid
[docs]def validate_ff_name_and_get_number(name): """ Check that the given name is a valid force field name and if so, return the mmffld number associated with that name :type name: str :param name: The force field name to check :rtype: (str or None, int) :return: The first item is an error message if an error occurred. The second item is the mmffld integer for the given forcefield, or 0 if an error occurred. """ error = None try: name, number = parserutils.type_forcefield_and_get_number(name) except argparse.ArgumentTypeError as msg: number = 0 error = str(msg) return error, number
[docs]def read_density_from_eaf(density_eaf, fraction=0.2, logger=None): """ Reads in a .eaf file generated by an ms_analysis stage of a Desmond MD run and returns density data. :type density_eaf: str :param density_eaf: EAF file containing density information :type fraction: float :param fraction: The last fraction of the frames to use in the density averaging :type logger: `logging.Logger` :param logger: The logger to use while logging errors. If not supplied, messages will be printed. :rtype: (None, None), or (float, float) :return: (None, None) if the density cannot be read from the file, or tuple of the calculated density and standard deviation. """ if not os.path.exists(density_eaf): return (None, None) # Read the density from the file try: with open(density_eaf, 'r') as fh: eaf_data = sea.Map(fh.read()) except RuntimeError as err: if logger: msg = ("Specified .eaf file does not appear to " "be formatted correctly. Error while processing " "was: {0}".format(str(err))) tlog.log(logger, msg) return (None, None) try: density_data = eaf_data.Keywords[0].Bulk.DensityResult except AttributeError as err: if logger: msg = ("Specified .eaf file does not appear to have the " "expected density data. Error while parsing file " "was: {0}").format(str(err)) tlog.log(logger, msg) return (None, None) # Average the density over the last 20% of frames and compute the # standard deviation total_frames = len(density_data) density_list = [] for density_value in density_data: density_list.append(density_value.val) checked_frames = int(math.ceil(total_frames * fraction)) density = numpy.mean(density_list[-checked_frames:]) stdev = numpy.std(density_list[-checked_frames:]) return (density, stdev)
[docs]def get_task_stage(extra_task_text='', check_cg=None, check_infinite=None, msj_string_header='', remove_com_motion=None): """ Get desmond task stage. :type extra_task_text: str :param extra_task_text: Additional text to place in the task stage. Ignored if task_stage=False. :type check_cg: `schrodinger.structure.Structure` :param check_cg: Check the given structure to see if it is coarse-grained. If it is, add the typical headers for the appropriate coarse-grained structure type. Note that the interplay between extra_task_test and the headers used for coarse-grained structures is uncertain and left to the caller to determine if the results are acceptable if both are used. Ignored if task_stage=False. :type check_infinite: `schrodinger.structure.Structure` :param check_infinite: Check the given structure to see if it is infinite. If it is, add the typical headers for the infinite systems. Ignored if task_stage=False. :type msj_string_header: when extra_task_text is empty, use this string as the header of the output msj :param msj_string_header: str :type remove_com_motion: bool or None :param bool remove_com_motion: None - default behavior, add remove_com_motion only to infinite atomic systems, if True add plugin to msj regardless. If False don't add remove_com_plugin :rtype: str :return: Task stage """ header = coarsegrain.get_coarse_grain_msj_header(check_cg) remove_com_motion_added = False if check_infinite: is_cg = msutils.is_coarse_grain(check_cg) is_infinite = bool(check_infinite.property.get(IS_INFINITE, False)) if is_infinite: header += PERIODIC_FIX_HEADER if remove_com_motion is None and is_infinite and not is_cg: # This is default behavior remove_com_motion_added = True header += PERIODIC_FIX_HEADER_ATOM if remove_com_motion and not remove_com_motion_added: header += PERIODIC_FIX_HEADER_ATOM if header: dheader = DESMOND_FAMILY_HEADER % header extra_task_text += MSJ_FAMILY_HEADER % dheader return 'task { task = "desmond:auto" %s }\n\n' % extra_task_text
[docs]def validate_box_size(model, temp=300.0, simple=True): """ Validate model or structure box (lattice) size. :type model: cms.Cms or structure.Structure :param model: Desmond model or structure. For structure chorus properties must be set :param float temp: Temperature in K :param bool simple: If true, run an empirical test based on the lattice constants :return bool: True if cell is valid for Desmond, False otherwise """ # Note that simple=False might fail on Windows (MATSCI-9023) if simple: # Empirically suggested by Dong, can change if user changes desmond cutoff MINIMUM_LENGTH = 22. # Ang if isinstance(model, cms.Cms): box = list(model.box) else: box = list(xtal.get_chorus_properties(model)) params = xtal.get_params_from_chorus(box) return all(x >= MINIMUM_LENGTH for x in params[:3]) else: # This imports entire desmond API (MATSCI-10367) from schrodinger.application.desmond.stage.utils import check_box_size msj = 'temperature = %f' % temp return check_box_size(model, msj)
[docs]class MSJStringer(object): """ A base class for setting up the information and generating a string describing that information for a stage in a Desmond MSJ file. """ # Common keys in the .msj file DOT = '_dot_' MAX_STEPS = 'max_steps' TIME = 'time' TEMP = 'temperature' SEED = 'randomize_velocity.seed' PRESSURE = 'pressure' ENSEMBLE = 'ensemble' ENSEMBLE_CLASS = 'ensemble.class' ENSEMBLE_METHOD = 'ensemble.method' TIMESTEP = 'timestep' JOBNAME = 'jobname' CHECKPOINT = 'checkpt' DIR = 'dir' COMPRESS = 'compress' INTERVAL = 'trajectory%sinterval' % DOT ENE_INTERVAL = 'eneseq%sinterval' % DOT ANALYSIS_TYPE = 'analysis_type' OTHERS = 'othertag' TRJ_WRITE_VEL = 'trajectory%swrite_velocity' % DOT RANDOMIZE_VEL = 'randomize_velocity%sfirst' % DOT ISOTROPY = f'backend{DOT}integrator{DOT}pressure{DOT}isotropy' # Very common key/value pairs # Checkpoint is being turned off as default. See case MATSCI-12438. To # turn it on please set 'checkpt' and 'checkpt.write_last_step'. MSJ_BASE = {JOBNAME: '"$MASTERJOBNAME"', CHECKPOINT: 'no'} MSJ_LAST = {DIR: '"."', COMPRESS: '""'} # Ensures that data in stages is in a logical, consistent order LINE_ORDER = [ MAX_STEPS, ANALYSIS_TYPE, TIME, TIMESTEP, ENSEMBLE, ENSEMBLE_CLASS, ENSEMBLE_METHOD, TEMP, PRESSURE, INTERVAL, SEED, OTHERS, JOBNAME, DIR, COMPRESS, CHECKPOINT ] KNOWN_LINES = set(LINE_ORDER) # Stage types and the default settings for each MINIMIZE = 'minimize' SIMULATE = 'simulate' ANALYSIS = 'matsci_analysis' SYSBUILD = 'assign_forcefield' AVE_CELL = 'average_cell' EN_ANALYSIS = 'analysis' CONCATENATE = 'concatenate' SETTINGS = { MINIMIZE: config.DEFAULT_SETTING.MINIMIZE, SIMULATE: config.DEFAULT_SETTING.MD, ANALYSIS: None, SYSBUILD: None, AVE_CELL: None, EN_ANALYSIS: None, CONCATENATE: None } PADDING = ' ' INDENT = ' '
[docs] def __init__(self, stype, last=False, use_base=True, **kwargs): """ Create a MSJStringer object :type stype: str :param stype: The type of stage this is. Should be one of the class constants MINIMIZE, SIMULATE, SYSBUILD, ANALYSIS, or AVE_CELL :type last: bool :param last: Whether the msj lines typically associated with the last stage (dir, compress) should be included :type use_base: bool :param use_base: Whether the base msj lines (jobname, checkpoint) that are common to almost all stages should be included. All other keyword arguments are turned into lines in the .msj file. Keys that have a '.' in them should have that . replaced with the class constant string DOT (_dot_). For instance to include a line setting trajectory.write_velocity to true, use the keyword argument trajectory_dot_write_velocity='true' """ self.stype = stype if use_base: self.data = self.MSJ_BASE.copy() else: self.data = collections.OrderedDict() self.data.update(kwargs) if last: self.data.update(self.MSJ_LAST) settings = self.SETTINGS[self.stype] if settings: self.key = config.canonicalize(copy.deepcopy(settings)) else: self.key = None
[docs] def createString(self, concatenate=False): """ Create and return the string that represents this stage in the .msj file :param bool concatenate: If True enable concatenate mode. In this mode, stage type is not printed :rtype: str :return: The msj string representing this stage """ # We're going to destroy the dictionary as we go, so make a copy dcopy = self.data.copy() # First line is the stage type and title title = self.getTitle() title_str = f' title = "{title}"' if title else '' if concatenate: string = '{%s\n' % title_str else: string = '%s {%s\n' % (self.stype, title_str) def add_line(string, key, value): # Add a key = value line to string key = key.replace(self.DOT, '.') newline = self.formLine(key, value) string += newline return string for line in self.LINE_ORDER: if line == self.OTHERS: # This is the signal to add all the custom lines in a block # Temporary fix by sorting, see MATSCI-5156 for key in sorted(dcopy): if key not in self.KNOWN_LINES: value = dcopy.pop(key) string = add_line(string, key, value) else: # This is a known line value = dcopy.pop(line, None) if value is not None: string = add_line(string, line, value) # Last line gets a closing bracket string = string[:-1] + ' }\n' return string
[docs] def formLine(self, key, value): """ Create the line that should go in the .msj file for this key. Subclasses should use this function to create custom strings for specific key values :type key: str :param key: Typically, the string before the key = value pair on an msj line. :type value: str :param value: Typically, the string after the key = value pair on an msj line. :rtype: str :return: The line (or lines) to put in the .msj file for this key/value pair """ return '%s%s = %s\n' % (self.PADDING, key, str(value))
[docs] def getTitle(self): """ This function returns a title for the stage describing the main parameters. The default implementation has no title. :rtype: str :return: The title for this stage """ return ""
[docs]class MinimizeMSJStringer(MSJStringer): """ An MSJStringer class for Minimizer stages """
[docs] def __init__(self, iterations=None, last=False, **kwargs): """ Create a MinimizeMSJStringer object :type last: bool :param last: Whether the msj lines typically associated with the last stage (dir, compress) should be included :type iterations: int :param iterations: The number of iterations. If not provided the Desmond default will be used. All other keyword arguments are turned into lines in the .msj file. See parent class for additional information. """ if iterations is not None: kwargs[self.MAX_STEPS] = iterations MSJStringer.__init__(self, self.MINIMIZE, last=last, **kwargs)
[docs] def getTitle(self): """ Return a title for the stage describing the main parameters. :rtype: str :return: The title for this stage """ iters = self.data.get(self.MAX_STEPS, self.key.max_steps.val) return 'Minimize Iter: %s' % iters
[docs]class AveCellMSJStringer(MSJStringer): """ An MSJStringer class for post average cell stage. """ PERCENT_TO_AVG = 'percent_to_avg'
[docs] def __init__(self, **kwargs): super().__init__(self.AVE_CELL, **kwargs) self.data.pop(self.CHECKPOINT, None)
[docs] def getTitle(self): """ This function returns a title for the stage describing the main parameters. The default implementation has no title. :rtype: str :return: The title for this stage """ return "Average cell, last %s%%" % self.data[self.PERCENT_TO_AVG]
[docs]class AnalysisMSJStringer(MSJStringer): """ An MSJStringer class for the analysis stage. """
[docs] def __init__(self, **kwargs): super().__init__(self.EN_ANALYSIS, **kwargs) self.data.pop(self.CHECKPOINT, None)
[docs]class MDMSJStringer(MSJStringer): """ An MSJStringer class for Molecular Dynamics stages """
[docs] def __init__(self, time=None, temp=None, pressure=None, random_seed=None, ensemble='NPT', method=None, timestep=None, last=False, **kwargs): """ Create a MDMSJStringer object Any keyword that does not have a value supplied will use the default Desmond value for MD simulations. :type time: float :param time: Time in picoseconds :type temp: float :param temp: Temperature in Kelvin :type pressure: float :param pressure: The pressure in bar :type random_seed: int :param random_seed: The seed for the random number generator :type ensemble: str :param ensemble: Valid Desmond ensemble such as NPT, NVT :type method: str :param method: Valid Desmond ensemble method such as NH, Berendson, Langevin :type timestep: int or float or str or list :param timestep: The timestep in picoseconds. If a string, will be treated as a float if it converts to float without error, otherwise it is used directly so must be in the proper [ x y z ] format used in .msj and .cfg files. If float or int, a string of the following format will be generated [ timestep timestep timestep * 3. ] If a list, the items will be converted to a string list of [ x y z ] :type last: bool :param last: Whether the msj lines typically associated with the last stage (dir, compress) should be included All other keyword arguments are turned into lines in the .msj file. See parent class for additional information. """ if time is not None: kwargs[self.TIME] = time if temp is not None: kwargs[self.TEMP] = temp if pressure is not None: kwargs[self.PRESSURE] = pressure if random_seed is not None: kwargs[self.SEED] = random_seed if method is not None: # Note - if method is specified, the class must also be specified # in the .msj file with ensemble.class kwargs[self.ENSEMBLE_CLASS] = ensemble kwargs[self.ENSEMBLE_METHOD] = method else: # If method is not specified, the class must be specified via # ensemble (not ensemble.class) kwargs[self.ENSEMBLE] = ensemble if timestep is not None: if isinstance(timestep, str): # Check for a string of the form '1.7' try: timestep = float(timestep) except ValueError: # Per the timestep docs, if a provided timestep string # doesn't convert to a float, we will use it directly. pass if isinstance(timestep, (int, float)): timestep = [timestep, timestep, timestep * 3.0] if isinstance(timestep, list): # The round in the function below avoids values like # 3.3000000000000003 timestep = '[ %s ]' % ' '.join( [str(round(x, 8)) for x in timestep]) kwargs[self.TIMESTEP] = timestep super().__init__(self.SIMULATE, last=last, **kwargs)
[docs] def getTitle(self): """ Return a title for the stage describing the main parameters. :rtype: str :return: The title for this stage """ time = float(self.data.get(self.TIME, self.key.time.val)) / 1000 ensemble_val = self.data.get(self.ENSEMBLE, self.key.ensemble.class_.val) ensemble = self.data.get(self.ENSEMBLE_CLASS, ensemble_val) pressure = float(self.data.get(self.PRESSURE, self.key.pressure.val[0])) title = ('Molecular Dynamics %.4f ns/%s/%.2f bar' % (time, ensemble, pressure)) return title
[docs]class MSAnalysisMSJStringer(MSJStringer): """ An MSJStringer class for MS MD Analysis stages """
[docs] def __init__(self, analysis_type=None, **kwargs): """ Create a MSAnalysisMSJStringer object Any keyword that does not have a value supplied will use the default Desmond value for MD simulations. :type analysis_type: str :param analysis_type: A string in the form of [ property property ] that lists the properties to be computed in the analysis. All other keyword arguments are turned into lines in the .msj file. See parent class for additional information. """ if analysis_type: kwargs[self.ANALYSIS_TYPE] = analysis_type MSJStringer.__init__(self, self.ANALYSIS, last=True, use_base=False, **kwargs)
[docs] def getTitle(self): """ Return a title for the stage describing the main parameters. :rtype: str :return: The title for this stage """ return 'Analysis of bulk properties for MS systems'
[docs]class BrownieMSJStringer(MDMSJStringer): """ An MSJStringer class for Brownie stages """
[docs] def __init__(self, time=100., temp=10., ensemble=msconst.ENSEMBLE_NVT, timestep='[ 0.001 0.001 0.003 ]', delta_max=0.1, btau=None, ttau=None, **kwargs): """ Create a BrownieMSJStringer object Desmond config does not have a specific set of defaults for Brownie stages, so the defaults are supplied here for all keywords. :param float delta_max: The value of delta_max :param float btau: The value of the barostat tau :param float ttau: The value of the thermostat tau See parent class for additional information. """ if delta_max is not None: kwargs['ensemble_dot_brownie_dot_delta_max'] = delta_max if btau is not None: kwargs['ensemble_dot_barostat_dot_tau'] = btau if ttau is not None: kwargs['ensemble_dot_thermostat_dot_tau'] = ttau super().__init__(time=time, temp=temp, ensemble=ensemble, method='Brownie', timestep=timestep, **kwargs)
[docs] def getTitle(self): """ Return a title for the stage describing the main parameters. :rtype: str :return: The title for this stage """ time = old_div(float(self.data[self.TIME]), 1000) ensemble = self.data[self.ENSEMBLE_CLASS] return f'Brownian Dynamics {ensemble}, {time:.4f} ns'
[docs]class SysbuildMSJStringer(MSJStringer): """ An MSJStringer class for building systems """
[docs] def __init__(self, forcefield=OPLS2005, compress='""', **kwargs): """ Create a SysbuildMSJStringer object :type forcefield: str :param forcefield: The forcefield to use All other keyword arguments are turned into lines in the .msj file. See parent class for additional information. """ kwargs['forcefield'] = forcefield kwargs['compress'] = compress MSJStringer.__init__(self, self.SYSBUILD, use_base=False, **kwargs)
[docs]class ConcatMSJStringer(MSJStringer): """ An MSJStringer class for the concatenate stage. """ STAGES = 'stages'
[docs] def __init__(self, **kwargs): super().__init__(self.CONCATENATE, **kwargs) self.data.pop(self.CHECKPOINT, None)
[docs] def formLine(self, key, value): """ Create the line that should go in the .msj file for the 'stages' key. Parent function creates strings for all other keys. :type key: str :param key: Typically, the string before the key = value pair on an msj line. :type value: str :param value: Typically, the string after the key = value pair on an msj line. :rtype: str :return: The line (or lines) to put in the .msj file for this key/value pair """ if key != self.STAGES: return super().formLine(key, value) # See DESMOND-9929 how to correctly build concatenate stage ret = 'simulate = [\n' for stage in value: # Only simulate stages can be concatenated assert stage.stype == stage.SIMULATE ret += stage.createString(concatenate=True) ret += ']\n' return ret
[docs]def get_materials_relaxation_stringers( temp=300.0, compress_last=True, pressure_isotropy=constants.IsotropyPolicy.ANISOTROPIC): """ Get a list of Stringer objects that reproduce the Materials Relaxation protocol :param float temp: The temperature for the last of the three stages :param bool compress_last: If true, compress the last stage - the trajectory will not be available except as part of the tgz file. If False, the last stage will not be compressed and the trajectory will be available. :param str pressure_isotropy: Barostat coupling between x, y, and z. :rtype: list :return: Each item of the list is a stage in the Materials Relaxation protocol """ brownie1 = BrownieMSJStringer(ensemble='NVT', temp=10.0, time=20) brownie2 = BrownieMSJStringer( ensemble='NPT', temp=100., time=20, btau=0.5, ttau=0.5, delta_max=0.5, timestep=None, backend_dot_integrator_dot_pressure_dot_isotropy=pressure_isotropy, backend_dot_integrator_dot_pressure_dot_tension_ref='0') relax_md = MDMSJStringer( ensemble='NPT', method='MTK', temp=temp, time=100, backend_dot_integrator_dot_pressure_dot_isotropy=pressure_isotropy, backend_dot_integrator_dot_pressure_dot_tension_ref='0', last=not compress_last) return [brownie1, brownie2, relax_md]
[docs]def get_semicrystalline_relaxation1_stringers( temp=300.0, compress_last=True, pressure_isotropy=constants.IsotropyPolicy.ANISOTROPIC): """ Get a list of Stringer objects that reproduce first Semi-Crystalline protocol :param float temp: The temperature for the last of the three stages :param bool compress_last: If true, compress the last stage - the trajectory will not be available except as part of the tgz file. If False, the last stage will not be compressed and the trajectory will be available. :param str pressure_isotropy: Barostat coupling between x, y, and z. :rtype: list :return: Each item of the list is a stage in the first Semi-Crysatlline relaxation protocol """ brownie1 = BrownieMSJStringer(ensemble='NVT', temp=10.0, time=100, restraints=CRYSTAL_RESTRAINT) relax_md1 = MDMSJStringer( time=200, temp=50, timestep=0.001, restraints=RETAIN_RESTRAINT, ensemble_dot_class='NPAT', ensemble_dot_method='MTK', backend_dot_integrator_dot_pressure_dot_isotropy='constant_area', backend_dot_integrator_dot_pressure_dot_tension_ref='0', ) relax_md2 = MDMSJStringer( time=200, temp=50, timestep=0.001, restraints=IGNORE_RESTRAINT, ensemble='NVT', ) relax_md3 = MDMSJStringer( time=50.0, timestep=0.0001, temp=50, pressure=1.01325, restraints=IGNORE_RESTRAINT, backend_dot_integrator_dot_pressure_dot_isotropy=pressure_isotropy, randomize_velocity_dot_first=0.0, randomize_velocity_dot_seed=2007, randomize_velocity_dot_interval='inf', randomize_velocity_dot_temperature=50, ensemble='NPT', method='MTK', ensemble_dot_barostat_dot_tau=2, ensemble_dot_thermostat_dot_tau=1) relax_md4 = MDMSJStringer( time=5000.0, timestep=0.001, annealing=True, ensemble='NPT', method='MTK', ensemble_dot_barostat_dot_tau=2, ensemble_dot_thermostat_dot_tau=1, backend_dot_integrator_dot_pressure_dot_isotropy=pressure_isotropy, temperature="[[50.0 0 ][%s 5000 ]]" % temp, last=not compress_last) return [brownie1, relax_md1, relax_md2, relax_md3, relax_md4]
[docs]def get_semicrystalline_relaxation2_stringers( temp=300.0, compress_last=True, pressure_isotropy=constants.IsotropyPolicy.ANISOTROPIC): """ Get a list of Stringer objects that reproduce second Semi-Crystalline Relaxation protocol :param float temp: The temperature for the last of the three stages :param bool compress_last: If true, compress the last stage - the trajectory will not be available except as part of the tgz file. If False, the last stage will not be compressed and the trajectory will be available. :param str pressure_isotropy: Barostat coupling between x, y, and z. :rtype: list :return: Each item of the list is a stage in the second Semi-Crysatlline relaxation protocol """ brownie1 = BrownieMSJStringer(ensemble='NVT', temp=10.0, time=100, restraints=CRYSTAL_RESTRAINT) relax_md1 = MDMSJStringer( time=200, temp=50, timestep=0.001, ensemble='NVT', restraints=RETAIN_RESTRAINT, ) relax_md2 = MDMSJStringer( time=200, temp=50, timestep=0.001, ensemble='NVT', restraints=IGNORE_RESTRAINT, ) relax_md3 = MDMSJStringer( time=50.0, timestep=0.0001, temp=50, pressure=1.01325, restraints=IGNORE_RESTRAINT, backend_dot_integrator_dot_pressure_dot_isotropy=pressure_isotropy, randomize_velocity_dot_first=0.0, randomize_velocity_dot_interval='inf', randomize_velocity_dot_seed=2007, randomize_velocity_dot_temperature=50, ensemble='NPT', method='MTK', ensemble_dot_barostat_dot_tau=2, ensemble_dot_thermostat_dot_tau=1) relax_md4 = MDMSJStringer( time=5000.0, timestep=0.001, annealing=True, ensemble='NPT', method='MTK', restraints=IGNORE_RESTRAINT, ensemble_dot_barostat_dot_tau=2, ensemble_dot_thermostat_dot_tau=1, backend_dot_integrator_dot_pressure_dot_isotropy=pressure_isotropy, temperature="[[50.0 0 ][%s 5000 ]]" % temp, last=not compress_last) return [brownie1, relax_md1, relax_md2, relax_md3, relax_md4]
[docs]def get_compressive_relaxation_stringers( temp=300.0, compress_last=True, pressure_isotropy=constants.IsotropyPolicy.ANISOTROPIC): """ Get a list of Stringer objects that reproduce the compressive relaxation protocol :param float temp: The temperature for the last of the three stages :param bool compress_last: If true, compress the last stage - the trajectory will not be available except as part of the tgz file. If False, the last stage will not be compressed and the trajectory will be available. :param str pressure_isotropy: Barostat coupling between x, y, and z. :rtype: list :return: Each item of the list is a stage in the compressive Relaxation protocol """ brownie1 = BrownieMSJStringer(ensemble='NVT', temp=10.0, time=100, annealing='false') relax_md1 = MDMSJStringer(ensemble='NVT', time=24, timestep=0.001, trajectory_dot_interval=4.8) relax_md2 = MDMSJStringer(ensemble='NVT', temp=700, time=240, timestep=0.001, trajectory_dot_interval=4.8) relax_md3 = MDMSJStringer( temp=300, time=24, timestep=0.001, backend_dot_integrator_dot_pressure_dot_isotropy=pressure_isotropy, backend_dot_integrator_dot_pressure_dot_tension_ref='0', trajectory_dot_interval=4.8) relax_md4 = MDMSJStringer( temp=300, time=240, timestep=0.002, backend_dot_integrator_dot_pressure_dot_isotropy=pressure_isotropy, backend_dot_integrator_dot_pressure_dot_tension_ref='0', trajectory_dot_interval=24) relax_md5 = MDMSJStringer( temp=300, timestep=0.002, pressure=1013.25, time=10000, backend_dot_integrator_dot_pressure_dot_isotropy=pressure_isotropy, backend_dot_integrator_dot_pressure_dot_tension_ref='0', trajectory_dot_interval=48, trajectory_dot_write_velocity=True) relax_md6 = MDMSJStringer( temp=temp, timestep=0.002, time=10000, backend_dot_integrator_dot_pressure_dot_isotropy=pressure_isotropy, backend_dot_integrator_dot_pressure_dot_tension_ref='0', eneseq_dot_interval=100, trajectory_dot_interval=100, last=not compress_last) return [ brownie1, relax_md1, relax_md2, relax_md3, relax_md4, relax_md5, relax_md6 ]
[docs]def create_msj(stringers, filename=None, task_stage=True, extra_task_text="", check_cg=None, check_infinite=None, msj_string_header="", remove_com_motion=None): """ Write a Desmond .msj file based on the supplied stringers :type filename: str :param filename: If given, write the msj string to this path :type stringers: list of `MSJStringer` objects :param stringers: Each item of the list is the MSJStringer for a single stage. The stages will be written in list order :type task_stage: bool :param task_stage: Whether the traditional desmond auto task stage should be written before the stages in stringers :type extra_task_text: str :param extra_task_text: Additional text to place in the task stage. Ignored if task_stage=False. :type check_cg: `schrodinger.structure.Structure` :param check_cg: Check the given structure to see if it is coarse-grained. If it is, add the typical headers for the appropriate coarse-grained structure type. Note that the interplay between extra_task_test and the headers used for coarse-grained structures is uncertain and left to the caller to determine if the results are acceptable if both are used. Ignored if task_stage=False. :type check_infinite: `schrodinger.structure.Structure` :param check_infinite: Check the given structure to see if it is infinite. If it is, add the typical headers for the infinite systems. Ignored if task_stage=False. :type msj_string_header: when extra_task_text is empty, use this string as the header of the output msj :param msj_string_header: str :type remove_com_motion: bool or None :param bool remove_com_motion: None - default behavior, add remove_com_motion only to infinite atomic systems, if True add plugin to msj regardless. If False don't add remove_com_plugin :rtype: str :return: The string that was written to the file """ msj_string = "" if extra_task_text else msj_string_header if task_stage: msj_string += get_task_stage(extra_task_text=extra_task_text, check_cg=check_cg, check_infinite=check_infinite, msj_string_header=msj_string_header, remove_com_motion=remove_com_motion) for stringer in stringers: msj_string += stringer.createString() + '\n' if filename: with open(filename, 'w') as ofile: ofile.write(msj_string) return msj_string
[docs]def get_multisim_command(input_name, output_name, msj_name=None, job_name=None, for_shell=False, hosts=None, add_license=True, license_host=None, **kwargs): """ Return a command to run the Desmond multisim utility :type input_name: str :param input_name: The name of the input file :type output_name: str :param output_name: The name to use for the output CMS file :type msj_name: str :param msj_name: The name of the .msj command file :type job_name: str :param job_name: The name to use for the job :type gpu: bool :param gpu: Does nothing, left in for backwards compatibility. All calculations run on GPU :type procs: int :param procs: Does nothing, left in for backwards compatibility. All calculations run on GPU :type for_shell: bool :param for_shell: True if this will be used directly in a shell, such as when a panel writes a .sh file or used via subprocess. False if the command will be issued via normal Schrodinger job launching. If True, $SCHRORDINGER will be prepended to the command. :type hosts: str :param hosts: the hosts to run this job in the hostname:nprocs format :type add_license: bool :param add_license: Whether Desmond GPU license should be added to the command. Should be False when running force field assignment. :type license_host: None or str :param license_host: Host to be used to generate license. It will NOT appear in the returned cmd. To keep the host pass it in `hosts`. This option cannot be used together with `hosts` :rtype: list :return: The command line command to use """ # to allow deprecated keyword arguments assert set(kwargs.keys()).issubset(set(['gpu', 'procs', None])) assert not (hosts and license_host), ('"hosts" and "license_host" ' 'arguments cannot be both set.') # Note - this function intentionally uses '/' instead of os.path.join # because '/' works on all platforms - thus the command is ready to use on # platforms other than where it was written. mexec = '$SCHRODINGER/' if for_shell else '' mexec += 'utilities/multisim' cmd = [mexec, '-o', output_name, '-mode', 'umbrella', input_name] if msj_name: cmd += ['-m', msj_name] if job_name: cmd += ['-JOBNAME', job_name] if hosts: cmd += [cmdline.FLAG_HOST, hosts] if license_host: cmd += [cmdline.FLAG_HOST, license_host] if add_license: cmd = license.add_md_lic(cmd) if license_host: # Remove host flag and it's value host_idx = cmd.index(cmdline.FLAG_HOST) del cmd[host_idx:host_idx + 2] return cmd
def _get_default_timestep(index): """ Get the default value for the timestep at the specified index :param index: Index of the default timestep to return. Should be 0, 1 or 2 :type index: int :rtype: float :return: The default value of the far timestep in femtoseconds """ default_md_params = config.get_default_setting({}) return round(1000 * (default_md_params.timestep[index].val), 1)
[docs]def get_default_near_timestep(): """ Return the default near timestep :return: Default near timestep in femtoseconds :rtype: float """ return _get_default_timestep(0)
[docs]def get_default_far_timestep(): """ Return the default far timestep :return: Default far timestep in femtoseconds :rtype: float """ return _get_default_timestep(2)
[docs]def prepare_cms_properties_for_writing(model): """ Deletes several properties that should be removed before writing a CMS. Notably, this function removes the trajectory property. :param cms.Cms model: System to update """ props_to_remove = [ constants.CT_INDEX, cgff.FFIO_CT_TYPE_KEY, PROP_TRJ, cms.Cms.PROP_CMS ] for prop in props_to_remove: model.remove_cts_property(prop)
[docs]def delete_molecules_from_comps(model, comp_data): """ Delete molecules from specific components of a model system :param cms.Cms model: Desmond model/system to update :param dict comp_data: keys - components ids, values list of molecules ids :return set: Set of empty (removed) component names """ empty_components = set() for comp_id, mol_indices in sorted(comp_data.items(), reverse=True): assert 0 <= comp_id < len(model.comp_ct) comp_ct = model.comp_ct[comp_id] # We have chosen to not yet support components with multiple species # (MATSCI-10849) assert comp_ct.property.get(constants.NUM_COMPONENT) < 2 atoms_to_delete = set() for mol_id in mol_indices: assert 1 <= mol_id <= comp_ct.mol_total atoms_to_delete.update(comp_ct.molecule[mol_id].getAtomIndices()) # Delete pseudo atoms_to_delete if present pseudos = get_molecules_pseudos(comp_ct, mol_indices) if pseudos: comp_ct.ffio.deletePseudos(pseudos) comp_ct.deleteAtoms(atoms_to_delete) if comp_ct.atom_total == 0: empty_components.add(comp_id) # Delete empty component(s), if any for comp_id in sorted(empty_components, reverse=True): del model.comp_ct[comp_id] model.remove_cts_property(model.PROP_TRJ) model.synchronize_fsys_ct() return empty_components
[docs]def delete_molecules_from_model(model, mol_indices, cg_ff_path=None): """ Delete molecules from a Desmond system :param cms.Cms model: Model to update :param list(int) mol_indices: A list of integers containing the indices of the molecules that should be removed. These indices should correspond with the `model.molecule` indices. :param str cg_ff_path: The path to the force field file for the coarse-grain system. Used to reassign the forcefield for coarse-grained systems after delete molecules. :rtype: schrodinger.application.desmond.cms.Cms :return: A copy of the input model with the molecules removed :raises TypeError: When a coarse-grain model is given, but no `cg_ff_path` argument is given. """ if msutils.is_coarse_grain(model): if cg_ff_path is None: msg = msutils.dedent(f""" A coarse-grained system was supplied ({model.title}), but no "cg_ff_path" argument was given. """) raise TypeError(msg) new_model = delete_molecules_from_cg_model(model=model, mol_indices=mol_indices, ff_path=cg_ff_path) else: if cg_ff_path: msg = msutils.dedent(f""" "{model.title}" is not coarse-grained, but a "cg_ff_path" argument was supplied. The "cg_ff_path" argument will be ignored. """) warnings.warn(msg, SyntaxWarning) new_model = model.copy() delete_molecules_from_aa_model(new_model, mol_indices) return new_model
[docs]def delete_molecules_from_cg_model(model, mol_indices, ff_path): """ Delete molecules from a coarse-grained Desmond system :param cms.Cms model: Model to update :param list(int) mol_indices: A list of integers containing the indices of the molecules that should be removed. These indices should correspond with the `model.molecule` indices. :param str ff_path: The path to the force field file :rtype: schrodinger.application.desmond.cms.Cms :return: A copy of the input model with the molecules removed """ # The forcefield creation step makes a copy of the input model, so we need # to return a new instance of the model. If we are returning a new # instance, then the user is probably going to expect that their original # model is unchanged. We copy it here to make sure that happens. model = model.copy() atom_indices = [ atom_idx for mol_idx in mol_indices for atom_idx in model.molecule[mol_idx].getAtomIndices() ] # `Cms` objects are mutable data structures/mirrors for their `Cms.comp_ct` # child objects. This means that we can modify `model`, but the changes # will not propagate correctly. Thus we need to modify the elements within # `model.comp_ct` and then synchronize the mirror. Note that we only delete # atoms from the first component, because CG structures currently have only # one component. model.comp_ct[0].deleteAtoms(atom_indices) model.synchronize_fsys_ct() # Update the forcefield if model.atom_total: return cgff.create_cg_system(struct=model, ff_path=ff_path) else: # If we've deleted everything, then `cgff.create_cg_system` will fail. # So we instead delete all sites from the FFIO. ffio = model.comp_ct[0].ffio ffio.deleteSites(range(1, len(ffio.site) + 1)) return model
[docs]def delete_molecules_from_aa_model(model, mol_indices): """ Delete molecules from an all-atom system :param cms.Cms model: Model to update :param list(int) mol_indices: A list of integers containing the indices of the molecules that should be removed. These indices should correspond with the `model.molecule` indices. :rtype: set :return: Set of empty (removed) components names """ mol_indices_set = set(mol_indices) comp_data = collections.defaultdict(list) mol_counter = 0 for comp_idx, comp_ct in enumerate(model.comp_ct): for mol_idx, molecule in enumerate(comp_ct.molecule, start=1): mol_counter += 1 if mol_counter in mol_indices_set: comp_data[comp_idx].append(mol_idx) empty_components = delete_molecules_from_comps(model, comp_data) return empty_components
[docs]def parse_ene_file(filename=None, fileobj=None, prop=None, gcmc=False): """ Parse data from a Desmond .ene file. All the data from the file can be returned in a dictionary, or just the data from a single property can be returned in a list. :type filename: str :param filename: The name of the .ene file to read :type fileobj: file or io.BufferedReader :param fileobj: The open file object for the .ene file. Only one of filename or fileobj should be supplied. If both are given, filename will take precedence. io.BufferedReader objects come from tarfile.extractfile, for instance. Note - in Python 3, open file objects are io.TextIOWrapper :type prop: str :param prop: If given, should match one of the properties in the ene file header (such as 'V'). The return value will be a list of values for this property only :type gcmc: bool :param gcmc: Whether to parse GMCM or a standard MD file :rtype: dict, dict, or list, list or dict or list :return: If prop is not specified, the return value is a dict if gcmc is False, otherwise two dicts. Keys are property names from the ene header (without units, such as 'V') and values are lists of floating point values for that property in the order obtained from the .ene file. If prop is specified, the return value is a list if gcmc is False, otherwise two lists of those property values in the order found in the .ene file. :raise ValueError: If a line of unknown format is found while reading the file, or if neither filename or fileobj is specified. :raise TypeError: If prop is supplied but does not match a property in the header line """ if not filename and not fileobj: raise ValueError('Either filename or fileobj must be specified') close_file = False if filename: fileobj = open(filename, 'r') close_file = True elif isinstance(fileobj, io.BufferedReader): fileobj = io.TextIOWrapper(fileobj) elif not isinstance(fileobj, io.TextIOWrapper): raise TypeError('fileobj must be an open file instance') data = {} data_gcmc = {} is_gcmc_line = False for line in fileobj: line = line.strip() if not line: # Blank line, ignore continue elif line.startswith('#'): # A comment line may be the header line or may be nothing if data: # Already found the header line continue else: line = line[1:].strip() if line.startswith('0:'): # This is the header line tokens = line.split() headers = [x.split(':')[1] for x in tokens if ':' in x] data = {x: [] for x in headers} if gcmc: data_gcmc = {x: [] for x in headers} num_headers = len(headers) if prop and prop not in headers: if close_file: fileobj.close() raise TypeError( 'The requested property, %s, was not found in the ' 'property headers:\n%s' % (prop, str(headers))) else: continue elif not data: # Some line before the header line, ignore it continue else: # This is a data line tokens = line.split() if len(tokens) != num_headers: if close_file: fileobj.close() raise ValueError('There are %d headers but found line with %d ' 'values.\n%s' % (num_headers, len(tokens), line)) for header, value in zip(headers, tokens): if not prop or header == prop: if is_gcmc_line: data_gcmc[header].append(float(value)) is_gcmc_line = False else: if header == 'N_S': # This has value of '-' for the standard MD line value = 0.0 data[header].append(float(value)) if gcmc: is_gcmc_line = True if close_file: fileobj.close() if not prop: if gcmc: return data, data_gcmc else: return data else: if gcmc: return data[prop], data_gcmc[prop] else: return data[prop]
[docs]def get_density_from_ene(ene_file, struct, fraction=0.20): """ Parse the volume from the ene file and compute the density from it. The density is averaged over the last fraction of frames. :type ene_file: str :param ene_file: The name of the file to read in. :type struct: `schrodinger.structure.Structure` :param struct: The structure to compute the density for :type fraction: float :param fraction: The fraction of frames to average the density over. For instance, use 0.20 to compute the density over the final 20% of frames. Use the special value 0 to just use the final density. With fraction=0, the returned standard deviation is 0. :rtype: (float, float) :return: The mean density and standard deviation of the density. :raises IOError: If unable to read the volumes from the ene file (from get_prop_from_ene method) """ r_vol_mean, r_vol_stdev = get_prop_from_ene(ene_file, 'V', fraction=fraction, reciprocal=True) # 1.6605 is the product of (AMU to grams) and (A^3 to cm^3) density_mean = 1.6605 * struct.total_weight * r_vol_mean density_stdev = 1.6605 * struct.total_weight * r_vol_stdev return density_mean, density_stdev
[docs]def get_prop_from_ene(ene_file, prop, fraction=0.20, reciprocal=False): """ Parse the ene file and compute mean and std for a specific property. The property is averaged over the last fraction of frames. :type ene_file: str :param ene_file: The name of the file to read in. :type prop: str :param prop: The name of the property to read in :type fraction: float :param fraction: The fraction of frames to average the density over. For instance, use 0.20 to compute the prop over the final 20% of frames. Use the special value 0 to just use the final density. With fraction=0, the returned standard deviation is 0. :type reciprocal: bool :param reciprocal: If True, the mean and std will be done on the reciprocal of the property values. :rtype: (float, float) :return: The mean property and standard deviation of the property. (or the reciprocal ones if reciprocal=True) :raises IOError: If unable to read the volumes from the ene file :raises ValueError: If data have zeros, and the reciprocal of them are requested. """ try: prop_values = parse_ene_file(filename=ene_file, prop=prop) except (ValueError, TypeError, IOError) as msg: raise IOError(f'Unable to read {prop} from the ene file: %s\n' 'Error was: %s' % (ene_file, str(msg))) if not prop_values: raise IOError(f'No {prop} data found in the ene file') if prop_values[0] == 0 and prop == 'T': # The temperature of the first step can be zero prop_values = prop_values[1:] checked_frames = max(int(math.ceil(len(prop_values) * fraction)), 1) sel_prop_values = prop_values[-checked_frames:] if reciprocal: if not all(sel_prop_values): # If [0.] it returns array([inf]) # RuntimeWarning: divide by zero encountered in reciprocal # The std of inf gives RuntimeWarning: invalid value # encountered in subtract x = asanyarray(arr - arrmean) raise ValueError( f'{prop} data have zeros, and the reciprocal of them ' 'are requested.') sel_prop_values = numpy.reciprocal(sel_prop_values) prop_mean = numpy.mean(sel_prop_values) prop_stdev = numpy.std(sel_prop_values) return prop_mean, prop_stdev
[docs]def update_pdb_pbc(icms_obj=None, icms_file=None, ocms_file=None): """ Update the PDB PBC properties given a cms-type input. :type icms_obj: `cms.Cms` or None :param icms_obj: input cms.Cms object or None if icms_file is given instead :type icms_file: str or None :param icms_file: input .cms file name or None if icms_obj is given instead :type ocms_file: str or None :param ocms_file: if provided the updated cms.Cms will be written to this file, if None then no file will be written :raise RuntimeError: if there is a problem with the input :rtype: cms.Cms :return: the updated cms.Cms """ if (icms_obj is None and icms_file is None) or \ (icms_obj is not None and icms_file is not None): msg = ('Must provide one or the other as input: cms.Cms ' 'or .cms file name.') raise RuntimeError(msg) if icms_file is not None: icms_obj = cms.Cms(icms_file) # assume the following sts all have the same chorus PBC properties sts = [icms_obj._raw_fsys_ct, icms_obj.fsys_ct] + icms_obj.comp_ct for st in sts: xtal.sync_pbc(st, in_place=True) if ocms_file is not None: icms_obj.write(ocms_file) return icms_obj
[docs]def get_desmond_trj_frames(cms_path=None, traj_dir=None, traj_frames=None): """ Yield a desmond trajectory frame by frame. The memory taken by the previous frame is released before yielding the next one. :param cms_path: path to the CMS file :type cms_path: `str` or None :param traj_dir: path to trajectory dir :type traj_dir: `str` or None :param traj_frames: trajectory to yield :type traj_frames: An iterable of trajectory frames :raise RuntimeError: if there is a problem with the input :rtype: iterator(`schrodinger.application.desmond.packages.traj.frame`) :return: an iterator for a trajectory frame """ if not any([cms_path, traj_dir, traj_frames]): msg = ('Must provide one as input: cms path ' ', traj dir or a trajectory frame iterator.') raise RuntimeError(msg) if not traj_frames: if not traj_dir: traj_dir = topo.find_traj_path_from_cms_path(cms_path) traj_frames = traj.read_traj(traj_dir) for frame in traj_frames: yield frame # garbage-collect a used trajectory frame if frame._source: frame._source.clear_cache() frame._object = None
[docs]def get_desmond_frames_in_range(traj_dir, trj_min, trj_max, trj_step=None): """ Get desmond frames from trajectory within the selected range (top and bottom included). :type traj_dir: str :param traj_dir: Path to the trajectory directory. :type trj_min: int or None :param trj_min: Index of frame in trajectory to start with. If None it will start from the begining of the trajectory, :type trj_max: int or None :param trj_max: Index of frame in trajectory to end at. If None it will end at the ending of the trajectory :type trj_step: int or None :param trj_step: The step for the trajectory :rtype: list :return: list of trajectory frames selected :raise ValueError: Will raise if no frames are selected. """ frames = traj.read_traj(traj_dir) start = trj_min or 0 # end can be zero, for single frame. frames start from 0 hence subtract 1 end = len(frames) - 1 if trj_max is None else min(trj_max, len(frames) - 1) if start > end: raise ValueError('Minimum trajectory frame cannot be larger than the ' f'maximum, {end}.') step = trj_step or 1 # Always add the last step frames = frames[start:end:step] + [frames[end]] return frames
[docs]def get_desmond_frames_from_options(options): """ Convenience function to get desmond frames in the range specified by the user from commandline options :param parserutils.SubscriptableNameSpace options: Parsed commandline options :return list: List of trajectory frames """ return get_desmond_frames_in_range(options[parserutils.FLAG_TRJ_PATH], options[parserutils.FLAG_TRJ_MIN], options[parserutils.FLAG_TRJ_MAX], options[parserutils.FLAG_TRJ_STEP])
[docs]def get_cutoff(length, percent): """ Get cutoff given length and percent. :param int length: Length :param float percent: Percent for the cutoff :return int: Cutoff (minimum 1) """ assert length >= 0 assert 0 <= percent <= 100 # At least one data point should be used return int(int(length) * percent / 100.0) or 1
[docs]def parse_enegrp_file(enegrp_fn, props=ALL_ENEGRP_PROPS): """ Parse data from a Desmond `*enegrp.dat` file. All the data from the file can be returned in a dictionary where key is the property name and values are dict, key is the time and values are the property value at that time. :param str enegrp_fn: name of enegrp file to read :props list props: list of properties to parse. Check ALL_ENEGRP_PROPS for the list of supported properties. :rtype: dict :return: dictionary where key is the property name and values are namedtuple. The namedtuple has two fields, time and value. The value is the value for the property at that timestep. Value at the timestep can be a float or a list depending on the property. """ # TODO : Test if AtomGroup properties are implemented properly enegrp_data = collections.defaultdict(list) with open(enegrp_fn) as enegrp_fp: # Properties are in 2 formats. # First inline time, with format time=value prop1=value prop2=value # Second is a line for the property, like # prop3 (time_value) prop_value # prop4 (time_value) prop_value_1 prop_value_2 for idx, line in enumerate(enegrp_fp): # Spliting line to list line_split = line.strip().split() if TIME_ENGRP in line: # Change to format [['time', '0.000000'], # ['en', '1.87284114e+03'], 'E_p', '-3.23865615e+03'], ...] # for each element of the list first element is property name # and second is its value prop_split = [x.split('=') for x in line_split] for index, (prop, value) in enumerate(prop_split): if index == 0: # Get time value for the properties time_val = float(value) else: # Add property to dictionary if property is required if prop in props: enegrp_data[prop].append( ENEGRP_VALUE(time=time_val, value=float(value))) else: # ['Pressure_Tensor', '(1.200000)', '-1308.5', '212.09' ... # 1st element is property name and 2nd is the time. Rest of # the element are values prop = line_split[0] if prop in props: time_str = line_split[1] # MATSCI-5317 where the space between bracket and is # occupied by negative sign # ['Virial', '(0.000000)-8.7375e+05', '-6711.3', '6850.2',.. # this occurs due data type issue in desmond code for time # string end_bracket_idx = time_str.index(')') if not len(time_str) - 1 == end_bracket_idx: split_time_str = time_str.split(')') time_str = split_time_str[0] line_split.insert(2, split_time_str[1]) time_val = float(time_str.replace('(', '').replace(')', '')) if len(line_split) == 3: # In case the value of property is a single value like # for Dispersion_Correction, Self_Energy_Correction values = float(line_split[2]) else: # In case there are multple values like Pressure_Tensor, # nonbonded_vdw values = list(map(float, line_split[2:])) enegrp_data[prop].append( ENEGRP_VALUE(time=time_val, value=values)) return enegrp_data
[docs]def get_ptensor_from_enegrp(enegrp_fn, percent_to_avg, log=None): """ Extract and calculate average pressure from an energy group file. :type enegrp_fn: str :param enegrp_fn: Energy group filename :type percent_to_avg: float :param percent_to_avg: Percent to use when averaging pressure tensor :type log: method :param log: Log method :rtype: numpy.array() :return: Averaged pressure tensor """ # Send to /dev/null if log is None log = log if log else lambda x: None msg_desmond = 'The Desmond simulation did not finish successfully.' msg_null_pt = 'The pressure tensor for this step will be set to null.' null_ptensor = numpy.zeros(9) try: grp_values = parse_enegrp_file(enegrp_fn, props=[PRESS_TENSOR_ENGRP]) ptensor_grp = [x.value for x in grp_values[PRESS_TENSOR_ENGRP]] except IOError: log('%s Could not read file: %s. %s' % (msg_desmond, enegrp_fn, msg_null_pt)) return null_ptensor # At least one data point should be used cutoff = get_cutoff(len(ptensor_grp), percent_to_avg) ptensor = numpy.array(ptensor_grp[-cutoff:]) return ptensor.mean(axis=0)
[docs]def get_lattice_param_from_enegrp(enegrp_fn, percent_to_avg): """ Extract and calculate average lattice parameter from an energy group file. :type enegrp_fn: str :param enegrp_fn: Energy group filename :type percent_to_avg: float :param percent_to_avg: Percent to use when averaging lattice parameter :raise IOError: fail in reading enegrp file :rtype: numpy.ndarray :return: Averaged lattice parameters matrix """ try: grp_values = parse_enegrp_file(enegrp_fn, props=[SIM_BOX_ENGRP]) except IOError as err: raise IOError(f"Cannot load {enegrp_fn}. ({str(err)})") from None latt_param_grp = [x.value for x in grp_values[SIM_BOX_ENGRP]] # At least one data point should be used cutoff = get_cutoff(len(latt_param_grp), percent_to_avg) latt_param = numpy.array(latt_param_grp[-cutoff:]) return latt_param.mean(axis=0)
[docs]def get_ptensor_from_trj(trj, percent_to_avg, log=None): """ Extract and calculate average pressure from trajectory. Trajectory must be in the DTR format trajectory.write_velocity should be True. :type trj: str or iterable :param trj: Trajectory directory or iterable trajectory :param float percent_to_avg: Percent to use when averaging pressure tensor :type log: method :param log: Log method :rtype: numpy.array :return: Averaged pressure tensor """ # Send to /dev/null if log is None log = log if log else lambda x: None null_ptensor = numpy.zeros(9) if isinstance(trj, str): try: trj_iter = traj.read_traj(trj, return_iter=True) except (IOError, RuntimeError): log('Could not open trajectory: %s.' % trj) return null_ptensor else: trj_iter = trj ptensor = numpy.array( [fr._frame().pressure_tensor.flatten() for fr in trj_iter]) # At least one data point should be used cutoff = get_cutoff(len(ptensor), percent_to_avg) ptensor_part = ptensor[-cutoff:] return ptensor_part.mean(axis=0)
[docs]def get_ptensor_from_plugin(ptensor_fn, percent_to_avg, print_volume=False, log=None): """ Extract and calculate average pressure from pressure tensor plugin. :param str ptensor_fn: Output file of ptensor plugin :param float percent_to_avg: Percent to use when averaging pressure tensor :type log: method or None :param log: Log method. If None, print will be used :rtype: numpy.array :return: Averaged pressure tensor """ # Send to /dev/null if log is None log = log if log else lambda x: None start = 1 # First column is time end = -1 if print_volume else None # Last column is volume if print_volume is True null_ptensor = numpy.zeros(9) try: ptensor = numpy.loadtxt(ptensor_fn, ndmin=2)[:, start:end] except IOError: log('Could not read pressure tensor from %s.' % ptensor_fn) return null_ptensor # At least one data point should be used cutoff = get_cutoff(len(ptensor), percent_to_avg) ptensor_part = ptensor[-cutoff:] return ptensor_part.mean(axis=0)
[docs]def deform_cms_model(model, box, remap=True): """ Deform the CMS box using values from new_box, and atom coordinates in the model are scaled proportionally to fit the new box if remap=True. :type model: `cms.Cms` :param model: model to modify :type remap: bool :param remap: whether to strain the atom coordinates in the cell :type box: list :param box: deform to this box (9 values) """ vecs = numpy.array(box).reshape((3, 3)) for ct in model.get_cts(): xtal.strain_cell(ct, vecs, remap=remap) model.synchronize_fsys_ct()
[docs]def is_infinite(model): """ Return a boolean indicating whether the given desmond model has components that are infinitely bonded, meaning that it has bonds that cross the periodic boundary which can not be eliminated by means of molecule contraction. :type model: `cms.Cms` :param model: model to check :rtype: bool :return: True if infinite, False otherwise """ for ct in model.comp_ct: if ct.property.get(NUM_COMPONENT, 0) == 1: # All molecules are the same in this component, check only first one atom_indices = ct.molecule[1].getAtomIndices() else: atom_indices = None if xtal.is_infinite2(ct, atom_indices=atom_indices): return True return False
[docs]def set_physical_properties(model): """ Set cell formula, volume and density to model. :type: cms.Cms :param: Model to set """ vecs = numpy.array(model.box).reshape((3, 3)) formula, volume, density = xtal.get_physical_properties(model, vecs=vecs) model.set_cts_property(xtal.Crystal.UNIT_CELL_FORMULA_KEY, formula) model.set_cts_property(xtal.Crystal.UNIT_CELL_VOLUME_KEY, volume) model.set_cts_property(xtal.Crystal.UNIT_CELL_DENSITY_KEY, density)
[docs]@contextlib.contextmanager def cms_writer(cms_fn_in, cms_fn_out=None): """ Context manager to modify desmond CMS model. with desmondutils.cms_writer(cms_in) as model: model.set_cts_property('r_matsci_real_property', 11.2) :param str cms_fn_in: Input CMS filename :type cms_fn_out: str or None :param cms_fn_in: Output CMS filename. If None, cms_fn_in will be used :yield: cms.Cms """ model = cms.Cms(cms_fn_in) yield model model.write(cms_fn_out or cms_fn_in)
def _get_lipid_ff_hash(struct): """ Return a lipid forcefield hash for the given structure. :type struct: `schrodinger.structure.Structure` :param struct: the structure :rtype: tuple :return: the lipid ff hash """ # the lipid forcefield assignment using templates in # mmshare/data/system_builder/lipid_charge.mae only # need consistent atom ordering and PDB naming return tuple(atom.pdbname for atom in struct.atom) def _get_lipid_ff_st_dict(): """ Return the lipid forcefield structure dictionary. :rtype: dict :return: the lipid forcefield structure dictionary, keys are structure titles, values are stucture.Structure """ # this file has single lipid molecules, one for each type lc_file_path = os.path.join(SYSTEM_BUILDER_DATA_PATH, 'lipid_charge.mae') st_dict = {} for struct in structure.StructureReader(lc_file_path): st_dict[struct.title] = struct return st_dict def _get_lipid_ff_hash_dict(): """ Return the lipid forcefield hash dictionary. :rtype: dict :return: the lipid forcefield hash dictionary, keys are tuples, values are lipid structure titles """ st_dict = _get_lipid_ff_st_dict() hash_dict = {} for title, struct in st_dict.items(): hash_dict[_get_lipid_ff_hash(struct)] = title return hash_dict def _get_lipid_ffio_st(title): """ Return the lipid ffiostructure.FFIOStructure for the given title. :type title: str :param title: the title of the lipid :raise ValueError: if there is no lipid template file :rtype: ffiostructure.FFIOStructure :return: the ffiostructure for the sought lipid """ # these files have two structures (1) entire lipid membrane model # and (2) waters lipid_file_path = os.path.join(SYSTEM_BUILDER_DATA_PATH, f'{title}.mae.gz') if not os.path.exists(lipid_file_path): raise ValueError( f'The lipid template file {lipid_file_path} can not be found.') lipid_st = structure.Structure.read(lipid_file_path) ffio_st = ffiostructure.FFIOStructure(lipid_st.handle) return ffio_st
[docs]def assign_lipid_ff(model): """ Assign the lipid forcefield to the given model. :type model: cms.Cms :param model: the model containing the lipid :raise ValueError: if there is no known lipid present """ hash_dict = _get_lipid_ff_hash_dict() idx_to_ffio_st = {} for idx, comp_ct in enumerate(model.comp_ct): rep_mol = comp_ct.molecule[1] rep_st = rep_mol.extractStructure() ahash = _get_lipid_ff_hash(rep_st) title = hash_dict.get(ahash) if not title: continue idx_to_ffio_st[idx] = _get_lipid_ffio_st(title) if not idx_to_ffio_st: raise ValueError('No known template lipid is present.') for idx, ffio_st in idx_to_ffio_st.items(): # do this manually rather than using the ffiostructure.FFIOStructure # API, it appears that the API doesn't work quite right if the instantiation # is done using ffiostructure.FFIOStructure directly, but rather only works if # the instantiation is done via a cms.Cms and accessed as .comp_ct[x] which is # of type ffiostructure.FFIOStructure ffhandle = mm.mmffio_ff_mmct_get(ffio_st.handle) ffio = ffiostructure._FFIO(ffhandle) model.comp_ct[idx].ffhandle = ffhandle model.comp_ct[idx].ffio = ffio
[docs]def combine_trajectories(trj_path, trj_dirs, trj_int, backend): """ Combine multiple trajectories into a single one :param str trj_path: The path to the final trj directory :param list trj_dirs: List of trajectory directory paths :param float trj_int: Trajectory recording interval in ps :type backend: `schrodinger.job._Backend` or None :param backend: Backend handle, or None if not running under job control :rtype: None or str :return: The path to the combined trj file, or None if the operation failed """ # MATSCI-5668: Create a low memory trajectory frame iterator traj.Source.CACHE_LIMIT_BYTES = 0 trajs = [] for trj in trj_dirs: # Add try except block for missing directory or directory with bad # format due to unexpected failure. try: trajs.append(traj.read_traj(trj, return_iter=True)) except (OSError, RuntimeError): continue result = traj.concat(0., trj_int, *trajs) traj.write_traj(result, trj_path) if backend: backend.addOutputFile(trj_path) return trj_path
[docs]def use_monomer_charges(struct): """ Add charges to the structure based on the charges computed by monomer. To compute monomer charges we create a fragment for each residue that includes the residue and all attached residues or atoms. Whether attached residues or attached atoms are used depends on the size of the created fragment. The partial_charge property of atoms in the structure will be modified with the computed charges and also the property 'r_ffio_custom_charge' will be added to each atom in the structure to indicate that these charges should be used by the force field :type struct: `schrodinger.structure.Structure` :param struct: The structure to compute charges for. """ with common.opls_force_field() as handle: # Make sure we can track atom indexes after we extract into a new structure for atom in struct.atom: atom.property[CHARGE_ORDER_PROP] = atom.index # Compute the charges for each residue, and transfer them to the complete # structure for res in struct.residue: # Create the fragment including attached atoms/residues fragment_residues, attached_atoms = _gather_q_residues(res) fragment = _create_q_fragment(struct, fragment_residues, attached_atoms, res) # Compute the OPLS3 charges - note that OPLS3 computes charges # independent of the geometry with common.assign_force_field(handle, fragment): pass # Transfer the computed charges for this residue to the main structure for fatom in fragment.atom: if fatom.resnum == res.resnum: orig_index = fatom.property.get(CHARGE_ORDER_PROP) if orig_index: orig_atom = struct.atom[orig_index] orig_atom.partial_charge = fatom.partial_charge balance_charge(struct) # Remove the index tracking property msutils.remove_atom_property(struct, CHARGE_ORDER_PROP) # Mark the charge for use in the force field partial_to_custom_charges(struct)
def _gather_q_residues(res): """ Find all atoms and residues attached to res :type res: `schrodinger.structure._Residue` :param res: The residue to find neighbors for :rtype: (set, set) :return: A set of all residues attached to res (including res itself) and a set of all atoms attached to res but not part of res """ fragment_residues = {res} attached_atoms = set() this_res = res.resnum this_chain = res.chain for atom in res.atom: for batom in atom.bonded_atoms: # Residues can have the same numbers if they are part of a # different chain (due to branching or cascading) if batom.resnum != this_res or batom.chain != this_chain: bres = batom.getResidue() fragment_residues.add(bres) # Some residues are only single atoms - like an N atom # that cascades three branches. In this case, add the next # residue out as well. bheavies = sum(1 for x in bres.atom if x.element != 'H') if bheavies == 1: for natom in batom.bonded_atoms: if (natom.resnum != batom.resnum or natom.chain != batom.chain): fragment_residues.add(natom.getResidue()) attached_atoms.add(batom) return fragment_residues, attached_atoms def _create_q_fragment(struct, fragment_residues, attached_atoms, res): """ Create a fragment that includes this residue and all monomers that attach to it. If that fragment is too large for OPLS3, create a fragment that is just the this residue and the atoms that attach to it :type struct: `schrodinger.structure.Structure` :param struct: The structure containing fragment_residues :type fragment_residues: iterable :param fragment_residues: A collection of _Residue objects for the residues to include in the fragment :type attached_atoms: iterable :param attached_atoms: A collection of _StructureAtom objects for the atoms in the neighboring residues that attach to res :type res: `schrodinger.structure._Residue` :param res: The central residue of the fragment :rtype: `schrodinger.structure.Structure` :return: The fragment that was formed """ indexes = [] truncated = False for fres in fragment_residues: indexes.extend(fres.getAtomIndices()) # Note: 250 and 100 are hardcoded OPLS3 parameters if len(indexes) > 250 or sum( 1 for x in indexes if struct.atom[x].element != 'H') > 100: # The entire fragment will be too large, create a truncated fragment # with the just the neighboring attached atoms indexes = set(res.getAtomIndices()) for batom in attached_atoms: indexes.add(batom.index) # We'll also add some neighboring atoms to the attached atoms if # they seem likely to strongly affect the charges for natom in batom.bonded_atoms: if not (batom.element == 'C' and natom.element == 'C'): indexes.add(natom.index) truncated = True fragment = struct.extract(indexes) _add_h_to_fragment_sites(fragment, truncated, res.resnum) return fragment def _add_h_to_fragment_sites(fragment, truncated, this_res): """ Add hydrogens to all sites that had bonds broken to form this fragment :type fragment: `schrodinger.structure.Structure` :param fragment: The structure to modify :type truncated: bool :param truncated: Whether this is a fragment with neighboring monomers truncated to just the first one or two atoms :type this_res: int :param this_res: The residue number for the central residue """ fragsites = [] for atom in fragment.atom: if truncated: if atom.resnum != this_res: fragsites.append(atom.index) else: for prop in amorphous.ATOM_PROP_MARKER: if atom.property.get(prop): fragsites.append(atom.index) build.add_hydrogens(fragment, atom_list=fragsites)
[docs]def balance_charge(struct): """ Ensure that the total of the partial charges on the molecule equals the total of the formal charges. :type struct: `schrodinger.structure.Structure` :param struct: The structure to modify """ pqsum = sum(x.partial_charge for x in struct.atom) fqsum = sum(x.formal_charge for x in struct.atom) delta = pqsum - fqsum del_per_atom = -delta / struct.atom_total for atom in struct.atom: atom.partial_charge += del_per_atom
[docs]def partial_to_custom_charges(struct): """ Set the current partial charges as custom FF charges :type struct: `schrodinger.structure.Structure` :param struct: The structure to modify """ for atom in struct.atom: atom.property[constants.FF_CUSTOM_CHARGE] = atom.partial_charge
[docs]class AnalysisDriverMixin: """ Class containing methods that are shared by most trajectory analysis drivers. """
[docs] def setDefaultAnalysisParams(self, opts, static_allowed=False): """ Set default class variables that all trajectory analysis drivers use. :param `parserutils.SubscriptableNameSpace` opts: options for running the driver :param bool static_allowed: whether static frames are allowed """ self.jobname = jobutils.get_jobname(opts.cms_file) # Load the structure self.msys_model, self.cms_model = topo.read_cms(opts.cms_file) if opts.trj is None and static_allowed: self.trj = None self.num_frames = None else: # Fix trajectory path opts.trj = fileutils.get_existing_filepath(opts.trj) # Load the trajectory frames self.trj = get_desmond_frames_in_range(opts.trj, opts.trj_min, opts.trj_max, opts.trj_step) self.num_frames = len(self.trj) # If the structure has single frame then change the model to that frame # and do not track trajectory if self.num_frames == 1 and static_allowed: self.num_frames = None frame = self.trj[0] topo.update_ct(self.cms_model, self.cms_model, frame) self.trj = None
[docs] def trjFrames(self): """ Yield the trajectory frame by frame. The memory taken by the previous frame is released before yielding the next one. :rtype: iterator(`schrodinger.application.desmond.packages.traj.frame`) :return: an iterator for trajectory frames """ # Warning: Should not be used when the trajectory is centered! Otherwise # it will go back to original trajectory for the second iteration return get_desmond_trj_frames(traj_frames=self.trj)
[docs] def getOutfileName(self, extension, add_to_backend=True): """ Get an outfile name by appending the extension to the job name :param bool add_to_backend: Whether the outfile should be added to backend :return str: The outfile name """ fname = self.jobname + extension if add_to_backend: jobutils.add_outfile_to_backend(fname) return fname
[docs] def writeOutputCmsAndData(self, wam_type, per_structure_wam=False): """ Write the modified cms_model and set it as output structure. If a data file is generated (data_filename) if will be added to the backend. :param int wam_type: One of the enums defined in workflow_action_menu.h :param bool per_structure_wam: Whether the WAM should be set on the cms model instead of file-level """ # Prepare cms_model to write jobutils.set_source_path(self.cms_model) self.cms_model.remove_cts_property(PROP_TRJ) # Write the cms structure output_file = self.jobname + '-out.cms' if per_structure_wam: jobutils.set_structure_wam(self.cms_model, wam_type) self.cms_model.write(output_file) else: jobutils.write_cms_with_wam(self.cms_model, output_file, wam_type=wam_type) jobutils.add_outfile_to_backend(output_file, set_structure_output=True) # Write the data file if it exists if hasattr(self, 'data_filename'): jobutils.add_outfile_to_backend(self.data_filename)
[docs] def getFirstCellPositions(self, frame): """ Get all the positions from the frame and then translate to first unit cell :param 'schrodinger.application.desmond.packages.traj.Frame' frame: current frame :return 'numpy.ndarray': numpy array containing translated positional coordinates """ # Get all the positions of atoms in the frame pbc = analysis.Pbc(frame.box) pos = frame.pos() # Wrap the trajectory positions to first box. pos[:] = pbc.calcMinimumImage(numpy.zeros([1, 3]), pos) return pos
[docs] def exportStructuresForDebugging(self): """ Export the full system structure at each frame for debugging """ cms_model = self.cms_model.copy() path = self.getOutfileName('_debug.maegz') for idx, frame in enumerate(self.trj): topo.update_ct(cms_model.fsys_ct, cms_model, frame) cms_model.fsys_ct.title = f'Frame {idx}' cms_model.fsys_ct.append(path)
[docs]def validate_ff_and_water_model(ffld, model): """ Verify that the chosen water model is compatible with the chosen forcefield :param str ffld: The name of the forcefield :param str model: The name of the water model. From water combobox use currentData method. :rtype: True or (False, str) :return: The return type is compatible with AF2 validators. Either True if everything is OK, or False with an error message if not. """ if ffld == OPLS2005: if model not in msconst.VALID_WATER_FFTYPES_OPLS2005: valid = ', '.join(msconst.VALID_WATER_FFTYPES_OPLS2005) return (False, f'Water model {model} cannot be used with {ffld}.' f' Valid models are: {valid}.') return True
[docs]def struct_has_ffio_ct_type(st): """ Return True if the given structure has the 's_ffio_ct_type' property defined. :type st: `schrodinger.structure.Structure` :param st: structure to check :rtype: bool :return: True if defined """ return st.property.get(CT_TYPE, False)
[docs]def get_cms_and_trj_path_from_st(struct): """ Get trajectory and cms path for the passed structure in maestro using structure property :param struct `schrodinger.structure.Structure`: Structure to get associated cms and trajectory of :param source_path str: Path to source cms and trj folder :return str,str: Path to the cms file trajectory frames if both are found. """ cms_file = struct.property.get(mm.M2IO_DATA_ORIGINAL_CMS_FILE) trj_dir = struct.property.get(mm.M2IO_DATA_TRAJECTORY_FILE) return cms_file, trj_dir
[docs]def make_pos_restraints(model, asl, force_const=500.): """ Create kwargs that can be used in MD MSJ stringers to restrain atoms :param cms.Cms model: the Desmond system that you want to restrain :param str asl: the atom-specification-language string used to identify the atoms in the system that should be restrained :param float force_const: the force constant of the restraints that you want imposed on the specified atoms, in units of kcal/mol/Ang^2 :rtype: dict(str=str) :return: key-word arguments that can be passed to `MDMSJStringer` to restrain the atoms """ indices = analyze.evaluate_asl(model.fsys_ct, asl) if not indices: msg = (f'No atoms found in the "{model.title}" system that match the ' f'"{asl}" search string.') raise ValueError(msg) # Explicitly write atom indices (MATSCI-7520) indices_str = ' '.join(str(idx) for idx in indices) restraints = msutils.dedent(f""" [{{ name = posre_harm atoms = ["atom.num {indices_str}"] force_constants = [{force_const} {force_const} {force_const}] }}] """, rm_breaks=False) restraints_kwargs = { 'restraints_dot_existing': constants.EXISTING_RESTRAINT.IGNORE, 'restraints_dot_new': restraints } return restraints_kwargs