Source code for schrodinger.application.matsci.reaction_workflow_utils

"""
Utilities for reaction workflows.

Copyright Schrodinger, LLC. All rights reserved.
"""

import argparse
import copy
import csv
import enum
import glob
import itertools
import json
import os
import pandas
import re
import shutil
import math
import warnings
from collections import OrderedDict
from collections import namedtuple
from types import SimpleNamespace

import networkx as nx
import numpy
import yaml
from collections import defaultdict
from scipy import constants

from schrodinger import structure
from schrodinger import adapter
from schrodinger.application.jaguar.input import JaguarInput
from schrodinger.application.jaguar.output import JaguarOutput
from schrodinger.application.macromodel import utils as mmodutils
from schrodinger.application.matsci import anharmonic
from schrodinger.application.matsci import desmondutils
from schrodinger.application.matsci import etarotamers
from schrodinger.application.matsci import etatoggle
from schrodinger.application.matsci import \
    jaguar_multistage_workflow_utils as jmswfu
from schrodinger.application.matsci import jaguar_restart
from schrodinger.application.matsci import textlogger
from schrodinger.application.matsci import jaguarworkflows
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import msutils
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import msprops
from schrodinger.application.matsci import \
    genetic_optimization as go
from schrodinger.application.matsci import swap_fragments_utils as sfu
from schrodinger.infra import mm
from schrodinger.job import jobcontrol
from schrodinger.structutils import analyze
from schrodinger.structutils import rmsd
from schrodinger.structutils import rings
from schrodinger.structutils import rgroup_enumerate
from schrodinger.structutils import build
from schrodinger.math import mathutils
from schrodinger.test.stu.common import zip_directory
from schrodinger.utils import csv_unicode
from schrodinger.utils import fileutils
from schrodinger.utils import subprocess
from schrodinger.utils import units
from schrodinger.utils.license import is_opls2_available

FLAG_N_CONFORMERS = '-n_conformers'
FLAG_PP_REL_ENERGY_THRESH = '-pp_rel_energy_thresh'
FLAG_RETURN_CSEARCH_FILES = '-return_csearch_files'
FLAG_SKIP_ETA_ROTAMERS = '-skip_eta_rotamers'

FLAG_JAGUAR = '-jaguar'
FLAG_JAGUAR_KEYWORDS = '-jaguar_keywords'
FLAG_TEMP_START = '-temp_start'
FLAG_TEMP_STEP = '-temp_step'
FLAG_TEMP_N = '-temp_n'
FLAG_PRESS_START = '-press_start'
FLAG_PRESS_STEP = '-press_step'
FLAG_PRESS_N = '-press_n'
FLAG_MAX_I_FREQ = anharmonic.FLAG_MAX_I_FREQ
FLAG_TPP = parserutils.FLAG_TPP

FLAG_DESCRIPTORS = '-descriptors'
FLAG_LIGFILTER = '-ligfilter'
FLAG_CANVAS = '-canvas'
FLAG_MOLDESCRIPTORS = '-moldescriptors'
FLAG_INCLUDE_VECTORIZED = '-include_vectorized'
FLAG_FINGERPRINTS = '-fingerprints'
FLAG_SAVEFILES = '-savefiles'
FLAG_STPROPS = '-stprops'

FLAG_INPUT_FILE = '-input_file'

DEFAULT_JOB_NAME = 'reaction_workflow'

DEFAULT_N_CONFORMERS = 5
# keys and values should be strings
DEFAULT_JAGUAR_KEYWORDS = OrderedDict([('dftname', 'B3LYP'), ('iuhf', '1'),
                                       ('basis', 'LACVP**')])
DEFAULT_TPP = 1
# OPLS_2005 does not allow ZOB to sp3 carbon while OPLS3 does
DEFAULT_FORCE_FIELD_NAME = mm.OPLS_NAME_F16  # OPLS3

RESERVED_JAGUAR_KEYS = set([
    'molchg', 'multip', 'igeopt', 'ifreq', 'itrvec', 'nhesref', 'ntemp',
    'tmpini', 'tmpstp', 'npress', 'press', 'press_step'
])

REACTION_WF_TAG = '_rxnwf'

REACTION_WF_STRUCTURE_KEY = sfu.REACTION_WF_STRUCTURE_KEY
CONFORMERS_GROUP_KEY = sfu.CONFORMERS_GROUP_KEY
SIBLING_GROUP_KEY = sfu.SIBLING_GROUP_KEY
PARENT_SIBLING_GROUPS_KEY = sfu.PARENT_SIBLING_GROUPS_KEY
CHARGE_KEY = msprops.CHARGE_PROP
MULTIPLICITY_KEY = msprops.MULTIPLICITY_PROP
KEEP_ATOM_KEY = sfu.KEEP_ATOM_PROP
SUPERPOSITION_ATOM_KEY = sfu.SUPERPOSITION_ATOM_PROP
RESTRAINED_ATOM_KEY = 'b_matsci_Reaction_Workflow_Restrained_Atom'
RESTRAINED_DISTANCES_KEY = sfu.RESTRAINED_DISTANCES_KEY
RESTRAINED_ANGLES_KEY = sfu.RESTRAINED_ANGLES_KEY
RESTRAINED_DIHEDRALS_KEY = sfu.RESTRAINED_DIHEDRALS_KEY
TRANSITION_STATE_STRUCTURE_KEY = sfu.TRANSITION_STATE_STRUCTURE_KEY

INDEX_SEPARATOR = sfu.INDEX_SEPARATOR
PARENT_SEPARATOR = ','
SEPARATOR = sfu.SEPARATOR

ENTRY_ID_KEY = msprops.ENTRY_ID_PROP

RATE_CONSTANT_STRING = 'Rate_Constant'
CONF_AVG_CTST_RATE_CONSTANT = 'Conf_Avg_CTST_Rate_Constant'

TempData = namedtuple('TempData', ['temp_start', 'temp_step', 'temp_n'])
PressData = namedtuple('PressData', ['press_start', 'press_step', 'press_n'])
ReactProdTS = namedtuple('ReactProdTS', ['ts_name', 'other_name'])
SameTypeGSTS = namedtuple('SameTypeGSTS', ['ts_name', 'other_name', 'is_gsgs'])
Pair = namedtuple('Pair', ['first', 'second'])

DF_COL_TS_SIBLING, DF_COL_OTHER_SIBLING = 'TS_Sibling', 'Other_Sibling'
DF_COL_EN_PROP = 'Energy_Property'
DfCustomRateColumns = namedtuple('DfCustomRateColumns', [
    DF_COL_TS_SIBLING, DF_COL_OTHER_SIBLING, 'Temperature_K', 'Pressure_atm',
    DF_COL_EN_PROP, 'Energy_Barrier_kcal_per_mol', 'TS_Partition_F',
    'Other_Partition_F', RATE_CONSTANT_STRING, 'Rate_Constant_Type',
    'Rate_Constant_Units'
])

DF_COL_SIBLING = 'Sibling'
DfCustomEqColumns = namedtuple('DfCustomEqColumns', [
    DF_COL_SIBLING, DF_COL_OTHER_SIBLING, 'Temperature_K', 'Pressure_atm',
    'Energy_Barrier_kcal_per_mol', DF_COL_EN_PROP, 'Keq'
])

HARTREE_UNITS = ['au']
KCAL_PER_MOL_UNITS = ['kcal/mol']
EV_UNITS = ['eV']
KJOULES_PER_MOL_UNITS = ['kJ/mol']
KNOWN_UNITS = (HARTREE_UNITS + KCAL_PER_MOL_UNITS + EV_UNITS +
               KJOULES_PER_MOL_UNITS)

# kcal/mol/K
IDEAL_GAS_CONSTANT = (constants.R / constants.calorie) / constants.kilo

STAGE_SEPARATOR = '_stage_'

REL_REACTANTS_W_X_EXT = '_conf_avg_rel_reactants.csv'
REL_REACTANTS_WO_X_EXT = '_conf_avg_wo_x_rel_reactants.csv'
REL_REACTANTS_LOWEST_EXT = '_lowest_conf_rel_reactants.csv'
REL_PARENTS_W_X_EXT = '_conf_avg_rel_parents.json'
REL_PARENTS_WO_X_EXT = '_conf_avg_wo_x_rel_parents.json'
REL_PARENTS_LOWEST_EXT = '_lowest_conf_rel_parents.json'

REL_REACTANTS_EXTS = [
    REL_REACTANTS_W_X_EXT, REL_REACTANTS_WO_X_EXT, REL_REACTANTS_LOWEST_EXT
]
REL_PARENTS_EXTS = [
    REL_PARENTS_W_X_EXT, REL_PARENTS_WO_X_EXT, REL_PARENTS_LOWEST_EXT
]

REL_EXT_TO_KWARG_DICT = {
    REL_REACTANTS_W_X_EXT: 'edict_w_x',
    REL_REACTANTS_WO_X_EXT: 'edict_wo_x',
    REL_REACTANTS_LOWEST_EXT: 'edict_lowest',
    REL_PARENTS_W_X_EXT: 'edict_w_x',
    REL_PARENTS_WO_X_EXT: 'edict_wo_x',
    REL_PARENTS_LOWEST_EXT: 'edict_lowest'
}

SIBLING_GROUP_NAME = 'Sibling_Group_Name'
ANHARMONIC_TAG = '_anharmonic'

RATE_CONSTANTS_W_X_EXT = '_conf_avg_rate_constants.csv'
CUSTOM_RATE_CONSTS_EXT = '_custom_rate_constants.csv'
CUSTOM_KEQ_CONSTS_EXT = '_custom_keq.csv'

FFLD_ENERGY_GLOBAL_TAG = '_global'
MmodEnergyKeys = namedtuple('MmodEnergyKeys', ['absolute', 'relative'])

# kJ/mol
CSEARCH_REL_ENERGY_THRESH = 50.

# Ang.
CSEARCH_RMSD_THRESH = 0.1

JMSWF_LAUNCH_DIR = 'jaguar_multistage_workflow'
ANHARM_LAUNCH_DIR = 'anharmonic'
CTST_RATE_CONSTANTS_DIR = 'CTST_rate_constants'

# the variable name pp_rel_energy_thresh uses the prefix pp to indicate
# that this is the relative energy used in postprocessing MacroModel results
# and is different from the variable name rel_energy_thresh, used elsewhere in this
# code, which is the relative energy used in the MacroModel csearch
PP_CSEARCH_REL_ENERGY_THRESH = None  # kJ/mol or None

DESCRIPTORS_LAUNCH_DIR_HEADER = 'descriptors_'

EnergyAnalysisProperty = namedtuple('EnergyAnalysisProperty', [
    'sibling_group', 'conformer_groups', 'representative_conformers',
    'temperature', 'pressure', 'energy_key', 'property_key', 'avg_property_key',
    'atom_idx', 'ensemble', 'include_x_terms', 'only_lowest_energy'
])

OUT_EXT = '-out'

DEFAULT_ENUM_RXNWF_JOB_NAME = 'enumerate_reaction_workflow'

# from_idx specifies the direction of the substitution relative
# to to_idx, to_idx specifies the substitution site, hash_idx
# is an enumeration index which maps the site to an R-group file,
# structure_idx indicates which structure in the input file this
# site applies to
Site = namedtuple('Site', ['from_idx', 'to_idx', 'hash_idx', 'structure_idx'])

Source = namedtuple('Source', ['rgroup_st', 'site_idxs'])

get_msg = lambda site, msg: (
    f'Invalid atom indices ({site.from_idx}, '
    f'{site.to_idx}) for enumeration index {site.hash_idx} '
    f'for structure index {site.structure_idx}.  {msg}')

RGROUP_SEPARATOR = '_'

NOVEL = 'novel'
REFERENCE = 'reference'

DEFAULT_AUTO_RXNWF_JOB_NAME = 'automatic_reaction_workflow'

ENERGY_KEYS = 'ENERGIES'
ANHARMONIC_ENERGY_KEYS = 'ANHARMONIC_ENERGIES'
CUSTOM_ENERGY_KEYS = "CUSTOM_ENERGIES"
RATE_CONSTANT_KEY = "RATE_CONSTANT_KEY"

MASS_N_DECIMAL = 3

# hard code was decided in MATSCI-10586
KEQ_MAX = 10e100


[docs]def log(msg, **kwargs): """ Add a message to the log file :param str msg: The message to log Additional keyword arguments are passed to the textlogger.log_msg function """ textlogger.log_msg(msg, **kwargs)
[docs]def get_mmod_energy_key(ffld_name=None): """ Returns the corresponding MacroModel energy key for the given forcefield. If no forcefield is given and the license is available, it defaults to S-OPLS. If no forcefield is given and the license is NOT available, it defaults to OPLS2005. :param str ffld_name: The forcefield to get the energy key for. If `None`, defaults to the latest forcefield available given your licenses. :return str energy_key: The MacroModel suffix `key` for the default forcefield. Units are in kJ/mol """ keys = get_mmod_energy_keys(ffld_name) return keys.absolute
[docs]def get_mmod_rel_energy_key(ffld_name=None): """ Returns the corresponding MacroModel relative energy key for the given forcefield. If no forcefield is given and the license is available, it defaults to S-OPLS. If no forcefield is given and the license is NOT available, it defaults to OPLS2005. :param str ffld_name: The forcefield to get the energy key for. If `None`, defaults to the latest forcefield available given your licenses. :return str energy_key: The MacroModel suffix `key` for the default forcefield. """ keys = get_mmod_energy_keys(ffld_name) return keys.relative
[docs]def get_mmod_energy_keys(ffld_name=None): """ Returns the corresponding MacroModel energy key for the given forcefield. If no forcefield is given and the license is available, it defaults to S-OPLS. If no forcefield is given and the license is NOT available, it defaults to OPLS2005. :param str ffld_name: The forcefield to get the energy key for. If `None`, defaults to the latest forcefield available given your licenses. :return namedtuple energy_keys: A named two-tuple where the first element (with a key of `absolute`) is the MacroModel suffix for the absolute energy (in kJ/mol), and the second element (with a key of `relative`) is the MacroModel suffix for the relative energy. """ sopls_key = f'r_mmod_Potential_Energy-{mm.OPLS_NAME_F16}' sopls_rel_key = f'r_mmod_Relative_Potential_Energy-{mm.OPLS_NAME_F16}' s_opls_keys = MmodEnergyKeys(sopls_key, sopls_rel_key) # The value for `mm.OPLS_NAME_F16` matches the suffix for the energy keys, # but the value for `mm.OPLS_NAME_F14` does not match its corresponding # suffix. Here we hard-code the correct suffix (suffix is 'OPLS-2005' # while name is 'OPLS_2005'). opls2005_key = 'r_mmod_Potential_Energy-OPLS-2005' opls2005_rel_key = 'r_mmod_Relative_Potential_Energy-OPLS-2005' opls2005_keys = MmodEnergyKeys(opls2005_key, opls2005_rel_key) if ffld_name == mm.OPLS_NAME_F16: return s_opls_keys elif ffld_name == mm.OPLS_NAME_F14: return opls2005_keys elif ffld_name is None: if is_opls2_available(): return s_opls_keys else: return opls2005_keys else: raise ValueError(f'{ffld_name} is not a recognize forcefield name.')
[docs]@enum.unique class DF_RATE_TYPE(enum.Enum): DEFAULT = 'default' LNQ = 'lnq' ANH_LNQ = 'anharmonic_lnq'
[docs]def get_reactions_data(sts): """ Given conformers dictionary return list of reactions. :param list[structure.Structure] sts: List of structures :rtype: list[Pair]: List of pairs that holds two ReactProdTS (first, second) or SameTypeGSTS (for the Int-Int or TS-TS case) which in turn hold the data for calculating both two rate constants and two equilibrium constants """ sibling_conformers_dict = bin_structures_by_property( sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY) ts_sibling_groups, intermediate_sibling_groups = [], [] for sibling_group, conformers_dict in sibling_conformers_dict.items(): rep_sts = [conformers[0] for conformers in conformers_dict.values()] if has_transition_state(rxnwf_sts=rep_sts): ts_sibling_groups.append(sibling_group) else: intermediate_sibling_groups.append(sibling_group) react_prod_ts = [] for ts_name, other_name in itertools.product(ts_sibling_groups, intermediate_sibling_groups): react_prod_ts.append(ReactProdTS(ts_name, other_name)) # Make pairs of same type of siblings, GS-GS, TS-TS (MATSCI-12024) for gs_name_a, gs_name_b in itertools.combinations( intermediate_sibling_groups, 2): react_prod_ts.append(SameTypeGSTS(gs_name_a, gs_name_b, True)) for ts_name_a, ts_name_b in itertools.combinations(ts_sibling_groups, 2): react_prod_ts.append(SameTypeGSTS(ts_name_a, ts_name_b, False)) if len(react_prod_ts) == 1: # If there is only one react_prod_ts, duplicate it in the pair return [Pair(react_prod_ts[0], react_prod_ts[0])] ret = [] for rpts_a, rpts_b in itertools.combinations(react_prod_ts, 2): ret.append(Pair(rpts_a, rpts_b)) # Note that list of pairs can contain repeating siblings, both ts and other, # since all the combinations are generated return ret
[docs]def add_remove_props(sts, prop, add_props): """ Set additional properties equal to the original property from input structures, original property will be deleted. :param list[structure.Structure] sts: List of structures to be modified :param str prop: Value of this property will be used, the property will be deleted :param list[str] add_props: List of properties to be added to the structure """ for struct in sts: val = struct.property.pop(prop) for add_prop in add_props: struct.property[add_prop] = val
[docs]def set_extra_energy_props(sts, extra_stages_file, press_data, temp_data): """ Set temp/pressure dependent extra properties in the input structures based on the original properties. :param list[structure.Structure] sts: List of structures to be modified :param str extra_stages_file: Extra staged file name :param PressData press_data: Pressure data :param TempData temp_data: Temperature data :rtype: set[str] :return: Set of extra energy properties """ def get_prop(prop, press, temp): idx = (prop.index(STAGE_SEPARATOR) if STAGE_SEPARATOR in prop else len(prop)) ret = prop[:idx] has_press = jaguarworkflows.get_pressure(prop) is not None has_temp = jaguarworkflows.get_temperature(prop) is not None if not all([has_press, has_temp]): ret += jaguarworkflows.get_temp_press_key_ext(temp, press) elif has_press: ret += '_%fK' % temp elif has_temp: ret += '_%satm' % jaguarworkflows.format_pressure(press) ret += prop[idx:] return ret pressures, temps = get_pressures_temps(press_data, temp_data) en_templates = set() extra_stages = jmswfu.read_stage_datafile(extra_stages_file)[1:] for extra_stage in extra_stages: if extra_stage.analyze_data: template = '%s%s%d' % (extra_stage.analyze_data[0].key, STAGE_SEPARATOR, extra_stage.index + 1) en_templates.add(template) extra_props = set() for template in en_templates: if template not in sts[0].property: assert not any(template in struct.property for struct in sts), ( '%s' ' property is missing in some structures' % template) # If energy is missing from the structures properties it was defined # with wildcards. Collect all properties with press/temp set # (MATSCI-11210) starter, stage_idx = template.split(STAGE_SEPARATOR) props = get_present_props_from_sts(sts, starter, stage_idx=int(stage_idx)) extra_props.update(props) continue # If property exists, it is considered a template, need to be expanded # with press/temp pairs press = jaguarworkflows.get_pressure(template) temp = jaguarworkflows.get_temperature(template) if press is not None and temp is not None: # Template is complete (press/temp set), use it extra_props.add(template) continue new_pressures = [press] if press is not None else pressures new_temps = [temp] if temp is not None else temps new_props = set( get_prop(template, press, temp) for temp, press in itertools.product(new_temps, new_pressures)) add_remove_props(sts, template, new_props) extra_props.update(new_props) return extra_props
[docs]def get_pressures_temps(press_data, temp_data): """ Get list of pressures and temepratures from input data. :param PressData press_data: Pressure data :param TempData temp_data: Temperature data :rtype: list[float], list[float] :return: List of pressures, list of temperatures """ pressures = [ press_data.press_start + i * press_data.press_step for i in range(0, press_data.press_n) ] temps = [ temp_data.temp_start + i * temp_data.temp_step for i in range(0, temp_data.temp_n) ] return pressures, temps
[docs]def set_gas_phase_zpe_props(sts, press_data, temp_data, stage_idx): """ Set temp/pressure dependent gas phase + ZPE property in the input structures based on the original property :param list[structure.Structure] sts: List of structures to be modified :param PressData press_data: Pressure data :param TempData temp_data: Temperature data :param int stage_idx: Stage index """ pressures, temps = get_pressures_temps(press_data, temp_data) e_zpe_props = [ '%s%s%d' % (jaguarworkflows.get_gas_phase_zpe_key( temp, press), STAGE_SEPARATOR, stage_idx) for press, temp in itertools.product(pressures, temps) ] prop = '%s%s%d' % (jaguarworkflows.GAS_PHASE_ZERO_POINT_ENERGY_PROP, STAGE_SEPARATOR, stage_idx) add_remove_props(sts, prop, e_zpe_props)
[docs]def get_eprop_data(eprop): """ Get conversion factor to kcal/mol, temperature in K, pressure in atm, from energy property :param str eprop: Energy property :rtype: float, float, float :return: conversion factor to kcal/mol, temperature in K, pressure in atm """ conv = ReactionWorkflowEnergyAnalysis.getKcalPerMolConversion(eprop) assert conv is not None # Should not happen temp = jaguarworkflows.get_temperature(eprop) assert temp is not None # Should not happen press = jaguarworkflows.get_pressure(eprop) assert press is not None # Should not happen return conv, temp, press
[docs]def add_custom_eq_rows(name, other_name, avg_props, other_avg_props, eprop, dframe): """ Add data with kEQ to the dataframe :param str name: Sibling name :param str other_name: Other sibling name :param list[EnergyAnalysisProperty] avg_props: List of average properties of sibling :param list[EnergyAnalysisProperty] other_avg_props: List of average properties of the other sibling :param str eprop: Energy property :param pandas.DataFrame dframe: Input dataframe :rtype: pandas.DataFrame :return: Updated dataframe """ def keq_in_dframe(dframe, sibling, other_sibling, eprop): query = ('%s == "%s" and %s == "%s" and %s == "%s"' % (DF_COL_SIBLING, sibling, DF_COL_OTHER_SIBLING, other_sibling, DF_COL_EN_PROP, eprop)) return bool(len(dframe.query(query))) from schrodinger.application.matsci import boltzmann_avg_props as boltz conv, temp, press = get_eprop_data(eprop) denergy = (boltz.get_averaged_value(eprop, avg_props) - boltz.get_averaged_value(eprop, other_avg_props)) * conv # Forward and backward for factor, (name_a, name_b) in zip([1, -1], [(name, other_name), (other_name, name)]): if keq_in_dframe(dframe, name_a, name_b, eprop): # Skip already present row continue if denergy < -100: # hard code was decided in MATSCI-10586 keq = KEQ_MAX else: denergy *= factor keq = get_keq(denergy, temp) keq = min(keq, KEQ_MAX) df_row = DfCustomEqColumns(Sibling=name_a, Other_Sibling=name_b, Temperature_K=temp, Pressure_atm=press, Energy_Property=eprop, Energy_Barrier_kcal_per_mol=denergy, Keq=keq) dframe = dframe.append(df_row._asdict(), ignore_index=True) return dframe
[docs]def add_custom_rate_rows(reaction, ts_avg_props, other_avg_props, eprop, dframe, is_anharm, wigner_tunnel_corr=False): """ Add data with custom rates to the dataframe :param ReactProdTS reaction: Reaction/product - TS :param list[EnergyAnalysisProperty] ts_avg_props: List of average properties :param list[EnergyAnalysisProperty] other_avg_props: List of other average properties :param str eprop: Energy property :param pandas.DataFrame dframe: Input dataframe :param bool is_anharm: Whether it is anharmonic calculation :type wigner_tunnel_corr: bool :param wigner_tunnel_corr: whether to include the Wigner tunneling correction when computing rate constant(s) :rtype: pandas.DataFrame :return: Updated dataframe """ from schrodinger.application.matsci import boltzmann_avg_props as boltz conv, temp, press = get_eprop_data(eprop) denergy = (boltz.get_averaged_value(eprop, ts_avg_props) - boltz.get_averaged_value(eprop, other_avg_props)) * conv rate = get_custom_rate(denergy, temp) df_row = DfCustomRateColumns(TS_Sibling=reaction.ts_name, Other_Sibling=reaction.other_name, Temperature_K=temp, Pressure_atm=press, Energy_Property=eprop, Energy_Barrier_kcal_per_mol=denergy, TS_Partition_F=None, Other_Partition_F=None, Rate_Constant=rate, Rate_Constant_Type=DF_RATE_TYPE.DEFAULT.value, Rate_Constant_Units='1/s') if not is_anharm: # rate without Q is only computed for the harmonic case dframe = dframe.append(df_row._asdict(), ignore_index=True) ifreq = boltz.get_averaged_value(eprop, ts_avg_props, jaguarworkflows.LOWEST_FREQUENCY_PROP) ts_lnq = get_lnq(eprop, ts_avg_props, is_anharm) other_lnq = get_lnq(eprop, other_avg_props, is_anharm) dlnq = ts_lnq - other_lnq rate_type = (DF_RATE_TYPE.ANH_LNQ if is_anharm else DF_RATE_TYPE.LNQ) rate = get_custom_rate(denergy, temp, dlnq=dlnq, ifreq=ifreq, wigner_tunnel_corr=wigner_tunnel_corr) df_row = df_row._replace(TS_Partition_F=ts_lnq, Other_Partition_F=other_lnq, Rate_Constant_Type=rate_type.value, Rate_Constant=rate) dframe = dframe.append(df_row._asdict(), ignore_index=True) return dframe
[docs]def get_custom_keq_rates(out_mae, press_data, temp_data, extra_stages_file=None, compute_rates=True, wigner_tunnel_corr=False): """ Given output mae file, write custom equilibrium constants and optionally custom rates to CSV dataframes. :param str out_mae: Output file name :param PressData press_data: Pressure data :param TempData temp_data: Temperature data :type extra_stages_file: str or None :param extra_stages_file: Extra staged file name or None :param bool compute_rates: Whether to compute rates or only equilibrium constants :type wigner_tunnel_corr: bool :param wigner_tunnel_corr: whether to include the Wigner tunneling correction when computing rate constant(s) """ def rate_in_dframe(dframe, rpts, eprop): query = ('%s == "%s" and %s == "%s" and %s == "%s"' % (DF_COL_TS_SIBLING, rpts.ts_name, DF_COL_OTHER_SIBLING, rpts.other_name, DF_COL_EN_PROP, eprop)) return bool(len(dframe.query(query))) from schrodinger.application.matsci import boltzmann_avg_props as boltz sts = list(structure.StructureReader(out_mae)) stage_idx = get_rep_stage_idx(rxnwf_sts=sts) extra_eprops = set() if extra_stages_file: extra_eprops = set_extra_energy_props(sts, extra_stages_file, press_data, temp_data) set_gas_phase_zpe_props(sts, press_data, temp_data, stage_idx) eprops = set() for starter in (jaguarworkflows.TOTAL_FREE_ENERGY_STARTER, jaguarworkflows.TOTAL_ENTHALPY_STARTER, jaguarworkflows.GAS_PHASE_ZERO_POINT_ENERGY_PROP, jaguarworkflows.TOTAL_INTERNAL_ENERGY_STARTER): eprops.update( get_present_props(starter, sts[0].property, stage_idx=stage_idx)) anh_eprops = set() for starter in (anharmonic.U_STARTER, anharmonic.H_STARTER, anharmonic.G_STARTER): anh_eprops.update( get_present_props(starter, sts[0].property, stage_idx=stage_idx)) all_eprops = eprops | anh_eprops | extra_eprops # add sets # see MATSCI-12044 - for a frequency calculation on an atom the following # property is not defined by Jaguar in the restart Maestro file, yet for # proper handling of imaginary frequencies for transition states it needs # to be defined so arbitrarily define it to be zero for st in sts: if st.atom_total == 1: st.property[jaguarworkflows.LOWEST_FREQUENCY_PROP] = 0 # do not dedup by geometry for now avg_props = boltz.get_averaged_properties(sts, all_eprops, allow_sibling_groups=True, dedup_geom_eps=0) dframe = pandas.DataFrame(columns=DfCustomRateColumns._fields) dframe_eq = pandas.DataFrame(columns=DfCustomEqColumns._fields) reactions = get_reactions_data(sts) data = dict() for reaction in reactions: for name in (reaction.first.ts_name, reaction.first.other_name, reaction.second.ts_name, reaction.second.other_name): if name not in data: data[name] = [ ea_prop for ea_prop in avg_props if ea_prop.sibling_group == name ] for eprop in all_eprops: is_anharm = eprop in anh_eprops if compute_rates: # Only add if sibling + other sibling + energy property is not in # the dframe. reactions obtained from get_reactions_data can contain # repeating siblings, both ts and other. if (isinstance(reaction.first, ReactProdTS) and not rate_in_dframe(dframe, reaction.first, eprop)): dframe = add_custom_rate_rows( reaction.first, data[reaction.first.ts_name], data[reaction.first.other_name], eprop, dframe, is_anharm, wigner_tunnel_corr=wigner_tunnel_corr) if (isinstance(reaction.second, ReactProdTS) and not rate_in_dframe(dframe, reaction.second, eprop)): dframe = add_custom_rate_rows( reaction.second, data[reaction.second.ts_name], data[reaction.second.other_name], eprop, dframe, is_anharm, wigner_tunnel_corr=wigner_tunnel_corr) if reaction.first == reaction.second: # This is the case where there is only one pair, see # get_reactions_data if reaction.first.ts_name != reaction.first.other_name: dframe_eq = add_custom_eq_rows( reaction.first.ts_name, reaction.first.other_name, data[reaction.first.ts_name], data[reaction.first.other_name], eprop, dframe_eq) else: if reaction.first.ts_name != reaction.second.ts_name: dframe_eq = add_custom_eq_rows( reaction.first.ts_name, reaction.second.ts_name, data[reaction.first.ts_name], data[reaction.second.ts_name], eprop, dframe_eq) if reaction.first.other_name != reaction.second.other_name: dframe_eq = add_custom_eq_rows( reaction.first.other_name, reaction.second.other_name, data[reaction.first.other_name], data[reaction.second.other_name], eprop, dframe_eq) if len(dframe): # Only write non-empty file out_fn = '%s%s' % (jobutils.get_jobname(DEFAULT_JOB_NAME), CUSTOM_RATE_CONSTS_EXT) dframe.to_csv(out_fn) jobutils.add_outfile_to_backend(out_fn) if len(dframe_eq): # Only write non-empty file out_eq_fn = '%s%s' % (jobutils.get_jobname(DEFAULT_JOB_NAME), CUSTOM_KEQ_CONSTS_EXT) dframe_eq.to_csv(out_eq_fn) jobutils.add_outfile_to_backend(out_eq_fn)
[docs]def get_keq(energy, temp): """ Get k_equilibrium given energy and temperature. :param float energy: energy in kcal/mol :param float temp: Temperature in Kelvin :rtype: float :return: k_equilibrium """ return numpy.exp(-1 * energy / IDEAL_GAS_CONSTANT / temp)
[docs]def get_custom_rate(energy, temp, dlnq=None, ifreq=None, wigner_tunnel_corr=False): """ Get custom rate. :param float energy: Energy in kcal/mol (!) :param temp: Temperature in K :type lnq: None or float :param lnq: lnQ to compute rate with partition function, unitless :type ifreq: None or float :param ifreq: Lowest negative frequency to compute rate with partition function, in 1/cm :type wigner_tunnel_corr: bool :param wigner_tunnel_corr: whether to include the Wigner tunneling correction when computing rate constant(s) :rtype: float :return: Rate in 1/s """ if energy < -100: # hard code was decided in MATSCI-10586 return 10e100 # k = kBh * T * np.exp(-1.0 * energy / kB1 / T) # 1/s kBh = constants.value('Boltzmann constant in Hz/K') chkB = constants.value('second radiation constant') # = 0.01438776877 m * K kval = kBh * temp * get_keq(energy, temp) if dlnq is None: kval = min(kval, KEQ_MAX) return kval if wigner_tunnel_corr: assert ifreq is not None assert ifreq < 0, 'Found positive frequency: %f' % ifreq ifreq_m = ifreq / constants.centi # from 1/cm to 1/m w_corr = (-1 * ifreq_m * chkB / temp)**2 / 24.0 + 1.0 else: w_corr = 1 # k = w * kBh * T * exp( D(lnQ) ) * exp(-1.0 * energy / kB1 / T) # 1/s kval = w_corr * numpy.exp(dlnq) * kval kval = min(kval, KEQ_MAX) return kval
[docs]def get_present_props(en_starter, properties, stage_idx=None): """ Get all properties with a certain starter and optionally stage index :param str en_starter: Property must start with this :param list[str] properties: List of properties :type stage_idx: None or int :param stage_idx: Stage index :rtype: list[str] :return: List of matching properties """ postfix = ('' if stage_idx is None else '%s%d' % (STAGE_SEPARATOR, stage_idx)) return [ prop for prop in properties if (prop.startswith(en_starter) and prop.endswith(postfix)) ]
[docs]def get_present_props_from_sts(sts, prefix, stage_idx=None): """ Get properties that start with prefix and end with stage index (if provided). Check if pressure/temperature is present. Ensure that all the structures have at least the same list of properties :param list[structure.Structure] sts: List of structures :param str prefix: Property must start with this :type stage_idx: None or int :param stage_idx: Stage index :rtype: set[str] :return: Set of properties """ props = set() for aprop in get_present_props(prefix, sts[0].property, stage_idx=stage_idx): press = jaguarworkflows.get_pressure(aprop) temp = jaguarworkflows.get_temperature(aprop) assert press is not None and temp is not None, ( 'Pressure or ' 'temperature is missing from %s' % aprop) props.add(aprop) # All structures must have these properties assert all(props.issubset(struct.property) for struct in sts), ( 'Some ' 'structures are missing properties: %s' % props) return props
[docs]def get_lnq(eprop, averaged_properties, is_anharm): """ Get lnQ Boltzmann averaged over energy property. :param str e_prop: Energy property :param dict averaged_properties: Averaged Boltzmann properties :param bool is_anharm: Whether to use anharmonic approximation :rtype: float :return: Averaged lnQ """ temp = jaguarworkflows.get_temperature(eprop) assert temp is not None # Should not happen press = jaguarworkflows.get_pressure(eprop) assert press is not None # Should not happen if is_anharm: press = jaguarworkflows.format_pressure(press) lnq_prop = anharmonic.LNQ_KEY.format(temp=temp, press=press) # Stage is set in anharmonic case, but not it harmonic case lnq_prop += '%s%d' % (STAGE_SEPARATOR, get_stage_idx(eprop)) else: lnq_prop = jaguarworkflows.get_lnq_key(temp, press) from schrodinger.application.matsci import boltzmann_avg_props as boltz lnq = boltz.get_averaged_value(eprop, averaged_properties, lnq_prop) return lnq
[docs]def get_restrain_atom_idxs(st): """ Return a list of indices of restrain atoms in the given structure. :type st: schrodinger.structure.Structure :param st: the structure :rtype: list :return: contains indices of restrain atoms """ return sfu.get_idxs_marked_atoms(st, RESTRAINED_ATOM_KEY)
[docs]class InvalidInput(Exception): pass
[docs]def get_idx_groups(text): """ Get index groups from the given string. :type text: str :param text: the string :raise: InvalidInput if there is a formatting issue :rtype: list :return: contains list of indices """ try: return sfu.get_idx_groups(text) except sfu.SwapFragmentsException as err: raise InvalidInput(str(err))
[docs]def get_restrain_distance_idxs(st): """ Return a list of lists of indices of restrain distances in the given structure. :type st: schrodinger.structure.Structure :param st: the structure :rtype: list :return: contains lists of restrain distances """ return sfu._get_restrain_group_idxs(st, RESTRAINED_DISTANCES_KEY)
[docs]def get_restrain_angle_idxs(st): """ Return a list of lists of indices of restrain angles in the given structure. :type st: schrodinger.structure.Structure :param st: the structure :rtype: list :return: contains lists of restrain angles """ return sfu._get_restrain_group_idxs(st, RESTRAINED_ANGLES_KEY)
[docs]def get_restrain_dihedral_idxs(st): """ Return a list of lists of indices of restrain dihedrals in the given structure. :type st: schrodinger.structure.Structure :param st: the structure :rtype: list :return: contains lists of restrain dihedrals """ return sfu._get_restrain_group_idxs(st, RESTRAINED_DIHEDRALS_KEY)
[docs]def get_jaguar_keywords_list(jaguar_keywords_dict): """ Return the Jaguar keywords list from the given dict. :type jaguar_keywords_dict: OrderedDict :param jaguar_keywords_dict: the Jaguar keywords dict :rtype: list :return: the Jaguar keywords list """ return ['='.join(item) for item in jaguar_keywords_dict.items()]
[docs]def type_cast_seed(seed): """ Type cast the seed. :type seed: str or unicode :param seed: seed for random number generator :rtype: int :return: the seed """ return parserutils.type_random_seed(seed, seed_min=go.CONF_SEARCH_SEED_RANGE[0], seed_max=go.CONF_SEARCH_SEED_RANGE[1])
[docs]def type_cast_jaguar_keywords(jaguar_keywords, reserved_keys=RESERVED_JAGUAR_KEYS, exception_type=argparse.ArgumentTypeError): """ Type cast the Jaguar keywords. :type jaguar_keywords: str or unicode or list :param jaguar_keywords: the Jaguar keywords, a whitespace delimited string of '<key>=<value>' tokens or a list of such tokens :type reserved_keys: set :param reserved_keys: contains reserved Jaguar keys :type exception_type: type :param exception_type: the exception type to raise if invalid :rtype: OrderedDict :return: the Jaguar keywords OrderedDict """ if not jaguar_keywords: msg = ('Jaguar keywords must be provided.') raise exception_type(msg) if isinstance(jaguar_keywords, list): jaguar_keywords = ' '.join(jaguar_keywords) jaguar_keywords = str(jaguar_keywords) try: msutils.keyword_string_to_dict(jaguar_keywords) except ValueError as err: msg = ('The following issue has been found with the ' 'specified Jaguar keywords: {err}. Such options ' 'must be specified using whitespace separated ' '<key>=<value> pairs.').format(err=str(err)) raise exception_type(msg) adict = OrderedDict([token.split('=') for token in jaguar_keywords.split()]) if reserved_keys.intersection(set(adict.keys())): msg = ('The following Jaguar keys are reserved for this workflow: ' '{keys}.').format(keys=sorted(reserved_keys)) raise exception_type(msg) return adict
[docs]def check_ff_assignment(sts, ffld_name=None): """ Check the assignment of the given force field to the given structures. :type sts: list :param sts: contains schrodinger.structure.Structure :type ffld_name: str :param ffld_name: the force field name. :raise ValueError: if invalid """ if ffld_name is None: ffld_name = msutils.get_default_forcefield().name # in case incoming structures are in the centroid representation sts = [st.copy() for st in sts] for st in sts: etatoggle.toggle_structure(st, out_rep=parserutils.ETA) # currently the following does not support an mmlewis_apply # clean up layer (which can change the incoming atom types), # see minimize.Minimizer.setStructure for more details ffld_version = mm.opls_name_to_version(ffld_name) invalid_sts = desmondutils.find_forcefield_invalid_structures( sts, ffld_version) invalid_idxs = [str(sts.index(st) + 1) for st in invalid_sts] if invalid_idxs: idxs = ','.join(invalid_idxs) msg = ('The {name} force field could not be assigned to ' 'the following structures: {idxs}.').format(name=ffld_name, idxs=idxs) raise ValueError(msg)
[docs]def get_molecular_weight(st, idxs=None, decimal=None): """ Return the molecular weight (amu) taken over the given atom indices in the given structure. :type st: schrodinger.structure.Structure :param st: the structure :type idxs: list :param idxs: the atom indices :type decimal: None or int :param decimal: an optional number of decimal places to which to round the weight :rtype: float :return: the molecular weight (amu) """ # both st.total_weight and atom.atomic_weight use implicit hydrogens # so do it by hand if not idxs: idxs = range(1, st.atom_total + 1) weight = 0 for idx in idxs: weight += mm.mmelement_get_atomic_weight_by_atomic_number( st.atom[idx].atomic_number) if decimal is not None: return round(weight, decimal) return weight
[docs]def check_centroid_rep(st): """ Check the centroid representation of the given structure. :type st: schrodinger.structure.Structure :param st: the structure :raise ValueError: if invalid """ dummies = etatoggle.get_dummy_atoms(st) if not dummies: return dummy_idxs = [dummy.index for dummy in dummies] # if using the centroid representation then those dummy # atoms should be at the end of the atom list non_dummies = set(range(1, st.atom_total + 1)).difference(dummy_idxs) if len(non_dummies) < max(non_dummies): msg = ('Dummy atoms used for centroid representations must be at the ' 'end of the atom list.') raise ValueError(msg) # dummies should have no metadata set for key in (KEEP_ATOM_KEY, RESTRAINED_ATOM_KEY, SUPERPOSITION_ATOM_KEY): for atom in dummies: if atom.property.get(key): msg = ('Dummy atoms used for centroid representations can not ' 'be used as keep, restrained, or superposition atoms.') raise ValueError(msg) # dummies should not be used in restraints for key in (RESTRAINED_DISTANCES_KEY, RESTRAINED_ANGLES_KEY, RESTRAINED_DIHEDRALS_KEY): jdxs = set([ idx for idxs in sfu._get_restrain_group_idxs(st, key) for idx in idxs ]) if jdxs.intersection(dummy_idxs): msg = ('Restrained distances, angles, or dihedrals can not use ' 'dummy atoms.') raise ValueError(msg)
[docs]def check_conformers(conformers, conformers_group_hash): """ Check conformers. If the given structures are conformers then their atom numberings are all changed in place so that they are equivalent to that of the first of the given conformers. :type conformers: list :param conformers: contains schrodinger.structure.Structure of conformers :type conformers_group_hash: str :param conformers_group_hash: a group hash :raise ValueError: if invalid """ smiles, masses = set(), set() for st in conformers: # see MATSCI-11764 - prevent stereochemistry for now smiles.add(analyze.generate_smiles(st, stereo=analyze.NO_STEREO)) masses.add(get_molecular_weight(st, decimal=MASS_N_DECIMAL)) if len(smiles) > 1 or len(masses) > 1: msg = ('At least a single structure in conformer group ' '{group} does not belong because it is not a ' 'conformer.').format(group=conformers_group_hash) raise ValueError(msg) # see MATSCI-5538 - where it was found that MacroModel conformational # search requires input conformers to have identical atom ordering, # force all conformers to be numbered like the first conformer, this # is also needed for the following validation # see MATSCI-8580 - where it was found that a water molecule was # renumbered even though it appears that it didn't need to be, # use_symmetry=False prevents it first_conformer = conformers[0] for idx, conformer in enumerate(conformers[1:], 1): conformers[idx] = rmsd.renumber_conformer(first_conformer, conformer, use_symmetry=False, in_place=True, has_hydrogens=True) check_centroid_rep(first_conformer) # yapf: disable check_key_is_atom_key_pairs = [ (REACTION_WF_STRUCTURE_KEY, False), (CONFORMERS_GROUP_KEY, False), (SIBLING_GROUP_KEY, False), (PARENT_SIBLING_GROUPS_KEY, False), (CHARGE_KEY, False), (MULTIPLICITY_KEY, False), (KEEP_ATOM_KEY, True), (RESTRAINED_ATOM_KEY, True), (RESTRAINED_DISTANCES_KEY, False), (RESTRAINED_ANGLES_KEY, False), (RESTRAINED_DIHEDRALS_KEY, False), (TRANSITION_STATE_STRUCTURE_KEY, False) ] # yapf: enable for key, is_atom_key in check_key_is_atom_key_pairs: try: _check_structures_for_property_equivalence(conformers, key=key, is_atom_key=is_atom_key) except ValueError as err: msg = ('The structures in conformer group {group} are ' 'invalid. {err}').format(group=conformers_group_hash, err=str(err)) raise ValueError(msg) # check superposition differently because of the way the data # is stored in order to honor ordering, sets of indices need # to be sorted because rmsd.renumber_conformers can return # inequivalent atom orderings, perhaps because it doesn't # consider what is considered here to actually be conformers # (see for example unit test # reaction_workflow_utils_test.FunctionTester.test_check_conformers # when run without the following sort) all_idxs = set() for st in conformers: idxs = [] for atom in st.atom: idx = atom.property.get(SUPERPOSITION_ATOM_KEY) if idx: idxs.append(idx) all_idxs.add(tuple(sorted(idxs))) if len(all_idxs) > 1: msg = ('The structures in conformer group {group} are ' 'invalid. The given structures have inequivalent {key} ' 'properties.').format(group=conformers_group_hash, key=SUPERPOSITION_ATOM_KEY) raise ValueError(msg)
[docs]def check_reaction_wf_structures(rxn_sts, ffld_name=None, mass_conserved=False, keep_atoms_only=False): """ Check the given reaction workflow structues. :type rxn_sts: str or list :param rxn_sts: the reaction workflow structures, a file name or list of schrodinger.structure.Structure :type ffld_name: str :param ffld_name: the force field name to use when optionally checking its assignment to the given structures :type mass_conserved: bool :param mass_conserved: check that mass is conserved (see also keep_atoms_only kwarg) :type keep_atoms_only: bool :param keep_atoms_only: specifies that only keep atoms be considered when checking if mass is conserved (see also mass_conserved kwarg) :raise ValueError: if invalid """ if isinstance(rxn_sts, str): if not fileutils.is_maestro_file(rxn_sts): msg = ('The file {afile} is not a Maestro file.').format( afile=rxn_sts) raise ValueError(msg) if not fileutils.get_basename(rxn_sts).endswith(REACTION_WF_TAG): msg = ('The file {afile} is not a reaction workflow file ' 'as its base name does not end in {tag}.').format( afile=rxn_sts, tag=REACTION_WF_TAG) raise ValueError(msg) rxn_sts = [st for st in structure.StructureReader(rxn_sts)] titles = set() for idx, st in enumerate(rxn_sts, 1): if not st.property.get(REACTION_WF_STRUCTURE_KEY): msg = ('Structure number {idx} does not belong to a ' 'reaction workflow.').format(idx=idx) raise ValueError(msg) if not st.property.get(CONFORMERS_GROUP_KEY): msg = ('Structure number {idx} does not belong to a ' 'conformer group.').format(idx=idx) raise ValueError(msg) titles.add(st.title) if len(titles) < len(rxn_sts): msg = ('Structures must have unique titles.') raise ValueError(msg) sibling_conformers_dict = bin_structures_by_property( rxn_sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY) getters = [ get_restrain_distance_idxs, get_restrain_angle_idxs, get_restrain_dihedral_idxs ] found_parents = True masses = set() for sibling_group, conformers_dict in sibling_conformers_dict.items(): num_tss = 0 parents = set() mass = 0 for conformer_group, conformers in conformers_dict.items(): # check conformers check_conformers(conformers, conformer_group) first_conformer = conformers[0] num_tss += first_conformer.property.get( TRANSITION_STATE_STRUCTURE_KEY, 0) parents.add(first_conformer.property.get(PARENT_SIBLING_GROUPS_KEY)) # check restraint idxs keep_idxs = sfu.get_idxs_marked_atoms(first_conformer, KEEP_ATOM_KEY) if keep_idxs: all_restrain_idxs = set() for getter in getters: for restrain_idxs in getter(first_conformer): all_restrain_idxs.update(restrain_idxs) if not all_restrain_idxs.issubset(keep_idxs): msg = ( f'The structures in conformer group {conformer_group} ' 'have atoms involved in distance, angle, and/or dihedral ' 'restraints that are not among the found keep atoms.') raise ValueError(msg) if keep_idxs and keep_atoms_only: mass += get_molecular_weight(first_conformer, idxs=keep_idxs) else: mass += get_molecular_weight(first_conformer) mass = round(mass, MASS_N_DECIMAL) masses.add(mass) # check transition states if num_tss > 1: msg = ( f'The structures in sibling group {sibling_group} can not have ' 'more than a single transition state.') raise ValueError(msg) # check parent equivalency if len(parents) > 1: msg = ( f'The structures in sibling group {sibling_group} do not have ' 'identical parents.') raise ValueError(msg) # check parent existence parents_tmp = list(parents)[0] if parents_tmp: for parent in parents_tmp.split(PARENT_SEPARATOR): if parent not in sibling_conformers_dict.keys(): msg = ( f'Sibling group {sibling_group} specifies a non-existent parent ' f'sibling group {parent}.') raise ValueError(msg) # check reactant siblings if None in parents: if not found_parents: msg = (f'Structures in sibling group {sibling_group} ' 'have no parents specified. All groups ' 'other than reactant siblings must ' 'have parent groups defined.') raise ValueError(msg) found_parents = False if found_parents: msg = ('No reactants were found.') raise ValueError(msg) if mass_conserved and len(masses) > 1: msg = ('Mass is not conserved along the reaction path.') raise ValueError(msg) if ffld_name: check_ff_assignment(rxn_sts, ffld_name=ffld_name)
[docs]def type_cast_reaction_wf_input(reaction_wf_input, exception_type=argparse.ArgumentTypeError, mass_conserved=False): """ Type cast the reaction workflow input. :type reaction_wf_input: str or unicode or list :param reaction_wf_input: the reaction workflow input, a file name or list of schrodinger.structure.Structure :type exception_type: type :param exception_type: the exception type to raise if invalid :type mass_conserved: bool :param mass_conserved: check that mass is conserved :rtype: str or list :return: the reaction workflow input, a file name or list of schrodinger.structure.Structure """ if not isinstance(reaction_wf_input, list): reaction_wf_input = str(reaction_wf_input) if not os.path.exists(reaction_wf_input): msg = ('The file {afile} does not exist.').format( afile=reaction_wf_input) raise exception_type(msg) if not fileutils.is_maestro_file(reaction_wf_input): msg = ('The file {afile} is not a Maestro file.').format( afile=reaction_wf_input) raise exception_type(msg) if not fileutils.get_basename(reaction_wf_input).endswith( REACTION_WF_TAG): msg = ('The file {afile} is not a reaction workflow file ' 'as its base name does not end in {tag}.').format( afile=reaction_wf_input, tag=REACTION_WF_TAG) raise exception_type(msg) reaction_wf_input_sts = [ st for st in structure.StructureReader(reaction_wf_input) ] else: reaction_wf_input_sts = list(reaction_wf_input) if not reaction_wf_input_sts: msg = ('No structures have been provided.') raise exception_type(msg) try: check_reaction_wf_structures(reaction_wf_input_sts, ffld_name=None, mass_conserved=mass_conserved) except ValueError as err: msg = str(err) + (' A valid reaction workflow input file must ' 'first be prepared.') raise exception_type(msg) return reaction_wf_input
[docs]def bin_structures_by_property(sts, key=CONFORMERS_GROUP_KEY, inner_key=None): """ Return a dictionary of structures binned by a property with the given key. If inner_key is provided then return a dictionary of dictionaries of structures with the inner dictionaries keyed by inner_key and outer dictionaries keyed by key. :type sts: list :param sts: the structures :type key: str :param key: the key for the property by which to bin :type inner_key: str :param inner_key: additionally bin by this inner_key :rtype: dict or dict of dict :return: dictionary where keys are properties and values are lists of structures or dictionary of dictionaries where the outer dictionary is keyed by key and inner dictionary is keyed by inner_key and values of the inner dictionary are lists of structures """ # for backwards compatibility in the data structure if the inner # property exists but the outer property does not then for the # outer property use the inner property and update the structure adict = {} for st in sts: aprop = st.property.get(key) if inner_key: inner_aprop = st.property.get(inner_key) if aprop is None and inner_aprop: aprop = inner_aprop st.property[key] = aprop adict.setdefault(aprop, {}).setdefault(inner_aprop, []).append(st) else: adict.setdefault(aprop, []).append(st) return adict
def _check_structures_for_property_equivalence(sts, key=RESTRAINED_ATOM_KEY, is_atom_key=True): """ Check that the given structures have equivalent property values for the given structure or atom key. If it is an atom key then the key must be a boolean key as this function only ensures that the groups of atoms for which the given property exists are equivalent as opposed to checking equivalence among the property values. :type sts: list :param sts: the structures :type key: str :param key: the key for the property for which to check equivalence :type is_atom_key: bool :param is_atom_key: True if the given key is an atom key in which case only property existence is checked as opposed to property values :raise ValueError: if invalid """ aprops = set() for st in sts: if is_atom_key: aprop = tuple(sfu.get_idxs_marked_atoms(st, key)) if not aprop: aprop = None else: aprop = st.property.get(key) aprops.add(aprop) if len(aprops) > 1: msg = ( 'The given structures have inequivalent {key} properties.').format( key=key) raise ValueError(msg)
[docs]def append_unique_conformers(sts, unique_sts, rmsd_thresh=CSEARCH_RMSD_THRESH, n_conformers=None): """ Append any unique conformers found in the given structures to the given unique structures. :type sts: list :param sts: schrodinger.structure.Structure candidate conformers :type unique_sts: list :param unique_sts: schrodinger.structure.Structure unique conformers :type rmsd_thresh: float :param rmsd_thresh: the maximum allowable RMSD (Ang.) between two structures before they can be considered different conformers :type n_conformers: int or None :param n_conformers: number of sought conformers or None if there isn't one """ # heavy atoms only idxs = [atom.index for atom in sts[0].atom if atom.atomic_number != 1] # consider how many unique conformers are given if not unique_sts: unique_sts.append(sts[0]) start_idx = 1 else: start_idx = 0 # early exit if there are sufficient conformers if n_conformers and len(unique_sts) == n_conformers: return for st in sts[start_idx:]: for unique_st in unique_sts: armsd = rmsd.superimpose(st, idxs, unique_st, idxs, use_symmetry=False, move_which=rmsd.CT) if armsd < rmsd_thresh: # non-unique conformer found break else: # unique conformer found unique_sts.append(st) # early exit if there are sufficient conformers if n_conformers and len(unique_sts) == n_conformers: return
[docs]def get_conformers(sts, n_conformers, pp_rel_energy_thresh=PP_CSEARCH_REL_ENERGY_THRESH, rmsd_thresh=CSEARCH_RMSD_THRESH, ffld_name=None): """ Return either (1) at most the given number of conformers or (2) all conformers with relative energies less than the given value. If (2) then an attempt is made to return at least the given number of conformers even if that means having relative energies larger than the given value. :type sts: list :param sts: schrodinger.structure.Structure conformers :type n_conformers: int :param n_conformers: either the maximum number of conformers if pp_rel_energy_thresh is None or a target minimum number of conformers if pp_rel_energy_thresh is given :type pp_rel_energy_thresh: None or float :param pp_rel_energy_thresh: relative energy threshold in kJ/mol, if None then only the n_conformers lowest energy conformers are returned :type rmsd_thresh: float :param rmsd_thresh: the maximum allowable RMSD (Ang.) between two structures before they can be considered different conformers :type ffld_name: str :param ffld_name: the name of the force field to use for the search. If `None`, then defaults to the latest forcefield available. :rtype: list :return: schrodinger.structure.Structure conformers """ energy_key, rel_energy_key = get_mmod_energy_keys(ffld_name) # first sort by total energy sorted_sts = sorted(sts, key=lambda x: x.property[energy_key]) if pp_rel_energy_thresh is None: key = energy_key else: # second sort by relative energy key = rel_energy_key sorted_sts = sorted(sorted_sts, key=lambda x: x.property[key]) # get unique (in the sense of RMSD) conformers unique_sts = [] if pp_rel_energy_thresh is None: append_unique_conformers(sorted_sts, unique_sts, rmsd_thresh=rmsd_thresh, n_conformers=n_conformers) else: # collect conformers with the proper relative energies sts = [] for idx, st in enumerate(sorted_sts): if st.property[key] > pp_rel_energy_thresh: break sts.append(st) # uniqueify them append_unique_conformers(sts, unique_sts, rmsd_thresh=rmsd_thresh, n_conformers=None) # if there is an insufficient number of conformers then try # to obtain at least n_conformers if n_conformers > len(unique_sts): append_unique_conformers(sorted_sts[idx:], unique_sts, rmsd_thresh=rmsd_thresh, n_conformers=n_conformers) return unique_sts
[docs]def get_int_tuples_from_str_property(st, key, separator=SEPARATOR): """ Return a list of tuples of integers from the given string structure property. :type st: schrodinger.structure.Structure :param st: the structure :type key: str :param key: the property key :type separator: str :param separator: the tuple separator used for the given property :rtype: list :return: contains tuples of integers """ aproperty = st.property.get(key) if not aproperty: return [] else: return [eval(token) for token in aproperty.split(separator)]
[docs]def update_index_properties(st, old_to_new): """ Update the index properties of the given structure. :type st: `structure.Structure` :param st: the structure :type old_to_new: dict :param old_to_new: a map of old-to-new atom indices """ keys = (RESTRAINED_DISTANCES_KEY, RESTRAINED_ANGLES_KEY, RESTRAINED_DIHEDRALS_KEY) for key in keys: idxs = _get_new_restrain_group_idxs(st, key, old_to_new) text = sfu.get_idx_groups_str(idxs) if text: st.property[key] = text
[docs]def get_core_idxs(st): """ Return a set of atom indices for the core of the given structure. :type st: schrodinger.structure.Structure :param st: the structure :rtype: set :return: core atom indices """ # keep atoms are not to be considered as core atoms atom_keys = [SUPERPOSITION_ATOM_KEY, RESTRAINED_ATOM_KEY] st_keys = [ RESTRAINED_DISTANCES_KEY, RESTRAINED_ANGLES_KEY, RESTRAINED_DIHEDRALS_KEY ] idxs = set() for atom in st.atom: for key in atom_keys: if atom.property.get(key): idxs.add(atom.index) for key in st_keys: for jdxs in sfu._get_restrain_group_idxs(st, key): idxs.update(jdxs) return idxs
def _get_new_restrain_group_idxs(st, key, old_to_new, offset=0): """ Return new restrain group indices. :type st: schrodinger.structure.Structure :param st: the structure :type key: str :param key: determines the type of restrain coordinate :type old_to_new: dict :param old_to_new: the old-to-new atom index map :type offset: int :param offset: an offset added to all new restrain group indices, useful when assembling structures from parts of other structures :rtype: list :return: contains new restrain group indices """ return sfu._get_new_restrain_group_idxs(st, key, old_to_new, offset=offset)
[docs]def representative_conformers(sibling_conformers_dict, specific_sibling_group=None): """ Generator over representative conformers. :type sibling_conformers_dict: dict :param sibling_conformers_dict: dictionary of dictionaries where the outer dictionary is keyed by sibling and inner dictionary is keyed by conformer and values of the inner dictionary are lists of structures :type specific_sibling_group: str or None :param specific_sibling_group: if not None then restrict conformers to be generated only over this sibling group :rtype: tuple :return: the sibling and conformer group names and the representative conformer or None if one doesn't exist """ # all conformers in each list of structures are identical (having identical # properties, etc.) and so choose the first as the representative for sibling_group, conformers_dict in sibling_conformers_dict.items(): if specific_sibling_group and specific_sibling_group != sibling_group: continue for conformer_group, conformers in conformers_dict.items(): if conformers: conformer = conformers[0] else: conformer = None yield tuple([sibling_group, conformer_group, conformer])
[docs]class RepresentativeConformersMixin(object):
[docs] def representativeConformers(self, sibling_conformers_dict=None, specific_sibling_group=None): """ Generator over representative conformers. :type sibling_conformers_dict: dict or None :param sibling_conformers_dict: dictionary of dictionaries where the outer dictionary is keyed by sibling and inner dictionary is keyed by conformer and values of the inner dictionary are lists of structures, if None then the class attr is used :type specific_sibling_group: str or None :param specific_sibling_group: if not None then restrict conformers to be generated only over this sibling group :rtype: tuple :return: the sibling and conformer group names and the representative conformer """ if sibling_conformers_dict is None: sibling_conformers_dict = self.sibling_conformers_dict return representative_conformers( sibling_conformers_dict, specific_sibling_group=specific_sibling_group)
[docs]class ReactionWorkflowFile(RepresentativeConformersMixin): """ Manage a reaction workflow file. """
[docs] def __init__(self, rxn_sts): """ Create an instance. :type rxn_sts: str or list :param rxn_sts: the reaction workflow structures, a file name or list of schrodinger.structure.Structure """ if isinstance(rxn_sts, str): rxn_sts = [st for st in structure.StructureReader(rxn_sts)] self.sibling_conformers_dict = bin_structures_by_property( rxn_sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY)
[docs] def getReactantsSiblingGroupName(self): """ Return the reactants sibling group name. :rtype: str :return: the reactants sibling group name """ for sibling_group, conformer_group, conformer in self.representativeConformers( ): parents = conformer.property.get(PARENT_SIBLING_GROUPS_KEY) if not parents: return sibling_group
[docs] def getReactantsConformersDict(self): """ Return the reactants conformers dictionary. :rtype: dict :return: keys are conformer group names, values are lists of `structure.Structure` """ sibling_group = self.getReactantsSiblingGroupName() return self.sibling_conformers_dict[sibling_group]
[docs]class UniqueGeomMixin: """ Manage uniqueifying structures by geometry. """ def _setUniqueGeomDict(self, sts): """ Set the unique geometry dictionary. :type sts: list[`schrodinger.structure.Structure`] :param sts: the structures """ if not self.dedup_geom_eps: # this means deduplicating is not being performed return # see MATSCI-10591 - set up a cache for unique geometries to dedup # expensive calculations like MacroModel, Jaguar, etc., arbitrarily # use the first geometry # # see MATSCI-11251 - JMSWF uses cleaned structure titles to name # Jaguar subjob input files, while the title (and entry name) on the # structure in the input file is unchanged, the titles (and entry names) # of structures in the main JMSWF output Maestro file are cleaned, so # used cleaned titles here unique_geom_dict = bin_by_geometry(sts, eps=self.dedup_geom_eps) self.unique_geom_dict = { jobutils.clean_string(ref_st.title): [ref_st] + other_sts for ref_st, other_sts in unique_geom_dict.items() } def _getUniqueGeomSts(self, sts): """ Return representative structures with unique geometries. :type sts: list[`schrodinger.structure.Structure`] :param sts: the structures :rtype: list[`schrodinger.structure.Structure`] :return: the structures """ # arbitrarily use the first geometry return [sts[0] for sts in self.unique_geom_dict.values()] or sts def _duplicateUniqueGeomSts(self): """ Duplicate the structures choosen as unique geometries. :rtype: list[`schrodinger.structure.Structure`] :return: the structures """ # for conformers created from an original structure JMSWF # will append '-<idx>' new_rep_sts = [] for rep_st in self.rep_sts: new_rep_sts.append(rep_st) if not self.unique_geom_dict: continue orig_title, title_ext = get_orig_title(rep_st.title, self.unique_geom_dict) _, ename_ext = get_orig_title( rep_st.property[msprops.ENTRY_NAME_PROP], self.unique_geom_dict) sts = self.unique_geom_dict[orig_title] orig_st, other_sts = sts[0], sts[1:] for other_st in other_sts: _other_st = rep_st.copy() _other_st.property.update(other_st.property) _other_st.title = f'{jobutils.clean_string(other_st.title)}{title_ext}' _other_st.property[msprops.ENTRY_NAME_PROP] = \ f'{jobutils.clean_string(other_st.title)}{ename_ext}' new_rep_sts.append(_other_st) return new_rep_sts def _updateUniqueGeoms(self, sibling_conformers_dict, title_dict): """ Update the unique geometry dictionary and use it to update the given structure dictionary. :type sibling_conformers_dict: dict :param sibling_conformers_dict: dictionary of dictionaries where the outer dictionary is keyed by sibling and inner dictionary is keyed by conformer and values of the inner dictionary are lists of structures :type title_dict: dict :param title_dict: dictionary mapping the structure titles of generated rotamers to the structure title of the structure from which they were generated :rtype: dict :return: dictionary of dictionaries where the outer dictionary is keyed by sibling and inner dictionary is keyed by conformer and values of the inner dictionary are lists of structures """ # relative to all original structures the following list contains # those representatives that have unique geometries plus any of # their guess conformers new_sts = flatten_sibling_conformers(sibling_conformers_dict) # guess conformers have been generated for the representatives, now # generate the same for the structures being represented all_sts = [] for new_st in new_sts: all_sts.append(new_st) if not self.unique_geom_dict: continue # structure titles of the generated guess conformers for the # representative may be appended with some identifier, for # example 'Fe-complex', 'Fe-complex_252_324', etc. orig_title = title_dict[new_st.title] old_sts = self.unique_geom_dict[orig_title] new_xyz = new_st.getXYZ() # 0th structure is the original old structure for old_st in old_sts[1:]: tmp_st = old_st.copy() tmp_st.setXYZ(new_xyz) all_sts.append(tmp_st) self._setUniqueGeomDict(all_sts) sts = self._getUniqueGeomSts(all_sts) return bin_structures_by_property(sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY)
[docs]class ReactionWorkflowEnergyAnalysis(ReactionWorkflowFile, UniqueGeomMixin): """ Manage a reaction workflow energy analysis. """
[docs] def __init__(self, rxn_sts, energy_keys, dedup_geom_eps=parserutils.DEFAULT_DEDUP_GEOM_EPS): """ Create an instance. :type rxn_sts: str or list :param rxn_sts: the reaction workflow structures, a file name or list of schrodinger.structure.Structure :type energy_keys: list :param energy_keys: structure property energy keys to consider, if it is temperature dependent then include the temperature (K) as a number followed by 'K' in the key and the corresponding energy must be in supported units (au, kcal/mol, eV, kJ/mol) and must be present in the key as '(<units>)' :type dedup_geom_eps: float :param dedup_geom_eps: reduce the number of calculations by deduplicating the input structures based on geometry, using this threshold in Ang., and only calculating the representatives, a value of zero means no deduplicating """ super().__init__(rxn_sts) self.energy_keys = energy_keys self.dedup_geom_eps = dedup_geom_eps # Cached units, see getUnitsData self._units_data = {} self.unique_geom_dict = {}
[docs] def getUnitsData(self, prop): """ Get conversion factor to kcal/mol, pressure and temperature from a property, saving the data into a dict. :param str prop: Property to use :rtype: float or None, float or None, float or None :return: Conversion to kcal/mol, pressure, temperature """ try: return self._units_data[prop] except KeyError: val = (self.getKcalPerMolConversion(prop), self.getPressure(prop), self.getTemperature(prop)) self._units_data[prop] = val return val
[docs] @staticmethod def getHeader(energy_key): """ Return a header for the given energy key. :type energy_key: str :param energy_key: structure property energy key :rtype: str :return: the header """ # handle prefix in r_<product>_<header> header = '_'.join(energy_key.split('_')[2:]) conversion = ReactionWorkflowEnergyAnalysis.getKcalPerMolConversion( energy_key) if conversion: _units = ReactionWorkflowEnergyAnalysis.getUnits(energy_key) if _units in header: header = header.replace(f'({_units})', '(kcal/mol)') else: if STAGE_SEPARATOR in header: head, tail = header.split(STAGE_SEPARATOR) header = head + '_(kcal/mol)' + STAGE_SEPARATOR + tail else: header += '_(kcal/mol)' return header
[docs] @staticmethod def getTemperature(energy_key): """ Return the temperature (K) for the given energy key. :type energy_key: str :param energy_key: structure property energy key :rtype: float, None :return: the temperature (K) if there is one """ return jaguarworkflows.get_temperature(energy_key)
[docs] @staticmethod def getPressure(energy_key): """ Return the pressure (atm) for the given energy key. :type energy_key: str :param energy_key: structure property energy key :rtype: float, None :return: the pressure (atm) if there is one """ return jaguarworkflows.get_pressure(energy_key)
[docs] @staticmethod def getUnits(energy_key): """ Return the units for the given energy key. :type energy_key: str :param energy_key: structure property energy key :rtype: str, None :return: the units if there is one """ # units like 'kcal/mol' in '(kcal/mol)' match = re.search(r'\((\D+)\)', energy_key) if match: return match.group(1) base_energy_key = energy_key.split(STAGE_SEPARATOR)[0] _units = jmswfu.JAGUAR_PROP_UNITS_DICT.get(base_energy_key) if _units: return _units[1:-1]
[docs] @staticmethod def getKcalPerMolConversion(energy_key): """ Return the kcal/mol conversion factor for the given energy key. :type energy_key: str :param energy_key: structure property energy key :rtype: float, None :return: the kcal/mol conversion factor if there is one """ _units = ReactionWorkflowEnergyAnalysis.getUnits(energy_key) if _units in HARTREE_UNITS: conversion = units.KCAL_PER_MOL_PER_HARTREE elif _units in KCAL_PER_MOL_UNITS: conversion = 1 elif _units in EV_UNITS: conversion = units.KCAL_PER_MOL_PER_EV elif _units in KJOULES_PER_MOL_UNITS: conversion = units.KCAL_PER_MOL_PER_EV conversion /= units.KJOULES_PER_MOL_PER_EV else: conversion = None return conversion
[docs] def getPropertyEnsemble(self, conformers_dict, energy_key, conversion, temp, do_boltzmann, include_x_terms=False, only_lowest_energy=False, property_key=None, atom_idx=None): """ Return an ensemble of properties for the given conformers dictionary of siblings and given energy key. :type conformers_dict: dict :param conformers_dict: keys are conformer group names, values are lists of `structure.Structure` :type energy_key: str :param energy_key: the relevant energy key :type conversion: float :param conversion: the energy conversion factor to kcal/mol :type temp: float :param temp: the temperature in K :type do_boltzmann: bool :param do_boltzmann: if True perform a Boltzmann average, otherwise an algebraic average :type include_x_terms: bool :param include_x_terms: whether to include cross terms in the conformational averaging :type only_lowest_energy: bool :param only_lowest_energy: use only the lowest energy conformer rather than averaging over conformers :type property_key: str :param property_key: the relevant property key, if not specified it is the same as the energy key :type atom_idx: int or None :param atom_idx: if an integer then the given property key is for an atomic property and this is the atom index, if None then the given property key is a structure property :raise ReactionWorkflowException: if there is an issue :rtype: list :return: ensemble of properties """ # being able to use both siblings and a property key depends # on if the property key is for a size extensive property, # i.e. property(ABC) = property(A) + property(B) + property(C), # such is the case for energy but not for example bite angle, etc. if include_x_terms and only_lowest_energy: msg = ('Simultaneous use of include_x_terms and ' 'only_lowest_energy is not supported.') raise ReactionWorkflowException(msg) if atom_idx and not property_key: msg = ('Atomic properties require an input property key.') raise ReactionWorkflowException(msg) if atom_idx and len(conformers_dict) > 1: msg = ('Atomic properties for sibling groups is not supported.') raise ReactionWorkflowException(msg) property_key = property_key or energy_key # the following getter takes a list that contains a sublist of conformers # for each reactant, if including cross terms then return the iterator # for all possible collections of mixed conformers of reactants else return # the original list getter = lambda x: itertools.product(*x) if include_x_terms else x ensemble = [] for sample in getter(conformers_dict.values()): # see MATSCI-12187 - deduplicate conformational averaging by geometry self._setUniqueGeomDict(sample) sample = self._getUniqueGeomSts(sample) energies_properties = [] for st in sample: energy = st.property.get(energy_key) if atom_idx: if atom_idx <= st.atom_total: aproperty = st.atom[atom_idx].property.get(property_key) else: aproperty = None else: aproperty = st.property.get(property_key) pair = (energy, aproperty) if None in pair: return [] energies_properties.append(pair) min_energy, min_property = min(energies_properties, key=lambda x: x[0]) energies, properties = list(zip(*energies_properties)) if include_x_terms: ensemble.append(sum(properties)) elif only_lowest_energy: ensemble.append(min_property) elif do_boltzmann: # calling code needs to ensure that conversion brings energies # to kcal/mol and that if the energy is temperature dependent # that temperature in K should be used, wp is the Boltzmann # averaged property, i runs over conformers, p_i is the property # being Boltzmann averaged, beta is 1/(kT) in units of mol/kcal, # E_i and E_min are in kcal/mol, and Z is the partition function # # wp = (sum_i p_i * exp(-beta * (E_i - E_min))) / Z # Z = sum_i exp(-beta * (E_i - E_min)) # deltas = conversion * (numpy.array(energies) - min_energy) factors = numpy.exp(-1 * deltas / (IDEAL_GAS_CONSTANT * temp)) weighted_property = sum(properties * factors) partition_f = sum(factors) weighted_property /= partition_f ensemble.append(weighted_property) else: ensemble.append(numpy.mean(properties)) if not include_x_terms and ensemble: # see MATSCI-11763 - For sibling groups containing a mix of intermediates # and transition states treat the former as a spectator and use the # most negative frequency as the representative for the group, otherwise # assume the property is size-extensive and can be summed over the # siblings to arrive at the total property positive = [0 <= value for value in ensemble] if (property_key == jaguarworkflows.LOWEST_FREQUENCY_PROP and any(positive) and not all(positive)): afunc = min else: afunc = sum ensemble = [afunc(ensemble)] return ensemble
[docs] def getProperties(self, include_x_terms=False, only_lowest_energy=False, property_key=None, atomic=False, temps=None): """ Return the properties. :type include_x_terms: bool :param include_x_terms: whether to include cross terms :type only_lowest_energy: bool :param only_lowest_energy: use only the lowest energy conformer rather than averaging over conformers :type property_key: str :param property_key: the property key, if not specified energy energy keys are used :type atomic: bool :param atomic: if True then the given property key is an atomic property, otherwise is a structure property :type temps: list :param temps: temperatures in K, only used for temperature independent energy and property keys :raise ReactionWorkflowException: if there is an issue :rtype: list[EnergyAnalysisProperty] :return: the properties """ if include_x_terms and only_lowest_energy: msg = ('Simultaneous use of include_x_terms and ' 'only_lowest_energy is not supported.') raise ReactionWorkflowException(msg) if atomic and not property_key: msg = ('Atomic properties require an input property key.') raise ReactionWorkflowException(msg) temps = temps or [jmswfu.DEFAULT_TEMP_START] properties = [] for energy_key in self.energy_keys: conversion, energy_key_press, energy_key_temp = \ self.getUnitsData(energy_key) do_boltzmann = bool(conversion) conversion = conversion or 1 this_property_key = property_key or energy_key _, this_property_key_press, this_property_key_temp = \ self.getUnitsData(this_property_key) # if property temperature or pressure differs from that # of the energy used for averaging then skip it t_state = (energy_key_temp and this_property_key_temp and energy_key_temp != this_property_key_temp) p_state = (energy_key_press and this_property_key_press and energy_key_press != this_property_key_press) if t_state or p_state: continue # get the temperature used for averaging, comes from either # the given energy key, property key, or specified temperatures if energy_key_temp: these_temps = [energy_key_temp] elif this_property_key_temp: these_temps = [this_property_key_temp] else: these_temps = list(temps) press = this_property_key_press or energy_key_press or 1 for temp in these_temps: # get the key that will hold the average property if this_property_key_temp: avg_property_key = this_property_key else: ext = jaguarworkflows.get_temp_press_key_ext(temp, press) avg_property_key = f'{this_property_key}{ext}' for sibling_group, conformers_dict in self.sibling_conformers_dict.items( ): representative_conformers = tuple( min(conformers, key=lambda x: x.property[energy_key]) for conformers in conformers_dict.values()) # get atom indices if doing atomic properties if atomic: if len(conformers_dict) > 1: msg = ( 'Atomic properties for sibling groups is not ' 'supported.') raise ReactionWorkflowException(msg) atom_idxs = list( range(1, representative_conformers[0].atom_total + 1)) else: atom_idxs = [None] for atom_idx in atom_idxs: ensemble = tuple( self.getPropertyEnsemble( conformers_dict, energy_key, conversion, temp, do_boltzmann, include_x_terms=include_x_terms, only_lowest_energy=only_lowest_energy, property_key=this_property_key, atom_idx=atom_idx)) if not ensemble: continue # define property object aproperty = EnergyAnalysisProperty( sibling_group=sibling_group, conformer_groups=tuple(conformers_dict.keys()), representative_conformers=representative_conformers, temperature=temp, pressure=press, energy_key=energy_key, property_key=this_property_key, avg_property_key=avg_property_key, atom_idx=atom_idx, ensemble=ensemble, include_x_terms=include_x_terms, only_lowest_energy=only_lowest_energy) properties.append(aproperty) return properties
[docs] def getConfAvgRelEnergies(self, include_x_terms=True, only_lowest_energy=False): """ Return the conformationally averaged energies relative to that of the reactants. :type include_x_terms: bool :param include_x_terms: whether to include cross terms in the conformational averaging :type only_lowest_energy: bool :param only_lowest_energy: use only the lowest energy conformer rather than averaging over conformers :raise ReactionWorkflowException: if there is an issue :rtype: dict :return: keys are sibling group names, values are dicts with energy keys as keys and energy values as values """ if include_x_terms and only_lowest_energy: msg = ('Simultaneous use of include_x_terms and ' 'only_lowest_energy is not supported.') raise ReactionWorkflowException(msg) # with cross terms in the conformational averaging all possible # collections of conformers within a sibling group as well # between sibling groups are included in the averaging which # is done last, for example a high energy conformer of A is # allowed to react with a low energy conformer of B, etc. and that # relative to some collection of conformers from another sibling # group both (high, low), (low, high), etc. collections of (A, B) # are included, without cross terms the averaging is done first # over each molecule in the sibling group thereby reducing the # energy of the group down to a single number # canonical ensemble # relevant energies reported relative to reactants reactants_sibling_group = self.getReactantsSiblingGroupName() reactants_conformers_dict = self.getReactantsConformersDict() # for each energy key for both reactants and non-reactants # obtain a conformeric ensemble of energies summed over siblings # and collect energy differences for all pairs then perform a # Boltzmann average if the energy is both temperature dependent # and of known units and including cross terms, otherwise perform # an algebraic average edict = {} for energy_key in self.energy_keys: conversion = self.getKcalPerMolConversion(energy_key) temp = self.getTemperature(energy_key) do_boltzmann = conversion and temp conversion = conversion or 1 reactants_energies = self.getPropertyEnsemble( reactants_conformers_dict, energy_key, conversion, temp, do_boltzmann, include_x_terms=include_x_terms, only_lowest_energy=only_lowest_energy) if not reactants_energies: # this means that at least a single reactant doesn't # have the given energy_key defined, so skip the energy_key continue header = self.getHeader(energy_key) edict.setdefault(reactants_sibling_group, {})[header] = 0. for sibling_group, conformers_dict in self.sibling_conformers_dict.items( ): if sibling_group == reactants_sibling_group: continue energies = self.getPropertyEnsemble( conformers_dict, energy_key, conversion, temp, do_boltzmann, include_x_terms=include_x_terms, only_lowest_energy=only_lowest_energy) if not energies: # this means that at least a single non-reactant child doesn't # have the given energy_key defined, so first undefine it for # all sibling groups and then skip the energy_key by breaking # out of the non-reactant child loop for _edict in edict.values(): _edict.pop(header, None) break deltas = numpy.array([ conversion * (enr - er) for enr, er in itertools.product( energies, reactants_energies) ]) if do_boltzmann and include_x_terms: factors = numpy.exp(-1 * deltas / (IDEAL_GAS_CONSTANT * temp)) avg_delta = sum([ delta * factor for delta, factor in zip(deltas, factors) ]) partition_f = sum(factors) # see MATSCI-7153 - where the denominator can be problematic with warnings.catch_warnings(): warnings.filterwarnings('error') try: avg_delta /= partition_f except RuntimeWarning: msg = ( 'Large energy differences with respect to reactants ' 'have been detected. This is typically a sign of a severely ' 'imbalanced chemical reaction in the reaction workflow input ' 'file.') raise ReactionWorkflowException(msg) else: avg_delta = numpy.mean(deltas) edict.setdefault(sibling_group, {})[header] = avg_delta return edict
[docs] def getGraph(self): """ Return a NetworkX directed graph of the reaction workflow energy level diagram. :rtype: networkx.DiGraph :return: nodes are sibling group names and have energy dictionary kwargs, edges are directed (parent, child) pairs """ edict_w_x = self.getConfAvgRelEnergies(include_x_terms=True, only_lowest_energy=False) edict_wo_x = self.getConfAvgRelEnergies(include_x_terms=False, only_lowest_energy=False) edict_lowest = self.getConfAvgRelEnergies(include_x_terms=False, only_lowest_energy=True) graph = nx.DiGraph() for sibling_group, conformers_dict in self.sibling_conformers_dict.items( ): graph.add_node(sibling_group, edict_w_x=edict_w_x[sibling_group], edict_wo_x=edict_wo_x[sibling_group], edict_lowest=edict_lowest[sibling_group]) # parents are equivalent just grab an arbitrary first item parents = next(iter(conformers_dict.values()))[0].property.get( PARENT_SIBLING_GROUPS_KEY) if not parents: continue for parent_sibling_group in parents.split(PARENT_SEPARATOR): graph.add_edge(parent_sibling_group, sibling_group) return graph
[docs] @staticmethod def getOrderedNodeNames(graph): """ Return an ordered list of node names from the given graph. :type graph: networkx.DiGraph :param graph: nodes are sibling group names and have energy dictionary kwargs, edges are directed (parent, child) pairs :rtype: list :return: contains ordered node names """ # for example, all lines point right, up-right, or down-right # (just a non-physical example): # # I2 -- TS2 # / \ / \ # TS1 I3 P # / \ / # R I1 # # is ordered as R,TS1,I1,I2,I3,TS2,P # find the reactant node for node in graph.nodes(): if not list(graph.predecessors(node)): reactant_node = node break # start with the reactant node nodes = [reactant_node] current_nodes = [reactant_node] # collect nodes in order by adding for the current nodes # all successor nodes that have no predecessor nodes that # have not already been collected, currently successor nodes # are sorted by name rather than energy, avoid adding the # same node twice which can happen for loops in the graph, # the current nodes to search are updated until all nodes # have been collected while len(nodes) < graph.number_of_nodes(): tmp = [] for current_node in current_nodes: for successor_node in sorted(graph.successors(current_node)): if not set(graph.predecessors(successor_node)).difference( nodes): if successor_node not in nodes: nodes.append(successor_node) tmp.append(successor_node) current_nodes = list(tmp) return nodes
[docs] def writeDataFiles(self): """ Write data files. """ # get ordered nodes graph = self.getGraph() node_names = self.getOrderedNodeNames(graph) base_name = jobutils.get_jobname(DEFAULT_JOB_NAME) out_files = [] # handle the conformationally averaged energies relative to reactants for ext in REL_REACTANTS_EXTS: conf_avg_rel_reactants_file = base_name + ext kwarg = REL_EXT_TO_KWARG_DICT[ext] with csv_unicode.writer_open(conf_avg_rel_reactants_file) as afile: writer = csv.writer(afile) for idx, name in enumerate(node_names): edict = graph.nodes[name][kwarg] if not idx: headers = sorted(edict.keys()) writer.writerow([SIBLING_GROUP_NAME] + headers) values = [name] + [edict[header] for header in headers] writer.writerow(values) out_files.append(conf_avg_rel_reactants_file) # handle the conformationally averaged energies relative to parents if len(node_names) > 1: for ext in REL_PARENTS_EXTS: conf_avg_rel_parents_file = base_name + ext kwarg = REL_EXT_TO_KWARG_DICT[ext] final_edict = {} for child_name in node_names: child_edict = graph.nodes[child_name][kwarg] parents_edict = {} for parent_name in graph.predecessors(child_name): parent_edict = graph.nodes[parent_name][kwarg] parents_edict[parent_name] = dict([ (key, child_edict[key] - parent_edict[key]) for key in child_edict.keys() ]) if parents_edict: final_edict[child_name] = parents_edict with open(conf_avg_rel_parents_file, 'w') as afile: json.dump(final_edict, afile, sort_keys=True, indent=4, separators=(',', ':')) out_files.append(conf_avg_rel_parents_file) for out_file in out_files: jobutils.add_outfile_to_backend(out_file)
[docs]def get_stage_idx(astr, is_filename=False): """ Return the stage index from the given string, can be a filename. :type astr: str :param astr: the string :type is_filename: bool :param is_filename: Whether astr is filename or not :rtype: int :return: the stage index """ if is_filename: astr = fileutils.get_basename(astr) head, stage_hash = astr.split(STAGE_SEPARATOR) stage_idx = stage_hash.split('_')[0] return int(stage_idx)
[docs]def check_TS_vetting(out_file): """ Return False if TS vetting failed in the given Jaguar out file. :type out_file: str :param out_file: Jaguar out file :rtype: bool :return: False if TS vetting failed """ with open(out_file, 'r') as afile: for line in afile: if line.startswith('Unable to find any valid transition vectors.'): return False return True
[docs]def get_sub_host_str(obj, sub_host_attr, n_procs_attr): """ Return the command line -HOST argument for using a subhost. :type obj: object :param obj: the object, possibly having the given attributes defined :type sub_host_attr: str :param sub_host_attr: the attribute for the subhost :type n_procs_attr: str :param n_procs_attr: the attribute for the number of processors """ try: host = f'{getattr(obj, sub_host_attr)}:{getattr(obj, n_procs_attr)}' except AttributeError: host = jobutils.AUTOHOST return host
[docs]class ConformationalSearchException(Exception): pass
[docs]class ConformationalSearchMixin(object): """ Manage a MacroModel conformational search. """
[docs] @staticmethod def genEtaRotamers(sibling_conformers_dict, only_rings=True): """ Generate eta-rotamers. :type sibling_conformers_dict: dict :param sibling_conformers_dict: dictionary of dictionaries where the outer dictionary is keyed by sibling and inner dictionary is keyed by conformer and values of the inner dictionary are lists of structures :type only_rings: bool :param only_rings: if True then only allow rotation of eta-bound rings, if False then also allow rotation of ligands where the eta-bound motif is acyclic, for example ethene, etc. :rtype: dict, dict :return: dictionary of dictionaries where the outer dictionary is keyed by sibling and inner dictionary is keyed by conformer and values of the inner dictionary are lists of structures, for eta-complexes incoming conformers have been replaced with all possible rotamers, another dictionary mapping the structure titles of generated rotamers to the structure title of the structure from which they were generated """ new_sibling_conformers_dict = {} title_dict = {} for sibling_group, conformers_dict in sibling_conformers_dict.items(): new_conformers_dict = {} for conformer_group, conformers in conformers_dict.items(): new_conformers = [] for conformer in conformers: try: rotamers = etarotamers.get_rotamers( conformer, only_rings=only_rings) except etarotamers.EtaRotamersException: # this means that the conformers are not eta-complexes, # no rotamers needed, use original conformers new_conformers = conformers for conformer in conformers: title_dict[conformer.title] = conformer.title break else: new_conformers.extend(rotamers) for rotamer in rotamers: title_dict[rotamer.title] = conformer.title new_conformers_dict[conformer_group] = new_conformers new_sibling_conformers_dict[sibling_group] = new_conformers_dict return new_sibling_conformers_dict, title_dict
[docs] def createConformers(self, sts): """ Create the conformers. :type sts: list :param sts: contains schrodinger.structure.Structure, the structures for which to create conformers, each unique type of structure should have a unique conformer group structure property keyed by CONFORMERS_GROUP_KEY, structures sharing the same CONFORMERS_GROUP_KEY should be conformers of the same structure and are used to seed the conformational search, an additional optional SIBLING_GROUP_KEY can be used to distinguish related groups of conformers, atoms marked with the property RESTRAINED_ATOM_KEY will be restrained :raise ConformationalSearchException: if there is an issue """ # in case incoming structures are in the centroid representation sts = [st.copy() for st in sts] for st in sts: etatoggle.toggle_structure(st, out_rep=parserutils.ETA) sibling_conformers_in_dict = bin_structures_by_property( sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY) if not self.n_conformers: self.sibling_conformers_dict = sibling_conformers_in_dict return skip_eta_rotamers = getattr(self, 'skip_eta_rotamers', False) if skip_eta_rotamers: t_sibling_conformers_in_dict = sibling_conformers_in_dict else: t_sibling_conformers_in_dict, title_dict = self.genEtaRotamers( sibling_conformers_in_dict, only_rings=self.only_rings) # see MATSCI-11138 where we need to now overwrite the unique # geometry dictionary given that we have potentially generated # more input structures if isinstance(self, UniqueGeomMixin): t_sibling_conformers_in_dict = self._updateUniqueGeoms( t_sibling_conformers_in_dict, title_dict) host = get_sub_host_str(self, 'subhost', 'n_jmswf_subjobs') # assume equivalence of restrain indices among the structures in # each bin, in this case run_conformational_search adds jobs to the # given job_dj launch_dir = 'csearch' job_dj = jobutils.create_queue(host=host) for sibling_group, conformers_dict in t_sibling_conformers_in_dict.items( ): for conformer_group, conformers in conformers_dict.items(): first_conformer = conformers[0] restrain_idxs = get_restrain_atom_idxs(first_conformer) ffld_name = getattr(self, 'ffld_name', msutils.get_default_forcefield().name) run_conformational_search(conformers, n_conformers=self.n_conformers, restrain_idxs=restrain_idxs, seed=self.seed, launch_dir=launch_dir, base_name=conformer_group, ffld_name=ffld_name, job_dj=job_dj) # MacroModel information like errors, etc. is sent to stdout which # is the JobControl log file, so the following run was originally # silenced but then allowed so that the JobControl progress can # be seen job_dj.run() all_failed = not job_dj.isComplete() force_return_csearch_files = all_failed if not all_failed: pp_rel_energy_thresh = getattr(self, 'pp_rel_energy_thresh', None) failed_hashes = [] sibling_conformers_out_dict = {} for sibling_group, conformers_dict in t_sibling_conformers_in_dict.items( ): sibling_conformers_out_dict[sibling_group] = {} for conformer_group, conformers in conformers_dict.items(): out_mae_path = os.path.join( launch_dir, f'{conformer_group}{OUT_EXT}.mae') # see MATSCI-1974 for the reasoning behind the second check if os.path.exists( out_mae_path) and structure.count_structures( out_mae_path): postprocess_conformational_search( conformers[0], out_mae_path, ffld_name) conformers_out = [ st for st in structure.StructureReader(out_mae_path) ] conformers_out = get_conformers( conformers_out, self.n_conformers, pp_rel_energy_thresh=pp_rel_energy_thresh, ffld_name=ffld_name) else: failed_hashes.append(conformer_group) conformers_out = list( sibling_conformers_in_dict[sibling_group] [conformer_group]) sibling_conformers_out_dict[sibling_group][ conformer_group] = conformers_out if failed_hashes: force_return_csearch_files = True n_total_conformers = sum( len(conformers_dict) for conformers_dict in t_sibling_conformers_in_dict.values()) all_failed = len(failed_hashes) == n_total_conformers if not all_failed and self.logger: msg = ( 'MacroModel conformational search jobs for the ' 'following conformer groups have failed: {groups}. ' 'Proceeding with original conformers for these groups. ' 'All output files will be returned.').format( groups=','.join(failed_hashes)) self.logger.warning(msg) if not all_failed: self.sibling_conformers_dict = sibling_conformers_out_dict if self.return_csearch_files or force_return_csearch_files: jobutils.add_zipfile_to_backend(launch_dir) if all_failed: msg = ('MacroModel conformational search jobs have failed.') raise ConformationalSearchException(msg)
[docs]class JMSWFException(Exception): pass
[docs]class JMSWFMixin(RepresentativeConformersMixin): """ Manage a Jaguar multistage workflow. """ # various structure and atom properties dictate the behavior of this class def _overwriteConformerEIDS(self): """ Overwrite the Maestro entry ID properties for the conformers. """ # needed for Jaguar multistage workflow # see MATSCI-11669 - it is possible that input structures may have the # entry ID property undefined, handle that by grabbing the maximum value # that is defined in the input and incrementing from there max_eid = 0 for conformers_dict in self.sibling_conformers_dict.values(): for conformers in conformers_dict.values(): for conformer in conformers: eid = int(conformer.property.get(ENTRY_ID_KEY, 0)) max_eid = max(max_eid, eid) idx = 0 fall_back_eid = max_eid for conformers_dict in self.sibling_conformers_dict.values(): for conformers in conformers_dict.values(): fall_back_eid += 1 for conformer in conformers: idx += 1 eid = conformer.property.get(ENTRY_ID_KEY, str(fall_back_eid)) self.eids_dict.setdefault(eid, []).append(str(idx)) conformer.property[ENTRY_ID_KEY] = str(idx) def _getJMSWFGenResKeywords(self, st, include_temp_press_data=True): """ Return the general reserved Jaguar multistage workflow keywords for the given structure. :type st: schrodinger.structure.Structure :param st: the structure :type include_temp_press_data: bool :param include_temp_press_data: whether to include the temperature and pressure data :rtype: OrderedDict :return: the keywords """ adict = OrderedDict([('molchg', str(st.property.get(CHARGE_KEY, 0))), ('multip', str(st.property.get(MULTIPLICITY_KEY, 1)))]) temp_data = getattr(self, 'temp_data', None) press_data = getattr(self, 'press_data', None) if include_temp_press_data and temp_data and press_data: adict['tmpini'] = str(temp_data.temp_start) adict['tmpstp'] = str(temp_data.temp_step) adict['ntemp'] = str(temp_data.temp_n) adict['press'] = str(press_data.press_start) adict['press_step'] = str(press_data.press_step) adict['npress'] = str(press_data.press_n) # see MATSCI-10584 and linked cases, if the Jaguar input file has # an initial guess wavefunction with a given dftname and basis, for # example from a previous calculation, that are the same as the dftname # and basis for the new job then igonly=1 (skip SCF) is set automatically # if igonly was not set, if the new job asks for properties, which will be # the general case, then the job will fail due to missing matrices required # for the property evaluation, prevent this by always setting igonly=0 adict['igonly'] = '0' return adict def _getJMSWFOptKeywords(self, st, set_freq=True): """ Return the Jaguar multistage workflow optimization keywords for the given structure. :type st: schrodinger.structure.Structure :param st: the structure :type set_freq: bool :param set_freq: if True then set 'ifreq=1' :rtype: str :return: the string of keywords """ reserved_jaguar_keywords = self._getJMSWFGenResKeywords( st, include_temp_press_data=set_freq) reserved_jaguar_keywords['igeopt'] = '1' if set_freq: reserved_jaguar_keywords['ifreq'] = '1' jaguar_keywords = self.jaguar_keywords.copy() jaguar_keywords.update(reserved_jaguar_keywords) return ' '.join(get_jaguar_keywords_list(jaguar_keywords)) @staticmethod def _getJMSWFCoords(st, are_restraints=True): """ Return the Jaguar multistage workflow coords for the given structure. :type st: schrodinger.structure.Structure :param st: the structure :type are_restraints: bool :param are_restraints: True if the coords are restraint coords, as opposed to active coords :rtype: list :return: contains strings defining the coords """ atype_key_pairs = [(4, RESTRAINED_DISTANCES_KEY), (5, RESTRAINED_ANGLES_KEY), (6, RESTRAINED_DIHEDRALS_KEY)] fields = ['{atype}', '{idxs}'] if are_restraints: fields = [jmswfu.NONE] + fields aformat = jmswfu.DELIM.join(fields) coords = [] for atype, key in atype_key_pairs: params = get_int_tuples_from_str_property(st, key) for param in params: idxs = jmswfu.DELIM.join(str(idx) for idx in param) coord = aformat.format(atype=atype, idxs=idxs) coords.append(coord) return coords def _getJMSWFTSKeywords(self, st): """ Return the Jaguar multistage workflow transition state search keywords for the given structure. :type st: schrodinger.structure.Structure :param st: the structure :rtype: str :return: the string of keywords """ reserved_jaguar_keywords = self._getJMSWFGenResKeywords( st, include_temp_press_data=True) reserved_jaguar_keywords['igeopt'] = '2' reserved_jaguar_keywords['ifreq'] = '1' reserved_jaguar_keywords['itrvec'] = '-6' reserved_jaguar_keywords['nhesref'] = '-1' # Jaguar TS vetting options, intentionally use the default # for ts_vet_olap_cut, for ts_vet use the value +3 which considers # both the eigenvector overlap with active atoms and distances # among active atoms, despite the Jaguar documentation using +3 # as opposed to -3 does not cause a job failure so check # afterwards reserved_jaguar_keywords['ts_vet'] = '3' jaguar_keywords = self.jaguar_keywords.copy() jaguar_keywords.update(reserved_jaguar_keywords) return ' '.join(get_jaguar_keywords_list(jaguar_keywords)) def _getJMSWFStages(self): """ Return the Jaguar multistage workflow stages. :rtype: list :return: contains the jmswfu.StageData for the Jaguar multistage workflow """ has_ts = getattr(self, 'has_ts', False) # if there is a TS stage then prevent divergence in the stage # indices by breaking up the work needed for non-TS stages # into two stages aformat = jmswfu.DELIM.join(['{eid}', '{rest}']) opt_jaguar_keywords_lines = [] opt_restraints_lines = [] ts_jaguar_keywords_lines = [] ts_restraints_lines = [] ts_actives_lines = [] for sibling_group, conformers_dict in self.sibling_conformers_dict.items( ): for conformer_group, conformers in conformers_dict.items(): for conformer in conformers: eid = conformer.property[ENTRY_ID_KEY] all_opt_restraints = self._getJMSWFCoords( conformer, are_restraints=True) set_freq = not has_ts and not bool(all_opt_restraints) opt_jaguar_keywords = self._getJMSWFOptKeywords( conformer, set_freq=set_freq) opt_jaguar_keywords_lines.append( aformat.format(eid=eid, rest=opt_jaguar_keywords)) for opt_restraints in all_opt_restraints: opt_restraints_lines.append( aformat.format(eid=eid, rest=opt_restraints)) is_ts = conformer.property.get( TRANSITION_STATE_STRUCTURE_KEY) if has_ts or is_ts: if not is_ts: set_freq = not bool(all_opt_restraints) ts_jaguar_keywords = self._getJMSWFOptKeywords( conformer, set_freq=set_freq) for opt_restraints in all_opt_restraints: ts_restraints_lines.append( aformat.format(eid=eid, rest=opt_restraints)) else: ts_jaguar_keywords = self._getJMSWFTSKeywords( conformer) for ts_actives in self._getJMSWFCoords( conformer, are_restraints=False): ts_actives_lines.append( aformat.format(eid=eid, rest=ts_actives)) ts_jaguar_keywords_lines.append( aformat.format(eid=eid, rest=ts_jaguar_keywords)) # the final hessian from a restrained optimization may not be a good # guess for a transition state search so do not inherit it from the # parent in_txt_file = os.path.join(self._jmswf_launch_dir, self._jmswf_in_txt_file) with open(in_txt_file, 'w') as afile: afile.write(jmswfu.NEW_STAGE + '\n') afile.write(jmswfu.KEYWORDS + '\n') for opt_jaguar_keywords_line in opt_jaguar_keywords_lines: afile.write(opt_jaguar_keywords_line + '\n') if opt_restraints_lines: afile.write(jmswfu.GEOM_CONSTRAINTS + '\n') for opt_restraints_line in opt_restraints_lines: afile.write(opt_restraints_line + '\n') if ts_jaguar_keywords_lines: afile.write(jmswfu.NEW_STAGE + '\n') afile.write(jmswfu.PARENT + '\n') afile.write( jmswfu.DELIM.join(['1', jmswfu.WAVEFUNCTION]) + '\n') afile.write(jmswfu.KEYWORDS + '\n') for ts_jaguar_keywords_line in ts_jaguar_keywords_lines: afile.write(ts_jaguar_keywords_line + '\n') if ts_restraints_lines: afile.write(jmswfu.GEOM_CONSTRAINTS + '\n') for ts_restraints_line in ts_restraints_lines: afile.write(ts_restraints_line + '\n') if ts_actives_lines: afile.write(jmswfu.ACTIVE_COORDINATES + '\n') for ts_actives_line in ts_actives_lines: afile.write(ts_actives_line + '\n') stages = jmswfu.read_stage_datafile(in_txt_file) fileutils.force_remove(in_txt_file) return stages def _updateJMSWFStages(self, stages): """ Update the Jaguar multistage workflow stages. :type stages: list :param stages: contains the jmswfu.StageData for the Jaguar multistage workflow :raise JMSWFException: if there is an issue :rtype: list :return: contains the jmswfu.StageData for the Jaguar multistage workflow """ unique_geom_dict = getattr(self, 'unique_geom_dict', {}) n_unique_geom = len(unique_geom_dict) # here we need to read in the extra stages file and modify # the returned jmswfu.StageData objects so that they can # be seamlessly appended to the given stages, the first # of these extra stages is skipped so as to allow analysis # to potentially be the first extra stage extra_stages = jmswfu.read_stage_datafile(self.extra_stages_file)[1:] n_prev_stages = len(stages) # will be either 1 or 2 existing_names = {x.info.name for x in stages} for index, extra_stage in enumerate(extra_stages, n_prev_stages + 1): # update the index of this stage extra_stage.index = index # We are merging stages from two different sources and the job # name depends on the stage name, so make sure all the extra # stages have names that are unique count = index while extra_stage.info.name in existing_names: extra_stage.info.name = jmswfu.GENERIC_STAGE_TAG + str(count) count += 1 existing_names.add(extra_stage.info.name) # update any parent indices for stage in extra_stage.parent_data: stage.stage += n_prev_stages - 1 # update the parenting indices for any analysis stages for stage in extra_stage.analyze_data: stage.parent_st_idx += n_prev_stages - 1 stage.parent_idxs = [ i + n_prev_stages - 1 for i in stage.parent_idxs ] # update entry IDs, the original entry IDs have been reset to start at # 1 and now include values for any generated conformers, see # ._overwriteConformerEIDS() and .eids_dict which for example # might contain something like eids_dict[17] = [4,5,6,7] if for the # original entry ID 17 there were 4 conformers generated which were # assigned entry IDs 4, 5, 6, and 7, all entry_data data needs # updated entry IDs for stage_subsection, eid_dict in extra_stage.entry_data.items(): # see MATSCI-11251 - if using geometry deduplication then for relevant # stages in the extra stages file the number of structure entries will # be greater than it should be due to the deduplication so update it if n_unique_geom: all_eid_pairs = list(eid_dict.items()) unique_eid_pairs = all_eid_pairs[:n_unique_geom] eid_dict = dict(unique_eid_pairs) # see MATSCI-6429 - since the main interface and all known use cases # force equivalent Jaguar keyword data for all structure entries in an # extra stage we will overwrite stale entry IDs by making this assumption # and choosing an arbitrary mapping of stale entry IDs to current entry # IDs uncommon = set(eid_dict.keys()).difference( self.eids_dict.keys()) if len(eid_dict) == len(self.eids_dict) and uncommon: eid_dict = dict( zip(self.eids_dict.keys(), eid_dict.values())) new_eid_dict = {} for eid, datas in eid_dict.items(): for new_eid in self.eids_dict.get(eid, []): new_datas = [] for data in datas: new_data = copy.deepcopy(data) new_data.eid = new_eid new_datas.append(new_data) new_eid_dict[new_eid] = new_datas if not new_eid_dict: msg = ( f'The extra stages file {self.extra_stages_file} contains ' 'at least a single stage that does not define stage data ' 'for any of the entries in the input file.') raise JMSWFException(msg) extra_stage.entry_data[stage_subsection] = new_eid_dict return stages + extra_stages def _setUpJMSWF(self): """ Set up the Jaguar multistage workflow. :rtype: list :return: contains the jmswfu.StageData for the Jaguar multistage workflow """ extra_stages_file = getattr(self, 'extra_stages_file', None) os.makedirs(self._jmswf_launch_dir, exist_ok=True) with structure.StructureWriter( os.path.join(self._jmswf_launch_dir, self._jmswf_in_mae_file)) as writer: for sibling_group, conformers_dict in self.sibling_conformers_dict.items( ): for conformer_group, conformers in conformers_dict.items(): for conformer in conformers: writer.append(conformer) # jmswfu manages the global smap file which if there aren't any # frequency jobs will be removed at the end base_name = os.path.join(self._jmswf_launch_dir, self._jmswf_base_name) + OUT_EXT jmswfu.create_smap(base_name, self._jmswf_out_mae_file) stages = self._getJMSWFStages() if extra_stages_file: stages = self._updateJMSWFStages(stages) jmswfu.write_stages_file( stages, os.path.join(self._jmswf_launch_dir, self._jmswf_in_txt_file)) return stages
[docs] def runJMSWF(self): """ Run the Jaguar multistage workflow. :raise JMSWFException: if there is an issue """ stages = self._setUpJMSWF() options = argparse.Namespace(TPP=self.tpp, name=self._jmswf_base_name, input_file=self._jmswf_in_mae_file) host = get_sub_host_str(self, 'subhost', 'n_jmswf_subjobs') job_dj = jobutils.create_queue(options=options, host=host) # uses jobutils.RobustSubmissionJob by default with fileutils.chdir(self._jmswf_launch_dir): workflows = jmswfu.create_workflows(options, job_dj, stages, self._jmswf_out_smap_file, hierarchical=False, robust=True, tmp_logger=self.logger) with structure.StructureWriter(self._jmswf_out_mae_file) as writer: jaguarworkflows.run_workflows(job_dj, workflows, writer) backend = None jmswfu.finalize_smap(self._jmswf_out_smap_file, backend) # for restarts it is possible that nothing was done all_failed = job_dj.total_added and len( job_dj._failed) == job_dj.total_added # handle partial failures in self.setRepresentatives if self.return_jaguar_files or all_failed: jobutils.add_zipfile_to_backend(self._jmswf_launch_dir) if all_failed: msg = ('All Jaguar multistage workflow jobs have failed. ' 'All output files will be returned.') raise JMSWFException(msg)
[docs] def prepareJMSWFOutput(self): """ Prepare Jaguar multistage workflow output. """ exts = [f'{OUT_EXT}.maegz', f'{OUT_EXT}.smap' ] + jmswfu.SMAP_ELIGIBLE_EXTENSIONS for ext in exts: pattern = os.path.join(self._jmswf_launch_dir, '*' + ext) for apath in glob.glob(pattern): afile = os.path.split(apath)[1] shutil.copy(apath, afile) jobutils.add_outfile_to_backend(afile)
[docs] def checkJMSWFOutputs(self, out_files): """ Raises JMSWFException if any of the given Jaguar out files should be treated as a failure. :type out_files: list :param out_files: contains Jaguar output files :raises JMSWFException: If any of the given Jaguar out files should be treated as a failure """ # for transition state structures vetting is used however a bad # result doesn't cause a Jaguar failure, nor does multiple imaginary # frequencies for either optimizations or transition states for out_file in out_files: # Jaguar failures have already been handled in the calling code # so missing Jaguar *out files means that the stage is an analysis # stage out_path = os.path.join(self._jmswf_launch_dir, out_file) if not os.path.exists(out_path): continue # skip non-frequency jobs jag_out = JaguarOutput(out_path) normal_modes = anharmonic.get_normal_modes(jag_out) if not normal_modes: continue # check TS, when asking for vetting there are three outcomes: # (1) vetting performed and a vetted vector found # (2) vetting performed and a vetted vector not found # (3) vetting not performed (see MATSCI-5145) # # see MATSCI-10176 - jag_out.getStructure() does NOT include input # Maestro structure properties mae_path = os.path.join(self._jmswf_launch_dir, f'{jag_out.restart}.mae') st = structure.Structure.read(mae_path) if st.property.get(TRANSITION_STATE_STRUCTURE_KEY): if not check_TS_vetting(out_path): # the found transition state does not involve the requested # active atoms raise JMSWFException( "The found transition state does not involve the requested active atoms" ) normal_mode = jag_out.vetted_ts_vector if normal_mode and normal_mode.frequency != min( [x.frequency for idx, x in normal_modes]): # the normal mode involving the active atoms is not the lowest # frequency mode raise JMSWFException( "The normal mode involving the active atoms is not the lowest frequency mode" ) # check imaginary frequencies in_path = os.path.join(self._jmswf_launch_dir, f'{jag_out.restart}.in') if not os.path.exists(in_path): raise JMSWFException(f'The file {in_path} does not exists') jag_in = JaguarInput(input=in_path) try: anharmonic.check_imaginary_frequencies( jag_out, jag_in, max_i_freq=self.max_i_freq) except anharmonic.AnharmonicException as err: raise JMSWFException(str(err))
@staticmethod def _setProperties(st, key, stage_idx, value): """ Set properties on the given structure. :type st: schrodinger.structure.Structure :param st: the structure :type key: str :param key: the property key :type stage_idx: int :param stage_idx: the stage index for the property :type value: float :param value: the property value :rtype: set :return: the created property keys """ rep_key = key + STAGE_SEPARATOR + str(stage_idx) adict = {rep_key: value} st.property.update(adict) return set(adict.keys()) def _updateSiblingConformersDict(self): """ Update the sibling conformers dict. """ # currently the self.sibling_conformers_dict contains conformers with # redundant structure titles if the conformers came from MacroModel, while # the conformers collected in the representatives coming from the Jaguar # multistage workflow job have been uniqueified using a '-<index>' appended # to the title, here we need to update the self.sibling_conformers_dict to # have unique titles, additionally the current self.sibling_conformers_dict # contains conformers considered as failures in the calling code and so also # remove any bad conformers rep_titles = set(rep_st.title for rep_st in self.rep_sts) for sibling_group, conformers_dict in list( self.sibling_conformers_dict.items()): for conformer_group, conformers in list(conformers_dict.items()): prev_title = None good_conformers = [] for idx, conformer in enumerate(conformers, 1): title = conformer.title if title == prev_title and idx > 1: title += f'-{idx}' else: prev_title = title if title in rep_titles: conformer.title = title good_conformers.append(conformer) self.sibling_conformers_dict[sibling_group][ conformer_group] = good_conformers
[docs] def setRepresentatives(self): """ Associated with each structure is output data from potentially multiple Jaguar multistage workflow stages. Pick representative structures to carry the data for all stages. :raise JMSWFException: if there is an issue """ # bin stage structures by original structure title, # titles are like 'reactant_stage_1', 'transition_state-2_stage_4_analysis_2', # etc. title_stage_sts_dict = {} for st in structure.StructureReader(self._jmswf_out_mae_file): title, stage_hash = st.title.split(STAGE_SEPARATOR) title_stage_sts_dict.setdefault(title, []).append((st, stage_hash)) stages = jmswfu.read_stage_datafile( os.path.join(self._jmswf_launch_dir, self._jmswf_in_txt_file)) rep_stage_idx = get_rep_stage_idx( rxnwf_file_path=self._jmswf_out_mae_file) warned = False copied = self.return_jaguar_files for title, pairs in title_stage_sts_dict.items(): skip_msg = False # the IndexError indicates that the required original workflow # Jaguar calculation has failed for this conformer, continue as there # may be other successful conformers try: rep_st = pairs[rep_stage_idx - 1][0].copy() except IndexError: skip_msg = ('Some Jaguar multistage workflow jobs have failed.') if not skip_msg: out_files = [f'{pair[0].title}.out' for pair in pairs] try: self.checkJMSWFOutputs(out_files) except JMSWFException as err: skip_msg = str(err) skip_msg += ( 'Postprocessing of at least a single Jaguar multistage workflow ' 'job has failed.') if skip_msg: if not warned: if self.logger: skip_msg += (' All output files will be returned.') self.logger.warning(skip_msg) warned = True if not copied: jobutils.add_zipfile_to_backend(self._jmswf_launch_dir) copied = True continue rep_st.title = title for st, stage_hash in pairs[rep_stage_idx - 1:]: stage_idx = int(stage_hash.split('_')[0]) stage = stages[stage_idx - 1] keys = stage.getPropertyKeys(st=st) # see MATSCI-10926 where we need to add a key to the keys from a # standard JMSWF if set([ jaguarworkflows.GAS_PHASE_ENERGY_PROP, jaguarworkflows.ZERO_POINT_ENERGY_PROP ]).issubset(keys): keys.append( jaguarworkflows.GAS_PHASE_ZERO_POINT_ENERGY_PROP) for key in keys: rep_st.property.pop(key, None) if key == jaguarworkflows.GAS_PHASE_ZERO_POINT_ENERGY_PROP: gpe = st.property.get( jaguarworkflows.GAS_PHASE_ENERGY_PROP) zpe = st.property.get( jaguarworkflows.ZERO_POINT_ENERGY_PROP) if gpe is not None and zpe is not None: zpe *= units.HARTREE_PER_KCAL_PER_MOL value = gpe + zpe else: value = None else: value = st.property.get(key) if value is not None: _rep_keys = self._setProperties(rep_st, key, stage_idx, value) self.rep_keys.update(_rep_keys) self.rep_sts.append(rep_st) if not self.rep_sts: msg = ( 'Postprocessing of all Jaguar multistage workflow jobs has failed.' ) raise JMSWFException(msg) self._updateSiblingConformersDict() # validate the output workflow file sibling_conformers_dict = bin_structures_by_property( self.rep_sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY) for sibling_group, conformer_group, conformer in self.representativeConformers( ): conformers_dict = sibling_conformers_dict.get(sibling_group) if not conformers_dict: msg = ( 'All Jaguar multistage workflow calculations for sibling ' f'group {sibling_group} have failed.') raise JMSWFException(msg) if not conformers_dict.get(conformer_group): msg = ( 'All Jaguar multistage workflow calculations for conformer ' f'group {conformer_group} have failed.') raise JMSWFException(msg)
[docs] def finalizeJMSWFOutput(self): """ Finalize the Jaguar multistage workflow output. :rtype: str :return: the Jaguar multistage workflow output file """ jmswf_tag = getattr(self, 'jmswf_tag', '') # create a valid non-redundant output file from the # Jaguar multistage workflow output file base_name = jobutils.get_jobname( DEFAULT_JOB_NAME) + f'{OUT_EXT}{jmswf_tag}' out_mae_file = base_name + '.mae' structure.write_cts(self.rep_sts, out_mae_file) # the difference between the jmswf and rxnwf output Maestro files is that # the latter, rather than providing a structure for each stage, chooses a # single structure to be the representative structure in the rxnwf output, # that representative comes from the last stage of the rxnwf excluding any # stages from a potential extra stages file, this representative carries # the properties of all stages, as a result of this if both a rxnwf and # extra stage specify keywords to create files with smap extensions, like # *vib, *vis, etc., only a single file can be linked, since it is unlikely # that this will arise as extra stages are typically for energy # post-processing choose to link those from the rxnwf stages, all will # still be available in the jmswf output files which are copied back to the # launch host unique_geom_dict = getattr(self, 'unique_geom_dict', {}) rep_stage_idx = get_rep_stage_idx(rxnwf_sts=self.rep_sts) rep_sts_dict = { rep_st.title: idx for idx, rep_st in enumerate(self.rep_sts, 1) } # since the Jaguar calculations were only performed on the representative # structure geometries the relevant files do not exist for the structures # that the representatives represent, so for these structures point # the files of the representatives smap_dict = {} for title, idx in rep_sts_dict.items(): title = get_unique_geom_title(title, unique_geom_dict) for ext in jmswfu.SMAP_ELIGIBLE_EXTENSIONS: pattern = f'{title}{STAGE_SEPARATOR}{rep_stage_idx}*{ext}' for afile in glob.glob(pattern): smap_dict.setdefault(afile, []).append(idx) if smap_dict: out_smap_file = jmswfu.create_smap(base_name, out_mae_file, smap_dict=smap_dict) jobutils.add_outfile_to_backend(out_smap_file) jobutils.add_outfile_to_backend(out_mae_file, set_structure_output=True) return out_mae_file
[docs] def getFreqStageIdxs(self): """ Return the stage indices of frequency stages. :rtype: tuple[int] :return: the stage indices """ freq_stage_idxs = set() pattern = os.path.join(self._jmswf_launch_dir, '*.out') for out_path in glob.glob(pattern): jag_out = JaguarOutput(out_path) if anharmonic.get_normal_modes(jag_out): out_file = os.path.split(out_path)[1] stage_idx = get_stage_idx(out_file, is_filename=True) freq_stage_idxs.add(stage_idx) return tuple(freq_stage_idxs)
[docs]class DescriptorsMixin(object): """ Manage running descriptors. """ DEFAULT_JOB_NAME = 'automatic_reaction_workflow'
[docs] def runDescriptors(self, files): """ Run descriptors. :type files: list :param files: the files on which to run descriptors :rtype: list :return: The output structure file paths. The returned list will be the same length as the input files list. """ # handle restarts job_name = jobutils.get_jobname(self.DEFAULT_JOB_NAME) zip_files = jaguar_restart.get_restart_zip_files( base_name_patterns=[f'{DESCRIPTORS_LAUNCH_DIR_HEADER}{job_name}*']) jaguar_restart.prepare_restart_dirs(zip_files) host = get_sub_host_str(self, 'jmswf_host', 'n_rxnwf_subjobs') # keep Jaguar &gen section to a minimum for now keys = ['dftname', 'basis'] jaguar_keywords = {key: self.jaguar_keywords[key] for key in keys} jaguar_keywords = get_jaguar_keywords_list(jaguar_keywords) jaguar_keywords = ' '.join(jaguar_keywords) options = argparse.Namespace(TPP=self.tpp) job_dj = jobutils.create_queue(options=options, host=host) base_cmd = ['complex_descriptors.py'] base_cmd += [FLAG_JAGUAR] base_cmd += [FLAG_JAGUAR_KEYWORDS, jaguar_keywords] base_cmd += [FLAG_TPP, str(self.tpp)] base_cmd += [FLAG_LIGFILTER] base_cmd += [FLAG_CANVAS] base_cmd += [FLAG_MOLDESCRIPTORS] base_cmd += [FLAG_INCLUDE_VECTORIZED] base_cmd += [FLAG_FINGERPRINTS] if self.return_descriptor_files: base_cmd += [FLAG_SAVEFILES] if host != jobutils.AUTOHOST: base_cmd += ['-HOST', self.jmswf_host] out_rep = getattr(self, 'out_rep', None) if out_rep: base_cmd += [parserutils.FLAG_OUT_REP, out_rep] # collect jobs, the complex descriptors driver does more than # just running Jaguar jobs so currently restart each job even # if all Jaguar *out files are available, for such cases the Jaguar # jobs are not restarted launch_dirs = [] for afile in files: job_name = fileutils.get_basename(afile) launch_dir = job_name = f'{DESCRIPTORS_LAUNCH_DIR_HEADER}{job_name}' cmd = list(base_cmd) cmd += ['-JOBNAME', job_name] cmd += [FLAG_INPUT_FILE, afile] cmd += [FLAG_STPROPS, f'{job_name}.mae'] os.makedirs(launch_dir, exist_ok=True) shutil.copy(afile, os.path.join(launch_dir, afile)) job = jobutils.RobustSubmissionJob(cmd, command_dir=launch_dir) job_dj.addJob(job) launch_dirs.append(launch_dir) if self.logger: self.logger.info('Running descriptors on all files.') self.logger.info('') job_dj.run() for launch_dir in launch_dirs: zip_file = launch_dir + '.zip' zip_directory(launch_dir, fileobj=zip_file) jobutils.add_outfile_to_backend(zip_file) file_paths = [] for launch_dir in launch_dirs: file_path = os.path.join(launch_dir, f'{launch_dir}.mae') if not os.path.exists(file_path): msg = (f'Descriptor output structure file {file_path} ' 'can not be found.') raise ReactionWorkflowException(msg) file_paths.append(file_path) return file_paths
[docs]def has_transition_state(rxnwf_file_path=None, rxnwf_sts=None): """ Return True if the reaction workflow features a transition state, False otherwise. :type rxnwf_file_path: str or None :type rxnwf_file_path: the path to a reaction workflow file :type rxnwf_sts: list[`schrodinger.structure.Structure`] or None :type rxnwf_sts: reaction workflow structures :rtype: bool :return: True if the reaction workflow features a transition state, False otherwise """ assert bool(rxnwf_file_path) ^ bool(rxnwf_sts) if rxnwf_file_path: rxnwf_sts = list(structure.StructureReader(rxnwf_file_path)) for rxnwf_st in rxnwf_sts: if rxnwf_st.property.get(TRANSITION_STATE_STRUCTURE_KEY): return True return False
[docs]def get_rep_stage_idx(rxnwf_file_path=None, rxnwf_sts=None): """ Return the representative stage index. :type rxnwf_file_path: str or None :type rxnwf_file_path: the path to a reaction workflow file :type rxnwf_sts: list[`schrodinger.structure.Structure`] or None :type rxnwf_sts: reaction workflow structures :rtype: int :return: the representative stage index """ assert bool(rxnwf_file_path) ^ bool(rxnwf_sts) has_ts = has_transition_state(rxnwf_file_path=rxnwf_file_path, rxnwf_sts=rxnwf_sts) if has_ts: rep_stage_idx = 2 else: rep_stage_idx = 1 return rep_stage_idx
[docs]class ReactionWorkflowException(Exception): pass
[docs]class ReactionWorkflow(ConformationalSearchMixin, JMSWFMixin, UniqueGeomMixin): """ Manage a reaction workflow. """
[docs] def __init__(self, reaction_wf_input_sts, dedup_geom_eps=parserutils.DEFAULT_DEDUP_GEOM_EPS, n_conformers=DEFAULT_N_CONFORMERS, skip_eta_rotamers=False, pp_rel_energy_thresh=PP_CSEARCH_REL_ENERGY_THRESH, seed=parserutils.RANDOM_SEED_DEFAULT, only_rings=True, return_csearch_files=False, jaguar_keywords=DEFAULT_JAGUAR_KEYWORDS, temp_data=None, press_data=None, return_jaguar_files=False, anharm=False, return_anharm_files=False, anharm_max_freq=anharmonic.DEFAULT_MAX_FREQ, anharm_factor_data=None, rate_constants=False, return_rate_constant_files=False, custom_rate_constants=False, wigner_tunnel_corr=False, extra_stages_file=None, max_i_freq=anharmonic.DEFAULT_MAX_I_FREQ, out_rep=None, n_jmswf_subjobs=1, subhost=None, tpp=DEFAULT_TPP, ffld_name=None, logger=None): """ Create an instance. :type reaction_wf_input_sts: list :param reaction_wf_input_sts: reaction workflow input structures :type dedup_geom_eps: float :param dedup_geom_eps: reduce the number of calculations by deduplicating the input structures based on geometry, using this threshold in Ang., and only calculating the representatives, a value of zero means no deduplicating :type n_conformers: int :param n_conformers: number of conformers to search for :type skip_eta_rotamers: bool :param skip_eta_rotamers: skip eta rotamers generation if true :type pp_rel_energy_thresh: None or float :param pp_rel_energy_thresh: relative energy threshold in kJ/mol, if None then only the n_conformers lowest energy conformers are returned :type seed: int :param seed: seed for random number generator :type only_rings: bool :param only_rings: if True then only allow rotation of eta-bound rings, if False then also allow rotation of ligands where the eta-bound motif is acyclic, for example ethene, etc. :type return_csearch_files: bool :param return_csearch_files: whether to return all output files from the conformational search subjobs :type jaguar_keywords: OrderedDict :param jaguar_keywords: Jaguar keywords :type temp_data: TempData :param temp_data: the temperature data for thermochemical properties :type press_data: PressData :param press_data: the pressure data for thermochemical properties :type return_jaguar_files: bool :param return_jaguar_files: whether to return all output files from the Jaguar subjobs :type anharm: bool :param anharm: whether to run the anharmonic workflow :type return_anharm_files: bool :param return_anharm_files: whether to return all output files from the anharmonic workflow :type anharm_max_freq: float :param anharm_max_freq: anharmonic potentials are created for normal modes with harmonic frequencies less than this value in wavenumbers (cm^-1) :type anharm_factor_data: anharmonic.SeqData or None :param anharm_factor_data: unitless factor data for factors that multiply a normal mode displacement, if None then the defaults are used, the number of points is in the positive direction only, excluding zero and the negative direction, for example using a value of 4 in turn means 2 * 4 + 1 = 9 points total :type rate_constants: bool :param rate_constants: whether to report rate constant(s) for the rate determining step of the reaction using canonical transition state theory :type return_rate_constant_files: bool :param return_rate_constant_files: whether to return all output files from the rate constant subjobs :param bool custom_rate_constants: whether to compute custom rate constants :type wigner_tunnel_corr: bool :param wigner_tunnel_corr: whether to include the Wigner tunneling correction when computing rate constant(s) :type extra_stages_file: str :param extra_stages_file: the name of a file containing extra stages for a Jaguar Multistage Workflow subjob that will be performed on all output structures from the reaction workflow, the first of these extra stages is always skipped so as to allow analysis to potentially be the first extra stage :type max_i_freq: float :param max_i_freq: tolerate small imaginary frequencies less than this value in wavenumbers (cm^-1) :param out_rep: if a string then must be either module constant parserutils.CENTROID or parserutils.ETA, if None then do nothing :type n_jmswf_subjobs: int :param n_jmswf_subjobs: the maximum number of simultaneous Jaguar multistage workflow subjobs :type subhost: str :param subhost: the host to use for subjobs :type tpp: int :param tpp: the number of threads to use for Jaguar subjobs, i.e. -TPP (threads-per-process) :type ffld_name: str :param ffld_name: the name of the force field to use (e.g., OPLS2005 or S-OPLS) :type logger: logging.Logger or None :param logger: output logger or None if there isn't one """ if ffld_name is None: ffld_name = msutils.get_default_forcefield().name self.reaction_wf_input_sts = reaction_wf_input_sts self.dedup_geom_eps = dedup_geom_eps self.n_conformers = n_conformers self.skip_eta_rotamers = skip_eta_rotamers self.pp_rel_energy_thresh = pp_rel_energy_thresh self.seed = seed self.only_rings = only_rings self.return_csearch_files = return_csearch_files self.jaguar_keywords = jaguar_keywords self.temp_data = temp_data or TempData( temp_start=jmswfu.DEFAULT_TEMP_START, temp_step=jmswfu.DEFAULT_TEMP_STEP, temp_n=jmswfu.DEFAULT_TEMP_N) self.press_data = press_data or PressData( press_start=jmswfu.DEFAULT_PRESS_START, press_step=jmswfu.DEFAULT_PRESS_STEP, press_n=jmswfu.DEFAULT_PRESS_N) self.return_jaguar_files = return_jaguar_files self.anharm = anharm self.return_anharm_files = return_anharm_files self.anharm_max_freq = anharm_max_freq self.anharm_factor_data = anharm_factor_data or anharmonic.SeqData( start=anharmonic.DEFAULT_F_START, step=anharmonic.DEFAULT_F_STEP, n_points=anharmonic.DEFAULT_F_N_POINTS) self.rate_constants = rate_constants self.return_rate_constant_files = return_rate_constant_files self.custom_rate_constants = custom_rate_constants self.wigner_tunnel_corr = wigner_tunnel_corr self.extra_stages_file = extra_stages_file self.max_i_freq = max_i_freq self.out_rep = out_rep self.n_jmswf_subjobs = n_jmswf_subjobs self.subhost = subhost self.tpp = tpp self.ffld_name = ffld_name self.logger = logger self.unique_geom_dict = {} self.sibling_conformers_dict = None self.eids_dict = {} self._jmswf_launch_dir = JMSWF_LAUNCH_DIR self._jmswf_base_name = 'jaguar_multistage_workflow' self._jmswf_in_mae_file = self._jmswf_base_name + '.maegz' self._jmswf_in_txt_file = self._jmswf_base_name + '.txt' self._jmswf_out_mae_file = f'{self._jmswf_base_name}{OUT_EXT}.maegz' self._jmswf_out_smap_file = f'{self._jmswf_base_name}{OUT_EXT}.smap' self.has_ts = False self.rdss = None self._anharm_dir = ANHARM_LAUNCH_DIR self._ctst_dir = CTST_RATE_CONSTANTS_DIR self.rep_keys = set() self.rep_sts = [] self.jmswf_tag = REACTION_WF_TAG
[docs] def validateRateConstants(self): """ Validate rate constants. :raise ReactionWorkflowException: if there is an issue """ if not self.rate_constants: return sibling_conformers_dict = bin_structures_by_property( self.reaction_wf_input_sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY) eligible_for_rate_constants = False # In order to be eligible we need a sibling group containing 1 TS whose parent # sibling group contains 0 TSs for sibling_group, conformers_dict in sibling_conformers_dict.items(): # ensure a single TS in the sibling group is_ts = [] for conformers in conformers_dict.values(): rep_conf = conformers[0] # all same here is_ts.append( bool(rep_conf.property.get(TRANSITION_STATE_STRUCTURE_KEY))) if is_ts.count(True) != 1: continue # ensure the sibling group containing a single TS has # a parent sibling group with no TSs rep_conf = list(conformers_dict.values())[0][0] # all same here parent_sibling_group_key = rep_conf.property.get( PARENT_SIBLING_GROUPS_KEY) if (not parent_sibling_group_key or parent_sibling_group_key not in sibling_conformers_dict): continue parent_conformers_dict = sibling_conformers_dict[ parent_sibling_group_key] is_not_ts = [] for conformers in parent_conformers_dict.values(): rep_conf = conformers[0] # all same here is_not_ts.append(not bool( rep_conf.property.get(TRANSITION_STATE_STRUCTURE_KEY))) if all(is_not_ts): eligible_for_rate_constants = True break if not eligible_for_rate_constants: msg = ('Rate constant calculations require child transition state ' 'structures.') raise ReactionWorkflowException(msg)
[docs] def validateAnharmonic(self): """ Validate anharmonic. :raise ReactionWorkflowException: if there is an issue """ sibling_conformers_dict = bin_structures_by_property( self.reaction_wf_input_sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY) if not self.has_ts and self.anharm: getters = [ get_restrain_distance_idxs, get_restrain_angle_idxs, get_restrain_dihedral_idxs ] only_restrained_opts = True for sibling_group, conformer_group, first_conformer in self.representativeConformers( sibling_conformers_dict=sibling_conformers_dict): for getter in getters: if getter(first_conformer): break else: only_restrained_opts = False if not only_restrained_opts: break if only_restrained_opts: msg = ( 'The anharmonic workflow requires frequencies but ' 'all Jaguar multistage jobs are restrained optimizations.') raise ReactionWorkflowException(msg)
[docs] def validateSubhost(self): """ Validate subhost. :raise ReactionWorkflowException: if there is an issue """ host = 'localhost' if jobcontrol.get_backend(): host_list = jobcontrol.get_backend_host_list() if host_list: host = host_list[0][0] if host != 'localhost' and self.subhost == 'localhost': msg = ('Jobs requesting a remote driver host and local ' 'subjob host are not supported.') raise ReactionWorkflowException(msg) if not self.subhost: self.subhost = host
[docs] def validate(self): """ Validate. :raise ReactionWorkflowException: if there is an issue """ self.validateRateConstants() self.validateAnharmonic() self.validateSubhost()
def _setUpAnharmonic(self): """ Set up anharmonic workflow calculations. :raise ReactionWorkflowException: if there is an issue """ # determine which JMSWF stages calculate frequencies, currently allow # for the possibility of an extra stage calculating frequencies (so # consider not only the first one or two reaction workflow stages), # for each stage collect all Jaguar *.out and *.01.in files in_files_by_stage = {} for stage_idx in self.getFreqStageIdxs(): pattern = os.path.join(self._jmswf_launch_dir, f'*{STAGE_SEPARATOR}{stage_idx}.out') for out_path in glob.glob(pattern): jag_out = JaguarOutput(out_path) # see MATSCI-12044 - anharmonic calculations on atomic systems will # fall through, ensure that here so that the properties defined for all # structures are consistent st = jag_out.getStructure() if st.atom_total > 1 and not anharmonic.get_normal_modes( jag_out): continue out_files = [os.path.split(out_path)[1]] try: self.checkJMSWFOutputs(out_files) except JMSWFException: continue rin_path = os.path.splitext(out_path)[0] + '.01.in' if not os.path.exists(rin_path): msg = (f'The Jaguar restart input file {rin_path} can ' 'not be found.') raise ReactionWorkflowException(msg) out_file = os.path.split(out_path)[1] rin_file = os.path.split(rin_path)[1] in_files_by_stage.setdefault(stage_idx, []).append( (out_file, rin_file)) # ensure that for each of these stages each conformer in the reaction # workflow has an anharmonic input deck for idx, all_files in in_files_by_stage.items(): if len(all_files) != len(self.rep_sts): msg = ( 'Some Jaguar output files from Jaguar Multistage Workflow ' f'stage {idx} are missing frequency data.') raise ReactionWorkflowException(msg) # prepare anharmonic subjobs in a separate directory os.makedirs(self._anharm_dir, exist_ok=True) for all_files in in_files_by_stage.values(): for files in all_files: for afile in files: apath = os.path.join(self._jmswf_launch_dir, afile) bpath = os.path.join(self._anharm_dir, afile) shutil.copy(apath, bpath) @staticmethod def _updateAnharmonicJaguarKwargs(rin_file_path): """ Update anharmonic Jaguar keyword arguments. :type rin_file_path: str :param rin_file_path: the restart Jaguar input file path (`*.01.in`) whose &gen keywords arguments will be updated """ # see MATSCI-11151 - where *vis, etc. property files are # not needed for rxnwf rin_jag = JaguarInput(input=rin_file_path) rin_jag.deleteKeys(['iplotden', 'iplotesp', 'iorb1a', 'iorb2a', 'nbo']) rin_jag.saveAs(rin_file_path)
[docs] def runAnharmonic(self): """ Run the anharmonic workflow. """ self._setUpAnharmonic() # allow multiple simultaneous anharmonic subjobs options = argparse.Namespace(TPP=self.tpp) host = f'{self.subhost}:{self.n_jmswf_subjobs}' job_dj = jobutils.create_queue(options=options, host=host) # allow multiple simultaneous Jaguar single point subjobs for each # anharmonic subjob, continue processing even if there are zero # normal modes to be treated anharmonically cmd = ['anharmonic_workflow_driver.py'] cmd += [anharmonic.FLAG_MAX_FREQ, str(self.anharm_max_freq)] cmd += [ anharmonic.FLAG_F_DATA, str(self.anharm_factor_data.start), str(self.anharm_factor_data.step), str(self.anharm_factor_data.n_points) ] cmd += [parserutils.FLAG_ROBUST] cmd += [FLAG_MAX_I_FREQ, str(self.max_i_freq)] cmd += [anharmonic.FLAG_PROCESS_NO_ANHARMONICITIES] cmd += [FLAG_TPP, str(self.tpp)] cmd += ['-HOST', host] # collect jobs, restart *.out files are marked with ANHARMONIC_TAG # and should be skipped here, in contrast to JMSWF which only runs # Jaguar jobs the anharmonic driver does more than just running # Jaguar jobs so currently restart each job even if all Jaguar *out # files are available, for such cases the Jaguar jobs are not restarted pattern = os.path.join(self._anharm_dir, '*.out') for out_path in glob.glob(pattern): if ANHARMONIC_TAG in out_path: continue out_file = os.path.split(out_path)[1] base_name, ext = os.path.splitext(out_file) rin_file = base_name + '.01.in' rin_file_path = os.path.join(self._anharm_dir, rin_file) self._updateAnharmonicJaguarKwargs(rin_file_path) job_name = base_name + ANHARMONIC_TAG _cmd = list(cmd) _cmd += [anharmonic.FLAG_JAG_OUT_FILE, out_file] _cmd += [anharmonic.FLAG_JAG_RIN_FILE, rin_file] _cmd += ['-JOBNAME', job_name] jc_job = jobutils.RobustSubmissionJob(_cmd, command_dir=self._anharm_dir, name=job_name) job_dj.addJob(jc_job) # run jobs job_dj.run()
[docs] def processAnharmonic(self): """ Process the anharmonic workflow. :raise ReactionWorkflowException: if there is an issue """ # check output, for now require success for each conformer pattern = os.path.join(self._anharm_dir, f'*{anharmonic.OUT_EXT}') mae_paths = glob.glob(pattern) stage_idxs = { get_stage_idx(mae_path, is_filename=True) for mae_path in mae_paths } force_return_anharm_files = len(mae_paths) != len(stage_idxs) * len( self.rep_sts) # copy output if self.return_anharm_files or force_return_anharm_files: jobutils.add_zipfile_to_backend(self._anharm_dir) if force_return_anharm_files: msg = ('At least a single anharmonic workflow subjob has failed.') raise ReactionWorkflowException(msg) # update representative structures with anharmonic properties for relevant # stages, update keys, and store energy keys for rep_st in self.rep_sts: for stage_idx in stage_idxs: mae_path = os.path.join(self._anharm_dir, f'{rep_st.title}{STAGE_SEPARATOR}{stage_idx}{ANHARMONIC_TAG}' \ f'{anharmonic.OUT_EXT}') st = structure.Structure.read(mae_path) for key, value in st.property.items(): if key.startswith(anharmonic.KEY_STARTER): _rep_keys = self._setProperties(rep_st, key, stage_idx, value) if 'Total' in key: self.rep_keys.update(_rep_keys)
def _setRateDeterminingSteps(self): """ Set a list of rate determining steps. :raise ReactionWorkflowException: if there is an issue """ rxnwfea = ReactionWorkflowEnergyAnalysis # use conformationally averaged energies relative to parents # including cross terms to determine the rate determining step # of the reaction base_name = jobutils.get_jobname(DEFAULT_JOB_NAME) rel_parents_file = base_name + REL_PARENTS_W_X_EXT with open(rel_parents_file, 'r') as afile: rel_parents_dict = json.load(afile) # bin (child - parent) energy differences by relevant stages to # determine the rate determining step for each stage, the energy # to use is U_0 = E_SCF + E_ZPE which is temperature independent # and thus previously algebraically averaged over conformers and # is now linearly summable, currently harmonic ZPE is used even # if modeling anharmonicities scf_energy_h = rxnwfea.getHeader(jaguarworkflows.GAS_PHASE_ENERGY_PROP) zpe_energy_h = rxnwfea.getHeader(jaguarworkflows.ZERO_POINT_ENERGY_PROP) stage_child_parent_energies_dict = {} for child_name, parent_dict in rel_parents_dict.items(): for parent_name, edict in parent_dict.items(): stage_energies_dict = {} for h, energy in edict.items(): if h.startswith(scf_energy_h) or h.startswith(zpe_energy_h): stage_idx = get_stage_idx(h) stage_energies_dict.setdefault(stage_idx, []).append(energy) for stage_idx, energies in stage_energies_dict.items(): if len(energies) == 2: stage_child_parent_energies_dict.setdefault( stage_idx, []).append( (child_name, parent_name, sum(energies))) if not stage_child_parent_energies_dict: msg = ('Rate constant calulations require at least a single stage ' 'in which Jaguar frequencies are calculated.') raise ReactionWorkflowException(msg) # collect rate determining steps by sorting by energy and finding the # highest energy transition state rdss = [] for stage_idx, values in stage_child_parent_energies_dict.items(): rds = None for child_name, parent_name, energy in sorted(values, key=lambda x: x[2], reverse=True): for sibling_group, conformer_group, rep_child_conf in self.representativeConformers( specific_sibling_group=child_name): if rep_child_conf.property.get( TRANSITION_STATE_STRUCTURE_KEY): rds = SimpleNamespace(stage_idx=stage_idx, child_name=child_name, parent_name=parent_name) break if rds: rdss.append(rds) break if not rdss: msg = ('Rate constant calulations require at least a single child ' 'structure to be marked as a transition state structure.') raise ReactionWorkflowException(msg) self.rdss = rdss def _getJaguarOutputPathsChildAndParent(self, rds): """ Return paths to the Jaguar output files for child and parent conformers for the given rate determining step. :type rds: SimpleNamespace :param rds: the rate determining step :rtype: list, list :return: the child and parent conformer paths the latter of which is a list of lists of paths for each parent structure """ # among the child sibling group collect spectator masses and # for the found TS collect conformer titles child_spectator_masses = [] child_ts_conformer_titles = [] for conformer_group, conformers in self.sibling_conformers_dict[ rds.child_name].items(): first_conformer = conformers[0] if first_conformer.property.get(TRANSITION_STATE_STRUCTURE_KEY): for conformer in conformers: child_ts_conformer_titles.append(conformer.title) else: child_spectator_masses.append( get_molecular_weight(first_conformer, decimal=MASS_N_DECIMAL)) # isolate spectators and determine which parent siblings # made the TS binned_parent_conformer_titles = {} for conformer_group, conformers in self.sibling_conformers_dict[ rds.parent_name].items(): first_conformer = conformers[0] parent_mass = get_molecular_weight(first_conformer, decimal=MASS_N_DECIMAL) if parent_mass in child_spectator_masses: child_spectator_masses.remove(parent_mass) else: for conformer in conformers: binned_parent_conformer_titles.setdefault( conformer_group, []).append(conformer.title) binned_parent_conformer_titles = \ list(binned_parent_conformer_titles.values()) get_paths = lambda x: [ os.path.join( self._jmswf_launch_dir, get_unique_geom_title(title, self.unique_geom_dict) + STAGE_SEPARATOR + str(rds.stage_idx) + '.out') for title in x ] child_paths = get_paths(child_ts_conformer_titles) binned_parent_paths = [ get_paths(titles) for titles in binned_parent_conformer_titles ] return child_paths, binned_parent_paths def _setUpCTST(self): """ Set up canonical transition state theory calculations. """ self._setRateDeterminingSteps() # the Wigner tunneling correction is computed from the imaginary # frequency of the transition state and ultimately multiples its # partition function to alter the rate constant wigner_tunnel_corr = 'yes' if self.wigner_tunnel_corr else 'no' temps = ' '.join([ str(self.temp_data.temp_start + i * self.temp_data.temp_step) for i in range(0, self.temp_data.temp_n) ]) # collect all required CTST jobs get_conf_title = lambda x: fileutils.get_basename(x).split( STAGE_SEPARATOR)[0] for rds in self.rdss: child_paths, binned_parent_paths = self._getJaguarOutputPathsChildAndParent( rds) rds.jobs = [] for child_path in child_paths: child_title = get_conf_title(child_path) for parent_paths in itertools.product(*binned_parent_paths): parent_title = '_'.join( get_conf_title(parent_path) for parent_path in parent_paths) job_name = (f'tst{STAGE_SEPARATOR}{rds.stage_idx}_' f'{parent_title}_{child_title}') cmd = [os.path.join(os.environ['SCHRODINGER'], 'run')] cmd += ['tst_rate_startup.py'] cmd += ['-wigner_tunnel_corr', wigner_tunnel_corr] cmd += [FLAG_MAX_I_FREQ, str(self.max_i_freq)] cmd += ['-temperature', temps] cmd += ['-COMPLEX_OUTPUT', child_path] cmd += ['-COMPLEX_TRV', 'trv'] if self.anharm: base_name = fileutils.get_basename(child_path) base_name = get_unique_geom_title( base_name, self.unique_geom_dict) mae_path = os.path.join( self._anharm_dir, f'{base_name}{ANHARMONIC_TAG}{anharmonic.OUT_EXT}') cmd += ['-COMPLEX_ANHARM_OUTPUT_MAE', mae_path] for idx, parent_path in enumerate(parent_paths): cmd += [f'-REAGENT_OUTPUT_{idx}', parent_path] cmd += [f'-REAGENT_TRV_{idx}', 'trv'] if self.anharm: base_name = fileutils.get_basename(parent_path) base_name = get_unique_geom_title( base_name, self.unique_geom_dict) mae_path = os.path.join( self._anharm_dir, f'{base_name}{ANHARMONIC_TAG}{anharmonic.OUT_EXT}' ) cmd += [ f'-REAGENT_ANHARM_OUTPUT_MAE_{idx}', mae_path ] cmd += ['-DEBUG'] # enable verbose logging cmd += [job_name] rds.jobs.append(cmd)
[docs] def runCTST(self): """ Run canonical transition state theory calculations to determine rate constant(s) for the rate determining step of the reaction. """ self._setUpCTST() # see MATSCI-6611 - originally jobs were launched simultaneously on # self.subhost using a JobDJ filled with SubprocessJob types but it # was found that starting the first of such jobs was hanging # indefinitely on Windows and so it was switched to use subprocess # (so running on the original -HOST) in serial for rds in self.rdss: rds.stdouts = [] for job in rds.jobs: try: stdout = subprocess.check_output(job, stderr=subprocess.STDOUT, universal_newlines=True) except subprocess.CalledProcessError: stdout = '' rds.stdouts.append(stdout)
[docs] def prepareCTSTOutput(self): """ Prepare CTST output. :raise ReactionWorkflowException: if there is an issue """ # process raw output data failed_stage_idx = None for rds in self.rdss: temp_data_dict = {} for stdout in rds.stdouts: for line in stdout.splitlines(): if line.startswith('{'): data = yaml.safe_load(line.strip()) temperature = float(data['Temperature'][0]) energy = float(data['Energy Barrier'][0]) r_partition_f = float( data['Reagents Partition Function'][0]) ts_partition_f = float( data['Complex Partition Function'][0]) mantissa, order_of_magnitude = data[ 'TST Rate Constant'][0].split(' x ') rate_constant = float(mantissa) * eval( order_of_magnitude.replace('^', '**')) rate_constant_units = data['TST Rate Constant'][1] data = (energy, r_partition_f, ts_partition_f, rate_constant, rate_constant_units) temp_data_dict.setdefault(temperature, []).append(data) if not temp_data_dict: failed_stage_idx = rds.stage_idx break for temp, data in temp_data_dict.items(): energies, r_partition_fs, ts_partition_fs, rate_constants, \ rate_constant_units = [numpy.array(i) for i in zip(*data)] # energies are temperature independent so perform an algebraic # average avg_energy = numpy.mean(energies) # Boltzmann average the temperature dependent partition functions # and rate constants factors = numpy.exp(-1 * energies / (IDEAL_GAS_CONSTANT * temp)) partition_f = sum(factors) avg_r_partition_f = numpy.dot(r_partition_fs, factors) / partition_f avg_ts_partition_f = numpy.dot(ts_partition_fs, factors) / partition_f avg_rate_constant = numpy.dot(rate_constants, factors) / partition_f temp_data_dict[temp] = (avg_energy, avg_r_partition_f, avg_ts_partition_f, avg_rate_constant, rate_constant_units[0]) rds.temp_data_dict = temp_data_dict # process raw output files if failed_stage_idx is not None or self.return_rate_constant_files: os.makedirs(self._ctst_dir, exist_ok=True) for rds in self.rdss: for job, stdout in zip(rds.jobs, rds.stdouts): log_file = os.path.join(self._ctst_dir, job[-1] + '.log') with open(log_file, 'w') as afile: afile.write(stdout) jobutils.add_zipfile_to_backend(self._ctst_dir) if failed_stage_idx: msg = ( 'All rate constant calculations for stage ' f'{failed_stage_idx} have failed for all child and parent ' 'conformers.') raise ReactionWorkflowException(msg) # process summary output file base_name = jobutils.get_jobname(DEFAULT_JOB_NAME) rate_constants_file = base_name + RATE_CONSTANTS_W_X_EXT with csv_unicode.writer_open(rate_constants_file) as afile: writer = csv.writer(afile) writer.writerow([ 'Stage_Index', 'Parent_Sibling_Group_Name', 'Child_Sibling_Group_Name', 'Temperature/K', 'Conf_Avg_SCF+ZPE_Energy_Barrier/kcal/mol', 'Conf_Avg_Reactants_Partition_F', 'Conf_Avg_TS_Partition_F', CONF_AVG_CTST_RATE_CONSTANT, 'CTST_Rate_Constant_Units' ]) for rds in self.rdss: for temp, data in rds.temp_data_dict.items(): writer.writerow( [rds.stage_idx, rds.parent_name, rds.child_name, temp] + list(data)) jobutils.add_outfile_to_backend(rate_constants_file)
[docs] def exportEnergyDiagrams(self): """ Export the energy diagrams if the parents file exists """ base_name = jobutils.get_jobname(DEFAULT_JOB_NAME) json_path = base_name + REL_PARENTS_WO_X_EXT if os.path.exists(json_path): csv_path = base_name + REL_REACTANTS_WO_X_EXT plotter = EnergyDiagramPlotter(json_path, csv_path, logger=self.logger) plotter.run()
[docs] def run(self): """ Run the reaction workflow. :raise ReactionWorkflowException: if there is an issue """ zip_files = jaguar_restart.get_restart_zip_files( base_names=[JMSWF_LAUNCH_DIR, ANHARM_LAUNCH_DIR]) jaguar_restart.prepare_restart_dirs(zip_files) self.has_ts = has_transition_state(rxnwf_sts=self.reaction_wf_input_sts) self.validate() self._setUniqueGeomDict(self.reaction_wf_input_sts) sts = self._getUniqueGeomSts(self.reaction_wf_input_sts) try: self.createConformers(sts) except ConformationalSearchException as err: raise ReactionWorkflowException(str(err)) try: self._overwriteConformerEIDS() self.runJMSWF() self.prepareJMSWFOutput() self.setRepresentatives() except JMSWFException as err: raise ReactionWorkflowException(str(err)) if self.anharm: self.runAnharmonic() self.processAnharmonic() if self.out_rep: for st in self.rep_sts: etatoggle.toggle_structure(st, out_rep=self.out_rep) self.rep_sts = self._duplicateUniqueGeomSts() self.sibling_conformers_dict = bin_structures_by_property( self.rep_sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY) out_mae_file = self.finalizeJMSWFOutput() ReactionWorkflowEnergyAnalysis( out_mae_file, self.rep_keys, dedup_geom_eps=self.dedup_geom_eps).writeDataFiles() self.exportEnergyDiagrams() compute_custom_rates = (self.rate_constants and self.custom_rate_constants) # Defined in MATSCI-10586 try: get_custom_keq_rates(out_mae_file, self.press_data, self.temp_data, extra_stages_file=self.extra_stages_file, compute_rates=compute_custom_rates, wigner_tunnel_corr=self.wigner_tunnel_corr) except ValueError as err: raise ReactionWorkflowException(str(err)) if self.rate_constants and not self.custom_rate_constants: self.runCTST() self.prepareCTSTOutput() log('All finished', timestamp=True, pad=True, logger=self.logger)
[docs]class Sites: """ Manage enumeration sites. """
[docs] @staticmethod def getSites(sites_data, n_structures=1): """ Return a list of Site from the given sites data. :type sites_data: list :param sites_data: contains for each site a list of data [from_idx, to_idx, hash_idx] with an optional fourth item structure_idx :type n_structures: int :param n_structures: the number of structures, used if the given sites lack the optional fourth item :raise InvalidInput: if there is an issue :rtype: list :return: contains `Site` """ sites = [] for site_data in sites_data: n_idxs = len(site_data) # 3 or 4 indices must be given (see below for cases) if not (n_idxs == 3 or n_idxs == 4): msg = ('Sites must be defined using either 3 or 4 indices.') raise InvalidInput(msg) elif n_idxs == 3: # backwards compatibility, no structure specific indexing given, # all structures have the same indexing i, j, k = site_data for l in range(1, n_structures + 1): # noqa: E741, bond atom sites.append( Site(from_idx=i, to_idx=j, hash_idx=k, structure_idx=l)) elif n_idxs == 4: # structure specific indexing given i, j, k, l = site_data # noqa: E741, bond atom sites.append( Site(from_idx=i, to_idx=j, hash_idx=k, structure_idx=l)) return sites
[docs] @staticmethod def validateSitesFormat(sites): """ Validate the given sites format. :type sites: list :param sites: contains `Site` :raise InvalidInput: if there is an issue """ all_site_idxs = set() for site in sites: site_idxs = tuple( sorted([site.from_idx, site.to_idx]) + [site.structure_idx]) if site_idxs[0] == site_idxs[1]: continue all_site_idxs.add(site_idxs) if len(all_site_idxs) < len(sites): msg = ('All sites must specify unique pairs of atom indices.') raise InvalidInput(msg)
[docs] @staticmethod def delete_substitution_site_bonds(st, sites): """ Delete bonds in the given structure that occur after the given substitution sites and return extracted core information. :type st: `structure.Structure` :param st: the structure :type sites: list :param sites: contains `Site` :rtype: `structure.Structure`, dict :return: the extracted core and old-to-new atom index map """ root_remove_idxs = set() for site in sites: from_atom = st.atom[site.from_idx] to_atom = st.atom[site.to_idx] bonded_atoms = [ atom for atom in to_atom.bonded_atoms if atom.index != from_atom.index ] for atom in bonded_atoms: st.deleteBond(to_atom, atom) root_remove_idxs.add(atom.index) remove_idxs = set() for root_remove_idx in root_remove_idxs: mol = st.atom[root_remove_idx].getMolecule() remove_idxs.update(mol.getAtomIndices()) core_st = st.copy() old_to_new = core_st.deleteAtoms(remove_idxs, renumber_map=True) for idx in list(old_to_new.keys()): if old_to_new[idx] is None: old_to_new.pop(idx) return core_st, old_to_new
[docs] @staticmethod def validateSitesData(sites, st): """ Validate the given sites data. :type sites: list :param sites: contains `Site` :type st: `structure.Structure` :param st: the structure :raise InvalidInput: if there is an issue """ st = st.copy() # validate sites ring_bonds = rings.find_ring_bonds(st) for site in sites: idxs = (site.from_idx, site.to_idx) if max(idxs) > st.atom_total: msg = get_msg(site, 'At least a single index does not exist.') raise InvalidInput(msg) bond = st.getBond(*idxs) if not bond: msg = get_msg(site, 'They are not bound.') elif bond.order != 1: msg = get_msg(site, 'It is not a single bond.') elif set(idxs) in ring_bonds: msg = get_msg(site, 'The bond is in a ring.') elif msutils.is_dummy_atom(st.atom[site.to_idx]): msg = get_msg(site, 'Can not enumerate on a centroid atom.') else: msg = None if msg: raise InvalidInput(msg) mol_total = st.mol_total # delete bonds core_st, old_to_new = Sites.delete_substitution_site_bonds(st, sites) # validate molecule molecule_idxs = set( st.atom[site.to_idx].getMolecule().number for site in sites) if len(molecule_idxs) > mol_total: msg = ( 'At least a single site is invalid. Sites should all be ' '("from", "to") index pairs where the "from" index is kept and the ' '"to" index replaced.') raise InvalidInput(msg) # validate core core_idxs = get_core_idxs(st) to_idx_is_core = any(site.to_idx in core_idxs for site in sites) if to_idx_is_core or not core_idxs.issubset(old_to_new.keys()): msg = ('At least a single site is invalid. The core of the ' 'molecule can not be modified.') raise InvalidInput(msg)
[docs] @staticmethod def getBinnedSites(sites): """ Get the sites binned firstly by structure_idx and secondly by hash_idx. :type sites: list :param sites: contains `Site` :rtype: dict :return: keys are structure_idx, values are dicts whose keys are hash_idx and values are lists of `Site` """ binned_sites = {} for site in sites: adict = binned_sites.setdefault(site.structure_idx, {}) adict.setdefault(site.hash_idx, []).append(site) return binned_sites
[docs]def get_RGE_sources(rgroup_sts, binned_sites, old_to_new): """ Return a list of Source that is prepared for enumeration using the R-Group Enumeration module. :type rgroup_sts: list of `structure.Structure` :param rgroup_sts: the R-group structures :type binned_sites: list of lists of `Site` :param binned_sites: the binned sites :type old_to_new: dict :param old_to_new: a map of old-to-new atom indices :rtype: list :return: contains `Source` """ sources = [] for rgroup_st, sites in zip(rgroup_sts, binned_sites): site_idxs = [old_to_new[site.to_idx] for site in sites] sources.append(Source(rgroup_st=rgroup_st, site_idxs=site_idxs)) return sources
[docs]def substitute(st, rgroups_dict, sites_dict): """ Return a copy of the given structure that has been substituted with the given R-groups at the given sites. :type st: `structure.Structure` :param st: the structure on which the substitution is performed :type rgroups_dict: dict :param rgroups_dict: keys are integer hashes relating to sites, values are `structure.Structure` :type sites_dict: dict :param sites_dict: keys are integer hashes relating to R-groups, values are lists of `Site` :raise InvalidInput: if there is an issue :rtype: `structure.Structure` :return: the substituted structure """ # the rgroup_enumerate module automatically substitutes # so that the leaving group is the smaller group and # because we can't be certain of the input structure # and substitution sites we are going to first prepare # the structure by trimming after the substitution sites all_sites = [site for sites in sites_dict.values() for site in sites] st, old_to_tmp = Sites.delete_substitution_site_bonds(st, all_sites) # collect substitution sources rgroup_sts, binned_sites = [], [] for hash_idx in sorted(sites_dict): rgroup_sts.append(rgroups_dict[hash_idx]) binned_sites.append(sites_dict[hash_idx]) sources = get_RGE_sources(rgroup_sts, binned_sites, old_to_tmp) sources = [[[source.rgroup_st]] + source.site_idxs for source in sources] # do the substitution try: obj = rgroup_enumerate.RgroupEnumerator(st, sources, optimize_sidechains=True, enumerate_cistrans=False, yield_renum_maps=True) out_st, tmp_to_new = next(iter(obj)) except rgroup_enumerate.RgroupError as err: msg = ('The R-group enumeration has failed for structure ' f'{st.title}. ') + str(err) raise InvalidInput(msg) amap = dict([(k, tmp_to_new[v]) for k, v in old_to_tmp.items()]) update_index_properties(out_st, amap) tokens = out_st.title.replace(' ', '').split(',') rgroup_title = RGROUP_SEPARATOR.join(tokens[1:]) out_st.title = tokens[0] + '_' + rgroup_title return out_st
[docs]class EnumerateReactionWorkflow: """ Manage enumeration of a reaction workflow. """
[docs] def __init__(self, rxnwf_file, rgroup_files, sites, force_hetero_substitution=False, out_rep=None, base_name=DEFAULT_ENUM_RXNWF_JOB_NAME, logger=None): """ Create an instance. :type rxnwf_file: str :param rxnwf_file: the reaction workflow file :type rgroup_files: dict :param rgroup_files: keys are hash_idx (see sites), values are file names :type sites: list :param sites: contains `Site` :type force_hetero_substitution: bool :param force_hetero_substitution: if True then for hetero-eumeration do not additionally include homo-enumeration results :param out_rep: if a string then must be either module constant parserutils.CENTROID or parserutils.ETA, if None then do nothing :type base_name: str :param base_name: the base name to use in naming the enumerated output files :type logger: logging.Logger :param logger: the logger """ self.rxnwf_file = rxnwf_file self.rgroup_files = rgroup_files self.binned_sites = Sites.getBinnedSites(sites) self.force_hetero_substitution = force_hetero_substitution self.out_rep = out_rep self.base_name = base_name self.logger = logger self.binned_rgroup_sts = {} self.has_keep_atoms = False self.out_structure_files = [] self.has_structure_specific_sites = True
[docs] def validate(self): """ Validate. :raise InvalidInput: if there is an issue """ # check reaction workflow input file try: check_reaction_wf_structures(self.rxnwf_file, ffld_name=None) except ValueError as err: msg = str(err) + (' A valid reaction workflow file must ' 'first be prepared.') raise InvalidInput(msg) # check consistency between hash indices coming from both r-groups and sites hash_idxs = set(hash_idx for adict in self.binned_sites.values() for hash_idx in adict.keys()) rgroup_files_tmp = self.rgroup_files.copy() for hash_idx in hash_idxs: rgroup_file = rgroup_files_tmp.pop(hash_idx, None) if not rgroup_file: msg = (f'The site enumeration index {hash_idx} does not ' 'have a corresponding R-group file defined.') raise InvalidInput(msg) if rgroup_files_tmp: hash_idxs = ','.join(map(str, sorted(rgroup_files_tmp.keys()))) msg = ('The following enumeration indices given for R-group ' f'files are not used by any sites: {hash_idxs}.') raise InvalidInput(msg) # check r-group files self.binned_rgroup_sts = {} for hash_idx, rgroup_file in self.rgroup_files.items(): self.binned_rgroup_sts.setdefault(rgroup_file, []).append(hash_idx) self.binned_rgroup_sts = { tuple(v): k for k, v in self.binned_rgroup_sts.items() } nfiles = 1 for hash_idxs, rgroup_file in self.binned_rgroup_sts.items(): n_needed = len(hash_idxs) n_available = structure.count_structures(rgroup_file) if n_available < n_needed: msg = ( f'There are only {n_available} R-groups present in the ' f'file {rgroup_file} but {n_needed} different structures ' 'are needed.') raise InvalidInput(msg) if self.force_hetero_substitution: nfiles *= int( math.factorial(n_available) / math.factorial(n_available - n_needed)) else: nfiles *= n_available**n_needed # check sites format sites = [ site for adict in self.binned_sites.values() for sites in adict.values() for site in sites ] Sites.validateSitesFormat(sites) # check sites structure index max_structure_idx = max(self.binned_sites.keys()) n_structures = structure.count_structures(self.rxnwf_file) if max_structure_idx > n_structures: msg = ( 'At least a single site specifies a structure index of ' f'{max_structure_idx} but there are only {n_structures} in {self.rxnwf_file}.' ) raise InvalidInput(msg) # check sites for idx, st in enumerate(structure.StructureReader(self.rxnwf_file), 1): sites_dict = self.binned_sites.get(idx) if not sites_dict: continue sites = [site for sites in sites_dict.values() for site in sites] Sites.validateSitesData(sites, st) if sfu.get_idxs_marked_atoms(st, KEEP_ATOM_KEY): self.has_keep_atoms = True if self.logger: self.logger.info(f'A total of {nfiles} reaction workflow input ' 'files will be created.') self.logger.info('') if nfiles > 500 and self.logger: self.logger.warning('Enumerating more than 500 files can be time ' 'consuming.')
[docs] def setRGroupStructures(self): """ Set the R-group structures. """ idx = 0 for hash_idxs in list(self.binned_rgroup_sts.keys()): rgroup_file = self.binned_rgroup_sts[hash_idxs] sts = [] for jdx, st in enumerate(structure.StructureReader(rgroup_file), 1): if not st.title: st.title = 'R' + str(idx + jdx) build.add_hydrogens(st) # if keep atoms are present then set all atoms in the R-group # as keep atoms as it doesn't make sense to enumerate and not # keep the enumerated result if self.has_keep_atoms: for atom in st.atom: atom.property[KEEP_ATOM_KEY] = 1 sts.append(st) self.binned_rgroup_sts[hash_idxs] = sts idx += len(sts)
[docs] def setHasStructureSpecificSites(self): """ Set has structure specific sites. """ all_sites_list = [] for sites_dict in self.binned_sites.values(): sites_list = [] for hash_idx in sorted(sites_dict): sites = [(site.from_idx, site.to_idx) for site in sites_dict[hash_idx]] sites_list.append((hash_idx, tuple(sites))) all_sites_list.append(tuple(sites_list)) n_structures = structure.count_structures(self.rxnwf_file) self.has_structure_specific_sites = not (len(all_sites_list) == n_structures and len(set(all_sites_list)) == 1)
[docs] def getStructures(self): """ Generates structure dictionaries where keys are enumeration indices and values are structures. :rtype: dict :return: keys are enumeration indices, values are `structure.Structure` """ if self.force_hetero_substitution: getter = lambda x, y: itertools.permutations(x, r=y) else: getter = lambda x, y: itertools.product(x, repeat=y) getters, all_hash_idxs = [], [] for hash_idxs, sts in self.binned_rgroup_sts.items(): getters.append(getter(sts, len(hash_idxs))) all_hash_idxs.extend(hash_idxs) for tuples in itertools.product(*getters): yield dict( zip(all_hash_idxs, tuple(st for atuple in tuples for st in atuple)))
[docs] def doEnumeration(self): """ Do the enumeration. """ rgroup_titles_count_dict = {} for rgroup_sts_dict in self.getStructures(): out_sts, rgroup_titles = [], [] for idx, st in enumerate(structure.StructureReader(self.rxnwf_file), 1): # handle no enumeration case sites_dict = self.binned_sites.get(idx) if not sites_dict: out_sts.append(st) continue # in order to ensure that any dummy centroid atoms remain # at the end of the atom list temporarily convert to eta changed = etatoggle.toggle_structure(st, out_rep=parserutils.ETA) out_st = substitute(st, rgroup_sts_dict, sites_dict) if changed: etatoggle.toggle_structure(out_st, out_rep=parserutils.CENTROID) out_sts.append(out_st) rgroup_titles.append(out_st.title.replace(st.title + '_', '')) if self.has_structure_specific_sites: rgroups = [] for rgroup_title in rgroup_titles: for rgroup in rgroup_title.split(RGROUP_SEPARATOR): if rgroup not in rgroups: rgroups.append(rgroup) rgroup_titles = RGROUP_SEPARATOR.join(rgroups) else: rgroup_titles = rgroup_titles[0] # they are all they same rgroup_titles_count_dict.setdefault(rgroup_titles, []).append(1) count = len(rgroup_titles_count_dict[rgroup_titles]) if count == 1: idx_str = '' else: idx_str = f'_{count}' if self.out_rep: for st in out_sts: etatoggle.toggle_structure(st, out_rep=self.out_rep) file_name = f'{self.base_name}_{rgroup_titles}{idx_str}{REACTION_WF_TAG}.mae' structure.write_cts(out_sts, file_name) self.out_structure_files.append(file_name)
[docs] def run(self): """ Run it. """ self.validate() self.setRGroupStructures() self.setHasStructureSpecificSites() self.doEnumeration() job_be = jobcontrol.get_backend() if job_be or self.logger: for afile in self.out_structure_files: if job_be: job_be.addOutputFile(afile) if self.logger: msg = (f'Created enumerated file {afile}.') self.logger.info(msg)
[docs]def replace_rxnwf_file_suffix(rxnwf_file, suffix): """ Replace the suffix in the given reaction workflow file with a new suffix. :type rxnwf_file: str :param rxnwf_file: the reaction workflow file :type suffix: str :param suffix: the new suffix :rtype: str :return: the new string """ return rxnwf_file.replace(f'{REACTION_WF_TAG}.mae', suffix)
[docs]class EnumerateSwapMixin: """ Manage enumeration and swapping. """
[docs] def runEnumerateRXNWF(self, tag): """ Run enumerate reaction workflow. :type tag: str :param tag: either the REFERENCE or NOVEL module constant :raise ReactionWorkflowException: if there is an issue :rtype: set :return: the names of the enumerated reaction workflow files """ assert tag in [REFERENCE, NOVEL] if tag == REFERENCE: sites = self.reference_sites rxnwf_file = self.reference_rxnwf_file # do not allow this type of hetero substitution because # it should not be needed force_hetero_substitution = False elif tag == NOVEL: sites = self.novel_sites rxnwf_file = self.novel_rxnwf_file force_hetero_substitution = self.force_hetero_substitution rgroup_files = {} for site in sites: rgroup_file = self.rgroup_files.get(site.hash_idx) if rgroup_file: rgroup_files[site.hash_idx] = rgroup_file if not (rgroup_files and sites): return set([rxnwf_file]) if self.logger: self.logger.info( f'R-group enumerating the {tag} reaction workflow file.') self.logger.info('') out_rep = getattr(self, 'out_rep', None) base_name = replace_rxnwf_file_suffix(rxnwf_file, '') obj = EnumerateReactionWorkflow( rxnwf_file, rgroup_files, sites, force_hetero_substitution=force_hetero_substitution, out_rep=out_rep, base_name=base_name, logger=self.logger) try: obj.run() except InvalidInput as err: raise ReactionWorkflowException(str(err)) if self.logger: self.logger.info('') return set(obj.out_structure_files)
[docs] def runSwapFragments(self, enumerated_novel_files, reference_rxnwf_file): """ Run swap fragments. :type enumerated_novel_files: set :param enumerated_novel_files: the names of the enumerated novel files :type reference_rxnwf_file: str :param reference_rxnwf_file: the reference reaction workflow file :raise ReactionWorkflowException: if there is an issue :rtype: set :return: the names of the enumerated reaction workflow files """ if self.logger: self.logger.info( f'Swapping fragments in {reference_rxnwf_file} with each ' 'of the available novel fragments.') self.logger.info('') enumerated_rxnwf_files = set() for enumerated_novel_file in enumerated_novel_files: novel_st = structure.Structure.read(enumerated_novel_file) novel_keep_idxs = sfu.get_keep_idxs(novel_st) novel_superposition_idxs = sfu.get_superposition_idxs(novel_st) enumerated_rxnwf_file = replace_rxnwf_file_suffix( enumerated_novel_file, '_' + reference_rxnwf_file) enumerated_rxnwf_files.add(enumerated_rxnwf_file) group_name = replace_rxnwf_file_suffix(enumerated_rxnwf_file, '') with structure.StructureWriter(enumerated_rxnwf_file) as writer: for reference_st in structure.StructureReader( reference_rxnwf_file): reference_keep_idxs = sfu.get_keep_idxs(reference_st) reference_superposition_idxs = sfu.get_superposition_idxs( reference_st) # only superpose relevant structures and copy the others (for # example spectators) if reference_keep_idxs and reference_superposition_idxs: title = novel_st.title + '_' + reference_st.title # in order to ensure that any dummy centroid atoms remain # at the end of the atom list temporarily convert to eta novel_changed = etatoggle.toggle_structure( novel_st, out_rep=parserutils.ETA) reference_changed = etatoggle.toggle_structure( reference_st, out_rep=parserutils.ETA) try: st = sfu.get_assembled_structure( novel_st, reference_st, novel_superposition_idxs, reference_superposition_idxs, novel_keep_idxs, reference_keep_idxs, title=title, require_identical_bonds=False) except sfu.SwapFragmentsException as err: raise ReactionWorkflowException(str(err)) if novel_changed or reference_changed: etatoggle.toggle_structure( st, out_rep=parserutils.CENTROID) # for the purposes of running the reaction workflow in the next # step of this workflow force the eid to be consistent with the # reference as it will be used in the underlying Jaguar multistage # workflow st.property[ msprops.ENTRY_ID_PROP] = reference_st.property[ msprops.ENTRY_ID_PROP] else: st = reference_st # update Maestro grouping hierarchy groups = msutils.get_project_group_hierarchy( st=reference_st) groups[0] = group_name msutils.set_project_group_hierarchy(st, groups) # handle output representation out_rep = getattr(self, 'out_rep', None) if out_rep: etatoggle.toggle_structure(st, out_rep=out_rep) writer.append(st) return enumerated_rxnwf_files
[docs] def getRXNWFInputFiles(self): """ Return all reaction workflow input files. :rtype: set :return: all reaction workflow input files """ # obtain all reaction workflow files, potentially enumerating reference # and/or novel enumerated_reference_files = self.runEnumerateRXNWF(REFERENCE) enumerated_rxnwf_files = set() if self.novel_rxnwf_file: enumerated_novel_files = self.runEnumerateRXNWF(NOVEL) for enumerated_reference_file in enumerated_reference_files: enumerated_rxnwf_files.update( self.runSwapFragments(enumerated_novel_files, enumerated_reference_file)) else: enumerated_rxnwf_files.update(enumerated_reference_files) # tag reaction workflow files with the job name for this job job_name = jobutils.get_jobname(DEFAULT_AUTO_RXNWF_JOB_NAME) final_enumerated_rxnwf_files = set() for enumerated_rxnwf_file in enumerated_rxnwf_files: final_enumerated_rxnwf_file = f'{job_name}_{enumerated_rxnwf_file}' shutil.copy(enumerated_rxnwf_file, final_enumerated_rxnwf_file) jobutils.add_outfile_to_backend(final_enumerated_rxnwf_file, wait=False) final_enumerated_rxnwf_files.add(final_enumerated_rxnwf_file) enumerated_rxnwf_files = final_enumerated_rxnwf_files # log the files if self.logger: self.logger.info( 'The following reaction workflow files are available.') self.logger.info('') for enumerated_rxnwf_file in enumerated_rxnwf_files: self.logger.info(enumerated_rxnwf_file) self.logger.info('') return enumerated_rxnwf_files
[docs]class EnumerateSwap(EnumerateSwapMixin): """ Manage enumeration and swapping. """
[docs] def __init__(self, reference_rxnwf_file, novel_rxnwf_file=None, rgroup_files=None, reference_sites=None, novel_sites=None, force_hetero_substitution=False, logger=None): """ Create an instance. :type reference_rxnwf_file: str :param reference_rxnwf_file: the reaction workflow file containing the reference structures :type novel_rxnwf_file: str :param novel_rxnwf)file: the reaction workflow file containing the single novel structure :type rgroup_files: dict :param rgroup_files: keys are hash_idx (see sites), values are file names :type reference_sites: list :param reference_sites: contains `Site` for the reference structures :type novel_sites: list :param novel_sites: contains `Site` for the novel structure :type force_hetero_substitution: bool :param force_hetero_substitution: if True then for hetero-eumeration do not additionally include homo-enumeration results :type logger: logging.Logger or None :param logger: output logger or None if there isn't one """ self.reference_rxnwf_file = reference_rxnwf_file self.novel_rxnwf_file = novel_rxnwf_file self.rgroup_files = rgroup_files self.reference_sites = reference_sites self.novel_sites = novel_sites self.force_hetero_substitution = force_hetero_substitution self.logger = logger
[docs] def run(self): """ Run it. :rtype: set :return: all reaction workflow input files """ return self.getRXNWFInputFiles()
[docs]def enumerate_swap(reference_rxnwf_file, novel_rxnwf_file=None, rgroup_files=None, reference_sites=None, novel_sites=None, force_hetero_substitution=False): """ Return all reaction workflow files created by R-group enumerating the given reference and novel structures and swapping reference fragments for novel fragments. :type reference_rxnwf_file: str :param reference_rxnwf_file: the reaction workflow file containing the reference structures :type novel_rxnwf_file: str :param novel_rxnwf)file: the reaction workflow file containing the single novel structure :type rgroup_files: dict :param rgroup_files: keys are hash_idx (see sites), values are file names :type reference_sites: list :param reference_sites: contains `Site` for the reference structures :type novel_sites: list :param novel_sites: contains `Site` for the novel structure :type force_hetero_substitution: bool :param force_hetero_substitution: if True then for hetero-eumeration do not additionally include homo-enumeration results :rtype: set :return: all reaction workflow input files """ eswap = EnumerateSwap(reference_rxnwf_file, novel_rxnwf_file=novel_rxnwf_file, rgroup_files=rgroup_files, reference_sites=reference_sites, novel_sites=novel_sites, force_hetero_substitution=force_hetero_substitution) return eswap.run()
[docs]def get_sites(sites): """ Return the reference and novel sites. :type sites: list :param sites: contains for each site a list of data [from_idx, to_idx, hash_idx] with an optional fourth item structure_idx and an optional fifth item reaction_workflow_utils.REFERENCE or reaction_workflow_utils.NOVEL module constants :raise ReactionWorkflowException: if there is an issue :rtype: list, list :return: the reference and novel sites, each containing `rxnwfu.Site` """ reference_sites, novel_sites = [], [] for site in sites: items = [] for item in site: try: items.append(int(item)) except ValueError: items.append(item) if len(items) == 5: if items[-1] == REFERENCE: reference_sites.append(items[:-1]) elif items[-1] == NOVEL: if items[-2] != 1: msg = ( f'Sites designated as "{NOVEL}" must specify a single ' 'structure index with a value of 1.') raise ReactionWorkflowException(msg) novel_sites.append(items[:-2]) else: msg = (f'Sites can only be designated as "{REFERENCE}" or ' f'"{NOVEL}".') raise ReactionWorkflowException(msg) elif len(items) == 4: if items[-1] != 1: msg = (f'Sites designated as "{NOVEL}" must specify a single ' 'structure index with a value of 1.') raise ReactionWorkflowException(msg) novel_sites.append(items[:-1]) elif len(items) == 3: novel_sites.append(items) else: msg = ( 'Sites must specify three indices with an optional fourth index ' f'plus an optional fifth item, either "{REFERENCE}" or ' f'"{NOVEL}".') raise ReactionWorkflowException(msg) reference_sites = Sites.getSites(reference_sites) novel_sites = Sites.getSites(novel_sites) return reference_sites, novel_sites
[docs]def validate_auto_reaction_workflow_files(reaction_wf_input, novel_rxnwf_file=None, mass_conserved=False, sites=None): """ Validate workflow input file, novel reaction workflow file :type reaction_wf_input: str :param reaction_wf_input: reference reaction workflow file name :type novel_rxnwf_file: str :param novel_rxnwf_file: the reaction workflow file containing the single novel structure :type mass_conserved: bool :param mass_conserved: True if mass is conserved else false :type sites: list :param sites: contains for each site a list of data [from_idx, to_idx, hash_idx] with an optional fourth item structure_idx and an optional fifth item reaction_workflow_utils.REFERENCE or reaction_workflow_utils.NOVEL module constants :raise ReactionWorkflowException: if there is an issue """ # check reference rxnwf file type_cast_reaction_wf_input(reaction_wf_input, exception_type=ReactionWorkflowException, mass_conserved=mass_conserved) # check novel rxnwf file if novel_rxnwf_file and structure.count_structures(novel_rxnwf_file) > 1: msg = ( 'The novel reaction workflow file must contain a single structure.') raise ReactionWorkflowException(msg) # Validate sites if sites: get_sites(sites)
[docs]def validate_custom_rate_with_extra_stages(options): """ Validate that if custom rates are requested and extra stages file is present, custom property has one of the known energy units :param SimpleNamespace options: Options :raise ReactionWorkflowException: if custom property has no known unit """ if not all([options.custom_rate_constants, options.extra_stages_file]): return known_units_str = ', '.join('(%s)' % unit for unit in KNOWN_UNITS) msg = ('Property from analysis stage has no known energy units: %%s. ' 'Known energy units are: %s.' % known_units_str) extra_stages = jmswfu.read_stage_datafile(options.extra_stages_file)[1:] for extra_stage in extra_stages: if extra_stage.analyze_data: en_starter = extra_stage.analyze_data[0].key if not ReactionWorkflowEnergyAnalysis.getKcalPerMolConversion( en_starter): raise ReactionWorkflowException(msg % en_starter)
[docs]def has_keep_and_superpose_atoms(input_st): """ Return True if the given structure has at least a single keep atom and a single superpose atom. :type input_st: `structure.Structure` :param input_st: the structure :rtype: bool :return: True if the structure has at least a single keep atom and a single superpose atom """ # if there are superpose atoms the requirement that there be at # least three of them is enforced in the Reaction Workflow Input GUI # which prepares the input here has_keep = has_superpose = False for atom in input_st.atom: if atom.property.get(KEEP_ATOM_KEY): has_keep = True if atom.property.get(SUPERPOSITION_ATOM_KEY): has_superpose = True if has_keep and has_superpose: return True return False
[docs]def validate_auto_reaction_workflow_options( reaction_wf_input, novel_rxnwf_file=None, n_conformers=DEFAULT_N_CONFORMERS, jaguar_keywords=DEFAULT_JAGUAR_KEYWORDS): """ Validate auto reaction workflow options. :type reaction_wf_input: str :param reaction_wf_input: reference reaction workflow file name :type novel_rxnwf_file: str :param novel_rxnwf_file: the reaction workflow file containing the single novel structure :type n_conformers: int :param n_conformers: number of conformers to search for :type jaguar_keywords: str :param jaguar_keywords: whitespace separated Jaguar <key>=<value> tokens :raise ReactionWorkflowException: if there is an issue """ try: sts = list(structure.StructureReader(reaction_wf_input)) except IOError as err: raise ReactionWorkflowException(str(err)) if novel_rxnwf_file: # check references for st in sts: if has_keep_and_superpose_atoms(st): break else: msg = ( 'At least a single reference structure must contain atoms marked with ' f'the "{KEEP_ATOM_KEY}" and "{SUPERPOSITION_ATOM_KEY}" ' 'properties which specify how to swap fragments with the novel ' 'structure.') raise ReactionWorkflowException(msg) # check novel if structure.count_structures(novel_rxnwf_file) > 1: msg = ( 'The novel reaction workflow file must contain a single structure.' ) raise ReactionWorkflowException(msg) novel_st = structure.Structure.read(novel_rxnwf_file) if not has_keep_and_superpose_atoms(novel_st): msg = ( 'The novel structure must contain atoms marked with the ' f'"{KEEP_ATOM_KEY}" and "{SUPERPOSITION_ATOM_KEY}" ' 'properties which specify how to swap fragments with the reference ' 'structures.') raise ReactionWorkflowException(msg) sts.append(novel_st) # check FF if n_conformers: try: check_ff_assignment(sts, ffld_name=DEFAULT_FORCE_FIELD_NAME) except ValueError as err: raise ReactionWorkflowException(str(err)) # check Jaguar kwargs type_cast_jaguar_keywords(jaguar_keywords, exception_type=ReactionWorkflowException)
[docs]def validate_all_auto_reaction_workflow_options( reaction_wf_input, novel_rxnwf_file=None, mass_conserved=False, sites=None, n_conformers=5, jaguar_keywords=DEFAULT_JAGUAR_KEYWORDS): """ Validate all auto reaction workflow options or raise ReactionWorkflowException error. :type reaction_wf_input: str :param reaction_wf_input: reference reaction workflow file name :type novel_rxnwf_file: str :param novel_rxnwf_file: the reaction workflow file containing the single novel structure :type mass_conserved: bool :param mass_conserved: True if mass is conserved else false :type sites: list :param sites: contains for each site a list of data [from_idx, to_idx, hash_idx] with an optional fourth item structure_idx and an optional fifth item reaction_workflow_utils.REFERENCE or reaction_workflow_utils.NOVEL module constants :type n_conformers: int :param n_conformers: number of conformers to search for :type jaguar_keywords: str :param jaguar_keywords: whitespace separated Jaguar <key>=<value> tokens :raise rxnwfu.ReactionWorkflowException: if there is an issue """ validate_auto_reaction_workflow_files(reaction_wf_input=reaction_wf_input, novel_rxnwf_file=novel_rxnwf_file, mass_conserved=mass_conserved, sites=sites) validate_auto_reaction_workflow_options(reaction_wf_input=reaction_wf_input, novel_rxnwf_file=novel_rxnwf_file, n_conformers=n_conformers, jaguar_keywords=jaguar_keywords)
[docs]def get_sibling_ownership_information(file_path=None, sts=None): """ Get the parents and children information of sibling groups from a structure file or a list of structures :param str file_path: The path to the structure file to read :param iterable sts: The structures to get information from, if file_path is not provided :raise ReactionWorkflowException: if there is an issue with the inputs :rtype: dict, dict :return: first dict maps each sibling group to its parents, second dict maps each sibling group to its children """ assert bool(file_path) ^ bool(sts) if file_path: sts = list(structure.StructureReader(file_path)) raw_parents = {} for struct in sts: sibling = struct.property.get(SIBLING_GROUP_KEY) if sibling is None: continue parents = struct.property.get(PARENT_SIBLING_GROUPS_KEY) if sibling in raw_parents: if raw_parents[sibling] != parents: error = (f'The "{sibling}" sibling group has inconsistent' ' parents across different structures.') raise ReactionWorkflowException(error) continue raw_parents[sibling] = parents children_dict = {sibling: [] for sibling in raw_parents} parents_dict = {sibling: [] for sibling in raw_parents} for sibling, parents in raw_parents.items(): if parents: for parent in parents.split(PARENT_SEPARATOR): parents_dict[sibling].append(parent) children_dict[parent].append(sibling) return parents_dict, children_dict
[docs]def get_smiles(st): """ Return the smiles of the given metal complex. :type st: `schrodinger.structure.Structure` :param st: the metal complex :raise: (`rdkit_adapter.InconsistentStructureError`, `rdkit_adapter.UnsupportedStructureError`) :rtype: str :return: smiles """ # smiles generation requires centroid representation _st = st.copy() etatoggle.toggle_structure(_st, out_rep=parserutils.CENTROID) return adapter.to_smiles(_st, adapter.StereoChemistry_Copy, adapter.Canonicalize_Enable, adapter.Sanitize_Enable)
[docs]def bin_by_geometry(sts, eps=parserutils.DEFAULT_DEDUP_GEOM_EPS): """ Return a dictionary of structures binned by common geometry. :type sts: list[`schrodinger.structure.Structure`] :param sts: the structures :type eps: float :param eps: the RMSD precision in Angstrom that controls the size of the clusters, see sklearn.cluster.DBSCAN documentation for more details :rtype: dict :return: keys are representative structures from each geometry bin, values are lists of other structures in the same geometry bin """ # while SMILES allows atomic charges they do not allow total charge # nor total multiplicity (nor atomic multiplicity) so generalize the # hash hash_dict = {} for st in sts: smiles = get_smiles(st) charge = st.property.get(CHARGE_KEY, 0) multip = st.property.get(MULTIPLICITY_KEY, 1) ahash = f'{smiles}_charge_{charge}_multip_{multip}' hash_dict.setdefault(ahash, []).append(st) st_dict = {} for conformers in hash_dict.values(): ref_st = conformers[0] # choose first conformer as the reference idxs = list(range(1, ref_st.atom_total + 1)) key = lambda x: rmsd.superimpose( ref_st, idxs, x, idxs, use_symmetry=False, move_which=rmsd.CT) groups = mathutils.dbscan_cluster(conformers, eps, key=key) for group in groups.values(): ref_st = group[0] # choose first geometry as the reference st_dict[ref_st] = group[1:] return st_dict
[docs]def get_orig_title(title, unique_geom_dict): """ Return the original title of the given title. :type title: str :param title: the title :type unique_geom_dict: dict :param unique_geom_dict: keys are original titles, values are structures in the same geometry bin, with the representative structure at index 0 :rtype: str, str :return: the original title and the extension """ # titles come in like the following # # 'my_reactant' (structure title directly from user input for the job) # # 'my_reactant_stage_<x>' where x > 0, if the stage is an analysis stage # then there will be a trailing '_analysis_<y>' where y > 0 is the parent # stage whose output structure is used to store the analysis property, # (structure title directly from JMSWF output) # # 'my_reactant-<x>_stage_<y>' where x > 1 is the conformer number, otherwise # same as above (structure title directly from JMSWF output or structure # JMSWF output file base name) # # 'my_reactant-<x>' where x > 1 is the conformer number, (structure title # after postprocessing JMSWF) # # 'my_reactant-<x>_stage_<y>_anharmonic' where x > 1 is the conformer number, # otherwise same as above (structure anharmonic output file base name) # # where 'my_reactant' has been jobutils.clean_string'd # see MATSCI-11362 - we need to distinguish whether titles like 'my_reactant-2' # were original titles or those generated by the conformational search which # uses an '-<idx>' convention orig_titles = set() for orig_sts in unique_geom_dict.values(): orig_titles.update( [jobutils.clean_string(orig_st.title) for orig_st in orig_sts]) if title in orig_titles: return title, '' if STAGE_SEPARATOR in title: conf_title, stage_hash = title.split(STAGE_SEPARATOR) stage_hash = f'{STAGE_SEPARATOR}{stage_hash}' else: conf_title, stage_hash = title, '' match = re.search(r'(-\d+)$', conf_title) if match: ext = match.group(1) orig_title = conf_title.rpartition(ext)[0] else: ext = '' orig_title = conf_title return orig_title, f'{ext}{stage_hash}'
[docs]def get_unique_geom_title(title, unique_geom_dict): """ Return the title of the unique geometry representative for the given title. :type title: str :param title: the title :type unique_geom_dict: dict :param unique_geom_dict: keys are original titles, values are structures in the same geometry bin, with the representative structure at index 0 :rtype: str :return: the title of the representative """ orig_title, ext = get_orig_title(title, unique_geom_dict) if not unique_geom_dict: return title elif unique_geom_dict.get(orig_title): return title else: for rep_title, orig_sts in unique_geom_dict.items(): orig_st, orig_other_sts = orig_sts[0], orig_sts[1:] for orig_other_st in orig_other_sts: if orig_other_st.title == orig_title: return f'{rep_title}{ext}'
[docs]def get_energy_db_term(rxnwf_file_path=None, rxnwf_sts=None, tstart=jmswfu.DEFAULT_TEMP_START, tstep=jmswfu.DEFAULT_TEMP_STEP, ntemp=jmswfu.DEFAULT_TEMP_N, pstart=jmswfu.DEFAULT_PRESS_START, pstep=jmswfu.DEFAULT_PRESS_STEP, npress=jmswfu.DEFAULT_PRESS_N, anharm=False, extra_stages_file=False, **kwargs): """ Get a dict containing all the energy term generated by reaction workflow :type rxnwf_file_path: str or None :param rxnwf_file_path: the path to a reaction workflow file :type rxnwf_sts: list[`schrodinger.structure.Structure`] or None :param rxnwf_sts: reaction workflow structures :type tstart: float :param tstart: the starting temperature (K) :type tstep: float :param tstep: the step size of the temperature (K) :type ntemp: int :param ntemp: the number of temperature points :type pstart: float :param pstart: the starting pressure (atm) :type pstep: float :param pstep: the step size of the pressure (atm) :type npress: int :param npress: the number of pressure points :type anharm: bool :param anharm: True if anharmonic calculation is performed else False :type extra_stages_file: string :param extra_stages_file: name of stage file :rtype energy_term_dict: dict :return: dictionary of energy keys. """ msg = ('Both input file and input structures are present or absent. ' 'Please specify either input file or input structures') assert bool(rxnwf_file_path) ^ bool(rxnwf_sts), msg rate_constants = kwargs.pop('rate_constants', False) custom_rate_constants = kwargs.pop('custom_rate_constants', False) if rxnwf_file_path: rxnwf_sts = list(structure.StructureReader(rxnwf_file_path)) keywords = { 'ifreq': 1, 'tmpini': tstart, 'tmpstp': tstep, 'ntemp': ntemp, 'press': pstart, 'press_step': pstep, 'npress': npress } not_include = (jaguarworkflows.HOMO_ENERGY_PROP, jaguarworkflows.ALPHA_HOMO_ENERGY_PROP, jaguarworkflows.BETA_HOMO_ENERGY_PROP, jaguarworkflows.LUMO_ENERGY_PROP, jaguarworkflows.ALPHA_LUMO_ENERGY_PROP, jaguarworkflows.BETA_LUMO_ENERGY_PROP, jaguarworkflows.ENTROPY_STARTER) # Collect all key info from Jaguar keys = jmswfu.get_property_keys_from_keywords(keywords) keys.append(jaguarworkflows.GAS_PHASE_ZERO_POINT_ENERGY_PROP) # Create dict to collect info energy_term_dict = {} energy_term_dict.setdefault(ENERGY_KEYS, {}) energy_term_dict.setdefault(ANHARMONIC_ENERGY_KEYS, {}) energy_term_dict.setdefault(CUSTOM_ENERGY_KEYS, {}) # Collect energy keys stage_idx = get_rep_stage_idx(rxnwf_sts=rxnwf_sts) energy_terms = [ key + STAGE_SEPARATOR + str(stage_idx) for key in keys if not key.startswith(not_include) ] for term in energy_terms: key = ReactionWorkflowEnergyAnalysis.getHeader(term) energy_term_dict[ENERGY_KEYS][key] = term # Collect anharmonic keys if anharm: not_include_anharmonic_terms = ( jaguarworkflows.ZERO_POINT_ENERGY_PROP, jaguarworkflows.GAS_PHASE_ENERGY_PROP, jaguarworkflows.GAS_PHASE_ZERO_POINT_ENERGY_PROP) filtered_terms = [ key for key in energy_terms if not key.startswith(not_include_anharmonic_terms) ] for term in filtered_terms: anharmonic_energy_term = anharmonic.KEY_STARTER + "_".join( term.split("_")[2:]) key = ReactionWorkflowEnergyAnalysis.getHeader( anharmonic_energy_term) energy_term_dict[ANHARMONIC_ENERGY_KEYS][ key] = anharmonic_energy_term # Collect custom key if extra_stages_file: press_data = PressData(press_start=pstart, press_step=pstep, press_n=npress) temp_data = TempData(temp_start=tstart, temp_step=tstep, temp_n=ntemp) pressures, temps = get_pressures_temps(press_data, temp_data) extra_stages = jmswfu.read_stage_datafile(extra_stages_file) for extra_stage in extra_stages: if not extra_stage.analyze_data: continue for analyze_data in extra_stage.analyze_data: # if using wildcards collect keys over the entire pressure and # temperature matrix, doing this for a single line is sufficient # hence the break if jaguarworkflows.ALL_TEMP_PRESS_KEY_EXT in analyze_data.parent_key: keys = [] for press, temp in itertools.product(pressures, temps): ext = jaguarworkflows.get_temp_press_key_ext( temp, press) keys.append(f'{analyze_data.key}{ext}') break else: keys = [extra_stage.analyze_data[0].key] for key in keys: term = f'{key}{STAGE_SEPARATOR}{stage_idx + extra_stage.index - 1}' key = ReactionWorkflowEnergyAnalysis.getHeader(term) energy_term_dict[CUSTOM_ENERGY_KEYS][key] = term if rate_constants: if custom_rate_constants: value = RATE_CONSTANT_STRING else: value = CONF_AVG_CTST_RATE_CONSTANT else: value = None energy_term_dict[RATE_CONSTANT_KEY] = value return energy_term_dict
[docs]class SiblingNode: """ Contains parent-child information for a sibling group as well as features for creating an energy diagram """ LINE_LENGTH = 1 X_GAP = LINE_LENGTH / 2
[docs] def __init__(self, name, axes): """ :param str name: The name of the sibling group :param `matplotlib.axes.Axes` axes: The plot's axes object """ self.name = name self.axes = axes self.generation = None self.csv_idx = None self.color = None self.parents = [] self.children = []
[docs] def addToPlot(self, y_val, energy, text_offset): """ Add this sibling group to the plot :param float y_val: The y-value of the sibling on the plot :param float energy: The energy to show on the label :param float text_offset: The offset for showing the label above horizontal lines """ self.y_val = y_val x_mid = (self.X_GAP + self.LINE_LENGTH) * self.generation x_1 = x_mid - self.LINE_LENGTH / 2 self.x_2 = x_mid + self.LINE_LENGTH / 2 color = self.color or 'k' self.axes.plot((x_1, self.x_2), (y_val, y_val), linewidth=3, color=color) label_text = self.name + ': {0:.2f}'.format(energy) self.label = self.axes.text(x_mid, y_val + text_offset, label_text, ha='center', size='small') color = self.color or '0.75' for parent in self.parents: self.axes.plot((x_1, parent.x_2), (y_val, parent.y_val), color=color)
[docs]class EnergyDiagramPlotter: """ Class for exporting energy level diagrams """ TOTAL_FREE_ENERGY = 'Total_Free_Energy' TEXT_OFFSET_DIVISOR = 50 VERTICAL_SPACING_DIVISOR = 23 PDF_FILE_ENDING = '_e_diagrams.pdf' PNG_FILE_ENDING = '_std_e_diagram.png'
[docs] def __init__(self, json_path, csv_path, logger=None): """ :param str json_path: Path to _conf_avg_wo_x_rel_parents.json file :param str csv_path: Path to _conf_avg_wo_x_rel_reactants.csv file """ # We need the csv in addition to the json because the former contains # energies relative to reactants while the latter contains the needed # parenting information self.json_path = json_path self.csv_path = csv_path self.logger = logger
[docs] def run(self): """ Export the energy level diagrams """ # Agg rather than QTAgg is supposed to be default backend (MATSCI-9819) # to prevent requiring graphics libraries in case running on a remote compute host import matplotlib from matplotlib import figure from matplotlib.backends import backend_pdf from matplotlib.backends.backend_agg import FigureCanvasAgg self.color_map = matplotlib.cm.get_cmap('gist_rainbow') fig = figure.Figure(figsize=(10, 7.5), dpi=200) self.canvas = FigureCanvasAgg(fig) self.axes = fig.add_subplot(111) fig.subplots_adjust(left=0.05, right=0.95) # Need to read the sibling names as str otherwise 000 will change to 0 # and it won't match the json file anymore (MATSCI-11196) self.ene_df = pandas.read_csv(self.csv_path, dtype={SIBLING_GROUP_NAME: str}) self.findTotalEnergyProp() self.parseSiblings() job_name = jobutils.get_jobname(DEFAULT_JOB_NAME) pdf_path = job_name + self.PDF_FILE_ENDING e_png_path = job_name + self.PNG_FILE_ENDING with backend_pdf.PdfPages(pdf_path) as pdf_pages: for prop in self.ene_df.columns[1:]: self.plotDiagram(prop) pdf_pages.savefig(fig) if prop == self.tot_ene_prop: self.canvas.print_figure(e_png_path) jobutils.add_outfile_to_backend(pdf_path) jobutils.add_outfile_to_backend(e_png_path)
[docs] def findTotalEnergyProp(self): """ Find the Total Free Energy property with temperature and pressure closest to 298.15 K and 1.0 atm """ def get_closest_key(the_dict, value): keys = list(the_dict.keys()) array = numpy.array(keys) idx = (numpy.abs(array - value)).argmin() return keys[idx] props = self.ene_df.columns[1:] tot_ene_props = [p for p in props if self.TOTAL_FREE_ENERGY in p] temp_press_dict = defaultdict(dict) for prop in tot_ene_props: temp = jaguarworkflows.get_temperature(prop) press = jaguarworkflows.get_pressure(prop) temp_press_dict[temp][press] = prop target_temp = get_closest_key(temp_press_dict, 298.15) target_press = get_closest_key(next(iter(temp_press_dict.values())), 1.0) self.tot_ene_prop = temp_press_dict[target_temp][target_press]
[docs] def getSibling(self, name): """ Get an existing sibling or create a new one and return it :param str name: The name of the sibling :rtype: `SiblingNode` :return: A SiblingNode object """ if name in self.siblings: return self.siblings[name] else: sibling = SiblingNode(name, self.axes) self.siblings[name] = sibling return sibling
[docs] def parseSiblings(self): """ Read the siblings and parent child info from json file and find the generation of each sibling """ with open(self.json_path) as json_fh: json_data = json.load(json_fh) self.siblings = {} for sibling_name, parents_data in json_data.items(): child = self.getSibling(sibling_name) for parent_name in parents_data: parent = self.getSibling(parent_name) parent.children.append(child) child.parents.append(parent) # Find indexes of each sibling in the csv file for idx, name in enumerate(self.ene_df[SIBLING_GROUP_NAME]): try: self.siblings[name].csv_idx = idx except KeyError: error = (f'Cannot find "{name}" sibling group in ' f'{os.path.basename(self.csv_path)}.') raise ReactionWorkflowException(error) self.findGenerations() self.findSameGenerationIndices() self.setColors()
[docs] def setColors(self): """ Set colors for paths that have one parent and one child for all nodes, excluding the reactant """ node_paths = [] for first_gen in self.generations[1]: path = [] node = first_gen while True: if len(node.parents) > 1 or len(node.children) > 1: path.clear() break if len(node.children) == 0: # last node. Wrap up. path.append(node) break path.append(node) node = node.children[0] if path: node_paths.append(path) num_paths = len(node_paths) for idx, path in enumerate(node_paths): color = self.color_map(idx / num_paths) for node in path: node.color = color
[docs] def findGenerations(self): """ Determine the generations of the siblings Each child's generation is the largest generation of its parents, plus 1 which means that if we have the following ownerships: 1 -> 2 -> 3 -> 4 1 -> 5 -> 4 "4" will be a 4th generation and "5" will be a 2nd generation, so the plot will look like this: 1 -- 2 -- 3 -- 4 1 -- 5 ------- 4 as opposed to 1 ------- 5 -- 4 """ def set_generation(sibling, generation): if sibling.generation is not None: if generation <= sibling.generation: # Nothing to do return sibling.generation = generation for child in sibling.children: set_generation(child, generation + 1) source = [x for x in self.siblings.values() if not x.parents][0] set_generation(source, 0) self.generations = defaultdict(list) for sibling in self.siblings.values(): self.generations[sibling.generation].append(sibling)
[docs] def findSameGenerationIndices(self): """ Find groups of csv indices that belong to the same generation """ self.same_gen_indices = [] for idx in range(1, max(self.generations) + 1): self.same_gen_indices.append( [sibling.csv_idx for sibling in self.generations[idx]])
[docs] def plotDiagram(self, prop): """ Plot the energy diagram for the passed property """ self.axes.clear() self.axes.axis('off') self.axes.autoscale(axis="y") self.canvas.figure.suptitle(prop) disclaimer = ('Lines and labels are moved vertically to avoid overlaps.' ' The Y-axis does not have a uniform scale.') self.axes.text(0.5, -0.05, disclaimer, size=6, ha="center", transform=self.axes.transAxes) y_vals = self.getAdjustedYValues(prop) text_offset = self.ene_df[prop].values.ptp() / self.TEXT_OFFSET_DIVISOR for idx in range(max(self.generations) + 1): for sibling in self.generations[idx]: csv_index = sibling.csv_idx sibling.addToPlot(y_val=y_vals[csv_index], energy=self.ene_df[prop][csv_index], text_offset=text_offset) self.canvas.draw() self.labels = [sibling.label for sibling in self.siblings.values()] self.adjustLabels()
[docs] def adjustLabels(self): """ Adjust the size of labels. If a label's width is large enough to overlap with a neighbour's label, reduce the font and redraw """ adjust = False max_width = SiblingNode.LINE_LENGTH + SiblingNode.X_GAP for label in self.labels: extents = label.get_window_extent() width = extents.transformed(self.axes.transData.inverted()).width if width > max_width: adjust = True break if adjust: for label in self.labels: label.set_size(label.get_size() - 0.5) self.canvas.draw() self.adjustLabels()
[docs] def getAdjustedYValues(self, prop): """ Adjust the y values so that there is enough gap between all consecutive values in the same generation so there is no overlap. :param str prop: The property to adjust values for :rtype: numpy.array :return: The adjusted Y values as a numpy array """ divisor = self.VERTICAL_SPACING_DIVISOR values = self.ene_df[prop].values.copy() base_range = values.ptp() # The required gap is a fixed portion of the plot size. required_diff = base_range / divisor last_num_raised = 0 while True: num_raised = 0 idx_to_deficit = {} for indices in self.same_gen_indices: sorted_indices = sorted(indices, key=lambda x: values[x]) generation_values = values[sorted_indices] generation_diffs = numpy.diff(generation_values) for diff_idx, diff in enumerate(generation_diffs): if diff < required_diff: # Not enough room. All values above need to be raised num_raised += 1 array_idx = sorted_indices[diff_idx + 1] idx_to_deficit[array_idx] = required_diff - diff # Raising the values will increase the value range and shrink all # spacings because the plot height remains constant, so we will need # more space for labels to not overlap. Now we calculate the # extra spacing (x) required. Adding extra_spacing to deficits will # fix the problem if no new labels overlap # new_range = base_range + sum(deficits) + num_raised * x # new_required_diff = x + required_diff = new_height / divisor # divisor * x + divisor * required_diff = # base_range + sum(deficits) + num_raised * x # (divisor-num_raised) * x = base_range + sum(deficits) - # divisor * required_diff denom = divisor - num_raised if denom <= 0: # Too many colliding labels. Will need to reduce the spacing. divisor += 1 continue numerator = (base_range + sum(idx_to_deficit.values()) - divisor * required_diff) extra_spacing = numerator / denom if last_num_raised == num_raised: if abs(extra_spacing) > 1e-5: if self.logger: warning = ( 'Error during adjustment of energy diagram' f' positions for "{prop}". Using original values.') self.logger.warning(warning) return values # Adjusting the sizes causes no new labels to overlap. Use the # current deficits break last_num_raised = num_raised # Raise the requirement and see if any other labels overlap now required_diff += extra_spacing for idx, deficit in idx_to_deficit.items(): val = values[idx] values[values >= val] += deficit return values
[docs]def flatten_sibling_conformers(sibling_conformers_dict): """ Return a list of structures from the given sibling conformers dictionary. :type sibling_conformers_dict: dict :param sibling_conformers_dict: dictionary of dictionaries where the outer dictionary is keyed by sibling and inner dictionary is keyed by conformer and values of the inner dictionary are lists of structures :rtype: list[`schrodinger.structure.Structure`] :return: the structures """ sts = [] for conformers_dict in sibling_conformers_dict.values(): for conformers in conformers_dict.values(): sts.extend(conformers) return sts