"""
Utilities for Jaguar multistage workflow.
Copyright Schrodinger, LLC. All rights reserved.
"""
import os
import shlex
import shutil
import sys
import time
from collections import defaultdict
from contextlib import nullcontext
import numpy
from schrodinger import structure
from schrodinger.application.desmond import constants as dconst
from schrodinger.application.jaguar import input as jinput
from schrodinger.application.matsci import desmondutils
from schrodinger.application.matsci import jaguarworkflows
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import msutils
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import textlogger
from schrodinger.application.matsci import msprops
from schrodinger.infra import mm
from schrodinger.job import jobcontrol
from schrodinger.utils import fileutils
NEW_STAGE = 'NEW_STAGE'
CUSTOM = 'CUSTOM'
COMMAND_LINE = 'WORKFLOW'
SIMULATION = 'SIMULATION'
INFO = 'INFO'
PARENT = 'PARENT'
KEYWORDS = 'KEYWORDS'
ATOM_BASIS = 'ATOM_BASIS'
CHARGE_CONSTRAINTS = 'CHARGE_CONSTRAINTS'
GEOM_CONSTRAINTS = 'GEOM_CONSTRAINTS'
ACTIVE_COORDINATES = 'ACTIVE_COORDINATES'
EXTRA_SECTIONS = 'EXTRA_SECTIONS'
OVERRIDE_KEYWORDS = 'OVERRIDE_KEYWORDS'
ANALYZE = 'ANALYZE'
DATATAGS = set([
PARENT, KEYWORDS, ATOM_BASIS, CHARGE_CONSTRAINTS, GEOM_CONSTRAINTS,
ACTIVE_COORDINATES, EXTRA_SECTIONS, OVERRIDE_KEYWORDS, ANALYZE, INFO,
COMMAND_LINE, SIMULATION, CUSTOM
])
DELIM = '\t'
TRUE = 'true'
GENERIC_STAGE_TAG = 'stage_'
NO_STAGE_NAME = '_NO_STAGE_NAME_'
WAVEFUNCTION = 'WAVEFUNCTION'
HESSIAN = 'HESSIAN'
ANALYSIS = 'ANALYSIS'
NONE = 'None'
MINF = 'Min'
MAXF = 'Max'
AVGF = 'Avg'
STDF = 'Std'
FUNCTION_DICT = {
NONE: lambda x: x[0],
MINF: min,
MAXF: max,
AVGF: numpy.mean,
STDF: numpy.std
}
SMAP_ELIGIBLE_EXTENSIONS = ['.vis', '.vib', '_vib.spm']
logger = None
WORKFLOW_STAGE_KEY = 'i_matsci_Workflow_Stage'
PARENT_ST_IDX_KEY = 'i_matsci_Parent_Structure_Stage_Index'
DEFAULT_TEMP_START = 298.15 # K
DEFAULT_TEMP_STEP = 10. # K
DEFAULT_TEMP_N = 1
DEFAULT_PRESS_START = 1. # atm
DEFAULT_PRESS_STEP = 1. # atm
DEFAULT_PRESS_N = 1
# reserved for Jaguar structure properties that lack units in
# the property key, '(au)' (Hartree) is used in other Jaguar
# property keys and is used to be consistent
JAGUAR_PROP_UNITS_DICT = {
jaguarworkflows.GAS_PHASE_ENERGY_PROP: '(au)',
jaguarworkflows.HOMO_ENERGY_PROP: '(au)',
jaguarworkflows.LUMO_ENERGY_PROP: '(au)',
jaguarworkflows.ALPHA_HOMO_ENERGY_PROP: '(au)',
jaguarworkflows.ALPHA_LUMO_ENERGY_PROP: '(au)',
jaguarworkflows.BETA_HOMO_ENERGY_PROP: '(au)',
jaguarworkflows.BETA_LUMO_ENERGY_PROP: '(au)',
jaguarworkflows.LOWEST_EXCITATION_PROP: '(eV)',
jaguarworkflows.LOWEST_SINGLET_EXCITATION_PROP: '(eV)',
jaguarworkflows.LOWEST_TRIPLET_EXCITATION_PROP: '(eV)',
jaguarworkflows.SOLUTION_ENERGY_PROP: '(au)',
jaguarworkflows.GAS_PHASE_GROUND_ENERGY_PROP: '(au)',
jaguarworkflows.GAS_EXCITED_ENERGY_PROP: '(au)',
jaguarworkflows.SOLUTION_GROUND_ENERGY_PROP: '(au)',
jaguarworkflows.SOL_EXCITED_ENERGY_PROP: '(au)'
}
[docs]def set_up_logger(related_filename):
"""
Set up the logger used in this module.
:type related_filename: str
:param related_filename: the base name of this file name
will be used as the base name of the log file name
if not running under job control, otherwise the job
name will be used
"""
global logger
logger, log_name = textlogger.create_logger(
related_filename=related_filename)
[docs]def parse_yes_no_setting(setting, keyword):
"""
Translate an English word into a boolean if possible
:param str setting: The text (true/false/yes/no/on/off etc) to convert
:param str keyword: The associated keyword to show in error messages
:rtype: bool
:return: True if the setting translates to Truthy, False if it translates
to Falsey
:raise InvalidStageFileError: If setting can't be translated
"""
try:
return msutils.setting_to_bool(setting)
except ValueError:
msg = (f'Cannot interpret the value of {keyword}={setting} as a '
'yes/no condition')
raise InvalidStageFileError(msg)
[docs]class InvalidStageFileError(Exception):
""" Class for any exception that occurs when reading in a settings file """
[docs]class MissingDataError(Exception):
""" Raised when an expected structure property for Analysis is not found """
[docs]class ParameterLine:
""" Base class for lines that take keyword=value parameters """
# The header line for this info in the stage file
TAG = 'BASE'
# All keys this class recognizes on the parameter line
ALL_KEYS = set()
[docs] def __init__(self, line, index):
"""
Create a ParameterLine object
:param str line: The line to parse from the input file
:param int index: The 1-based index of this stage
:raise `InvalidStageFileError`: If something is wrong with the line
"""
keywords = self.getKeywords(line)
self.validateKeywords(keywords, index)
[docs] def getKeywords(self, line):
"""
Parse the line into a set of keyword-value pairs
:param str line: The line to parse from the input file
:rtype: dict
:return: Keys are lowercase keywords, values are values
:raise `InvalidStageFileError`: If something is wrong with the line
"""
line = line.strip()
try:
raw_keywords = msutils.keyword_string_to_dict(line)
except ValueError as msg:
raise InvalidStageFileError(msg)
return {x.lower(): y for x, y in raw_keywords.items()}
[docs] def validateKeywords(self, keywords, index):
"""
Validate and pull information from the keywords
:param dict keywords: Keys are class constant keywords
:param int index: The 1-based index of this stage
:raise `InvalidStageFileError`: If something is wrong with the line
"""
for key in keywords.keys():
if key not in self.ALL_KEYS:
allowed = ', '.join(self.ALL_KEYS)
raise InvalidStageFileError(
f'Stage {index}: {key} is an invalid key. '
f'Valid keys are: {allowed}.')
[docs] @classmethod
def writeParameterLine(cls, keywords, datafile):
"""
Write a line with these settings to the given file
:param dict keywords: The current settings
:param file datafile: The file object to write to
"""
for key in keywords.keys():
if key not in cls.ALL_KEYS:
raise RuntimeError(f'{key} is not a known parameter for '
f'{cls.TAG} lines')
line = msutils.keyword_dict_to_string(keywords)
datafile.write(f'{cls.TAG}\n{line}\n')
[docs]class CustomScriptLine(ParameterLine):
"""
Holds general information about a custom script
Used in the meta workflow driver but not the Jaguar multistage workflow
"""
# The header line for this info in the stage file
TAG = CUSTOM
# A string, the input type for this stage
INPUT = 'input'
# A string, output type for this stage
OUTPUT = 'output'
# boolean True or False
TRAJECTORY = 'trajectory'
# Path to the script
PATH = 'path'
# Whether to use JC or a subprocess to run the script
USE_JC = 'use_jc'
# All keys this class recognizes on the parameter line
ALL_KEYS = {INPUT, OUTPUT, TRAJECTORY, PATH, USE_JC}
MAEGZ = 'maegz'
CMS = 'cms'
OTHER = 'other'
ALLOWED_INPUT = {MAEGZ, CMS, OTHER}
ALLOWED_OUTPUT = {MAEGZ, CMS}
# GUI uses this path value to indicate that it is in an intermediate state
# and the driver path should not be checked
DO_NOT_CHECK_PATH = 'Not_located'
[docs] def validateKeywords(self, keywords, index):
"""
Validate and pull information from the keywords
:param dict keywords: Keys are class constant keywords
:param int index: The 1-based index of this stage
:raise `InvalidStageFileError`: If something is wrong with the line
"""
super().validateKeywords(keywords, index)
# Validate the input and output types
msg = ('Stage {index}: {ftype} is not a valid {ktype} type. '
'Allowed types: {allowed}')
ktypes = (self.INPUT, self.OUTPUT)
allowed_vals = (self.ALLOWED_INPUT, self.ALLOWED_OUTPUT)
for ktype, allowed in zip(ktypes, allowed_vals):
ftype = keywords.get(ktype, self.CMS)
if ftype not in allowed:
astr = ', '.join(allowed)
error = msg.format(index=index,
ftype=ftype,
ktype=ktype,
allowed=astr)
raise InvalidStageFileError(error)
# Store the keyword values
self.input_type = keywords.get(self.INPUT, self.CMS)
self.output_type = keywords.get(self.OUTPUT, self.CMS)
self.trajectory = parse_yes_no_setting(keywords.get(self.TRAJECTORY),
self.TRAJECTORY)
self.use_jc = parse_yes_no_setting(keywords.get(self.USE_JC),
self.USE_JC)
# Ensure the usage of trajectory is OK
if self.output_type == self.MAEGZ and self.trajectory:
# User specified True
msg = (f'Stage {index}: Trajectories from stages with output type '
f'{self.MAEGZ} cannot be used by later steps')
raise InvalidStageFileError(msg)
if self.trajectory is None:
# User did not specify, default is True
self.trajectory = True
# Check for a valid path (note - we don't know the script name yet, so
# we can't validate that the script is in the directory). We don't want
# to do this check if we are running under job control because in that
# case the script should already have been copied into the job directory
# and this original path was only needed for local start of the job.
path = keywords.get(self.PATH, '.')
if path != self.DO_NOT_CHECK_PATH and not jobcontrol.get_backend():
if os.path.isfile(path):
# User included the script name in the path, remove it
path = os.path.dirname(path)
elif not os.path.exists(path):
msg = ("The given path for the script's directory does not "
f"exist {path}")
raise InvalidStageFileError(msg)
self.path = path
[docs]class StageInfoLine(ParameterLine):
"""
Holds general information about a stage: name, parent, etc.
Used in the meta workflow driver but not the Jaguar multistage workflow
"""
# The header line for this info in the stage file
TAG = INFO
# A string, the name of this stage
NAME = 'name'
# A string, referring to the name of the parent stage
PARENT = 'parent'
# boolean True or False
MAIN = 'main'
# Names of other stages to wait for completion before starting this step
REQUIRES = 'requires'
# All keys this class recognizes on the parameter line
ALL_KEYS = {NAME, PARENT, MAIN, REQUIRES}
[docs] def __init__(self, line, index):
"""
Create a StageInfoLine object
:param str line: The line to parse from the input file
:param int index: The 1-based index of this stage
:raise `InvalidStageFileError`: If something is wrong with the line
"""
super().__init__(line, index)
# Will be replaced later with the stage that is this stage's parent
self.parent_stage = None
self.requires_stages = []
[docs] def validateKeywords(self, keywords, index):
"""
Validate and pull information from the keywords
:param dict keywords: Keys are class constant keywords
:param int index: The 1-based index of this stage
:raise `InvalidStageFileError`: If something is wrong with the line
"""
super().validateKeywords(keywords, index)
self.name = jobutils.clean_string(
keywords.get(self.NAME, f'{GENERIC_STAGE_TAG}{index}'))
self.parent_name = keywords.get(self.PARENT)
if self.parent_name:
self.parent_name = jobutils.clean_string(self.parent_name)
self.main = parse_yes_no_setting(keywords.get(self.MAIN), self.MAIN)
requires = keywords.get(self.REQUIRES)
if requires:
self.requires_names = set(requires.split(','))
else:
self.requires_names = set()
[docs]class SimulationParams:
""" Holds information about an MD simulation stage """
MD = 'md'
BROWNIE = 'brownie'
MS_RELAX = 'matsci_relaxation'
COMPRESSIVE_RELAX = 'compress_relaxation'
SEMI_CRYSTALLINE1_RELAX = 'semi_crystal_relaxation1'
SEMI_CRYSTALLINE2_RELAX = 'semi_crystal_relaxation2'
TYPE = 'type'
ENSEMBLE = 'ensemble'
TIME = 'time'
TEMP = 'temp'
PRESSURE = 'pressure'
TIMESTEP = 'timestep'
TRJINT = 'trj_interval'
TRJINT_STRINGER = 'trajectory_dot_interval'
SEED = 'seed'
SEED_STRINGER = 'random_seed'
ANALYSIS = 'analysis'
AVERAGE = 'average'
ANISOTROPIC = 'anisotropic'
ALL_KEYS = [
ENSEMBLE, TIME, TEMP, PRESSURE, SEED, ANALYSIS, AVERAGE, TIMESTEP,
TRJINT, ANISOTROPIC
]
ALLOWED_KEYS = {
MD: ALL_KEYS,
BROWNIE: ALL_KEYS,
MS_RELAX: [TEMP],
COMPRESSIVE_RELAX: [TEMP],
SEMI_CRYSTALLINE1_RELAX: [TEMP],
SEMI_CRYSTALLINE2_RELAX: [TEMP]
}
[docs] def __init__(self, line):
"""
Create a Simulation instance from a line of text
Expected tab-delimited format:
type=stype temp=300...
:param str line: The line of text to create the instance from
:raise `InvalidStageFileError`: If something is wrong with the line
"""
try:
keywords = msutils.keyword_string_to_dict(line)
except ValueError as msg:
raise InvalidStageFileError(msg)
self.keywords = {x.lower(): y for x, y in keywords.items()}
self.validateKeywords()
[docs] def validateKeywords(self):
"""
Validate and parse information from the keywords
:raise `InvalidStageFileError`: If something is wrong with the line
"""
self.stype = self.keywords.pop(self.TYPE, self.MD)
analysis = self.keywords.pop(self.ANALYSIS, None)
self.analysis = parse_yes_no_setting(analysis, self.ANALYSIS)
ave_msg = 'The value for "average" must be a positive integer <= 100'
try:
self.average = int(self.keywords.pop(self.AVERAGE, 0))
except ValueError:
raise InvalidStageFileError(ave_msg)
if self.average < 0 or self.average > 100:
raise InvalidStageFileError(ave_msg)
# Ensure the type is a valid simulation type
try:
valid_params = self.ALLOWED_KEYS[self.stype]
except KeyError:
allowed = ', '.join(self.ALLOWED_KEYS.keys())
msg = (f'{self.stype} is not an allowed type. Allowed types are '
f'{allowed}.')
raise InvalidStageFileError(msg)
# Ensure the keywords are valid for this simulation type
for key in self.keywords.keys():
if key not in valid_params:
valid_string = ', '.join(valid_params)
msg = (f'{key} is not an allowed parameter for type '
f'{self.stype}. Allowed parameters are {valid_string}')
raise InvalidStageFileError(msg)
# Translate user-facing keywords to Stringer-facing keywords
# Random seed - use parserutils to handle the value 'random'
rseed = self.keywords.pop(self.SEED, None)
if rseed:
rseed = str(parserutils.type_random_seed(rseed))
self.keywords[self.SEED_STRINGER] = rseed
# Trajectory interval
if self.TRJINT in self.keywords:
self.keywords['trajectory_dot_interval'] = self.keywords.pop(
self.TRJINT)
# Anisotropy
anisotropic = self.keywords.pop(self.ANISOTROPIC, False)
if anisotropic:
anisotropic = parse_yes_no_setting(anisotropic, self.ANISOTROPIC)
if anisotropic:
msj_isokey = desmondutils.MSJStringer.ISOTROPY
self.keywords[msj_isokey] = dconst.IsotropyPolicy.ANISOTROPIC
# The user gives timestep in femtoseconds, we use picoseconds
if self.TIMESTEP in self.keywords:
step = float(self.keywords[self.TIMESTEP]) / 1000
self.keywords[self.TIMESTEP] = str(step)
# Make sure the capitalization of the ensemble is correct
ensemble = self.keywords.get(self.ENSEMBLE)
if ensemble:
if ensemble.lower() == 'npgt':
self.keywords[self.ENSEMBLE] = 'NPgT'
else:
self.keywords[self.ENSEMBLE] = ensemble.upper()
[docs]class CommandLine:
""" Holds information about a workflow command line """
INPUT_MASK = '$input'
TRAJ_MASK = '$trj'
[docs] def __init__(self, line):
"""
Create a CommandLine instance from a line of text
Expected tab-delimited format:
[$SCHRODINGER/run] driver_path command line tokens
Any string in the command that depends on a job-specific name should be
given as simply $input. For instance, in a command such as::
$SCHRODINGER/run driver.py -flob hobnob -j jobname.txt jobname.maegz
The command should be provided is::
$SCHRODINGER/run driver.py -flob hobnob -j $input.txt $input.maegz
`$input` will be replaced at runtime with the name of the job
If a trajectory path from the previous step is part of the command, that
should be given as just $trj (i.e. -trj $trj)
Command line values that have a space in them - such as an ASL string -
should be put inside double quotes: "mol.num 7"
:param str line: The line of text to create the instance from
:raise `InvalidStageFileError`: If something is wrong with the line
"""
# We need to use shlex.split to keep quoted arguments (such as an ASL)
# together
try:
tokens = shlex.split(line.strip())
except ValueError as err:
raise InvalidStageFileError(f'Error parsing command line: {err}')
for index, token in enumerate(tokens):
# Skip $SCHRODINGER/run (including if $SCHRODINGER is an explicit
# path with spaces in it)
if token.endswith('.py'):
self.flags = tokens[index:]
break
else:
self.flags = tokens
self.driver = os.path.basename(self.flags[0])
[docs]class ParentStageData(object):
""" Holds and manipulates data about a parent stage """
[docs] def __init__(self, line):
"""
Create a ParentStageData instance from a line of text
Expected tab-delimited format:
parent_stage_# [WAVEFUNCTION] [HESSIAN] [ANALYSIS]
:type line: str
:param line: The line of text to create the instance from
"""
tokens = line.upper().strip().split(DELIM)
try:
self.stage = int(tokens.pop(0))
except (IndexError, ValueError, TypeError):
raise InvalidStageFileError('The first value in the %s data line '
'must be the integer index of a '
'parent stage.' % PARENT)
options = set(tokens)
try:
options.remove(WAVEFUNCTION)
self.use_wavefunction = True
except KeyError:
self.use_wavefunction = False
try:
options.remove(HESSIAN)
self.use_hessian = True
except KeyError:
self.use_hessian = False
try:
options.remove(ANALYSIS)
self.use_analysis = True
except KeyError:
self.use_analysis = False
if options:
invalids = ', '.join(options)
raise InvalidStageFileError('Invalid options on the %s data line: '
'%s' % (PARENT, invalids))
if (self.use_wavefunction or self.use_hessian) and self.use_analysis:
msg = ('Parents used for the purposes of analysis are not allowed '
'to pass on information, like their geometry, wavefunction, '
'and/or Hessian, to a child job.')
raise InvalidStageFileError(msg)
self.inherited = not self.use_analysis
[docs] @staticmethod
def writeInheritableParentDataToFile(parent, wavefunction, hessian,
datafile):
"""
Write inheritable parent stage data to a file in a format that
this class can read in.
:type parent: int
:param parent: The stage number of the parent stage
:type wavefunction: bool
:param wavefunction: Whether to use the wavefunction from the parent
stage
:type hessian: bool
:param hessian: Whether to use the hessian from the parent stage
:type datafile: file
:param datafile: The file to write the data to
"""
datafile.write(PARENT + '\n')
values = [str(parent)]
if wavefunction:
values.append(WAVEFUNCTION)
if hessian:
values.append(HESSIAN)
line = DELIM.join(values)
datafile.write('%s\n' % line)
[docs] @staticmethod
def writeNonInheritableParentDataToFile(parents, datafile):
"""
Write noninheritable parent stage data to a file in a format that
this class can read in.
:type parents: list
:param parents: the stage numbers of the parent stages
:type datafile: file
:param datafile: the file to write the data to
"""
datafile.write(PARENT + '\n')
for idx in parents:
values = [str(idx), ANALYSIS]
line = DELIM.join(values)
datafile.write('%s\n' % line)
[docs]class GeomConstraint(object):
""" Holds and manipulates data about geometry constraints """
[docs] def __init__(self, line):
"""
Create a GeomConstraint instance from a line of text
Expected tab-delimited format:
entry_id target value index index ...
:type line: str
:param line: The line of text to create the instance from
"""
tokens = line.strip().split(DELIM)
geo_fmt = ('The format of the tab-delimited %s data line is: "entry_id '
'target type index index ..." where entry_id is the entry '
'ID of the entry the constraint applies to, target is the '
'floating point target for the contraint or "%s" if there '
'is no target, type is the integer constraint type and '
'index is the integer atom index the constraint '
'applies to. Give a single index for atom constraints, two '
'indexes for bond constraints, etc. Instead, got: %s' %
(GEOM_CONSTRAINTS, NONE, line))
try:
self.eid = tokens.pop(0)
except IndexError:
raise InvalidStageFileError(geo_fmt)
try:
target = tokens.pop(0)
except IndexError:
raise InvalidStageFileError(geo_fmt)
if target == NONE:
self.target = None
else:
try:
self.target = float(target)
except ValueError:
raise InvalidStageFileError(geo_fmt)
try:
self.ctype = int(tokens.pop(0))
except (IndexError, ValueError):
raise InvalidStageFileError(geo_fmt)
self.indexes = [int(x) for x in tokens]
[docs] @staticmethod
def writeData(eid, target, ctype, idxs, afile):
"""
Write the given data to file.
:type eid: str
:param eid: the entry ID
:type target: float or None
:param target: the target value for the constraint if there is one
:type ctype: int
:param ctype: the Jaguar constraint type
:type idxs: list
:param idxs: contains indices of atoms defining the constraint
:type afile: file
:param afile: the file to write the data to
"""
astr = DELIM.join(str(idx) for idx in idxs)
try:
target = str(target)
except ValueError:
# some constraints have None as the target value
target = NONE
afile.write('{eid}{tab}{targ}{tab}{ctype}{tab}{inds}\n'.format(
eid=eid, tab=DELIM, targ=target, ctype=ctype, inds=astr))
[docs] @staticmethod
def writeModelDataToFile(model, datafile, eid):
"""
Write geometry constraint data from a model to a file in a format that
this class can read in
:type model: `schrodinger.application.jaguar.gui.tabs.optimization_tab.
ContraintCoordinatesModel`
:param model: The model containing restraints to write
:type datafile: file
:param datafile: The file to write the data to
:type eid: str
:param eid: The entry id for this geometry constraint
"""
for index, data in enumerate(model.coords):
if not index:
datafile.write(GEOM_CONSTRAINTS + '\n')
GeomConstraint.writeData(eid, data.target_value,
data.coordinate_type, data.atom_indices,
datafile)
[docs] @staticmethod
def writeDictDataToFile(adict, datafile):
"""
Write geometry constraint data from a dictionary to a file in a
format that this class can read in
:type adict: dictionary
:param adict: keys are entry IDs, values are lists of GeomConstraint
:type datafile: file
:param datafile: The file to write the data to
"""
for index, datas in enumerate(adict.values()):
if not index:
datafile.write(GEOM_CONSTRAINTS + '\n')
for data in datas:
GeomConstraint.writeData(data.eid, data.target, data.ctype,
data.indexes, datafile)
[docs]class ActiveCoord(object):
""" Holds and manipulates data about active coordinates """
[docs] def __init__(self, line):
"""
Create an ActiveCoord instance from a line of text
Expected tab-delimited format:
entry_id type index index ...
:type line: str
:param line: The line of text to create the instance from
"""
tokens = line.strip().split(DELIM)
act_fmt = ('The format of the tab-delimited %s data line is: "entry_id '
'type index index ..." where entry_id is the entry '
'ID of the entry the active coordinate applies to, '
'type is the integer active coordinate type and '
'index is the integer atom index the active coordinate '
'applies to. Give a single index for an active atom, two '
'indexes for an active bond, etc. Instead, got: %s' %
(ACTIVE_COORDINATES, line))
try:
self.eid = tokens.pop(0)
except IndexError:
raise InvalidStageFileError(act_fmt)
try:
self.ctype = int(tokens.pop(0))
except (IndexError, ValueError):
raise InvalidStageFileError(act_fmt)
self.indexes = [int(x) for x in tokens]
[docs] @staticmethod
def writeData(eid, ctype, idxs, afile):
"""
Write the given data to file.
:type eid: str
:param eid: the entry ID
:type ctype: int
:param ctype: the Jaguar active coordinate type
:type idxs: list
:param idxs: contains indices of atoms defining the active
coordinate
:type afile: file
:param afile: the file to write the data to
"""
astr = DELIM.join(str(idx) for idx in idxs)
afile.write('{eid}{tab}{ctype}{tab}{inds}\n'.format(eid=eid,
tab=DELIM,
ctype=ctype,
inds=astr))
[docs] @staticmethod
def writeModelDataToFile(model, datafile, eid):
"""
Write active coordinate data using a geometry constraint model
to a file in a format that this class can read in
:type model: `schrodinger.application.jaguar.gui.tabs.optimization_tab.
ContraintCoordinatesModel`
:param model: The model containing active coordinates to write
:type datafile: file
:param datafile: The file to write the data to
:type eid: str
:param eid: The entry id for this active coordinate
"""
for index, data in enumerate(model.coords):
if not index:
datafile.write(ACTIVE_COORDINATES + '\n')
ActiveCoord.writeData(eid, data.coordinate_type, data.atom_indices,
datafile)
[docs] @staticmethod
def writeDictDataToFile(adict, datafile):
"""
Write active coordinate data from the given dictionary
to a file in a format that this class can read in
:type adict: dict
:param adict: keys are entry IDs, values are lists of ActiveCoord
:type datafile: file
:param datafile: The file to write the data to
"""
for index, datas in enumerate(adict.values()):
if not index:
datafile.write(ACTIVE_COORDINATES + '\n')
for data in datas:
ActiveCoord.writeData(data.eid, data.ctype, data.indexes,
datafile)
[docs]class AtomBasis(object):
""" Holds and manipulates data about by-atom basis sets """
[docs] def __init__(self, line):
"""
Create a AtomBasis instance from a line of text
Expected tab-delimited format:
entry_id index basis_set
:type line: str
:param line: The line of text to create the instance from
"""
tokens = line.strip().split(DELIM)
ab_format = ('The format of an tab-delimited %s data line is: '
'"entry_id index basis" where entry_id is the entry ID '
'the data applies to, index is the integer index of the '
'atom the data applies to and basis is the name of the '
'basis set that applies to that atom. Instead got: %s' %
(ATOM_BASIS, line))
try:
self.eid = tokens[0]
except IndexError:
raise InvalidStageFileError(ab_format)
try:
self.num = int(tokens[1])
except (ValueError, IndexError):
raise InvalidStageFileError(ab_format)
try:
self.basis = tokens[2]
except IndexError:
raise InvalidStageFileError(ab_format)
[docs] @staticmethod
def writeData(eid, anum, basis, afile):
"""
Write the given data to file.
:type eid: str
:param eid: the entry ID
:type anum: int
:param anum: the atom number
:type basis: str
:param basis: the basis
:type afile: file
:param afile: the file to write the data to
"""
afile.write("{eid}{tab}{num}{tab}{basis}\n".format(eid=eid,
tab=DELIM,
num=anum,
basis=basis))
[docs] @staticmethod
def writeModelDataToFile(model, datafile):
"""
Write by-atom basis set data from a model to a file in a format that
this class can read in
:type model: `schrodinger.application.jaguar.gui.tabs.sub_tab_widgets.
basis_set_widgets.BasisSetModel`
:param model: The model containing restraints to write
:type datafile: file
:param datafile: The file to write the data to
"""
for index, row in enumerate(model._rows):
if not index:
datafile.write(ATOM_BASIS + '\n')
AtomBasis.writeData(row.entry_id, row.atom_num, row.basis, datafile)
[docs] @staticmethod
def writeDictDataToFile(adict, datafile):
"""
Write by-atom basis set data from a dictionary to a file in a format
that this class can read in
:type adict: dict
:param adict: keys are entry IDs, values are lists of AtomBasis
:type datafile: file
:param datafile: The file to write the data to
"""
for index, datas in enumerate(adict.values()):
if not index:
datafile.write(ATOM_BASIS + '\n')
for data in datas:
AtomBasis.writeData(data.eid, data.num, data.basis, datafile)
[docs]class ChargeConstraint(object):
""" Holds and manipulates data about by-atom charge constraints """
[docs] def __init__(self, line):
"""
Create a ChargeConstraint instance from a line of text
Expected tab-delimited format:
entry_id index basis_set
:type line: str
:param line: The line of text to create the instance from
"""
tokens = line.strip().split(DELIM)
chg_format = ('The format for a tab-delimited %s data line is: '
'"entry_id charge index:weight index:weight ..." where '
'entry_id is the ID of the entry it applies to, charge '
'is the floating point charge, and each index:weight '
'pair is the integer index of an atom and weight is the '
'floating point weight for that atom. Multiple '
'index:weight pairs can be given. Instead got: %s' %
(CHARGE_CONSTRAINTS, line))
try:
self.eid = tokens.pop(0)
except IndexError:
raise InvalidStageFileError(chg_format)
try:
self.charge = float(tokens.pop(0))
except (IndexError, ValueError):
raise InvalidStageFileError(chg_format)
self.weights = dict()
for token in tokens:
try:
sindex, sweight = token.split(':')
self.weights[int(sindex)] = float(sweight)
except ValueError:
raise InvalidStageFileError(chg_format)
[docs] @staticmethod
def writeData(eid, charge, weights, afile):
"""
Write the given data to file.
:type eid: str
:param eid: the entry ID
:type charge: float
:param charge: the charge
:type weights: dict
:param weights: keys are atom indices, values are float weights
:type afile: file
:param afile: the file to write the data to
"""
astr = DELIM.join(['%d:%.6f' % p for p in weights.items()])
afile.write("{eid}{tab}{chg:.4f}{tab}{wts}\n".format(eid=eid,
tab=DELIM,
chg=charge,
wts=astr))
[docs] @staticmethod
def writeModelDataToFile(model, datafile):
"""
Write by-atom charge constraint data from a model to a file in a format
that this class can read in
:type model: `schrodinger.application.jaguar.gui.tabs.sub_tab_widgets.
basis_set_widgets.ChargeConstraintsModel`
:param model: The model containing restraints to write
:type datafile: file
:param datafile: The file to write the data to
"""
for index, row in enumerate(model._rows):
if not index:
datafile.write(CHARGE_CONSTRAINTS + '\n')
ChargeConstraint.writeData(row.entry_id, row.charge,
row.weightsByNum(), datafile)
[docs] @staticmethod
def writeDictDataToFile(adict, datafile):
"""
Write by-atom charge constraint data from a dictionary to a file in a
format that this class can read in
:type adict: dict
:param adict: keys are entry IDs, values are lists of ChargeConstraint
:type datafile: file
:param datafile: The file to write the data to
"""
for index, datas in enumerate(adict.values()):
if not index:
datafile.write(CHARGE_CONSTRAINTS + '\n')
for data in datas:
ChargeConstraint.writeData(data.eid, data.charge, data.weights,
datafile)
[docs]class StageKeywords(object):
""" Holds and manipulates data about keywords """
[docs] def __init__(self, line):
"""
Create a StageKeywords instance from a line of text
Expected tab-delimited format:
entry_id keyword=value keyword=value ...
:type line: str
:param line: The line of text to create the instance from
"""
tokens = line.strip().split(DELIM)
key_fmt = ('The format for a %s data line is: "entry_id keyword=value '
'keyword=value ..." where entry_id is the entry ID of the '
'entry the keywords apply to, followed by any number of '
'keyword=value pairs. Instead got: %s' % (KEYWORDS, line))
try:
self.eid = tokens[0]
except IndexError:
raise InvalidStageFileError(key_fmt)
keystring = tokens[1]
try:
self.keywords = msutils.keyword_string_to_dict(keystring)
except (IndexError, ValueError):
raise InvalidStageFileError(key_fmt)
[docs] @staticmethod
def writeKeywordsToFile(keywords, datafile):
"""
Write keyword data to a file in a format that this class can read in
:type keywords: dict
:param keywords: keys are entry IDs, values dicts with Jaguar
(key, value) pairs
:type datafile: file
:param datafile: The file to write the data to
"""
datafile.write(KEYWORDS + '\n')
for eid, keydict in keywords.items():
keystring = msutils.keyword_dict_to_string(keydict)
datafile.write('{eid}{tab}{keys}\n'.format(eid=eid,
tab=DELIM,
keys=keystring))
[docs] @staticmethod
def writeDictDataToFile(adict, datafile):
"""
Write keyword data from the given dictionary to a file in a format
that this class can read in
:type adict: dict
:param adict: keys are entry IDs, values are lists of StageKeywords
:type datafile: file
:param datafile: The file to write the data to
"""
new_adict = {}
for eid, datas in adict.items():
new_adict[eid] = {}
for data in datas:
new_adict[eid].update(data.keywords)
StageKeywords.writeKeywordsToFile(new_adict, datafile)
[docs]class OverrideKeywords(object):
"""
Holds and manipulates data about override keywords - these are keywords
that either can't be set by the GUI or override the values set in the GUI.
They apply to all structures
"""
[docs] def __init__(self, line):
"""
Create a OverrideKeywords instance from a line of text
Expected tab-delimited format:
keyword=value keyword=value ...
:type line: str
:param line: The line of text to create the instance from
"""
key_fmt = ('The format for a tab-delimited %s data line is: '
'"keyword=value keyword=value ...". Instead got: %s' %
(OVERRIDE_KEYWORDS, line))
try:
self.keywords = msutils.keyword_string_to_dict(line)
except (IndexError, ValueError):
raise InvalidStageFileError(key_fmt)
[docs] @staticmethod
def writeKeyStringToFile(keystring, datafile):
"""
Write override keyword data to a file in a format that this class can
read in
:type keystring: str
:param keystring: the string of keyword=value pairs to write
:type datafile: file
:param datafile: The file to write the data to
"""
datafile.write(OVERRIDE_KEYWORDS + '\n')
tokens = keystring.split()
tabstring = DELIM.join(tokens)
datafile.write('%s\n' % tabstring)
[docs]def get_property_keys_from_keywords(keywords):
"""
Return Jaguar output structure property keys that are
created from the given input keywords.
:type keywords: dict
:param keywords: Jaguar keywords
:rtype: list
:return: structure property keys
"""
keys = []
if keywords.get('ifreq'):
keys.append(jaguarworkflows.ZERO_POINT_ENERGY_PROP)
temp_start = float(keywords.get('tmpini', DEFAULT_TEMP_START))
temp_step = float(keywords.get('tmpstp', DEFAULT_TEMP_STEP))
temp_n = int(keywords.get('ntemp', DEFAULT_TEMP_N))
press_start = float(keywords.get('press', DEFAULT_PRESS_START))
press_step = float(keywords.get('press_step', DEFAULT_PRESS_STEP))
press_n = int(keywords.get('npress', DEFAULT_PRESS_N))
temperatures = [temp_start + i * temp_step for i in range(temp_n)]
pressures = [press_start + i * press_step for i in range(press_n)]
for temp in temperatures:
for press in pressures:
internal = jaguarworkflows.get_internal_energy_key(temp, press)
enthalpy = jaguarworkflows.get_enthalpy_key(temp, press)
free_energy = jaguarworkflows.get_free_energy_key(temp, press)
entropy = jaguarworkflows.get_entropy_key(temp, press)
keys.extend([internal, enthalpy, free_energy, entropy])
itddft = keywords.get('itddft')
isolv = int(keywords.get('isolv', 0))
igeopt = int(keywords.get('igeopt', 0))
if itddft and igeopt > 0:
keys.append(jaguarworkflows.GAS_PHASE_GROUND_ENERGY_PROP)
keys.append(jaguarworkflows.GAS_EXCITED_ENERGY_PROP)
if isolv:
keys.append(jaguarworkflows.SOLUTION_GROUND_ENERGY_PROP)
keys.append(jaguarworkflows.SOL_EXCITED_ENERGY_PROP)
if isolv == 7:
keys.append(jaguarworkflows.GROUND_PCM_SOLVATION_ENERGY_PROP)
else:
keys.append(jaguarworkflows.GROUND_SOLVATION_ENERGY_PROP)
else:
keys.append(jaguarworkflows.GAS_PHASE_ENERGY_PROP)
if isolv:
keys.append(jaguarworkflows.SOLUTION_ENERGY_PROP)
if isolv == 7:
keys.append(jaguarworkflows.PCM_SOLVATION_ENERGY_PROP)
else:
keys.append(jaguarworkflows.SOLVATION_ENERGY_PROP)
# for iuhf == 2 below we should actually key off of the multiplicity
# to set one or the other but that property depends on the structure so
# for now just offer both options
iuhf = int(keywords.get('iuhf', 2))
if itddft:
if iuhf == 0:
if keywords.get('rsinglet', 0):
keys.append(jaguarworkflows.LOWEST_SINGLET_EXCITATION_PROP)
if keywords.get('rtriplet', 0):
keys.append(jaguarworkflows.LOWEST_TRIPLET_EXCITATION_PROP)
elif iuhf == 1:
keys.append(jaguarworkflows.LOWEST_EXCITATION_PROP)
elif iuhf == 2:
keys.append(jaguarworkflows.LOWEST_SINGLET_EXCITATION_PROP)
keys.append(jaguarworkflows.LOWEST_EXCITATION_PROP)
if iuhf == 0 or iuhf == 2:
keys.extend([
jaguarworkflows.HOMO_ENERGY_PROP, jaguarworkflows.LUMO_ENERGY_PROP
])
if iuhf == 1 or iuhf == 2:
keys.extend([
jaguarworkflows.ALPHA_HOMO_ENERGY_PROP,
jaguarworkflows.ALPHA_LUMO_ENERGY_PROP,
jaguarworkflows.BETA_HOMO_ENERGY_PROP,
jaguarworkflows.BETA_LUMO_ENERGY_PROP
])
return keys
[docs]class AnalyzeStageData(object):
""" Holds and manipulates data about an analysis stage """
[docs] def __init__(self, line):
"""
Create an Analyze instance from a line of text
Expected tab-delimited format:
stage_idx property_key property_key float str stage_idx stage_idx...
:type line: str
:param line: The line of text to create the instance from
"""
afmt = (
'The format of the tab-delimited {analyze} data line is: '
'"stage_idx property_key property_key float str stage_idx '
'stage_idx ..." where the first stage_idx is the parent stage '
'index from which to use the structure that will hold the '
'calculated property, the first property_key is the property key '
'for the calculated property, the second property_key is the parent'
' property key that is used to calculated the new property, '
'the float is a multiplicative prefactor for the parent '
'property, the string is "None" if there is only a single '
'parent otherwise it can be {minf}, {maxf}, {avgf}, or {stdf} '
'to calculate the corresponding value from multiple parents, '
'the final stage indices are the parent stage indices from '
'which to get the properties. Instead got: {line}.').format(
analyze=ANALYZE,
minf=MINF,
maxf=MAXF,
avgf=AVGF,
stdf=STDF,
line=line)
tokens = line.strip().split(DELIM)
try:
self.parent_st_idx = int(tokens.pop(0))
except (IndexError, ValueError):
raise InvalidStageFileError(afmt)
if self.parent_st_idx < 1:
raise InvalidStageFileError(afmt)
try:
self.key = tokens.pop(0)
self.parent_key = tokens.pop(0)
except IndexError:
raise InvalidStageFileError(afmt)
if not (self.key.startswith('r') and self.parent_key.startswith('r')):
raise InvalidStageFileError(afmt)
if self.key.count('_') < 2 or self.parent_key.count('_') < 2:
raise InvalidStageFileError(afmt)
try:
self.prefactor = float(tokens.pop(0))
except (IndexError, ValueError):
raise InvalidStageFileError(afmt)
if self.prefactor == 0:
raise InvalidStageFileError(afmt)
try:
self.function = tokens.pop(0)
except IndexError:
raise InvalidStageFileError(afmt)
if self.function not in list(FUNCTION_DICT.keys()):
raise InvalidStageFileError(afmt)
self.parent_idxs = []
while tokens:
try:
parent_idx = int(tokens.pop(0))
except (IndexError, ValueError):
raise InvalidStageFileError(afmt)
if parent_idx < 1:
raise InvalidStageFileError(afmt)
self.parent_idxs.append(parent_idx)
if len(self.parent_idxs) == 1 and self.function != NONE:
raise InvalidStageFileError(afmt)
if len(self.parent_idxs) > 1 and self.function == NONE:
raise InvalidStageFileError(afmt)
[docs] @staticmethod
def writeAnalyzeDataToFile(data, datafile):
"""
Write analyze data to a file in a format that this class can read in.
:type data: list
:param data: contains (parent_st_idx, key, parent_key, prefactor,
function, parent_idx, parent_idx, ...) tuples
:type datafile: file
:param datafile: the file to write the data to
"""
datafile.write(ANALYZE + '\n')
for atuple in data:
aline = DELIM.join(str(x) for x in atuple)
datafile.write(aline + '\n')
datafile.write('\n')
[docs] def getThermoKeys(self, parent_st_dict):
"""
Return the thermochemistry keys for this analyze stage term.
:type parent_st_dict: dict
:param parent_st_dict: contains parent index, structure pairs
:rtype: list
:return: the thermochemistry keys
"""
# For a given energy type if the parent key is not a thermochemistry
# wildcard, like 'r_j_Total_Free_Energy_(au)_*K_*atm' (where '*' is
# literal), then return
if jaguarworkflows.ALL_TEMP_PRESS_KEY_EXT not in self.parent_key:
return []
energy_starter = self.parent_key.replace(
jaguarworkflows.ALL_TEMP_PRESS_KEY_EXT, '')
# For each parent structure collect the given energy type for all
# available temperatures and pressures, if a parent has no keys of
# the given energy type then raise an error
all_keys = []
for parent_idx in self.parent_idxs:
parent_st = parent_st_dict[parent_idx]
keys = []
for key in parent_st.property:
if key.startswith(energy_starter):
keys.append(key)
if not keys:
raise MissingDataError(f'Parent {parent_idx} has no '
f'keys matching {self.parent_key}.')
all_keys.append(keys)
# multiple parents are used for min, max, avg, and std, this wildcard
# automation seems to make sense only for shared temperatures and
# pressures
keys = set(all_keys[0])
for _keys in all_keys[1:]:
keys = keys.intersection(_keys)
if not keys:
raise MissingDataError(
f'Parents {self.parent_idxs} have no '
f'keys matching {self.parent_key} in common.')
return list(keys)
def _getPropertyTerm(self, parent_st_dict, parent_key):
"""
Return the property term for this analyze stage term.
:type parent_st_dict: dict
:param parent_st_dict: contains parent index, structure pairs
:type parent_key: str
:param parent_key: the parent key for the property of interest
:rtype: float
:return: the property term
"""
parent_values = []
for parent_idx in self.parent_idxs:
parent_st = parent_st_dict[parent_idx]
try:
parent_value = parent_st.property[parent_key]
except KeyError:
raise MissingDataError(f'Parent {parent_idx} is missing '
f'the key {parent_key}.')
parent_values.append(parent_value)
value = FUNCTION_DICT[self.function](parent_values)
value *= self.prefactor
return value
[docs]def write_stages_file(stages, file_path):
"""
Write stages to a file with the given path.
:type stages: list
:param stages: contains StageData
:type file_path: str
:param file_path: the file path
"""
with open(file_path, 'w') as afile:
for stage in stages:
afile.write(NEW_STAGE + '\n')
if not stage.analyze_data:
# for non-analysis stages there will no parents or one parent
# that is inherited from
if stage.parent_data:
data = stage.parent_data[0]
ParentStageData.writeInheritableParentDataToFile(
data.stage, data.use_wavefunction, data.use_hessian,
afile)
# handle all data type in DATA_CLASSES at the same time
for atype, aclass in StageData.DATA_CLASSES.items():
adict = stage.entry_data.get(atype)
if adict:
aclass.writeDictDataToFile(adict, afile)
if stage.override_keywords:
astr = msutils.keyword_dict_to_string(
stage.override_keywords.keywords)
OverrideKeywords.writeKeyStringToFile(astr, afile)
if stage.extra_sections.text:
ExtraSectionData.writeSectionsToFile(
stage.extra_sections.text, afile)
else:
# for analysis there will always be at least a single parent
# and possibly multiple parents but none of which are inherited
# from
idxs = [data.stage for data in stage.parent_data]
ParentStageData.writeNonInheritableParentDataToFile(idxs, afile)
datas = []
for data in stage.analyze_data:
datas.append(
tuple([
data.parent_st_idx, data.key, data.parent_key,
data.prefactor, data.function
] + data.parent_idxs))
AnalyzeStageData.writeAnalyzeDataToFile(datas, afile)
[docs]class StageData(object):
"""
Hold and manipulate all the settings for a stage in the workflow
"""
DATA_CLASSES = {
ATOM_BASIS: AtomBasis,
CHARGE_CONSTRAINTS: ChargeConstraint,
GEOM_CONSTRAINTS: GeomConstraint,
ACTIVE_COORDINATES: ActiveCoord,
KEYWORDS: StageKeywords
}
[docs] def __init__(self, index):
"""
Create a StageData instance
:type index: int
:param index: The 1-based index of this stage
"""
self.index = index
# The keys of entry_data are data types, the values of entry_data are
# dictionaries. The keys of those value dictionaries are entry id and
# the values are lists of data objects. For instance, to get the list of
# geometry constraints for entry id EID, use
# self.entry_data[GEOM_CONSTRAINTS][eid]
self.entry_data = defaultdict(lambda: defaultdict(list))
self.parent_data = []
self.extra_sections = ExtraSectionData()
self.override_keywords = None
self.analyze_data = []
self.command_line = None
self.simulation_params = None
self.info = StageInfoLine("", index)
self.custom_script_info = None
[docs] def parseDataLine(self, line, ltype):
"""
Parse a data line
:type line: str
:param line: The line of data to parse
:type ltype: str
:param ltype: The type of data in this line. Should be a module constant
PARENT, EXTRA_SECTIONS, ANALYZE, or one of the DATA_CLASSES keys
"""
if ltype == PARENT:
aparent = ParentStageData(line)
pdex = aparent.stage
if pdex >= self.index:
raise InvalidStageFileError(
'A parent for stage %d must be '
'an earlier stage. Got %d instead.' % (self.index, pdex))
self.parent_data.append(aparent)
elif ltype == EXTRA_SECTIONS:
self.extra_sections.addLine(line)
elif ltype == OVERRIDE_KEYWORDS:
if self.override_keywords:
raise InvalidStageFileError(
'Only one line of override keywords is allowed per stage')
self.override_keywords = OverrideKeywords(line)
elif ltype == ANALYZE:
aanalyze = AnalyzeStageData(line)
for idx in aanalyze.parent_idxs:
if idx >= self.index:
msg = (
'Some parents, {parent_idxs}, used in the analyze '
'stage, {index}, are not from earlier stages.').format(
parent_idxs=aanalyze.parent_idxs, index=self.index)
raise InvalidStageFileError(msg)
self.analyze_data.append(aanalyze)
elif ltype == COMMAND_LINE:
self.command_line = CommandLine(line)
elif ltype == SIMULATION:
self.simulation_params = SimulationParams(line)
elif ltype == INFO:
self.info = StageInfoLine(line, index=self.index)
elif ltype == CUSTOM:
self.custom_script_info = CustomScriptLine(line, self.index)
else:
data = self.DATA_CLASSES[ltype](line)
# Note that because we used defaultdicts, we don't have to worry
# about whether the keys already exist in the dicts or not
self.entry_data[ltype][data.eid].append(data)
[docs] def applyEntryData(self, jagin, eid):
"""
Apply all entry data for entry eid to the given JaguarInput object
:type jagin: `schrodinger.application.jaguar.input.JaguarInput`
:param jagin: The JaguarInput object to apply the data to
:type eid: str
:param eid: The ID of the entry whose data should be applied
"""
for entry_data in self.entry_data.values():
for data in entry_data[eid]:
data.applyToJaguarInput(jagin)
if self.override_keywords:
self.override_keywords.applyToJaguarInput(jagin)
[docs] def getKeywords(self, eid=None):
"""
Get the keywords for this stage for the given eid. If no eid is given,
keywords for a random entry will be returned.
:type eid: str or None
:param eid: If str, keywords for this entry will be supplied. If None, a
random entry will be chosen.
:rtype: dict
:return: keys are keywords, values are values for that keyword
"""
if eid:
keyword_info = self.entry_data[KEYWORDS][eid][0]
else:
eid, keyword_info = self.entry_data[KEYWORDS].pop()[0]
self.entry_data[eid] = keyword_info
return keyword_info.keywords
[docs] def getPropertyKeys(self, st=None):
"""
Return output structure property keys that are
created by this stage.
:type st: schrodinger.structure.Structure or None
:param st: if given and the stage is an analysis stage then
the thermochemistry wildcards are considered
:rtype: list
:return: structure property keys
"""
if self.analyze_data:
# all data in analyze_data have the same base key
base_key = self.analyze_data[0].key
if st:
return [key for key in st.property if key.startswith(base_key)]
else:
return [base_key]
else:
all_keywords = {}
for keywords in self.entry_data[KEYWORDS].values():
all_keywords.update(keywords[0].keywords)
if self.override_keywords:
all_keywords.update(self.override_keywords.keywords)
return get_property_keys_from_keywords(all_keywords)
[docs]class JMSWorkFlow(jaguarworkflows.WorkFlow):
"""
A Jaguar Multistage WorkFlow object that controls all the steps for an entry
"""
[docs] def __init__(self, *args, **kwargs):
"""
Create a JSMWorkFlow instance
:type stages: list
:param stages: A list of StageData objects, one for each step in the
workflow
:type smap_name: str
:param smap_name: The name of the master smap file
:type hierarchical: bool
:param hierarchical: in the output structure file hierarchically group
structures by stage using a job name and original structure title
header
See parent class for additional documentation
"""
self.smap_name = kwargs.pop('smap_name', None)
self.stages = kwargs.pop('stages', None)
hierarchical = kwargs.pop('hierarchical', True)
jaguarworkflows.WorkFlow.__init__(self, *args, **kwargs)
if hierarchical:
if self.backend:
group_name = self.backend.getJob().Name
else:
group_name = self.options.name
hierarchy = '%s->%s' % (group_name, self.base_name)
self.properties[mm.M2IO_DATA_SUBGROUPID] = hierarchy
self.child_properties[mm.M2IO_DATA_SUBGROUPID] = hierarchy
[docs] def getSteps(self):
"""
Create all the steps for this workflow, one for each stage
"""
eid = self.struct.property[msprops.ENTRY_ID_PROP]
for stage in self.stages:
# skip stages that do not define a Jaguar
# keywords section for the given structure
keywords_dict = stage.entry_data.get(KEYWORDS)
if keywords_dict and not keywords_dict.get(eid):
continue
# an analysis stage can not be the first stage
if stage.analyze_data and not self.steps:
continue
parent = noninheritable_parents = None
if stage.parent_data and self.steps:
if len(stage.parent_data) == 1:
# there is a single parent which may or may not
# be inherited from
single_parent = stage.parent_data[0]
step = self.steps[single_parent.stage - 1]
if single_parent.inherited:
parent = step
else:
noninheritable_parents = [step]
else:
# there are multiple parents but none of which
# are inherited from
noninheritable_parents = [
self.steps[x.stage - 1] for x in stage.parent_data
]
self.steps.append(
JMSStep(stage,
self,
parent=parent,
noninheritable_parents=noninheritable_parents))
[docs]class JMSStep(jaguarworkflows.Step):
"""
A step in the Jaguar Multistage Workflow
"""
[docs] def __init__(self, stage, *args, **kwargs):
"""
Create a JMSStep instance
:type stage: `StageData`
:param stage: The settings for this step
See parent class for additional documentation
"""
self.stage = stage
kwargs['step_name'] = 'Stage %d' % self.stage.index
jaguarworkflows.Step.__init__(self, *args, **kwargs)
self.entry_id = self.workflow.struct.property[msprops.ENTRY_ID_PROP]
self.job_name = self.workflow.base_name
if self.stage.info.name != NO_STAGE_NAME:
self.job_name += f'_{self.stage.info.name}'
if self.stage.analyze_data:
parent_st_idx = self.stage.analyze_data[0].parent_st_idx
self.job_name += '_analysis_' + str(parent_st_idx)
# For file names that need to be recorded in the smap file
self.smap_names = []
def _getParentStructureDict(self):
"""
Return a dictionary of structures from parent stages keyed
by parent index.
:rtype: dict
:return: contains parent index, structure pairs
"""
parent_st_dict = {}
for parent in self.stage.parent_data:
idx = parent.stage
step = self.workflow.steps[idx - 1]
st = step.results.getMaeStructure()
parent_st_dict[idx] = st
return parent_st_dict
def _createAnalysisStageOutputFiles(self, st, parent_st_idx):
"""
Create analysis stage output files.
:type st: schrodinger.structure.Structure
:param st: the output structure for the analysis stage
:type parent_st_idx: int
:param parent_st_idx: the parent structure index
"""
out_files = []
analyze_mae_file = self.job_name + '.01.mae'
st.write(analyze_mae_file)
out_files.append(analyze_mae_file)
parent_job_name = self.workflow.steps[parent_st_idx - 1].job_name
# the following is needed in case analysis stages
# are chosen as parents for non-analysis stages
# where wavefunction and/or Hessian data may be
# inherited
analyze_in_file = self.job_name + '.01.in'
shutil.copy(parent_job_name + '.01.in', analyze_in_file)
out_files.append(analyze_in_file)
# the following is needed to ensure that normal modes
# may be viewed on the analysis output structure
for ext in SMAP_ELIGIBLE_EXTENSIONS:
pafile = parent_job_name + ext
if os.path.exists(pafile):
afile = self.job_name + ext
shutil.copy(pafile, afile)
out_files.append(afile)
self.smap_names.append(afile)
if self.workflow.backend:
for afile in out_files:
self.workflow.backend.addOutputFile(afile)
[docs] def getThermoExts(self, parent_st_dict):
"""
Return the thermochemistry extensions for this stage.
:type parent_st_dict: dict
:param parent_st_dict: contains parent index, structure pairs
:rtype: list
:return: the thermochemistry extensions
"""
# an analysis stage is composed of several analyze_data terms,
# collectively the stage can have zero, one, or multiple
# terms using thermochemistry wildcards, in the case of multiple
# they can in fact be for different energy types, for each term
# collect all available temperature and pressure key extensions
all_exts = []
for analyze_data in self.stage.analyze_data:
exts = []
for thermo_key in analyze_data.getThermoKeys(parent_st_dict):
temp = jaguarworkflows.get_temperature(thermo_key)
press = jaguarworkflows.get_pressure(thermo_key)
exts.append(jaguarworkflows.get_temp_press_key_ext(temp, press))
if exts:
all_exts.append(exts)
# if there aren't any terms with wildcards return
if not all_exts:
return []
# an analysis stage featuring terms at different temperatures and
# pressures is reserved for manual creation, for the wildcard
# automation here allow only temperature and pressure combinations
# that are shared by all terms
exts = set(all_exts[0])
for _exts in all_exts[1:]:
exts = exts.intersection(_exts)
if not exts:
raise MissingDataError(
'The analysis terms have no extensions '
f'matching {jaguarworkflows.ALL_TEMP_PRESS_KEY_EXT} in common.')
return list(exts)
[docs] def start(self):
"""
Start the job - create the input and write it, adding necessary output
files to make sure they get copied back
"""
if self.stage.analyze_data:
self.log('Starting analyze stage')
parent_st_dict = self._getParentStructureDict()
parent_st_idx = self.stage.analyze_data[0].parent_st_idx
st = parent_st_dict[parent_st_idx]
st.property[PARENT_ST_IDX_KEY] = parent_st_idx
st.property[WORKFLOW_STAGE_KEY] = self.stage.index
# turn any thermochemistry wildcards into explict temperature
# and pressure key extensions that are available for all property
# terms and all parents for each term, if there aren't any
# wildcards then this is a standard run so just make a no-op list
# of None, if wildcards are used but some parent structure for some
# term is for some reason missing data then log an error, continue
# with an explicit wildcard property, and have it skipped by the
# break statement below
try:
thermo_exts = self.getThermoExts(parent_st_dict)
except MissingDataError as err:
self.log(str(err))
thermo_exts = []
if not thermo_exts:
thermo_exts = [None]
# for each thermochemistry extension evaluate the property as a sum
# over terms, if there were no wildcards or there was a wildcard but
# data was missing then the outer loop is performed once with a value
# of None, if the key asked for is missing then skip the analysis
for thermo_ext in thermo_exts:
for analyze_data in self.stage.analyze_data:
if thermo_ext:
parent_key = analyze_data.parent_key.replace(
jaguarworkflows.ALL_TEMP_PRESS_KEY_EXT, thermo_ext)
key = f'{analyze_data.key}{thermo_ext}'
else:
parent_key = analyze_data.parent_key
key = analyze_data.key
try:
value = analyze_data._getPropertyTerm(
parent_st_dict, parent_key)
except MissingDataError as err:
st.property.pop(key, None)
self.log(str(err))
break
st.property[key] = st.property.get(key, 0.) + value
self._createAnalysisStageOutputFiles(st, parent_st_idx)
self.results = jaguarworkflows.Results(self.job_name)
self.finished = self.ok = True
self.log('Finished analyze stage')
else:
jaguarworkflows.Step.start(self)
[docs] def getStructure(self):
"""
Overwrite the parent class method to return the structure from the
inheritable parent step if an inheritable parent exists, otherwise
return the original structure
"""
if not self.parent:
struct = self.workflow.struct
else:
struct = jaguarworkflows.Step.getStructure(self)
struct.property[WORKFLOW_STAGE_KEY] = self.stage.index
return struct
[docs] def getJaguarRestartFileName(self):
"""
Get the name of the Jaguar restart file for this step - the restart file
contains the wavefunction and hessian
:rtype: str
:return: The name of the Jaguar restart file for this step
"""
restart_name = self.job_name + '.01.in'
if os.path.exists(restart_name):
return restart_name
[docs] def finishProcessingJobControlJob(self):
"""
Add any files to the backend that the Jaguar subjob preserved
"""
super().finishProcessingJobControlJob()
jc_job = self.job.getJob()
if not jc_job:
return
for filename in jc_job.OutputFiles:
# Record any files we need to write to the master smap file
for ext in SMAP_ELIGIBLE_EXTENSIONS:
if filename.endswith(ext):
# For robust driver calculations, we only want to grab
# smap-eligible files if they are in the main directory.
# Subdirectory files are not the final versions
if not self.robust or not os.path.dirname(filename):
self.smap_names.append(filename)
[docs] def write(self, writer, **kwargs):
"""
In addition to the parent method, also compile any smap data into the
master smap file.
See parent method for additional documentation
"""
jaguarworkflows.Step.write(self, writer, **kwargs)
# Add each smap datafile for this step to the master smap using the
# index of this structure in the output mae file
myindex = writer.written_count
if self.smap_names:
with open(self.workflow.smap_name, 'a') as master_smap:
for fname in self.smap_names:
# datafile lines are of the form 'filename: index'
master_smap.write('%s: %d\n' % (fname, myindex))
[docs]def create_workflows(options,
jobq,
stages,
smap_name=None,
hierarchical=True,
workflow_class=None,
robust=True,
tmp_logger=None):
"""
Create a workflow for each structure
:type options: `argparse.Namespace`
:param options: The command line options
:type jobq: `schrodinger.job.queue.JobDJ`
:param jobq: The JobDJ to run subjobs with
:type stages: list
:param stages: A list of `StageData` objects to create `JMSStep` from
:type smap_name: str
:param smap_name: The name of the master smap file
:type hierarchical: bool
:param hierarchical: in the output structure file hierarchically group
structures by stage using a job name and original structure title
header
:type workflow_class: Jaguar multistage workflow
:param workflow_class: Jaguar multistage workflow or any other custom
jaguar workflow
:param bool robust: If True, use the robust Jaguar driver to run Jaguar
jobs. If false, use Jaguar directly.
:type tmp_logger: logging.Logger or None
:param tmp_logger: output logger or None if there isn't one
"""
global logger
try:
reader = structure.StructureReader(options.input_file)
except IOError:
con_man = msutils.with_global_as(
logger, tmp_logger) if tmp_logger else nullcontext()
with con_man:
log_error('Could not read input file: %s' % options.input_file)
strcleaner = jobutils.StringCleaner()
workflow_class = workflow_class or JMSWorkFlow
logger = tmp_logger or logger
workflows = []
for index, struct in enumerate(reader, 1):
# Ensure atom naming is consistent with GUI atom names (MATSCI-4159)
jinput.apply_jaguar_atom_naming(struct)
workflows.append(
workflow_class(struct,
options,
index,
jobq,
strcleaner=strcleaner,
logger=logger,
stages=stages,
smap_name=smap_name,
hierarchical=hierarchical,
robust=robust))
return workflows
[docs]def parse_stage_data(data, meta=False):
"""
Parse data in settings file format into a list of stages
:type data: list or file
:param data: The data to parse. Can be a list of strings with each item a
line of data, or a an open file
:param bool meta: If True, this is a meta workflow-like data file. If False,
this is a jaguarworkflows-like data file
:rtype: list
:return: A list of StageData objects
:raise InvalidStageFileError: if there is an issue
"""
stages = []
stage = ltype = None
for line in data:
line = line.strip()
uline = line.upper()
if line.startswith('#') or not line:
# Comment or blank
continue
elif uline == NEW_STAGE:
# Start a new stage
stage = StageData(len(stages) + 1)
stages.append(stage)
elif uline in DATATAGS:
# Start a new data type
ltype = line
else:
# Parse a data line
if not ltype or not stage:
raise InvalidStageFileError(
'Unrecoginzed format for settings file on line: \n%s' %
line)
try:
stage.parseDataLine(line, ltype)
except InvalidStageFileError as msg:
raise InvalidStageFileError('Error reading settings file:\n%s' %
str(msg))
validate_stages(stages, meta=meta)
return stages
[docs]def read_stage_datafile(filename, meta=False):
"""
Read in a settings file. Logs an error and exists if an error occurs while
reading the file.
:param str filename: The name of the settings file to read
:param bool meta: If True, this is a meta workflow-like data file. If False,
this is a jaguarworkflows-like data file
:rtype: list
:return: A list of StageData objects
:raise InvalidStageFileError: if there is an issue
"""
with open(filename, 'r') as datafile:
stages = parse_stage_data(datafile, meta=meta)
return stages
[docs]def validate_jaguarlike_stages(stages):
"""
Validate the information for jaguarworkflows-like stages
:param list stages: contains StageData
:raise InvalidStageFileError: if there is an issue
"""
for stage in stages:
parents = [parent for parent in stage.parent_data if parent.inherited]
if len(parents) > 1:
msg = ('Invalid data for Stage {idx}. Stages can either '
'have a single parent from which data is inherited or '
'multiple parents from which no data is inherited.').format(
idx=stage.index)
raise InvalidStageFileError(msg)
if parents and stage.analyze_data:
msg = ('An analyze stage can only involve parents for the '
'purposes of analysis.')
raise InvalidStageFileError(msg)
# the following three sets are used to validate parenting in
# analysis
#
# the following are all unique parent stage indices in the
# PARENT stage section
pps = set(x.stage for x in stage.parent_data)
# the following are all unique parent stage indices from the
# parent indices part of all terms in the ANALYZE stage section
aps = set(y for x in stage.analyze_data for y in x.parent_idxs)
# the following are all unique parent stage indices from the
# parent structure indices part of all terms in the ANALYZE stage
# section
asps = set(x.parent_st_idx for x in stage.analyze_data)
if stage.analyze_data and (not aps.issubset(pps) or
not asps.issubset(pps)):
msg = ('An analyze stage requires defining the corresponding '
'parent stages.')
raise InvalidStageFileError(msg)
if len(asps) > 1:
msg = ('Only a single parent structure may be used for an '
'analyze stage.')
raise InvalidStageFileError(msg)
if stage.analyze_data and (stage.entry_data or stage.extra_sections.text
or stage.override_keywords):
msg = ('Jaguar jobs can not be run in analyze stages.')
raise InvalidStageFileError(msg)
[docs]def validate_stages(stages, meta=False):
"""
Validate stages.
:param list stages: contains StageData
:param bool meta: If True, this is a meta workflow-like data file. If False,
this is a jaguarworkflows-like data file
:raise InvalidStageFileError: if there is an issue
"""
if not stages:
raise InvalidStageFileError('No stages found in the stage file')
if meta:
validate_metalike_stages(stages)
else:
validate_jaguarlike_stages(stages)
[docs]def log_error(msg):
"""
Add a message to the log file and exit with an error code
:type msg: str
:param msg: The message to log
"""
log(msg)
log('Finished', timestamp=True)
sys.exit(1)
[docs]def log(msg, timestamp=False, pad=False, pad_below=False):
"""
Add a message to the log file
:type msg: str
:param msg: The message to log
:type pad: bool
:param pad: Whether to pad above this message with a blank line
:type pad_below: bool
:param pad_below: Whether to pad below this message with a blank line
:type timestamp: bool
:param timestamp: Whether to print a timestamp with the message
"""
if timestamp:
msg = msg + ' at ' + time.ctime()
if pad:
textlogger.log(logger, "")
textlogger.log(logger, msg)
if pad_below:
textlogger.log(logger, "")
[docs]def create_smap(basename, output_name, smap_dict=None):
"""
Create the master smap file that will map property files (.vib, .vis, etc)
to structures in the compiled structure file
:type basename: str
:param basename: The base name of all job files
:type output_name: str
:param output_name: The name of the output structure file
:type smap_dict: dict
:param smap_dict: keys are file names, values are indices, the
values are not entry IDs but rather the counting indices (1-based)
of the structure in the given output_name file that the given
file name key is associated with, values can also be lists of
indices for example if a given file name is used for multiple
indices
:rtype: str
:return: The name of the smap file created
"""
if not smap_dict:
smap_dict = {}
smap_name = basename + '.smap'
with open(smap_name, 'w') as sfile:
sfile.write('# smap version 1.0\n')
sfile.write(output_name + '\n')
for file_name, idxs in smap_dict.items():
if not isinstance(idxs, list):
idxs = [idxs]
for idx in idxs:
sfile.write(f'{file_name}: {idx}\n')
if smap_dict:
backend = None
finalize_smap(smap_name, backend)
return smap_name
[docs]def finalize_smap(smap_name, backend):
"""
Finish the master smap file and add it to the jobcontrol backend if
necessary
:type smap_name: str
:param smap_name: The name of the master smap file
:type backend: `schrodinger.job.jobcontrol._Backend`
:param backend: The job control backend or None if there is no backend
"""
keep_smap = False
# If there are any lines with a colon, the smap file contains data
with open(smap_name, 'r') as sfile:
for line in sfile:
if ':' in line:
keep_smap = True
break
if keep_smap:
with open(smap_name, 'a') as sfile:
sfile.write('#end\n') # newline here is critical
if backend:
backend.addOutputFile(smap_name)
else:
fileutils.force_remove(smap_name)