Source code for schrodinger.application.matsci.jaguar_multistage_workflow_utils

"""
Utilities for Jaguar multistage workflow.

Copyright Schrodinger, LLC. All rights reserved.
"""

import os
import shlex
import shutil
import sys
import time
from collections import defaultdict
from contextlib import nullcontext

import numpy

from schrodinger import structure
from schrodinger.application.desmond import constants as dconst
from schrodinger.application.jaguar import input as jinput
from schrodinger.application.matsci import desmondutils
from schrodinger.application.matsci import jaguarworkflows
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import msutils
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import textlogger
from schrodinger.application.matsci import msprops
from schrodinger.infra import mm
from schrodinger.job import jobcontrol
from schrodinger.utils import fileutils

NEW_STAGE = 'NEW_STAGE'
CUSTOM = 'CUSTOM'
COMMAND_LINE = 'WORKFLOW'
SIMULATION = 'SIMULATION'
INFO = 'INFO'
PARENT = 'PARENT'
KEYWORDS = 'KEYWORDS'
ATOM_BASIS = 'ATOM_BASIS'
CHARGE_CONSTRAINTS = 'CHARGE_CONSTRAINTS'
GEOM_CONSTRAINTS = 'GEOM_CONSTRAINTS'
ACTIVE_COORDINATES = 'ACTIVE_COORDINATES'
EXTRA_SECTIONS = 'EXTRA_SECTIONS'
OVERRIDE_KEYWORDS = 'OVERRIDE_KEYWORDS'
ANALYZE = 'ANALYZE'
DATATAGS = set([
    PARENT, KEYWORDS, ATOM_BASIS, CHARGE_CONSTRAINTS, GEOM_CONSTRAINTS,
    ACTIVE_COORDINATES, EXTRA_SECTIONS, OVERRIDE_KEYWORDS, ANALYZE, INFO,
    COMMAND_LINE, SIMULATION, CUSTOM
])
DELIM = '\t'
TRUE = 'true'

GENERIC_STAGE_TAG = 'stage_'
NO_STAGE_NAME = '_NO_STAGE_NAME_'

WAVEFUNCTION = 'WAVEFUNCTION'
HESSIAN = 'HESSIAN'
ANALYSIS = 'ANALYSIS'

NONE = 'None'
MINF = 'Min'
MAXF = 'Max'
AVGF = 'Avg'
STDF = 'Std'
FUNCTION_DICT = {
    NONE: lambda x: x[0],
    MINF: min,
    MAXF: max,
    AVGF: numpy.mean,
    STDF: numpy.std
}
SMAP_ELIGIBLE_EXTENSIONS = ['.vis', '.vib', '_vib.spm']

logger = None

WORKFLOW_STAGE_KEY = 'i_matsci_Workflow_Stage'
PARENT_ST_IDX_KEY = 'i_matsci_Parent_Structure_Stage_Index'

DEFAULT_TEMP_START = 298.15  # K
DEFAULT_TEMP_STEP = 10.  # K
DEFAULT_TEMP_N = 1
DEFAULT_PRESS_START = 1.  # atm
DEFAULT_PRESS_STEP = 1.  # atm
DEFAULT_PRESS_N = 1

# reserved for Jaguar structure properties that lack units in
# the property key, '(au)' (Hartree) is used in other Jaguar
# property keys and is used to be consistent
JAGUAR_PROP_UNITS_DICT = {
    jaguarworkflows.GAS_PHASE_ENERGY_PROP: '(au)',
    jaguarworkflows.HOMO_ENERGY_PROP: '(au)',
    jaguarworkflows.LUMO_ENERGY_PROP: '(au)',
    jaguarworkflows.ALPHA_HOMO_ENERGY_PROP: '(au)',
    jaguarworkflows.ALPHA_LUMO_ENERGY_PROP: '(au)',
    jaguarworkflows.BETA_HOMO_ENERGY_PROP: '(au)',
    jaguarworkflows.BETA_LUMO_ENERGY_PROP: '(au)',
    jaguarworkflows.LOWEST_EXCITATION_PROP: '(eV)',
    jaguarworkflows.LOWEST_SINGLET_EXCITATION_PROP: '(eV)',
    jaguarworkflows.LOWEST_TRIPLET_EXCITATION_PROP: '(eV)',
    jaguarworkflows.SOLUTION_ENERGY_PROP: '(au)',
    jaguarworkflows.GAS_PHASE_GROUND_ENERGY_PROP: '(au)',
    jaguarworkflows.GAS_EXCITED_ENERGY_PROP: '(au)',
    jaguarworkflows.SOLUTION_GROUND_ENERGY_PROP: '(au)',
    jaguarworkflows.SOL_EXCITED_ENERGY_PROP: '(au)'
}


[docs]def set_up_logger(related_filename): """ Set up the logger used in this module. :type related_filename: str :param related_filename: the base name of this file name will be used as the base name of the log file name if not running under job control, otherwise the job name will be used """ global logger logger, log_name = textlogger.create_logger( related_filename=related_filename)
[docs]def parse_yes_no_setting(setting, keyword): """ Translate an English word into a boolean if possible :param str setting: The text (true/false/yes/no/on/off etc) to convert :param str keyword: The associated keyword to show in error messages :rtype: bool :return: True if the setting translates to Truthy, False if it translates to Falsey :raise InvalidStageFileError: If setting can't be translated """ try: return msutils.setting_to_bool(setting) except ValueError: msg = (f'Cannot interpret the value of {keyword}={setting} as a ' 'yes/no condition') raise InvalidStageFileError(msg)
[docs]class InvalidStageFileError(Exception): """ Class for any exception that occurs when reading in a settings file """
[docs]class MissingDataError(Exception): """ Raised when an expected structure property for Analysis is not found """
[docs]class ParameterLine: """ Base class for lines that take keyword=value parameters """ # The header line for this info in the stage file TAG = 'BASE' # All keys this class recognizes on the parameter line ALL_KEYS = set()
[docs] def __init__(self, line, index): """ Create a ParameterLine object :param str line: The line to parse from the input file :param int index: The 1-based index of this stage :raise `InvalidStageFileError`: If something is wrong with the line """ keywords = self.getKeywords(line) self.validateKeywords(keywords, index)
[docs] def getKeywords(self, line): """ Parse the line into a set of keyword-value pairs :param str line: The line to parse from the input file :rtype: dict :return: Keys are lowercase keywords, values are values :raise `InvalidStageFileError`: If something is wrong with the line """ line = line.strip() try: raw_keywords = msutils.keyword_string_to_dict(line) except ValueError as msg: raise InvalidStageFileError(msg) return {x.lower(): y for x, y in raw_keywords.items()}
[docs] def validateKeywords(self, keywords, index): """ Validate and pull information from the keywords :param dict keywords: Keys are class constant keywords :param int index: The 1-based index of this stage :raise `InvalidStageFileError`: If something is wrong with the line """ for key in keywords.keys(): if key not in self.ALL_KEYS: allowed = ', '.join(self.ALL_KEYS) raise InvalidStageFileError( f'Stage {index}: {key} is an invalid key. ' f'Valid keys are: {allowed}.')
[docs] @classmethod def writeParameterLine(cls, keywords, datafile): """ Write a line with these settings to the given file :param dict keywords: The current settings :param file datafile: The file object to write to """ for key in keywords.keys(): if key not in cls.ALL_KEYS: raise RuntimeError(f'{key} is not a known parameter for ' f'{cls.TAG} lines') line = msutils.keyword_dict_to_string(keywords) datafile.write(f'{cls.TAG}\n{line}\n')
[docs]class CustomScriptLine(ParameterLine): """ Holds general information about a custom script Used in the meta workflow driver but not the Jaguar multistage workflow """ # The header line for this info in the stage file TAG = CUSTOM # A string, the input type for this stage INPUT = 'input' # A string, output type for this stage OUTPUT = 'output' # boolean True or False TRAJECTORY = 'trajectory' # Path to the script PATH = 'path' # Whether to use JC or a subprocess to run the script USE_JC = 'use_jc' # All keys this class recognizes on the parameter line ALL_KEYS = {INPUT, OUTPUT, TRAJECTORY, PATH, USE_JC} MAEGZ = 'maegz' CMS = 'cms' OTHER = 'other' ALLOWED_INPUT = {MAEGZ, CMS, OTHER} ALLOWED_OUTPUT = {MAEGZ, CMS} # GUI uses this path value to indicate that it is in an intermediate state # and the driver path should not be checked DO_NOT_CHECK_PATH = 'Not_located'
[docs] def validateKeywords(self, keywords, index): """ Validate and pull information from the keywords :param dict keywords: Keys are class constant keywords :param int index: The 1-based index of this stage :raise `InvalidStageFileError`: If something is wrong with the line """ super().validateKeywords(keywords, index) # Validate the input and output types msg = ('Stage {index}: {ftype} is not a valid {ktype} type. ' 'Allowed types: {allowed}') ktypes = (self.INPUT, self.OUTPUT) allowed_vals = (self.ALLOWED_INPUT, self.ALLOWED_OUTPUT) for ktype, allowed in zip(ktypes, allowed_vals): ftype = keywords.get(ktype, self.CMS) if ftype not in allowed: astr = ', '.join(allowed) error = msg.format(index=index, ftype=ftype, ktype=ktype, allowed=astr) raise InvalidStageFileError(error) # Store the keyword values self.input_type = keywords.get(self.INPUT, self.CMS) self.output_type = keywords.get(self.OUTPUT, self.CMS) self.trajectory = parse_yes_no_setting(keywords.get(self.TRAJECTORY), self.TRAJECTORY) self.use_jc = parse_yes_no_setting(keywords.get(self.USE_JC), self.USE_JC) # Ensure the usage of trajectory is OK if self.output_type == self.MAEGZ and self.trajectory: # User specified True msg = (f'Stage {index}: Trajectories from stages with output type ' f'{self.MAEGZ} cannot be used by later steps') raise InvalidStageFileError(msg) if self.trajectory is None: # User did not specify, default is True self.trajectory = True # Check for a valid path (note - we don't know the script name yet, so # we can't validate that the script is in the directory). We don't want # to do this check if we are running under job control because in that # case the script should already have been copied into the job directory # and this original path was only needed for local start of the job. path = keywords.get(self.PATH, '.') if path != self.DO_NOT_CHECK_PATH and not jobcontrol.get_backend(): if os.path.isfile(path): # User included the script name in the path, remove it path = os.path.dirname(path) elif not os.path.exists(path): msg = ("The given path for the script's directory does not " f"exist {path}") raise InvalidStageFileError(msg) self.path = path
[docs]class StageInfoLine(ParameterLine): """ Holds general information about a stage: name, parent, etc. Used in the meta workflow driver but not the Jaguar multistage workflow """ # The header line for this info in the stage file TAG = INFO # A string, the name of this stage NAME = 'name' # A string, referring to the name of the parent stage PARENT = 'parent' # boolean True or False MAIN = 'main' # Names of other stages to wait for completion before starting this step REQUIRES = 'requires' # All keys this class recognizes on the parameter line ALL_KEYS = {NAME, PARENT, MAIN, REQUIRES}
[docs] def __init__(self, line, index): """ Create a StageInfoLine object :param str line: The line to parse from the input file :param int index: The 1-based index of this stage :raise `InvalidStageFileError`: If something is wrong with the line """ super().__init__(line, index) # Will be replaced later with the stage that is this stage's parent self.parent_stage = None self.requires_stages = []
[docs] def validateKeywords(self, keywords, index): """ Validate and pull information from the keywords :param dict keywords: Keys are class constant keywords :param int index: The 1-based index of this stage :raise `InvalidStageFileError`: If something is wrong with the line """ super().validateKeywords(keywords, index) self.name = jobutils.clean_string( keywords.get(self.NAME, f'{GENERIC_STAGE_TAG}{index}')) self.parent_name = keywords.get(self.PARENT) if self.parent_name: self.parent_name = jobutils.clean_string(self.parent_name) self.main = parse_yes_no_setting(keywords.get(self.MAIN), self.MAIN) requires = keywords.get(self.REQUIRES) if requires: self.requires_names = set(requires.split(',')) else: self.requires_names = set()
[docs]class SimulationParams: """ Holds information about an MD simulation stage """ MD = 'md' BROWNIE = 'brownie' MS_RELAX = 'matsci_relaxation' COMPRESSIVE_RELAX = 'compress_relaxation' SEMI_CRYSTALLINE1_RELAX = 'semi_crystal_relaxation1' SEMI_CRYSTALLINE2_RELAX = 'semi_crystal_relaxation2' TYPE = 'type' ENSEMBLE = 'ensemble' TIME = 'time' TEMP = 'temp' PRESSURE = 'pressure' TIMESTEP = 'timestep' TRJINT = 'trj_interval' TRJINT_STRINGER = 'trajectory_dot_interval' SEED = 'seed' SEED_STRINGER = 'random_seed' ANALYSIS = 'analysis' AVERAGE = 'average' ANISOTROPIC = 'anisotropic' ALL_KEYS = [ ENSEMBLE, TIME, TEMP, PRESSURE, SEED, ANALYSIS, AVERAGE, TIMESTEP, TRJINT, ANISOTROPIC ] ALLOWED_KEYS = { MD: ALL_KEYS, BROWNIE: ALL_KEYS, MS_RELAX: [TEMP], COMPRESSIVE_RELAX: [TEMP], SEMI_CRYSTALLINE1_RELAX: [TEMP], SEMI_CRYSTALLINE2_RELAX: [TEMP] }
[docs] def __init__(self, line): """ Create a Simulation instance from a line of text Expected tab-delimited format: type=stype temp=300... :param str line: The line of text to create the instance from :raise `InvalidStageFileError`: If something is wrong with the line """ try: keywords = msutils.keyword_string_to_dict(line) except ValueError as msg: raise InvalidStageFileError(msg) self.keywords = {x.lower(): y for x, y in keywords.items()} self.validateKeywords()
[docs] def validateKeywords(self): """ Validate and parse information from the keywords :raise `InvalidStageFileError`: If something is wrong with the line """ self.stype = self.keywords.pop(self.TYPE, self.MD) analysis = self.keywords.pop(self.ANALYSIS, None) self.analysis = parse_yes_no_setting(analysis, self.ANALYSIS) ave_msg = 'The value for "average" must be a positive integer <= 100' try: self.average = int(self.keywords.pop(self.AVERAGE, 0)) except ValueError: raise InvalidStageFileError(ave_msg) if self.average < 0 or self.average > 100: raise InvalidStageFileError(ave_msg) # Ensure the type is a valid simulation type try: valid_params = self.ALLOWED_KEYS[self.stype] except KeyError: allowed = ', '.join(self.ALLOWED_KEYS.keys()) msg = (f'{self.stype} is not an allowed type. Allowed types are ' f'{allowed}.') raise InvalidStageFileError(msg) # Ensure the keywords are valid for this simulation type for key in self.keywords.keys(): if key not in valid_params: valid_string = ', '.join(valid_params) msg = (f'{key} is not an allowed parameter for type ' f'{self.stype}. Allowed parameters are {valid_string}') raise InvalidStageFileError(msg) # Translate user-facing keywords to Stringer-facing keywords # Random seed - use parserutils to handle the value 'random' rseed = self.keywords.pop(self.SEED, None) if rseed: rseed = str(parserutils.type_random_seed(rseed)) self.keywords[self.SEED_STRINGER] = rseed # Trajectory interval if self.TRJINT in self.keywords: self.keywords['trajectory_dot_interval'] = self.keywords.pop( self.TRJINT) # Anisotropy anisotropic = self.keywords.pop(self.ANISOTROPIC, False) if anisotropic: anisotropic = parse_yes_no_setting(anisotropic, self.ANISOTROPIC) if anisotropic: msj_isokey = desmondutils.MSJStringer.ISOTROPY self.keywords[msj_isokey] = dconst.IsotropyPolicy.ANISOTROPIC # The user gives timestep in femtoseconds, we use picoseconds if self.TIMESTEP in self.keywords: step = float(self.keywords[self.TIMESTEP]) / 1000 self.keywords[self.TIMESTEP] = str(step) # Make sure the capitalization of the ensemble is correct ensemble = self.keywords.get(self.ENSEMBLE) if ensemble: if ensemble.lower() == 'npgt': self.keywords[self.ENSEMBLE] = 'NPgT' else: self.keywords[self.ENSEMBLE] = ensemble.upper()
[docs]class CommandLine: """ Holds information about a workflow command line """ INPUT_MASK = '$input' TRAJ_MASK = '$trj'
[docs] def __init__(self, line): """ Create a CommandLine instance from a line of text Expected tab-delimited format: [$SCHRODINGER/run] driver_path command line tokens Any string in the command that depends on a job-specific name should be given as simply $input. For instance, in a command such as:: $SCHRODINGER/run driver.py -flob hobnob -j jobname.txt jobname.maegz The command should be provided is:: $SCHRODINGER/run driver.py -flob hobnob -j $input.txt $input.maegz `$input` will be replaced at runtime with the name of the job If a trajectory path from the previous step is part of the command, that should be given as just $trj (i.e. -trj $trj) Command line values that have a space in them - such as an ASL string - should be put inside double quotes: "mol.num 7" :param str line: The line of text to create the instance from :raise `InvalidStageFileError`: If something is wrong with the line """ # We need to use shlex.split to keep quoted arguments (such as an ASL) # together try: tokens = shlex.split(line.strip()) except ValueError as err: raise InvalidStageFileError(f'Error parsing command line: {err}') for index, token in enumerate(tokens): # Skip $SCHRODINGER/run (including if $SCHRODINGER is an explicit # path with spaces in it) if token.endswith('.py'): self.flags = tokens[index:] break else: self.flags = tokens self.driver = os.path.basename(self.flags[0])
[docs]class ParentStageData(object): """ Holds and manipulates data about a parent stage """
[docs] def __init__(self, line): """ Create a ParentStageData instance from a line of text Expected tab-delimited format: parent_stage_# [WAVEFUNCTION] [HESSIAN] [ANALYSIS] :type line: str :param line: The line of text to create the instance from """ tokens = line.upper().strip().split(DELIM) try: self.stage = int(tokens.pop(0)) except (IndexError, ValueError, TypeError): raise InvalidStageFileError('The first value in the %s data line ' 'must be the integer index of a ' 'parent stage.' % PARENT) options = set(tokens) try: options.remove(WAVEFUNCTION) self.use_wavefunction = True except KeyError: self.use_wavefunction = False try: options.remove(HESSIAN) self.use_hessian = True except KeyError: self.use_hessian = False try: options.remove(ANALYSIS) self.use_analysis = True except KeyError: self.use_analysis = False if options: invalids = ', '.join(options) raise InvalidStageFileError('Invalid options on the %s data line: ' '%s' % (PARENT, invalids)) if (self.use_wavefunction or self.use_hessian) and self.use_analysis: msg = ('Parents used for the purposes of analysis are not allowed ' 'to pass on information, like their geometry, wavefunction, ' 'and/or Hessian, to a child job.') raise InvalidStageFileError(msg) self.inherited = not self.use_analysis
[docs] @staticmethod def writeInheritableParentDataToFile(parent, wavefunction, hessian, datafile): """ Write inheritable parent stage data to a file in a format that this class can read in. :type parent: int :param parent: The stage number of the parent stage :type wavefunction: bool :param wavefunction: Whether to use the wavefunction from the parent stage :type hessian: bool :param hessian: Whether to use the hessian from the parent stage :type datafile: file :param datafile: The file to write the data to """ datafile.write(PARENT + '\n') values = [str(parent)] if wavefunction: values.append(WAVEFUNCTION) if hessian: values.append(HESSIAN) line = DELIM.join(values) datafile.write('%s\n' % line)
[docs] @staticmethod def writeNonInheritableParentDataToFile(parents, datafile): """ Write noninheritable parent stage data to a file in a format that this class can read in. :type parents: list :param parents: the stage numbers of the parent stages :type datafile: file :param datafile: the file to write the data to """ datafile.write(PARENT + '\n') for idx in parents: values = [str(idx), ANALYSIS] line = DELIM.join(values) datafile.write('%s\n' % line)
[docs]class GeomConstraint(object): """ Holds and manipulates data about geometry constraints """
[docs] def __init__(self, line): """ Create a GeomConstraint instance from a line of text Expected tab-delimited format: entry_id target value index index ... :type line: str :param line: The line of text to create the instance from """ tokens = line.strip().split(DELIM) geo_fmt = ('The format of the tab-delimited %s data line is: "entry_id ' 'target type index index ..." where entry_id is the entry ' 'ID of the entry the constraint applies to, target is the ' 'floating point target for the contraint or "%s" if there ' 'is no target, type is the integer constraint type and ' 'index is the integer atom index the constraint ' 'applies to. Give a single index for atom constraints, two ' 'indexes for bond constraints, etc. Instead, got: %s' % (GEOM_CONSTRAINTS, NONE, line)) try: self.eid = tokens.pop(0) except IndexError: raise InvalidStageFileError(geo_fmt) try: target = tokens.pop(0) except IndexError: raise InvalidStageFileError(geo_fmt) if target == NONE: self.target = None else: try: self.target = float(target) except ValueError: raise InvalidStageFileError(geo_fmt) try: self.ctype = int(tokens.pop(0)) except (IndexError, ValueError): raise InvalidStageFileError(geo_fmt) self.indexes = [int(x) for x in tokens]
[docs] @staticmethod def writeData(eid, target, ctype, idxs, afile): """ Write the given data to file. :type eid: str :param eid: the entry ID :type target: float or None :param target: the target value for the constraint if there is one :type ctype: int :param ctype: the Jaguar constraint type :type idxs: list :param idxs: contains indices of atoms defining the constraint :type afile: file :param afile: the file to write the data to """ astr = DELIM.join(str(idx) for idx in idxs) try: target = str(target) except ValueError: # some constraints have None as the target value target = NONE afile.write('{eid}{tab}{targ}{tab}{ctype}{tab}{inds}\n'.format( eid=eid, tab=DELIM, targ=target, ctype=ctype, inds=astr))
[docs] @staticmethod def writeModelDataToFile(model, datafile, eid): """ Write geometry constraint data from a model to a file in a format that this class can read in :type model: `schrodinger.application.jaguar.gui.tabs.optimization_tab. ContraintCoordinatesModel` :param model: The model containing restraints to write :type datafile: file :param datafile: The file to write the data to :type eid: str :param eid: The entry id for this geometry constraint """ for index, data in enumerate(model.coords): if not index: datafile.write(GEOM_CONSTRAINTS + '\n') GeomConstraint.writeData(eid, data.target_value, data.coordinate_type, data.atom_indices, datafile)
[docs] @staticmethod def writeDictDataToFile(adict, datafile): """ Write geometry constraint data from a dictionary to a file in a format that this class can read in :type adict: dictionary :param adict: keys are entry IDs, values are lists of GeomConstraint :type datafile: file :param datafile: The file to write the data to """ for index, datas in enumerate(adict.values()): if not index: datafile.write(GEOM_CONSTRAINTS + '\n') for data in datas: GeomConstraint.writeData(data.eid, data.target, data.ctype, data.indexes, datafile)
[docs] def applyToJaguarInput(self, jagin): """ Apply this geometry constraint to a JaguarInput object :type jagin: `schrodinger.application.jaguar.input.JaguarInput` :param jagin: The JaguarInput object to apply this constraint to """ jagin.setConstraint(self.ctype, self.indexes, self.target)
[docs]class ActiveCoord(object): """ Holds and manipulates data about active coordinates """
[docs] def __init__(self, line): """ Create an ActiveCoord instance from a line of text Expected tab-delimited format: entry_id type index index ... :type line: str :param line: The line of text to create the instance from """ tokens = line.strip().split(DELIM) act_fmt = ('The format of the tab-delimited %s data line is: "entry_id ' 'type index index ..." where entry_id is the entry ' 'ID of the entry the active coordinate applies to, ' 'type is the integer active coordinate type and ' 'index is the integer atom index the active coordinate ' 'applies to. Give a single index for an active atom, two ' 'indexes for an active bond, etc. Instead, got: %s' % (ACTIVE_COORDINATES, line)) try: self.eid = tokens.pop(0) except IndexError: raise InvalidStageFileError(act_fmt) try: self.ctype = int(tokens.pop(0)) except (IndexError, ValueError): raise InvalidStageFileError(act_fmt) self.indexes = [int(x) for x in tokens]
[docs] @staticmethod def writeData(eid, ctype, idxs, afile): """ Write the given data to file. :type eid: str :param eid: the entry ID :type ctype: int :param ctype: the Jaguar active coordinate type :type idxs: list :param idxs: contains indices of atoms defining the active coordinate :type afile: file :param afile: the file to write the data to """ astr = DELIM.join(str(idx) for idx in idxs) afile.write('{eid}{tab}{ctype}{tab}{inds}\n'.format(eid=eid, tab=DELIM, ctype=ctype, inds=astr))
[docs] @staticmethod def writeModelDataToFile(model, datafile, eid): """ Write active coordinate data using a geometry constraint model to a file in a format that this class can read in :type model: `schrodinger.application.jaguar.gui.tabs.optimization_tab. ContraintCoordinatesModel` :param model: The model containing active coordinates to write :type datafile: file :param datafile: The file to write the data to :type eid: str :param eid: The entry id for this active coordinate """ for index, data in enumerate(model.coords): if not index: datafile.write(ACTIVE_COORDINATES + '\n') ActiveCoord.writeData(eid, data.coordinate_type, data.atom_indices, datafile)
[docs] @staticmethod def writeDictDataToFile(adict, datafile): """ Write active coordinate data from the given dictionary to a file in a format that this class can read in :type adict: dict :param adict: keys are entry IDs, values are lists of ActiveCoord :type datafile: file :param datafile: The file to write the data to """ for index, datas in enumerate(adict.values()): if not index: datafile.write(ACTIVE_COORDINATES + '\n') for data in datas: ActiveCoord.writeData(data.eid, data.ctype, data.indexes, datafile)
[docs] def applyToJaguarInput(self, jagin): """ Apply this active coordinate to a JaguarInput object :type jagin: `schrodinger.application.jaguar.input.JaguarInput` :param jagin: The JaguarInput object to apply this active coordinate to """ jagin.setActiveCoord(self.ctype, self.indexes)
[docs]class AtomBasis(object): """ Holds and manipulates data about by-atom basis sets """
[docs] def __init__(self, line): """ Create a AtomBasis instance from a line of text Expected tab-delimited format: entry_id index basis_set :type line: str :param line: The line of text to create the instance from """ tokens = line.strip().split(DELIM) ab_format = ('The format of an tab-delimited %s data line is: ' '"entry_id index basis" where entry_id is the entry ID ' 'the data applies to, index is the integer index of the ' 'atom the data applies to and basis is the name of the ' 'basis set that applies to that atom. Instead got: %s' % (ATOM_BASIS, line)) try: self.eid = tokens[0] except IndexError: raise InvalidStageFileError(ab_format) try: self.num = int(tokens[1]) except (ValueError, IndexError): raise InvalidStageFileError(ab_format) try: self.basis = tokens[2] except IndexError: raise InvalidStageFileError(ab_format)
[docs] @staticmethod def writeData(eid, anum, basis, afile): """ Write the given data to file. :type eid: str :param eid: the entry ID :type anum: int :param anum: the atom number :type basis: str :param basis: the basis :type afile: file :param afile: the file to write the data to """ afile.write("{eid}{tab}{num}{tab}{basis}\n".format(eid=eid, tab=DELIM, num=anum, basis=basis))
[docs] @staticmethod def writeModelDataToFile(model, datafile): """ Write by-atom basis set data from a model to a file in a format that this class can read in :type model: `schrodinger.application.jaguar.gui.tabs.sub_tab_widgets. basis_set_widgets.BasisSetModel` :param model: The model containing restraints to write :type datafile: file :param datafile: The file to write the data to """ for index, row in enumerate(model._rows): if not index: datafile.write(ATOM_BASIS + '\n') AtomBasis.writeData(row.entry_id, row.atom_num, row.basis, datafile)
[docs] @staticmethod def writeDictDataToFile(adict, datafile): """ Write by-atom basis set data from a dictionary to a file in a format that this class can read in :type adict: dict :param adict: keys are entry IDs, values are lists of AtomBasis :type datafile: file :param datafile: The file to write the data to """ for index, datas in enumerate(adict.values()): if not index: datafile.write(ATOM_BASIS + '\n') for data in datas: AtomBasis.writeData(data.eid, data.num, data.basis, datafile)
[docs] def applyToJaguarInput(self, jagin): """ Apply this basis set to a JaguarInput object :type jagin: `schrodinger.application.jaguar.input.JaguarInput` :param jagin: The JaguarInput object to apply this atom basis set to """ jagin.setAtomicBasis(self.num, self.basis)
[docs]class ChargeConstraint(object): """ Holds and manipulates data about by-atom charge constraints """
[docs] def __init__(self, line): """ Create a ChargeConstraint instance from a line of text Expected tab-delimited format: entry_id index basis_set :type line: str :param line: The line of text to create the instance from """ tokens = line.strip().split(DELIM) chg_format = ('The format for a tab-delimited %s data line is: ' '"entry_id charge index:weight index:weight ..." where ' 'entry_id is the ID of the entry it applies to, charge ' 'is the floating point charge, and each index:weight ' 'pair is the integer index of an atom and weight is the ' 'floating point weight for that atom. Multiple ' 'index:weight pairs can be given. Instead got: %s' % (CHARGE_CONSTRAINTS, line)) try: self.eid = tokens.pop(0) except IndexError: raise InvalidStageFileError(chg_format) try: self.charge = float(tokens.pop(0)) except (IndexError, ValueError): raise InvalidStageFileError(chg_format) self.weights = dict() for token in tokens: try: sindex, sweight = token.split(':') self.weights[int(sindex)] = float(sweight) except ValueError: raise InvalidStageFileError(chg_format)
[docs] @staticmethod def writeData(eid, charge, weights, afile): """ Write the given data to file. :type eid: str :param eid: the entry ID :type charge: float :param charge: the charge :type weights: dict :param weights: keys are atom indices, values are float weights :type afile: file :param afile: the file to write the data to """ astr = DELIM.join(['%d:%.6f' % p for p in weights.items()]) afile.write("{eid}{tab}{chg:.4f}{tab}{wts}\n".format(eid=eid, tab=DELIM, chg=charge, wts=astr))
[docs] @staticmethod def writeModelDataToFile(model, datafile): """ Write by-atom charge constraint data from a model to a file in a format that this class can read in :type model: `schrodinger.application.jaguar.gui.tabs.sub_tab_widgets. basis_set_widgets.ChargeConstraintsModel` :param model: The model containing restraints to write :type datafile: file :param datafile: The file to write the data to """ for index, row in enumerate(model._rows): if not index: datafile.write(CHARGE_CONSTRAINTS + '\n') ChargeConstraint.writeData(row.entry_id, row.charge, row.weightsByNum(), datafile)
[docs] @staticmethod def writeDictDataToFile(adict, datafile): """ Write by-atom charge constraint data from a dictionary to a file in a format that this class can read in :type adict: dict :param adict: keys are entry IDs, values are lists of ChargeConstraint :type datafile: file :param datafile: The file to write the data to """ for index, datas in enumerate(adict.values()): if not index: datafile.write(CHARGE_CONSTRAINTS + '\n') for data in datas: ChargeConstraint.writeData(data.eid, data.charge, data.weights, datafile)
[docs] def applyToJaguarInput(self, jagin): """ Apply this charge constraint to a JaguarInput object :type jagin: `schrodinger.application.jaguar.input.JaguarInput` :param jagin: The JaguarInput object to apply this constraint to """ jagin.appendChargeConstraints(self.charge, self.weights)
[docs]class StageKeywords(object): """ Holds and manipulates data about keywords """
[docs] def __init__(self, line): """ Create a StageKeywords instance from a line of text Expected tab-delimited format: entry_id keyword=value keyword=value ... :type line: str :param line: The line of text to create the instance from """ tokens = line.strip().split(DELIM) key_fmt = ('The format for a %s data line is: "entry_id keyword=value ' 'keyword=value ..." where entry_id is the entry ID of the ' 'entry the keywords apply to, followed by any number of ' 'keyword=value pairs. Instead got: %s' % (KEYWORDS, line)) try: self.eid = tokens[0] except IndexError: raise InvalidStageFileError(key_fmt) keystring = tokens[1] try: self.keywords = msutils.keyword_string_to_dict(keystring) except (IndexError, ValueError): raise InvalidStageFileError(key_fmt)
[docs] @staticmethod def writeKeywordsToFile(keywords, datafile): """ Write keyword data to a file in a format that this class can read in :type keywords: dict :param keywords: keys are entry IDs, values dicts with Jaguar (key, value) pairs :type datafile: file :param datafile: The file to write the data to """ datafile.write(KEYWORDS + '\n') for eid, keydict in keywords.items(): keystring = msutils.keyword_dict_to_string(keydict) datafile.write('{eid}{tab}{keys}\n'.format(eid=eid, tab=DELIM, keys=keystring))
[docs] @staticmethod def writeDictDataToFile(adict, datafile): """ Write keyword data from the given dictionary to a file in a format that this class can read in :type adict: dict :param adict: keys are entry IDs, values are lists of StageKeywords :type datafile: file :param datafile: The file to write the data to """ new_adict = {} for eid, datas in adict.items(): new_adict[eid] = {} for data in datas: new_adict[eid].update(data.keywords) StageKeywords.writeKeywordsToFile(new_adict, datafile)
[docs] def applyToJaguarInput(self, jagin): """ Apply these keywords to a JaguarInput object :type jagin: `schrodinger.application.jaguar.input.JaguarInput` :param jagin: The JaguarInput object to apply these keywords to """ jagin.setValues(self.keywords)
[docs]class ExtraSectionData(object): """ Holds and manipulates text for extra sections """
[docs] def __init__(self): """ Create an ExtraSectionData instance Use addLine to add a line of text """ self.text = ""
[docs] def addLine(self, line): """ Add a line of text to the extra section text block :type line: str :param line: The line of text to add to the extra section block """ # The \n was stripped off when reading in the line self.text += line + '\n'
[docs] def addToJaguarInput(self, input_file): """ Add this block of text at the bottom of a jaguar input file :type input_file: file :param input_file: The input file to add the text to """ if self.text: input_file.write(self.text + '\n')
[docs] @staticmethod def writeSectionsToFile(sections, datafile): """ Write extra sections data to a file in a format that this class can read in :type sections: str :param sections: The extra text to add :type datafile: file :param datafile: The file to write the data to """ if sections: datafile.write(EXTRA_SECTIONS + '\n') datafile.write(sections + '\n')
[docs]class OverrideKeywords(object): """ Holds and manipulates data about override keywords - these are keywords that either can't be set by the GUI or override the values set in the GUI. They apply to all structures """
[docs] def __init__(self, line): """ Create a OverrideKeywords instance from a line of text Expected tab-delimited format: keyword=value keyword=value ... :type line: str :param line: The line of text to create the instance from """ key_fmt = ('The format for a tab-delimited %s data line is: ' '"keyword=value keyword=value ...". Instead got: %s' % (OVERRIDE_KEYWORDS, line)) try: self.keywords = msutils.keyword_string_to_dict(line) except (IndexError, ValueError): raise InvalidStageFileError(key_fmt)
[docs] @staticmethod def writeKeyStringToFile(keystring, datafile): """ Write override keyword data to a file in a format that this class can read in :type keystring: str :param keystring: the string of keyword=value pairs to write :type datafile: file :param datafile: The file to write the data to """ datafile.write(OVERRIDE_KEYWORDS + '\n') tokens = keystring.split() tabstring = DELIM.join(tokens) datafile.write('%s\n' % tabstring)
[docs] def applyToJaguarInput(self, jagin): """ Apply these keywords to a JaguarInput object :type jagin: `schrodinger.application.jaguar.input.JaguarInput` :param jagin: The JaguarInput object to apply these keywords to """ jagin.setValues(self.keywords)
[docs]def get_property_keys_from_keywords(keywords): """ Return Jaguar output structure property keys that are created from the given input keywords. :type keywords: dict :param keywords: Jaguar keywords :rtype: list :return: structure property keys """ keys = [] if keywords.get('ifreq'): keys.append(jaguarworkflows.ZERO_POINT_ENERGY_PROP) temp_start = float(keywords.get('tmpini', DEFAULT_TEMP_START)) temp_step = float(keywords.get('tmpstp', DEFAULT_TEMP_STEP)) temp_n = int(keywords.get('ntemp', DEFAULT_TEMP_N)) press_start = float(keywords.get('press', DEFAULT_PRESS_START)) press_step = float(keywords.get('press_step', DEFAULT_PRESS_STEP)) press_n = int(keywords.get('npress', DEFAULT_PRESS_N)) temperatures = [temp_start + i * temp_step for i in range(temp_n)] pressures = [press_start + i * press_step for i in range(press_n)] for temp in temperatures: for press in pressures: internal = jaguarworkflows.get_internal_energy_key(temp, press) enthalpy = jaguarworkflows.get_enthalpy_key(temp, press) free_energy = jaguarworkflows.get_free_energy_key(temp, press) entropy = jaguarworkflows.get_entropy_key(temp, press) keys.extend([internal, enthalpy, free_energy, entropy]) itddft = keywords.get('itddft') isolv = int(keywords.get('isolv', 0)) igeopt = int(keywords.get('igeopt', 0)) if itddft and igeopt > 0: keys.append(jaguarworkflows.GAS_PHASE_GROUND_ENERGY_PROP) keys.append(jaguarworkflows.GAS_EXCITED_ENERGY_PROP) if isolv: keys.append(jaguarworkflows.SOLUTION_GROUND_ENERGY_PROP) keys.append(jaguarworkflows.SOL_EXCITED_ENERGY_PROP) if isolv == 7: keys.append(jaguarworkflows.GROUND_PCM_SOLVATION_ENERGY_PROP) else: keys.append(jaguarworkflows.GROUND_SOLVATION_ENERGY_PROP) else: keys.append(jaguarworkflows.GAS_PHASE_ENERGY_PROP) if isolv: keys.append(jaguarworkflows.SOLUTION_ENERGY_PROP) if isolv == 7: keys.append(jaguarworkflows.PCM_SOLVATION_ENERGY_PROP) else: keys.append(jaguarworkflows.SOLVATION_ENERGY_PROP) # for iuhf == 2 below we should actually key off of the multiplicity # to set one or the other but that property depends on the structure so # for now just offer both options iuhf = int(keywords.get('iuhf', 2)) if itddft: if iuhf == 0: if keywords.get('rsinglet', 0): keys.append(jaguarworkflows.LOWEST_SINGLET_EXCITATION_PROP) if keywords.get('rtriplet', 0): keys.append(jaguarworkflows.LOWEST_TRIPLET_EXCITATION_PROP) elif iuhf == 1: keys.append(jaguarworkflows.LOWEST_EXCITATION_PROP) elif iuhf == 2: keys.append(jaguarworkflows.LOWEST_SINGLET_EXCITATION_PROP) keys.append(jaguarworkflows.LOWEST_EXCITATION_PROP) if iuhf == 0 or iuhf == 2: keys.extend([ jaguarworkflows.HOMO_ENERGY_PROP, jaguarworkflows.LUMO_ENERGY_PROP ]) if iuhf == 1 or iuhf == 2: keys.extend([ jaguarworkflows.ALPHA_HOMO_ENERGY_PROP, jaguarworkflows.ALPHA_LUMO_ENERGY_PROP, jaguarworkflows.BETA_HOMO_ENERGY_PROP, jaguarworkflows.BETA_LUMO_ENERGY_PROP ]) return keys
[docs]class AnalyzeStageData(object): """ Holds and manipulates data about an analysis stage """
[docs] def __init__(self, line): """ Create an Analyze instance from a line of text Expected tab-delimited format: stage_idx property_key property_key float str stage_idx stage_idx... :type line: str :param line: The line of text to create the instance from """ afmt = ( 'The format of the tab-delimited {analyze} data line is: ' '"stage_idx property_key property_key float str stage_idx ' 'stage_idx ..." where the first stage_idx is the parent stage ' 'index from which to use the structure that will hold the ' 'calculated property, the first property_key is the property key ' 'for the calculated property, the second property_key is the parent' ' property key that is used to calculated the new property, ' 'the float is a multiplicative prefactor for the parent ' 'property, the string is "None" if there is only a single ' 'parent otherwise it can be {minf}, {maxf}, {avgf}, or {stdf} ' 'to calculate the corresponding value from multiple parents, ' 'the final stage indices are the parent stage indices from ' 'which to get the properties. Instead got: {line}.').format( analyze=ANALYZE, minf=MINF, maxf=MAXF, avgf=AVGF, stdf=STDF, line=line) tokens = line.strip().split(DELIM) try: self.parent_st_idx = int(tokens.pop(0)) except (IndexError, ValueError): raise InvalidStageFileError(afmt) if self.parent_st_idx < 1: raise InvalidStageFileError(afmt) try: self.key = tokens.pop(0) self.parent_key = tokens.pop(0) except IndexError: raise InvalidStageFileError(afmt) if not (self.key.startswith('r') and self.parent_key.startswith('r')): raise InvalidStageFileError(afmt) if self.key.count('_') < 2 or self.parent_key.count('_') < 2: raise InvalidStageFileError(afmt) try: self.prefactor = float(tokens.pop(0)) except (IndexError, ValueError): raise InvalidStageFileError(afmt) if self.prefactor == 0: raise InvalidStageFileError(afmt) try: self.function = tokens.pop(0) except IndexError: raise InvalidStageFileError(afmt) if self.function not in list(FUNCTION_DICT.keys()): raise InvalidStageFileError(afmt) self.parent_idxs = [] while tokens: try: parent_idx = int(tokens.pop(0)) except (IndexError, ValueError): raise InvalidStageFileError(afmt) if parent_idx < 1: raise InvalidStageFileError(afmt) self.parent_idxs.append(parent_idx) if len(self.parent_idxs) == 1 and self.function != NONE: raise InvalidStageFileError(afmt) if len(self.parent_idxs) > 1 and self.function == NONE: raise InvalidStageFileError(afmt)
[docs] @staticmethod def writeAnalyzeDataToFile(data, datafile): """ Write analyze data to a file in a format that this class can read in. :type data: list :param data: contains (parent_st_idx, key, parent_key, prefactor, function, parent_idx, parent_idx, ...) tuples :type datafile: file :param datafile: the file to write the data to """ datafile.write(ANALYZE + '\n') for atuple in data: aline = DELIM.join(str(x) for x in atuple) datafile.write(aline + '\n') datafile.write('\n')
[docs] def getThermoKeys(self, parent_st_dict): """ Return the thermochemistry keys for this analyze stage term. :type parent_st_dict: dict :param parent_st_dict: contains parent index, structure pairs :rtype: list :return: the thermochemistry keys """ # For a given energy type if the parent key is not a thermochemistry # wildcard, like 'r_j_Total_Free_Energy_(au)_*K_*atm' (where '*' is # literal), then return if jaguarworkflows.ALL_TEMP_PRESS_KEY_EXT not in self.parent_key: return [] energy_starter = self.parent_key.replace( jaguarworkflows.ALL_TEMP_PRESS_KEY_EXT, '') # For each parent structure collect the given energy type for all # available temperatures and pressures, if a parent has no keys of # the given energy type then raise an error all_keys = [] for parent_idx in self.parent_idxs: parent_st = parent_st_dict[parent_idx] keys = [] for key in parent_st.property: if key.startswith(energy_starter): keys.append(key) if not keys: raise MissingDataError(f'Parent {parent_idx} has no ' f'keys matching {self.parent_key}.') all_keys.append(keys) # multiple parents are used for min, max, avg, and std, this wildcard # automation seems to make sense only for shared temperatures and # pressures keys = set(all_keys[0]) for _keys in all_keys[1:]: keys = keys.intersection(_keys) if not keys: raise MissingDataError( f'Parents {self.parent_idxs} have no ' f'keys matching {self.parent_key} in common.') return list(keys)
def _getPropertyTerm(self, parent_st_dict, parent_key): """ Return the property term for this analyze stage term. :type parent_st_dict: dict :param parent_st_dict: contains parent index, structure pairs :type parent_key: str :param parent_key: the parent key for the property of interest :rtype: float :return: the property term """ parent_values = [] for parent_idx in self.parent_idxs: parent_st = parent_st_dict[parent_idx] try: parent_value = parent_st.property[parent_key] except KeyError: raise MissingDataError(f'Parent {parent_idx} is missing ' f'the key {parent_key}.') parent_values.append(parent_value) value = FUNCTION_DICT[self.function](parent_values) value *= self.prefactor return value
[docs]def write_stages_file(stages, file_path): """ Write stages to a file with the given path. :type stages: list :param stages: contains StageData :type file_path: str :param file_path: the file path """ with open(file_path, 'w') as afile: for stage in stages: afile.write(NEW_STAGE + '\n') if not stage.analyze_data: # for non-analysis stages there will no parents or one parent # that is inherited from if stage.parent_data: data = stage.parent_data[0] ParentStageData.writeInheritableParentDataToFile( data.stage, data.use_wavefunction, data.use_hessian, afile) # handle all data type in DATA_CLASSES at the same time for atype, aclass in StageData.DATA_CLASSES.items(): adict = stage.entry_data.get(atype) if adict: aclass.writeDictDataToFile(adict, afile) if stage.override_keywords: astr = msutils.keyword_dict_to_string( stage.override_keywords.keywords) OverrideKeywords.writeKeyStringToFile(astr, afile) if stage.extra_sections.text: ExtraSectionData.writeSectionsToFile( stage.extra_sections.text, afile) else: # for analysis there will always be at least a single parent # and possibly multiple parents but none of which are inherited # from idxs = [data.stage for data in stage.parent_data] ParentStageData.writeNonInheritableParentDataToFile(idxs, afile) datas = [] for data in stage.analyze_data: datas.append( tuple([ data.parent_st_idx, data.key, data.parent_key, data.prefactor, data.function ] + data.parent_idxs)) AnalyzeStageData.writeAnalyzeDataToFile(datas, afile)
[docs]class StageData(object): """ Hold and manipulate all the settings for a stage in the workflow """ DATA_CLASSES = { ATOM_BASIS: AtomBasis, CHARGE_CONSTRAINTS: ChargeConstraint, GEOM_CONSTRAINTS: GeomConstraint, ACTIVE_COORDINATES: ActiveCoord, KEYWORDS: StageKeywords }
[docs] def __init__(self, index): """ Create a StageData instance :type index: int :param index: The 1-based index of this stage """ self.index = index # The keys of entry_data are data types, the values of entry_data are # dictionaries. The keys of those value dictionaries are entry id and # the values are lists of data objects. For instance, to get the list of # geometry constraints for entry id EID, use # self.entry_data[GEOM_CONSTRAINTS][eid] self.entry_data = defaultdict(lambda: defaultdict(list)) self.parent_data = [] self.extra_sections = ExtraSectionData() self.override_keywords = None self.analyze_data = [] self.command_line = None self.simulation_params = None self.info = StageInfoLine("", index) self.custom_script_info = None
[docs] def parseDataLine(self, line, ltype): """ Parse a data line :type line: str :param line: The line of data to parse :type ltype: str :param ltype: The type of data in this line. Should be a module constant PARENT, EXTRA_SECTIONS, ANALYZE, or one of the DATA_CLASSES keys """ if ltype == PARENT: aparent = ParentStageData(line) pdex = aparent.stage if pdex >= self.index: raise InvalidStageFileError( 'A parent for stage %d must be ' 'an earlier stage. Got %d instead.' % (self.index, pdex)) self.parent_data.append(aparent) elif ltype == EXTRA_SECTIONS: self.extra_sections.addLine(line) elif ltype == OVERRIDE_KEYWORDS: if self.override_keywords: raise InvalidStageFileError( 'Only one line of override keywords is allowed per stage') self.override_keywords = OverrideKeywords(line) elif ltype == ANALYZE: aanalyze = AnalyzeStageData(line) for idx in aanalyze.parent_idxs: if idx >= self.index: msg = ( 'Some parents, {parent_idxs}, used in the analyze ' 'stage, {index}, are not from earlier stages.').format( parent_idxs=aanalyze.parent_idxs, index=self.index) raise InvalidStageFileError(msg) self.analyze_data.append(aanalyze) elif ltype == COMMAND_LINE: self.command_line = CommandLine(line) elif ltype == SIMULATION: self.simulation_params = SimulationParams(line) elif ltype == INFO: self.info = StageInfoLine(line, index=self.index) elif ltype == CUSTOM: self.custom_script_info = CustomScriptLine(line, self.index) else: data = self.DATA_CLASSES[ltype](line) # Note that because we used defaultdicts, we don't have to worry # about whether the keys already exist in the dicts or not self.entry_data[ltype][data.eid].append(data)
[docs] def applyEntryData(self, jagin, eid): """ Apply all entry data for entry eid to the given JaguarInput object :type jagin: `schrodinger.application.jaguar.input.JaguarInput` :param jagin: The JaguarInput object to apply the data to :type eid: str :param eid: The ID of the entry whose data should be applied """ for entry_data in self.entry_data.values(): for data in entry_data[eid]: data.applyToJaguarInput(jagin) if self.override_keywords: self.override_keywords.applyToJaguarInput(jagin)
[docs] def getKeywords(self, eid=None): """ Get the keywords for this stage for the given eid. If no eid is given, keywords for a random entry will be returned. :type eid: str or None :param eid: If str, keywords for this entry will be supplied. If None, a random entry will be chosen. :rtype: dict :return: keys are keywords, values are values for that keyword """ if eid: keyword_info = self.entry_data[KEYWORDS][eid][0] else: eid, keyword_info = self.entry_data[KEYWORDS].pop()[0] self.entry_data[eid] = keyword_info return keyword_info.keywords
[docs] def getPropertyKeys(self, st=None): """ Return output structure property keys that are created by this stage. :type st: schrodinger.structure.Structure or None :param st: if given and the stage is an analysis stage then the thermochemistry wildcards are considered :rtype: list :return: structure property keys """ if self.analyze_data: # all data in analyze_data have the same base key base_key = self.analyze_data[0].key if st: return [key for key in st.property if key.startswith(base_key)] else: return [base_key] else: all_keywords = {} for keywords in self.entry_data[KEYWORDS].values(): all_keywords.update(keywords[0].keywords) if self.override_keywords: all_keywords.update(self.override_keywords.keywords) return get_property_keys_from_keywords(all_keywords)
[docs]class JMSWorkFlow(jaguarworkflows.WorkFlow): """ A Jaguar Multistage WorkFlow object that controls all the steps for an entry """
[docs] def __init__(self, *args, **kwargs): """ Create a JSMWorkFlow instance :type stages: list :param stages: A list of StageData objects, one for each step in the workflow :type smap_name: str :param smap_name: The name of the master smap file :type hierarchical: bool :param hierarchical: in the output structure file hierarchically group structures by stage using a job name and original structure title header See parent class for additional documentation """ self.smap_name = kwargs.pop('smap_name', None) self.stages = kwargs.pop('stages', None) hierarchical = kwargs.pop('hierarchical', True) jaguarworkflows.WorkFlow.__init__(self, *args, **kwargs) if hierarchical: if self.backend: group_name = self.backend.getJob().Name else: group_name = self.options.name hierarchy = '%s->%s' % (group_name, self.base_name) self.properties[mm.M2IO_DATA_SUBGROUPID] = hierarchy self.child_properties[mm.M2IO_DATA_SUBGROUPID] = hierarchy
[docs] def getSteps(self): """ Create all the steps for this workflow, one for each stage """ eid = self.struct.property[msprops.ENTRY_ID_PROP] for stage in self.stages: # skip stages that do not define a Jaguar # keywords section for the given structure keywords_dict = stage.entry_data.get(KEYWORDS) if keywords_dict and not keywords_dict.get(eid): continue # an analysis stage can not be the first stage if stage.analyze_data and not self.steps: continue parent = noninheritable_parents = None if stage.parent_data and self.steps: if len(stage.parent_data) == 1: # there is a single parent which may or may not # be inherited from single_parent = stage.parent_data[0] step = self.steps[single_parent.stage - 1] if single_parent.inherited: parent = step else: noninheritable_parents = [step] else: # there are multiple parents but none of which # are inherited from noninheritable_parents = [ self.steps[x.stage - 1] for x in stage.parent_data ] self.steps.append( JMSStep(stage, self, parent=parent, noninheritable_parents=noninheritable_parents))
[docs]class JMSStep(jaguarworkflows.Step): """ A step in the Jaguar Multistage Workflow """
[docs] def __init__(self, stage, *args, **kwargs): """ Create a JMSStep instance :type stage: `StageData` :param stage: The settings for this step See parent class for additional documentation """ self.stage = stage kwargs['step_name'] = 'Stage %d' % self.stage.index jaguarworkflows.Step.__init__(self, *args, **kwargs) self.entry_id = self.workflow.struct.property[msprops.ENTRY_ID_PROP] self.job_name = self.workflow.base_name if self.stage.info.name != NO_STAGE_NAME: self.job_name += f'_{self.stage.info.name}' if self.stage.analyze_data: parent_st_idx = self.stage.analyze_data[0].parent_st_idx self.job_name += '_analysis_' + str(parent_st_idx) # For file names that need to be recorded in the smap file self.smap_names = []
def _getParentStructureDict(self): """ Return a dictionary of structures from parent stages keyed by parent index. :rtype: dict :return: contains parent index, structure pairs """ parent_st_dict = {} for parent in self.stage.parent_data: idx = parent.stage step = self.workflow.steps[idx - 1] st = step.results.getMaeStructure() parent_st_dict[idx] = st return parent_st_dict def _createAnalysisStageOutputFiles(self, st, parent_st_idx): """ Create analysis stage output files. :type st: schrodinger.structure.Structure :param st: the output structure for the analysis stage :type parent_st_idx: int :param parent_st_idx: the parent structure index """ out_files = [] analyze_mae_file = self.job_name + '.01.mae' st.write(analyze_mae_file) out_files.append(analyze_mae_file) parent_job_name = self.workflow.steps[parent_st_idx - 1].job_name # the following is needed in case analysis stages # are chosen as parents for non-analysis stages # where wavefunction and/or Hessian data may be # inherited analyze_in_file = self.job_name + '.01.in' shutil.copy(parent_job_name + '.01.in', analyze_in_file) out_files.append(analyze_in_file) # the following is needed to ensure that normal modes # may be viewed on the analysis output structure for ext in SMAP_ELIGIBLE_EXTENSIONS: pafile = parent_job_name + ext if os.path.exists(pafile): afile = self.job_name + ext shutil.copy(pafile, afile) out_files.append(afile) self.smap_names.append(afile) if self.workflow.backend: for afile in out_files: self.workflow.backend.addOutputFile(afile)
[docs] def getThermoExts(self, parent_st_dict): """ Return the thermochemistry extensions for this stage. :type parent_st_dict: dict :param parent_st_dict: contains parent index, structure pairs :rtype: list :return: the thermochemistry extensions """ # an analysis stage is composed of several analyze_data terms, # collectively the stage can have zero, one, or multiple # terms using thermochemistry wildcards, in the case of multiple # they can in fact be for different energy types, for each term # collect all available temperature and pressure key extensions all_exts = [] for analyze_data in self.stage.analyze_data: exts = [] for thermo_key in analyze_data.getThermoKeys(parent_st_dict): temp = jaguarworkflows.get_temperature(thermo_key) press = jaguarworkflows.get_pressure(thermo_key) exts.append(jaguarworkflows.get_temp_press_key_ext(temp, press)) if exts: all_exts.append(exts) # if there aren't any terms with wildcards return if not all_exts: return [] # an analysis stage featuring terms at different temperatures and # pressures is reserved for manual creation, for the wildcard # automation here allow only temperature and pressure combinations # that are shared by all terms exts = set(all_exts[0]) for _exts in all_exts[1:]: exts = exts.intersection(_exts) if not exts: raise MissingDataError( 'The analysis terms have no extensions ' f'matching {jaguarworkflows.ALL_TEMP_PRESS_KEY_EXT} in common.') return list(exts)
[docs] def start(self): """ Start the job - create the input and write it, adding necessary output files to make sure they get copied back """ if self.stage.analyze_data: self.log('Starting analyze stage') parent_st_dict = self._getParentStructureDict() parent_st_idx = self.stage.analyze_data[0].parent_st_idx st = parent_st_dict[parent_st_idx] st.property[PARENT_ST_IDX_KEY] = parent_st_idx st.property[WORKFLOW_STAGE_KEY] = self.stage.index # turn any thermochemistry wildcards into explict temperature # and pressure key extensions that are available for all property # terms and all parents for each term, if there aren't any # wildcards then this is a standard run so just make a no-op list # of None, if wildcards are used but some parent structure for some # term is for some reason missing data then log an error, continue # with an explicit wildcard property, and have it skipped by the # break statement below try: thermo_exts = self.getThermoExts(parent_st_dict) except MissingDataError as err: self.log(str(err)) thermo_exts = [] if not thermo_exts: thermo_exts = [None] # for each thermochemistry extension evaluate the property as a sum # over terms, if there were no wildcards or there was a wildcard but # data was missing then the outer loop is performed once with a value # of None, if the key asked for is missing then skip the analysis for thermo_ext in thermo_exts: for analyze_data in self.stage.analyze_data: if thermo_ext: parent_key = analyze_data.parent_key.replace( jaguarworkflows.ALL_TEMP_PRESS_KEY_EXT, thermo_ext) key = f'{analyze_data.key}{thermo_ext}' else: parent_key = analyze_data.parent_key key = analyze_data.key try: value = analyze_data._getPropertyTerm( parent_st_dict, parent_key) except MissingDataError as err: st.property.pop(key, None) self.log(str(err)) break st.property[key] = st.property.get(key, 0.) + value self._createAnalysisStageOutputFiles(st, parent_st_idx) self.results = jaguarworkflows.Results(self.job_name) self.finished = self.ok = True self.log('Finished analyze stage') else: jaguarworkflows.Step.start(self)
[docs] def getStructure(self): """ Overwrite the parent class method to return the structure from the inheritable parent step if an inheritable parent exists, otherwise return the original structure """ if not self.parent: struct = self.workflow.struct else: struct = jaguarworkflows.Step.getStructure(self) struct.property[WORKFLOW_STAGE_KEY] = self.stage.index return struct
[docs] def getInput(self): """ Overwrite the parent class method to avoid setting default keywords values and to apply data from the StageData class for this step :rtype: `schrodinger.application.jaguar.input.JaguarInput` or None :return: The JaguarInput object for this step, or None if it could not be created. """ struct = self.getStructure() msutils.remove_properties(struct, matches=['_j_']) try: jagin = jinput.JaguarInput(structure=struct, name=self.job_name) except mm.MmException as msg: self.ok = False self.finished = True self.log('failed to create Jagur input, step will not be run') self.log(f'Error was: {str(msg)}') return None self.stage.applyEntryData(jagin, self.entry_id) return jagin
[docs] def getJaguarRestartFileName(self): """ Get the name of the Jaguar restart file for this step - the restart file contains the wavefunction and hessian :rtype: str :return: The name of the Jaguar restart file for this step """ restart_name = self.job_name + '.01.in' if os.path.exists(restart_name): return restart_name
[docs] def writeInput(self): """ Overwrite the parent class method to copy data from the parent step and add extra section data from the StageData object """ self.input.save() filename = self.input.filename def add_section_to_file(input_file, section): """ Copy a section from a Jaguar restart file to a Jaguar input file :type input_file: file :param input_file: The input file to write to :type section: str :param section: The initial line of the section to write (ex. '&hess') """ restart_filename = self.parent.getJaguarRestartFileName() with open(restart_filename, 'r') as restart_file: in_section = False for line in restart_file: if line.startswith(section): in_section = True if in_section: input_file.write(line) if in_section and line.strip() == '&': # End of desired section, all done return # If requested, add guess and hessian sections from parent step. Note # that mmjag_append_sections_from_link exists and could be used for # this, but that function doesn't have the ability to add one or the # other - it always adds both. Also, mmjag_get_sect_text exists and # could be used to get the sections texts, but, alas, it segfaults. with open(filename, 'a') as input_file: if self.parent: if self.stage.parent_data[0].use_wavefunction: add_section_to_file(input_file, '&guess') if self.stage.parent_data[0].use_hessian: add_section_to_file(input_file, '&hess') self.stage.extra_sections.addToJaguarInput(input_file)
[docs] def finishProcessingJobControlJob(self): """ Add any files to the backend that the Jaguar subjob preserved """ super().finishProcessingJobControlJob() jc_job = self.job.getJob() if not jc_job: return for filename in jc_job.OutputFiles: # Record any files we need to write to the master smap file for ext in SMAP_ELIGIBLE_EXTENSIONS: if filename.endswith(ext): # For robust driver calculations, we only want to grab # smap-eligible files if they are in the main directory. # Subdirectory files are not the final versions if not self.robust or not os.path.dirname(filename): self.smap_names.append(filename)
[docs] def write(self, writer, **kwargs): """ In addition to the parent method, also compile any smap data into the master smap file. See parent method for additional documentation """ jaguarworkflows.Step.write(self, writer, **kwargs) # Add each smap datafile for this step to the master smap using the # index of this structure in the output mae file myindex = writer.written_count if self.smap_names: with open(self.workflow.smap_name, 'a') as master_smap: for fname in self.smap_names: # datafile lines are of the form 'filename: index' master_smap.write('%s: %d\n' % (fname, myindex))
[docs]def create_workflows(options, jobq, stages, smap_name=None, hierarchical=True, workflow_class=None, robust=True, tmp_logger=None): """ Create a workflow for each structure :type options: `argparse.Namespace` :param options: The command line options :type jobq: `schrodinger.job.queue.JobDJ` :param jobq: The JobDJ to run subjobs with :type stages: list :param stages: A list of `StageData` objects to create `JMSStep` from :type smap_name: str :param smap_name: The name of the master smap file :type hierarchical: bool :param hierarchical: in the output structure file hierarchically group structures by stage using a job name and original structure title header :type workflow_class: Jaguar multistage workflow :param workflow_class: Jaguar multistage workflow or any other custom jaguar workflow :param bool robust: If True, use the robust Jaguar driver to run Jaguar jobs. If false, use Jaguar directly. :type tmp_logger: logging.Logger or None :param tmp_logger: output logger or None if there isn't one """ global logger try: reader = structure.StructureReader(options.input_file) except IOError: con_man = msutils.with_global_as( logger, tmp_logger) if tmp_logger else nullcontext() with con_man: log_error('Could not read input file: %s' % options.input_file) strcleaner = jobutils.StringCleaner() workflow_class = workflow_class or JMSWorkFlow logger = tmp_logger or logger workflows = [] for index, struct in enumerate(reader, 1): # Ensure atom naming is consistent with GUI atom names (MATSCI-4159) jinput.apply_jaguar_atom_naming(struct) workflows.append( workflow_class(struct, options, index, jobq, strcleaner=strcleaner, logger=logger, stages=stages, smap_name=smap_name, hierarchical=hierarchical, robust=robust)) return workflows
[docs]def parse_stage_data(data, meta=False): """ Parse data in settings file format into a list of stages :type data: list or file :param data: The data to parse. Can be a list of strings with each item a line of data, or a an open file :param bool meta: If True, this is a meta workflow-like data file. If False, this is a jaguarworkflows-like data file :rtype: list :return: A list of StageData objects :raise InvalidStageFileError: if there is an issue """ stages = [] stage = ltype = None for line in data: line = line.strip() uline = line.upper() if line.startswith('#') or not line: # Comment or blank continue elif uline == NEW_STAGE: # Start a new stage stage = StageData(len(stages) + 1) stages.append(stage) elif uline in DATATAGS: # Start a new data type ltype = line else: # Parse a data line if not ltype or not stage: raise InvalidStageFileError( 'Unrecoginzed format for settings file on line: \n%s' % line) try: stage.parseDataLine(line, ltype) except InvalidStageFileError as msg: raise InvalidStageFileError('Error reading settings file:\n%s' % str(msg)) validate_stages(stages, meta=meta) return stages
[docs]def read_stage_datafile(filename, meta=False): """ Read in a settings file. Logs an error and exists if an error occurs while reading the file. :param str filename: The name of the settings file to read :param bool meta: If True, this is a meta workflow-like data file. If False, this is a jaguarworkflows-like data file :rtype: list :return: A list of StageData objects :raise InvalidStageFileError: if there is an issue """ with open(filename, 'r') as datafile: stages = parse_stage_data(datafile, meta=meta) return stages
[docs]def validate_jaguarlike_stages(stages): """ Validate the information for jaguarworkflows-like stages :param list stages: contains StageData :raise InvalidStageFileError: if there is an issue """ for stage in stages: parents = [parent for parent in stage.parent_data if parent.inherited] if len(parents) > 1: msg = ('Invalid data for Stage {idx}. Stages can either ' 'have a single parent from which data is inherited or ' 'multiple parents from which no data is inherited.').format( idx=stage.index) raise InvalidStageFileError(msg) if parents and stage.analyze_data: msg = ('An analyze stage can only involve parents for the ' 'purposes of analysis.') raise InvalidStageFileError(msg) # the following three sets are used to validate parenting in # analysis # # the following are all unique parent stage indices in the # PARENT stage section pps = set(x.stage for x in stage.parent_data) # the following are all unique parent stage indices from the # parent indices part of all terms in the ANALYZE stage section aps = set(y for x in stage.analyze_data for y in x.parent_idxs) # the following are all unique parent stage indices from the # parent structure indices part of all terms in the ANALYZE stage # section asps = set(x.parent_st_idx for x in stage.analyze_data) if stage.analyze_data and (not aps.issubset(pps) or not asps.issubset(pps)): msg = ('An analyze stage requires defining the corresponding ' 'parent stages.') raise InvalidStageFileError(msg) if len(asps) > 1: msg = ('Only a single parent structure may be used for an ' 'analyze stage.') raise InvalidStageFileError(msg) if stage.analyze_data and (stage.entry_data or stage.extra_sections.text or stage.override_keywords): msg = ('Jaguar jobs can not be run in analyze stages.') raise InvalidStageFileError(msg)
[docs]def validate_metalike_stages(stages): """ Validate the stage information for meta workflows-like stages :param list stages: contains StageData :raise InvalidStageFileError: if there is an issue """ for index, stage in enumerate(stages): # Assign parent stages requires_names = stage.info.requires_names.copy() for pstage in stages[:index]: if pstage.info.name == stage.info.name: msg = ('Duplicate stage names are not allowed: ' f'{stage.info.name}') raise InvalidStageFileError(msg) if pstage.info.name == stage.info.parent_name: stage.info.parent_stage = pstage elif pstage.info.name in requires_names: stage.info.requires_stages.append(pstage) requires_names.discard(pstage.info.name) header = f'Stage number {index+1} ({stage.info.name})' # Verify all parent and required stages were assigned if stage.info.parent_name and not stage.info.parent_stage: msg = (f'{header} has a parent ' f'named {stage.info.parent_name}, but no prior stage has ' 'that name.') raise InvalidStageFileError(msg) if requires_names: not_found = ','.join(requires_names) msg = (f'{header} requires the ' 'following stages but no prior stage with those names are ' f'found: {not_found}') raise InvalidStageFileError(msg) # Validate that custom scripts exist in the given directory if stage.custom_script_info and stage.command_line: if not os.path.exists(stage.command_line.driver): cinfo = stage.custom_script_info if cinfo.path != cinfo.DO_NOT_CHECK_PATH: dpath = os.path.join(cinfo.path, stage.command_line.driver) if not os.path.exists(dpath): msg = (f'Cannot find the driver for {header} in the ' f'current directory or at {dpath}') raise InvalidStageFileError(msg)
[docs]def validate_stages(stages, meta=False): """ Validate stages. :param list stages: contains StageData :param bool meta: If True, this is a meta workflow-like data file. If False, this is a jaguarworkflows-like data file :raise InvalidStageFileError: if there is an issue """ if not stages: raise InvalidStageFileError('No stages found in the stage file') if meta: validate_metalike_stages(stages) else: validate_jaguarlike_stages(stages)
[docs]def log_error(msg): """ Add a message to the log file and exit with an error code :type msg: str :param msg: The message to log """ log(msg) log('Finished', timestamp=True) sys.exit(1)
[docs]def log(msg, timestamp=False, pad=False, pad_below=False): """ Add a message to the log file :type msg: str :param msg: The message to log :type pad: bool :param pad: Whether to pad above this message with a blank line :type pad_below: bool :param pad_below: Whether to pad below this message with a blank line :type timestamp: bool :param timestamp: Whether to print a timestamp with the message """ if timestamp: msg = msg + ' at ' + time.ctime() if pad: textlogger.log(logger, "") textlogger.log(logger, msg) if pad_below: textlogger.log(logger, "")
[docs]def create_smap(basename, output_name, smap_dict=None): """ Create the master smap file that will map property files (.vib, .vis, etc) to structures in the compiled structure file :type basename: str :param basename: The base name of all job files :type output_name: str :param output_name: The name of the output structure file :type smap_dict: dict :param smap_dict: keys are file names, values are indices, the values are not entry IDs but rather the counting indices (1-based) of the structure in the given output_name file that the given file name key is associated with, values can also be lists of indices for example if a given file name is used for multiple indices :rtype: str :return: The name of the smap file created """ if not smap_dict: smap_dict = {} smap_name = basename + '.smap' with open(smap_name, 'w') as sfile: sfile.write('# smap version 1.0\n') sfile.write(output_name + '\n') for file_name, idxs in smap_dict.items(): if not isinstance(idxs, list): idxs = [idxs] for idx in idxs: sfile.write(f'{file_name}: {idx}\n') if smap_dict: backend = None finalize_smap(smap_name, backend) return smap_name
[docs]def finalize_smap(smap_name, backend): """ Finish the master smap file and add it to the jobcontrol backend if necessary :type smap_name: str :param smap_name: The name of the master smap file :type backend: `schrodinger.job.jobcontrol._Backend` :param backend: The job control backend or None if there is no backend """ keep_smap = False # If there are any lines with a colon, the smap file contains data with open(smap_name, 'r') as sfile: for line in sfile: if ':' in line: keep_smap = True break if keep_smap: with open(smap_name, 'a') as sfile: sfile.write('#end\n') # newline here is critical if backend: backend.addOutputFile(smap_name) else: fileutils.force_remove(smap_name)