Source code for schrodinger.application.matsci.permittivity

'''
Permittivity module for permittivity workflow.

Copyright Schrodinger, LLC. All rights reserved.
'''

# Contributor: Teng Zhang

import collections
import copy
import glob
import itertools
import math
import os
import re
import shutil
import time
import timeit
import warnings
from contextlib import contextmanager

import networkx
import numpy as np
from ruamel import yaml
from scipy import constants
from scipy import signal

import schrodinger
from schrodinger import structure
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import cms
from schrodinger.application.jaguar import input as jaguar_input
from schrodinger.application.jaguar import output as jaguar_output
from schrodinger.application.matsci import desmondutils
from schrodinger.application.matsci import codeutils
from schrodinger.application.matsci import equilibrium_md as emd
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import msconst
from schrodinger.application.matsci import msprops
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import textlogger
from schrodinger.application.matsci import jaguarworkflows
from schrodinger.application.matsci import \
    genetic_optimization as gnt_opm
from schrodinger.infra import mm
from schrodinger.job import jobcontrol
from schrodinger.utils import fileutils
from schrodinger.utils import imputils
from schrodinger.utils import sea
from schrodinger.utils import units

topo = codeutils.get_safe_package('desmond.topo')
traj = codeutils.get_safe_package('desmond.traj')

PERMITTIVITY = 'permittivity'
TASK = 'task'
PERMITTIVITY_TASK = f'{PERMITTIVITY}_%s_{TASK}'

POLYMER_TEMPLATES = 'polymer_templates'
INITIATORS = 'initiators'
TERMINATORS = 'terminators'
H_MAE = 'H.mae'

POLYMER_BUILDER_PATH = ['polymer_builder_gui_dir', 'polymer_builder_driver.py']
DISORDER_SYSTEM_BUILDER_PATH = [
    'disordered_system_builder_gui_dir', 'disordered_system_builder_driver.py'
]

MMSHARE_SCRIPTS_PATH = fileutils.get_mmshare_scripts_dir()
PERMITTIVITY_GUI_DIR = 'permittivity_gui_dir'
JAGUAR_RUN_PATH = ['run']
REFRACTIVE_INDEX_PATH = [
    PERMITTIVITY_GUI_DIR, 'permittivity_refractive_index_driver.py'
]
STATIC_PERMITTIVITY_PATH = [
    PERMITTIVITY_GUI_DIR, 'permittivity_static_permittivity_driver.py'
]
JAGUAR_MERGE_PATH = [
    PERMITTIVITY_GUI_DIR, 'permittivity_merge_polarizability_driver.py'
]
MD_POST_DENSITY_PATH = [PERMITTIVITY_GUI_DIR, 'permittivity_density_driver.py']
MD_POST_AVE_CELL_PATH = [
    PERMITTIVITY_GUI_DIR, 'permittivity_ave_cell_driver.py'
]
MD_POST_ANALYZE_SIMULATION = ['analyze_simulation.py']
MD_POST_DIPOLE_PATH = [PERMITTIVITY_GUI_DIR, 'permittivity_dipole_driver.py']
PERMITTIVITY_POSITIONAL_PATH = [
    PERMITTIVITY_GUI_DIR, 'permittivity_positional_driver.py'
]

INPUT_CMS = emd.INPUT_CMS
INPUT_MAE = emd.INPUT_MAE
INPUT_TRJ = '-input_trj'
INPUT_ENE = '-input_ene'
INPUT_DIPOLE_FILE = '-dipole_file'
TMP_DIR = 'tmp_dir'
FLAG_INPUTFIIE_TASKTYPE_DRIVERPATHS = '-inputfile_tasktype_driverpaths'
FLAG_PARALLEL = '-parallel'
FLAG_REPLICA_NUM = '-replica_num'
FLAG_RESTART = '-restart'
FLAG_INITIAL_DENSITY = '-initial_density'

PREPEND_RUN = 'prepend_run'
SCHRODINGER_RUN = ['$SCHRODINGER', 'run']
SCHRODINGER_JAGUAR = ['$SCHRODINGER', 'jaguar']
SUBCLASS_DRIVER_PATH = 'driver_path'
CMD_FLAGS = 'cmd_flags'
XTRA_FLAGS = 'xtra_flags'
REPLICA = 'replica'
REPLICA_TYPE = 'replica_type'
REPLICA_FLAG = 'replica_flag'
REPLICA_NUM = 'replica_num'
MERGE = 'merge'
BRANCH = 'branch'
REMOVE = 'remove'
# This is not a valid user-input replica type, as it is an intermediate one
# defined at runtime between merge and branch points. (removed afterwards)
THREADING = 'threading'

ANY = 'any'
NAME = 'name'
TYPE = 'type'
DRIVER_PATH = 'driver_path'
LOCATOR_TYPES = [ANY, NAME, TYPE, DRIVER_PATH]
SPECIFIC_LOCATOR_TYPES = [NAME, TYPE, DRIVER_PATH]

WORKFLOW = 'Workflow'
NONE_TASK_NAMES = [WORKFLOW]
POLYMER_BUILDER_PROTOCOL = 'Polymer Builder Protocol'
MD_SIMULATION_PROTOCOL = 'MD Simulation Protocol'
MD_POST_DENSITY_PROTOCOL = 'MD Density Protocol'
JAGUAR_CALCULATION_PROTOCOL = 'Jaguar Calculation Protocol'
JAGUAR_FINER_CALCULATION_PROTOCOL = 'Jaguar Finer Calculation Protocol'
JAGUAR_POLARIZABILITY_PROTOCOL = 'Jaguar Polarizability Protocol'
JAGUAR_MERGE_PROTOCOL = 'Jaguar Merge Protocol'
REFRACTIVE_INDEX_PROTOCOL = 'Refractive Index Protocol'
MD_POST_AVE_CELL_PROTOCOL = 'MD Average Cell Protocol'
MD_ANALYZE_SIMULATION_PROTOCOL = 'MD Analyze Simulation Protocol'
MD_POST_DIPOLE_PROTOCOL = 'MD Dipole Protocol'
SUSCEPTIBILITY_MERGE_PROTOCOL = 'Susceptibility Merge Protocol'
COMPLEX_PERMITTIVITY_PROTOCOL = 'Complex Permittivity Protocol'
DISORDER_SYSTEM_BUILDER_PROTOCOL = 'Disorder System Builder Protocol'
STATIC_PERMITTIVITY_PROTOCOL = 'Static Permittivity Protocol'

CMS_TASK_TYPES = [
    MD_SIMULATION_PROTOCOL, MD_POST_DENSITY_PROTOCOL, MD_POST_AVE_CELL_PROTOCOL,
    MD_POST_DIPOLE_PROTOCOL, COMPLEX_PERMITTIVITY_PROTOCOL,
    MD_ANALYZE_SIMULATION_PROTOCOL
]
MAE_TASK_TYPES = [
    JAGUAR_POLARIZABILITY_PROTOCOL, DISORDER_SYSTEM_BUILDER_PROTOCOL,
    JAGUAR_CALCULATION_PROTOCOL, JAGUAR_FINER_CALCULATION_PROTOCOL,
    JAGUAR_MERGE_PROTOCOL
]
MULTI_FILES_TASK_TYPES = [
    REFRACTIVE_INDEX_PROTOCOL, STATIC_PERMITTIVITY_PROTOCOL
]

TASK_TYPES = CMS_TASK_TYPES + MAE_TASK_TYPES + MULTI_FILES_TASK_TYPES

INDEX = 'index'
IS_PRODUCTION = 'is_production'
IS_RELAXATION = 'is_relaxation'
USE_CUSTOMIZED_RELAXATION = 'use_customized_relaxation'
MD_STAGE_MARKERS = [IS_PRODUCTION, IS_RELAXATION, USE_CUSTOMIZED_RELAXATION]
DESMOND_YES = 'yes'
DESMOND_NO = 'no'
DESMOND_TRUE = desmondutils.DESMOND_TRUE
ANNEALING = 'annealing'

MSJ = emd.MSJ
YAML = 'yaml'
YAML_EXT = f'.{YAML}'
GZ = 'gz'
MAE = emd.MAE
MAEGZ = f"{MAE}{GZ}"
CMS = emd.CMS
FILE = 'file'
CSV = 'csv'
NPZ = 'npz'
TRJ_EXT = '_trj'
ENE_EXT = '.ene'
FILE_PROP_EXTENSIONS = [GZ, MAE, MAEGZ, CMS, FILE, CSV, NPZ, TRJ_EXT]
INPUT_FILE = emd.INPUT_FILE
TERMINAL_INFILE = f'$in{FILE}'
TERMINAL_OUTFILE_BASENAME = '$outfile_basename'
CMS_OUT_EXT = msconst.CMS_OUT_EXT
POLYMER_OUT_EXT = f'-polymer.{MAEGZ}'
MAE_OUT_EXT = f'-out.{MAE}'
CSV_OUT_EXT = f'-out.{CSV}'
NPZ_OUT_EXT = f'-out.{NPZ}'

SUBJOB_OUT_EXTS = [CMS_OUT_EXT, POLYMER_OUT_EXT, MAE_OUT_EXT]

INPUT_READERS = {INPUT_MAE: structure.Structure.read, INPUT_CMS: cms.Cms}

DEFAULT_REFRACTIVE_INDEX_PROTOCOLS = 'default_refractive_index_protocols.yaml'
DEFAULT_ABBE_NUMBER_PROTOCOLS = 'default_abbe_number_protocols.yaml'
DEFAULT_STATIC_PERMITTIVITY_PROTOCOLS = 'default_static_permittivity_protocols.yaml'
DEFAULT_COMPLEX_PERMITTIVITY_PROTOCOLS = 'default_complex_permittivity_protocols.yaml'
DEFAULT_AN_SP_PROTOCOLS = 'default_an_sp_protocols.yaml'
DEFAULT_AN_SP_CP_PROTOCOLS = 'default_an_sp_cp_protocols.yaml'
DEFAULT_PERMITTIVITY_PROTOCOLS = 'default_permittivity_protocols.yaml'

PERMITTIVITY = 'permittivity'
REFRACTIVE_INDEX = 'refractive_index'
ABBE_NUMBER = 'abbe_number'
STATIC_PERMITTIVITY = 'static_permittivity'
COMPLEX_PERMITTIVITY = 'complex_permittivity'
# PERMITTIVITY has all known calculations without some validations, and thus is
# not customer facing
WORKFLOW_TYPES = [
    REFRACTIVE_INDEX, ABBE_NUMBER, STATIC_PERMITTIVITY, COMPLEX_PERMITTIVITY
]
JAGUAR_WORKFLOW_TYPES = [REFRACTIVE_INDEX, ABBE_NUMBER, STATIC_PERMITTIVITY]
IPOLAR_WORKFLOW_TYPES = [
    REFRACTIVE_INDEX, STATIC_PERMITTIVITY, COMPLEX_PERMITTIVITY
]
DIPOLE_WORKFLOW_TYPES = [STATIC_PERMITTIVITY, COMPLEX_PERMITTIVITY]
SINGLE_MOLECUE_WORKFLOW_TYPES = JAGUAR_WORKFLOW_TYPES.copy()
TRAJECTORY_AUTOCORRELATION_WORKFLOW_TYPES = [COMPLEX_PERMITTIVITY]

# yapf: disable
WORKFLOW_TYPE_YAML_MAP = [
 [[REFRACTIVE_INDEX], DEFAULT_REFRACTIVE_INDEX_PROTOCOLS],
 [[ABBE_NUMBER], DEFAULT_ABBE_NUMBER_PROTOCOLS],
 [[STATIC_PERMITTIVITY], DEFAULT_STATIC_PERMITTIVITY_PROTOCOLS],
 [[COMPLEX_PERMITTIVITY], DEFAULT_COMPLEX_PERMITTIVITY_PROTOCOLS],
 [[REFRACTIVE_INDEX, ABBE_NUMBER],DEFAULT_ABBE_NUMBER_PROTOCOLS],
 [[REFRACTIVE_INDEX, STATIC_PERMITTIVITY], DEFAULT_STATIC_PERMITTIVITY_PROTOCOLS],
 [[REFRACTIVE_INDEX, COMPLEX_PERMITTIVITY], DEFAULT_COMPLEX_PERMITTIVITY_PROTOCOLS],
 [[ABBE_NUMBER, STATIC_PERMITTIVITY], DEFAULT_AN_SP_PROTOCOLS],
 [[ABBE_NUMBER, COMPLEX_PERMITTIVITY], DEFAULT_AN_SP_CP_PROTOCOLS],
 [[STATIC_PERMITTIVITY, COMPLEX_PERMITTIVITY], DEFAULT_COMPLEX_PERMITTIVITY_PROTOCOLS],
 [[ABBE_NUMBER, STATIC_PERMITTIVITY, COMPLEX_PERMITTIVITY], DEFAULT_AN_SP_CP_PROTOCOLS],
 [[REFRACTIVE_INDEX, STATIC_PERMITTIVITY, COMPLEX_PERMITTIVITY], DEFAULT_COMPLEX_PERMITTIVITY_PROTOCOLS],
 [[REFRACTIVE_INDEX, ABBE_NUMBER, COMPLEX_PERMITTIVITY], DEFAULT_AN_SP_CP_PROTOCOLS],
 [[REFRACTIVE_INDEX, ABBE_NUMBER, STATIC_PERMITTIVITY], DEFAULT_AN_SP_PROTOCOLS],
 [[REFRACTIVE_INDEX, ABBE_NUMBER, STATIC_PERMITTIVITY, COMPLEX_PERMITTIVITY], DEFAULT_AN_SP_CP_PROTOCOLS],
 [[PERMITTIVITY], DEFAULT_PERMITTIVITY_PROTOCOLS]]
# yapf: enable
WORKFLOW_TYPE_YAML_MAP = {
    tuple(sorted(x)): y for x, y in WORKFLOW_TYPE_YAML_MAP
}

JAGUAR = 'jaguar'
POSITIONAL = 'positional'
DELIMITER_BTW = ';'
DELIMITER_WI = ','
NONE = 'None'
EV = 'eV'
RELAXATION_PROTOCOLS = 'relaxation_protocols'
NONUNIFORM_FOURIER_TRANSFORM_FAILED_MSG = "Nonuniform fourier transform failed."
WAM_TYPE = jobutils.WAM_TYPES.MS_AMORPHOUS_DIELECTRIC

logger = None


[docs]def log_debug(msg): """ Add a message to debug logger file. :type msg: str :param msg: The message to debug logging """ global logger if logger is None: logger = textlogger.create_debug_logger(__name__) logger.debug(msg)
[docs]@contextmanager def elapsed_timer(): """ Handy way to print time consumption for the execution of a few python line codes. example to print line number and time elapsed: from inspect import currentframe with elapsed_timer() as elapsed:: xxx print(currentframe().f_lineno, elapsed()) xxx print(currentframe().f_lineno, elapsed()) :return: float :rtype: time elapsed in second """ start = timeit.default_timer() elapser = lambda: timeit.default_timer() - start yield elapser
[docs]def path_join(paths, linux=True): """ Wrapper around os.path.join to allow forced '/' as the path separator. :param linux: Force using '/' as the separator if True :type linux: bool :return: The str paths :rtype: str """ if linux: return '/'.join(paths) return os.path.join(*paths)
[docs]def permittivity_transform(tau0, beta, tau_max=20, tau_num=None, omega_range=(1E6, 1E14), omega_num=None, simpson_quadrature=True, angular_frequency=False): """ Based on the parameters in the KWW equation, non-uniform grid in tau and omega are used to transform the time domain (tau) signal to frequency domain (omega). Note: the KWW is the equation 1 and implemented transform is equation 7 in the following paper. Further Considerations of Non Symmetrical Dielectric Relaxation Behaviour arising from a Simple Empirical Decay Function. BY G. WILLIAM AND D. C. WATT :param tau0: The tau0 in unit ps is the decay constant in KWW equation :type tau0: float :param beta: The beta in KWW equation, which must be positive :type beta: float :param tau_max: Tau upper limit in tau (time) domain :type tau_max: float :param tau_num: Total number of data points in the tau (time) domain :type tau_num: int or None :param omega_range: Output frequency domain range :type omega_range: tuple of two floats :param omega_num: Total number of data points in the frequency domain :type omega_num: int or None :param simpson_quadrature: If True, simpson rule with quadratic interpolation is used to reduce the oscillation :type simpson_quadrature: bool :param angular_frequency: If True, the first column is angular frequency else regular frequency :type angular_frequency: bool :raise ValueError: Non-uniform fourier transform yields nan (e.g. beta is negative) :raise AssertionError: beta <= 0 :return: Three columns are frequency, storage permittivity, and loss permittivity :rtype: 3xn numpy.array """ assert beta > 0, NONUNIFORM_FOURIER_TRANSFORM_FAILED_MSG tau0 *= constants.pico # picosecond to second if tau_num is None: tau_num = int(8E5) if omega_num is None: omega_num = int(5E2) if simpson_quadrature: if tau_num % 2: # simpson quadrature requires even num of data points tau_num += 1 # // instead of / is needed as int helps to build up the list weighting_factors = [1] + [4, 2] * (tau_num // 2 - 1) + [1] scaling_factor = 3. else: weighting_factors = [1] * tau_num scaling_factor = 1. x_eq7 = np.linspace(0, tau_max, num=tau_num, endpoint=False) exp_x = np.exp(-1 * x_eq7) * weighting_factors omega_min, omega_max = omega_range omega_log10 = np.linspace(np.log10(omega_min), np.log10(omega_max), num=omega_num, endpoint=False) omega = np.power(10, omega_log10) exp_iwx = np.exp(-1j * omega * tau0 * x_eq7[:, np.newaxis]**(1. / beta)) permittivity = np.dot(exp_x, exp_iwx) * tau_max / tau_num / scaling_factor freq = omega if angular_frequency else omega / np.pi / 2 out_data = np.array([freq, np.real(permittivity), -np.imag(permittivity)]) out_data = np.transpose(out_data) if np.isnan(out_data).any(): raise ValueError(NONUNIFORM_FOURIER_TRANSFORM_FAILED_MSG) return out_data
[docs]def get_properties_from_inputfiles(opts, input_required_properties, input_readers=None, startwith=False): """ Get properties from the input files. If the requested doesn't exist, a KeyError error is raised. :param opts: The parsed command line options :type opts: Named tuple :param input_required_properties: Keys are struct type and values are property keys. :type input_required_properties: dict :param input_readers: Keys are struct type and values are readers for different struct types. :type input_readers: dict :param startwith: If True, property keys starting with the requested string are considered matches. :type startwith: bool :raise KeyError: When the struct misses one required property :return: Requested property keys and obtained values :rtype: dict """ if input_readers is None: input_readers = INPUT_READERS properties = {} for infile_atrr_name in [INPUT_MAE, INPUT_CMS]: input_file_found = getattr(opts, infile_atrr_name, None) if input_file_found is None: continue input_reader = input_readers.get(infile_atrr_name, None) if input_reader is None: continue required_properties = input_required_properties.get( infile_atrr_name, None) if required_properties is None: continue st_ish = input_reader(input_file_found) for pkey in required_properties: if startwith: props = { x: st_ish.property[x] for x in st_ish.property.keys() if x.startswith(pkey) } if not props: raise KeyError( f"{input_file_found} doesn't properties starting with {pkey}." ) else: try: props = {pkey: st_ish.property[pkey]} except KeyError: raise KeyError(f"{input_file_found} doesn't have {pkey}.") properties.update(props) return properties
[docs]def is_file_property(property_name): """ Whether the property is related to a file. :param property_name: A property :type property_name: str :return: Whether the property should point to a file :rtype: bool """ for extension in FILE_PROP_EXTENSIONS: if property_name.lower().endswith(extension): return True return False
[docs]class chdir(fileutils.chdir): """ Make dir, change, and exit. If this indeed creates a dir, it will remove the dir on exit with rm_created_dir=True. """
[docs] def __init__(self, dirname, rm_created_dir=False, always_create=False): """ :param dirname: The directory name to make and cd to :type dirname: str :param rm_created_dir: If True and this class creates a directory, the directory will be removed on exit. :type rm_created_dir: bool :param always_create: If True and the input dirname exists, creates a directory, with a different name. For example 'my_dir' exists, the newly created dir will be 'my_dir.1'. If 'my_dir.1' exists, the newly create dir will be 'my_dir.2'. :type always_create: bool """ super().__init__(dirname) self.always_create = always_create self.rm_created_dir = rm_created_dir self.successful_mkdir = False
def __enter__(self): """ :return: On object of this class :rtype: 'chdir' """ self._createDir() super().__enter__() return self def _createDir(self): """ Create the directory and save the state. If always_create is requested, change the directory name until it can be successfully created. """ while not self.successful_mkdir: try: os.mkdir(self.dirname) except FileExistsError: if self.always_create: self.dirname = self.getNextDirname() else: break else: self.successful_mkdir = True def __exit__(self, *args): super().__exit__(*args) if self.successful_mkdir and self.rm_created_dir: fileutils.force_rmtree(self.dirname, ignore_errors=True)
[docs] def getNextDirname(self, midfix='.', zfill_width=0): """ Get the next dirname based on the input path and current existing directories. :param path: Path to a directory :type path: str :param midfix: This string split the basename and number <root><midfix><number> :type midfix: str :param zfill_width: Pad a numeric string S with zeros on the left, to fill a field of the specified width :type zfill_width: int :return: The next dirname based on the input path and all dirs in the relevant folder. :rtype: str """ # Decompose the path: splitted_path = self.dirname.split(midfix) if len(splitted_path) == 1: query_root = splitted_path[0] else: query_root = midfix.join(splitted_path[:-1]) # Search directory for files whose prefixes are of form # <query_root><midfix><N>, where N is an integer: query_glob = ''.join([query_root, midfix, '*']) # starting index for number in filename prefix: start_num = len(query_root) + len(midfix) max_number_found = 0 for pname in glob.iglob(query_glob): try: number = int(pname[start_num:]) except ValueError: # something other than a number was found starting at start_num continue else: max_number_found = max(max_number_found, number) return ''.join( [query_root, midfix, str(max_number_found + 1).zfill(zfill_width)])
[docs]def get_config(config_yaml): """ Get the Config instance from the input yaml file. :param config_yaml: The input configure yaml file :type config_yaml: str :return: The Config instance with property processed. :rtype: Config """ config = Config(config_yaml) config.load() config.setWorkflow() config.setNameToTypeMap() config.sortStages() return config
[docs]def get_config_from_options(options): """ Get the Config instance from the command line options. :param options: Command line options :type options: `argparse.Namespace` :return: The Config instance with property processed. :rtype: Config """ config_file = Config.get_file_from_options(options) config = get_config(config_file) return config
[docs]class Config(object): """ Class to store the config file, parse workflow, provide method to get fields. """
[docs] def __init__(self, config_yaml): """ :param config_yaml: The config file :type config_yaml: str """ self.config_yaml = config_yaml self.config_data = None self.workflow = None self.task_by_name = {} self.task_by_type = {} self.name_to_type = {}
[docs] @staticmethod def get_file(filename=None, product_dir=None): """ Get the permittivity config yaml from the mmshare data dir. :param filename: The yaml filename :type filename: str :param product_dir: Subdir name inside the mmshare_data_dir :type product_dir: str :return: The full path and filename for the input yaml :rtype: str """ if filename is None: filename = DEFAULT_PERMITTIVITY_PROTOCOLS if product_dir is None: product_dir = PERMITTIVITY mmshare_data_dir = fileutils.get_mmshare_data_dir() return os.path.join(mmshare_data_dir, product_dir, filename)
[docs] @classmethod def get_file_from_options(cls, options): """ Return the permittivity config file based on command line inputs. :param options: The command parsered options :type options: `argparse.Namespace` :return: The path to a config yaml :rtype: str """ if options.config_yaml: return options.config_yaml config_filename = WORKFLOW_TYPE_YAML_MAP[options.workflow_type] config_filepath = cls.get_file(filename=config_filename) return config_filepath
[docs] def load(self): """ Load configure file, set the task protocols by name and type NOTE: If no type found, name is used as the type Note: If the task type have multiple stages and each stage has its own index field, the sequence of the stages is sorted according to the index field. task_name has higher priority than task_type type, as multiple task names can share the same task type. """ with open(self.config_yaml) as f_hrp: self.config_data = yaml.load(f_hrp, Loader=TupleSafeLoader) for task_name_with_type, task_protocol in self.config_data.items(): task_name, task_type = self.splitNameAndType(task_name_with_type) protocol = WorkflowProtocol( task_protocol) if task_name == WORKFLOW else TaskProtocol( task_protocol) self.task_by_name[task_name] = protocol # yaml setting block has 'task name' is used instead of # 'task name (task type)' task_type = task_type if task_type else task_name self.task_by_type[task_type] = protocol
[docs] def setWorkflow(self): """ workflow is a nested data structure of list, tuple, and dict. Itself looks like an unbalanced tree data type. list gives linear job dependency as desmond msj Tuple gives independence between each task, and all these subjobs can share parent from the same level or one level up list. Dict links additional parent jobs. NOTE: dict is unhashable and cannot be used as the key of a dict. Each key or value is either a TASK NAME or TASK NAME (int,int,..) whose (int, int,..) defines the task id so that the same task name can be used multiple times, and the additional dependence by dict can indentify unique task. (This is not tested or may not be well implemented) The same task name can appear several times in the workflow, which means that the same task (with the same settings) run multiple times in the workflow. However, the dict value must link to a single task ( either unique task name or with task id) The workflow section describes the task sequence and dependence. Each string corresponds to a task name (may with task id), and each task name must have a top level section in configure file to define the settings. The title of each task setting section describe the name and type. For example, TASK NAME (TASK TYPE): [flag1 : value1, flag2 : value2] The TASK TYPE is used to map for backend class """ self.workflow = self.getTaskProtocol(task_name=WORKFLOW)
[docs] def getTaskNames(self): """ Get all the task names in the workflow block :return: tasknames in the workflow :rtype: set """ return self.workflow.getTaskNames()
[docs] def sortStages(self, index_field=INDEX): """ Sort the stage order according to the index field. :param index_field: str of None :type index_field: Use this index field to sort all the stages in a task protocol """ for task_protocal in self.config_data.values(): try: task_protocal.sort(key=lambda x: x.get(index_field)) except (AttributeError, TypeError): # {} or None as in the list; not all have index key pass
[docs] def splitNameAndType(self, name_and_type): """ Split the task section title into task name and task type. 'task name (task type)' or 'task name' alone in front of the task setting section. :param name_with_prop: 'task name (task type)' or 'task name' :type name_with_prop: str :return: (task name, take type) or (task name , None) :rtype: (str, str) or (str , None) """ matches = re.search(r'\(.*?\)', name_and_type) if not matches: return name_and_type.strip(), None matched_str = matches.group(0) # removes the () in "(a task type)" prop = matched_str[1:-1] name = name_and_type.split(matched_str)[0].strip() return name, prop
[docs] def setNameToTypeMap(self): """ Based on the config file, get a dict map from task name to task type. If 'task name' instead of 'task name (task type)' is found, task type is assumed to be the same as task name. :param config_yaml: A configure filename with path :type config_yaml: str :return: Map from task name to task type :rtype: dict """ name_and_types = self.config_data.keys() task_names = [x for x in name_and_types if x not in NONE_TASK_NAMES] for task_name_with_type in task_names: task_name, task_type = self.splitNameAndType(task_name_with_type) if task_type is None: task_type = task_name self.name_to_type[task_name] = task_type
[docs] def getTaskProtocol(self, task_name=None, task_type=None): """ Get the protocol for selected task by name or type. If both provided, select by type. :param task_name: The task name of a stage. the xxx as in the 'xxx (task type)' before the setting block. :type task_name: str or None :param task_type: The task type of a stage. the xxx as in the 'task type (xxx)' before the setting block. :type task_type: str or None :param index_field: str of None :type index_field: Use this index field to sort all the stages in a task protocol :return: list of list for workflow block; list of list for desmond multiple stages; dict for regular single stage :rtype: dict, list of str, or list of list """ assert any([task_name, task_type]) if task_name: return self.task_by_name.get(task_name) return self.task_by_type.get(task_type)
[docs]class TupleSafeLoader(yaml.SafeLoader): """ Yaml Loader that recognizes tuple tags. """
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.add_constructor(u'tag:yaml.org,2002:python/tuple', lambda x, y: tuple(self.construct_sequence(y)))
[docs]class ConfigWriter(object):
[docs] def __init__(self, options, jobname): """ Class to write out config yaml. :param options: Commandline options :type options: `argparse.Namespace` :param jobname: The jobname based on which the yaml filename is set :type jobname: str or False """ self.options = options self.jobname = jobname self.config_file = None self.config = None self.config_updated = None self.task_names = [] self.task_types = [] self.driver_paths = []
[docs] def run(self): """ Main method to drive the functionality. """ self.loadWorkflow() self.loadAndUpdateTaskProtocol() self.validateGpuHosts() self.checkInputfileTasktypeDriverpaths() self.write()
[docs] def loadWorkflow(self): """ Load the config yaml based on commandline options, and parse the file. """ self.config_file = Config.get_file_from_options(self.options) self.config = get_config(self.config_file) if not self.config.workflow: raise KeyError( f"{self.config_file} doesn't have a '{WORKFLOW}' block defined." ) self.task_names = self.config.getTaskNames() self.config_updated = {WORKFLOW: self.config.workflow.protocol}
[docs] def loadAndUpdateTaskProtocol(self): """ Update the task protocols based command line input, and set task type and driver path. """ for index, task_name in enumerate(sorted(self.task_names)): try: task_type = self.config.name_to_type[task_name] except KeyError: raise ValueError( f"{task_name} in {WORKFLOW} block doesn't have " f"a task type block to define settings. ({self.config_file})" ) self.task_types.append(task_type) task_protocol = self.config.getTaskProtocol(task_name=task_name) task_subclass = WorkflowParser.getTaskSubClass( task_type, task_protocol) # If the user provides a yaml file, use as is. Otherwise, based on the # user's overall seed, set a different one for each sub task seed = None if self.options.config_yaml else index + self.options.seed # 'MD Simulation Production' 'MD Simulation Relax and Npt' task_protocol = task_subclass.optionsToTaskProtocols(self.options, task_name, seed=seed) driver_path = getattr(task_subclass, 'DRIVER_PATH', None) driver_path = task_protocol.getDriverPath(driver_path=driver_path) if driver_path: self.driver_paths.append(driver_path) task_full_task = f"{task_name} ({task_type})" if task_type else task_name self.config_updated.update({task_full_task: task_protocol.protocol})
[docs] def validateGpuHosts(self): """ Validate GPU availability against the type of protocols. (whether desmond tasks exist) DEPRECATED due to MATSCI-9604 """ if MD_SIMULATION_PROTOCOL not in self.task_types: return # Note: more generial way to also validate whether users define a gpu task is_debug = schrodinger.in_dev_env() if is_debug: return hostname = jobutils.get_jobhost_name() is_host_known = jobutils.is_hostname_known(hostname) if not (is_host_known or hostname in [None, 'localhost']): raise TypeError(f"Unknown HOST name ({hostname}).")
# MATSCI-9604: localhost returns a job id and launches the job to remote, # but the remote host fails the validation due to unknown reason. # is_gpu_avail = jobutils.is_jobhost_gpu_available() # Mac dev environ doesn't have desmond GPU on local for debug # if not is_gpu_avail: # raise TypeError("Desmond simulation steps are found, but subhost " # "is not a GPU host.")
[docs] def checkInputfileTasktypeDriverpaths(self): """ Validate whether the restart flag for input file task type driver paths is followed by properly formatted values. DEPRECATED as MATSCI-8523 is planned as the customer facing api """ if self.options.inputfile_tasktype_driverpaths is None or not isinstance( self.options.inputfile_tasktype_driverpaths, list): return for infile_job_info in self.options.inputfile_tasktype_driverpaths: if infile_job_info.locator_type == TYPE and \ infile_job_info.locator_value not in self.task_types: raise ValueError( f"{infile_job_info.locator_value} after " f"{FLAG_INPUTFIIE_TASKTYPE_DRIVERPATHS} doesn't have " f"a task type block to define settings in {self.config_file}" ) elif infile_job_info.locator_type == NAME and \ infile_job_info.locator_value not in self.task_names: raise ValueError( f"{infile_job_info.locator_value} after " f"{FLAG_INPUTFIIE_TASKTYPE_DRIVERPATHS} doesn't have " f"a task name block to define settings in {self.config_file}" ) elif infile_job_info.locator_type == DRIVER_PATH and \ infile_job_info.locator_value not in self.driver_paths: raise ValueError( f"{infile_job_info.locator_value} after " f"{FLAG_INPUTFIIE_TASKTYPE_DRIVERPATHS} doesn't have " f"a defined task driver in {self.config_file}.")
[docs] def write(self): """ Write out the updated config yaml to disk if users don't specific their input config yaml. """ if not self.jobname: # we are in get_job_spec_from_args() return # If config file provided by users, we use the original filename if self.options.config_yaml: # Users provide a config file, and use it as is. # The above check works as validator of the config_yaml and options. config_yaml = self.options.config_yaml config_basename = os.path.basename(config_yaml) if os.path.exists( config_yaml) and not os.path.exists(config_basename): # Users pass their customized config file and thus should be aware # of the config yaml. The copying to job launch dir only happens # when running without -(SUB)HOST and without the config file in # the current folder. shutil.copy(config_yaml, config_basename) return # If config file from distribution, we use the jobname config_yaml = self.jobname + parserutils.YAML_EXT # Users use command flags, which have partial information of the # task settings. Load some default config files, combine it # with users' input, and write a complete final config file. with open(config_yaml, 'w') as fh_config: yaml.dump(self.config_updated, fh_config, default_flow_style=False)
[docs]class Protocol(object):
[docs] def __init__(self, protocol): """ The protocol for the workflow or one task. :param protocol: dict for workflow and regular task protocol; list for multistage desmond protocol. :type protocol: dict or list """ self.protocol = protocol
[docs]class WorkflowProtocol(Protocol):
[docs] def __init__(self, protocol): """ The protocol for the workflow. :param protocol: dict for workflow and regular task protocol; list for multistage desmond protocol. :type protocol: dict or list """ super().__init__(protocol)
[docs] def indexUnbalancedWorkflow(self): """ Generate an unique tuple of integers for each task (item) in the workflow. Return the map from tuple to task name. :return: The map from un unique tuple to task name :rtype: dict """ indexed_items = {} indexes = [0] while indexes: try: val = self.getItemByIndex(indexes) while emd.DiffusivityMsdAnalysis.isIterable(val): indexes.append(0) val = self.getItemByIndex(indexes) if isinstance(val, dict): val = list(val.keys())[0] indexed_items[tuple(indexes)] = val except IndexError: indexes.pop() try: indexes[-1] += 1 except IndexError: break return indexed_items
[docs] def getItemByIndex(self, index): """ Get the item in the workflow protocol of this index. :param index: The indexes of the item :type index: tuple or list of int :return: The item saved in the workflow yaml with this index :rtype: str, list, or dict """ indexes = list(index) nested = copy.deepcopy(self.protocol) while indexes: index = indexes.pop(0) nested = nested[index] return nested
[docs] def getParentTasknames(self, index): """ Get the parent task names for the current task pointed by this index. Parents can be defined via three methods: parent / child by nested list or tuple: outer is parent; inner is child parent / child by list: former is the parent; later is the child parent / child by dict: key is the child; values are parents. These methods are equivalent in setting the job dependency. :param index: The index of the current item :type index: tuple, list, or generator for integers :return: The task names of parent jobs :rtype: list of str """ node_indexes = list(index) node_indexes = node_indexes.copy() parent_tasknames = self.getParentTasknameByIndexFromDict(node_indexes) one_level_up_node_index = node_indexes.copy() node_inner_index = one_level_up_node_index.pop() node_in_data_struct = self.getItemByIndex(one_level_up_node_index) if isinstance(node_in_data_struct, list) and node_inner_index != 0: # parent / child by list: former is the parent; later is the child # and the current one has former in the same list # Note: the former can be dict, list, or tuple pre_data_struct = node_in_data_struct[node_inner_index - 1] if not emd.DiffusivityMsdAnalysis.isIterable(pre_data_struct): # Now the former is either list or tuple pre_data_struct = [pre_data_struct] # Now the values in list or tuple are strings (task names) pre_top_level_tasknames = self.getFirstLevelTasknames( pre_data_struct) if isinstance(pre_data_struct, tuple): # Pre data structure has independent runs at top top level parent_tasknames += list(pre_top_level_tasknames) else: # Pre data structure has list of jobs at top top level parent_tasknames.append(pre_top_level_tasknames[-1]) else: # This is the first subjob at this level, or # subjobs on the same level are independent pre_one_level_up_node_index = one_level_up_node_index # Before 'pre_one_level_up_node_index[-1] -= 1' directly to make it # the previous item of the one level outer of the current node index, # we need to consider the situation when the one level outer of the # current node index is still in a tuple, which mean the the previous # time of this one level outer of the current node index is an # independent job with respect to the one level up thing. # FIXME: currently, it doesn't handle list inside tuple inside tuple, # which need recursive algorithm for this three or N levels of iterables pre_two_level_up_node_index = pre_one_level_up_node_index.copy() try: # This is complicated due to 'list inside tuple' pre_two_level_up_node_index.pop() node_in_data_struct2 = self.getItemByIndex( pre_two_level_up_node_index) if isinstance(node_in_data_struct2, tuple): pre_one_level_up_node_index = pre_two_level_up_node_index except IndexError: # No two-level outer for the current outer level, # don't modify pre_one_level_up_node_index pass try: # No previous item for this outer level pre_one_level_up_node_index[-1] -= 1 except IndexError: return parent_tasknames if pre_one_level_up_node_index[-1] < 0: # int in the node index should starts from 0. # Negative int points to no new parents task return parent_tasknames pre_one_level_up_data_struct = self.getItemByIndex( pre_one_level_up_node_index) parent_task_names = self.flattenWithoutDict( pre_one_level_up_data_struct) # Only append the last, which is more favorable for the concept of list. # If the parent_task_names are from a list, the last is the direct parent # of this job. If the parent_task_names are tuple, we sort of randomly # pick one as the single parent. # The simulations are whether list in a tuple in list or # list in a tuple in tuple situation, # a situation, quite hard to read from hand written text or think of. # In addition, too complicated text situation may have another easy way # to write the text, For example, multiple nested tuples physically # mean all subjobs are all in parallel, equivalent to a single tuple. # For example, multiple nested lists physically # mean all subjobs are all in serial, equivalent to a single list. # Thus, for the first implementation, this "parent / child by nested # list or tuple" and "parent / child by list" togather only assign a # single parent task. # Multiple parent tasks are available through "parent / child by dict" # Hope this gives syntax flexibility and covers most cases. parent_tasknames.append(parent_task_names[-1]) return parent_tasknames
[docs] def getParentTasknameByIndexFromDict(self, index): """ Get the parent task names of the current task indicated by the dict format. (the key and value of the item in the workflow pointed by the index are child and parents). If str instead of dict, return empty. :param index: The indexes of the item :type index: tuple or list of int :raise TypeError: when it is not a str or dict, meaning this not a leaf item in the workflow tree data struct. :return: The task name for this item :rtype: str """ item_with_dict = self.getItemByIndex(index) if emd.DiffusivityMsdAnalysis.isIterable(item_with_dict): raise TypeError(f"{item_with_dict} is not dict or str.") return list(item_with_dict.values()) if isinstance( item_with_dict, dict) else []
[docs] def flattenWithoutDict(self, an_iterable_item): """ Robust function to flatten list recursively. If a dict presents as the value, use the key of the dict. The difference between the above flatten is that this function don't return a dict as one value in the list. Instead, it uses the dict key to represent the dict. :param an_iterable_item: The iterable item to be flatten. (an iterable that describes the whole or part of the workflow) :type an_iterable_item: list, tuple or set :return: The flatten items in a list ([uniterable value 1, uniterable value 2, uniterable value 3 ...]) :rtype: list of str values """ items_with_dict = emd.DiffusivityMsdAnalysis.flatten(an_iterable_item) return [ list(x.keys())[0] if isinstance(x, dict) else x for x in items_with_dict ]
[docs] def getFirstLevelTasknames(self, task_stages): """ Get the first level tasknames without dict. For example, [a, [b, [c]], {d:f}, [e]] returns [a, d] :param task_stages: An iterable that describes the whole or part of the workflow. :type task_stages: list, tuple, or set :return: The first level tasknames :rtype: list of string """ tasknames = [ taskname for taskname in task_stages if not emd.DiffusivityMsdAnalysis.isIterable(taskname) ] return self.flattenWithoutDict(tasknames)
[docs] def getTaskNames(self): """ A set of all task name in the workflow section. :return: set of task names :rtype: set """ return set(self.flattenWithoutDict(self.protocol))
[docs] def getTaskName(self, index): """ Get the task name of this item in the workflow. :param index: Index of an item containing a task name. :type index: list or tuple of int :raise TypeError: When it is not a str or dict, meaning this not a leaf item in the workflow tree data struct. :return: The task name for this item :rtype: str """ item_with_dict = self.getItemByIndex(index) if emd.DiffusivityMsdAnalysis.isIterable(item_with_dict): raise TypeError(f"{item_with_dict} is not dict or str.") return list(item_with_dict.keys())[0] if isinstance( item_with_dict, dict) else item_with_dict
[docs]class TaskProtocol(Protocol): """ Wrapper for task protocol: regular driver or desmond multi-stages. """
[docs] def __init__(self, protocol): """ The protocol for the workflow or one task. :param protocol: The settings for a task: dict for task protocol; list for multistage desmond protocol. :type protocol: dict or list """ super().__init__(protocol)
[docs] def getDriverPath(self, driver_path=None): """ Return the driver path of the task :param driver_path: The driver path of the task :type driver_path: None or str :return: The driver path of the task :rtype: str or None """ try: driver_path = self.protocol[SUBCLASS_DRIVER_PATH] except (KeyError, TypeError): # For multistage task_protocol is a list instead of a dict return driver_path else: # Linux driver path is used on windows machines return path_join(driver_path)
[docs] def getPreScript(self, pre_script=SCHRODINGER_RUN): """ Return the shell script before regular driver. For example, $SCHRODINGER/run :param pre_script: The shell script before regular driver :type pre_script: None or list :return: The shell script before regular driver :rtype: None or str """ pre_script = self.protocol.get(PREPEND_RUN, pre_script) return path_join(pre_script) if pre_script else None
[docs] def getFlagBlock(self, label=CMD_FLAGS): """ Get the flag block with desired label. :param label: The block header label to search :type label: str :return: The flag block for a certain task in the config yaml :rtype: None or list """ try: flag_block = self.protocol.get(label) except AttributeError: # Multistage protocol is a list instead of a dict return None else: return flag_block
[docs] def getInterpFlagBlock(self, label=CMD_FLAGS): """ Get the flag block with desired label and interpret the block into dict data type. :param label: The block header label to search :type label: str :return: Keys are flags without '-', and values are None for positional flag, NONE for optional 'store_true' or 'store_false' flag, 'none' or 'any_value' for regular optional 'store' flag :rtype: dict """ flag_block = self.getFlagBlock(label=label) return {} if flag_block is None else self.interpretFlagBlock(flag_block)
[docs] def getReplicaKeywords(self): """ Return the replica keyword list or None. :return: The replica keywords :rtype: None or list :raise ValueError: If more than one replica setting blocks are found. """ xtra_flag_block = self.getFlagBlock(label=XTRA_FLAGS) if not xtra_flag_block: return replica_keywords = [x for x in xtra_flag_block if x.get(REPLICA_TYPE)] if not replica_keywords: return if len(replica_keywords) != 1: raise ValueError( f"Duplicated replica settings found in {self.protocol}") return replica_keywords[0]
[docs] def rmReplicaKeywords(self): """ Remove the replica keywords from xtra flag block. """ xtra_flag_block = self.getFlagBlock(label=XTRA_FLAGS) if not xtra_flag_block: return for val in xtra_flag_block: val.pop(REPLICA_TYPE, None) val.pop(REPLICA_NUM, None) val.pop(REPLICA_FLAG, None)
[docs] def getSeed(self, seed_flag): """ Return the random seed for this task. :return: The random seed in the config or the default value :rtype: int """ iflag_block = self.getInterpFlagBlock() if not iflag_block: return parserutils.RANDOM_SEED_DEFAULT return iflag_block.get(seed_flag, parserutils.RANDOM_SEED_DEFAULT)
[docs] def getCmdFromFlagBlock(self): """ Return the command list from the cmd flag block. :return: The command list built from the task protocol cmd flag block :rtype: None or list """ flag_block = self.getFlagBlock(label=CMD_FLAGS) if not flag_block: return interpreted = self.interpretFlagBlock(flag_block) # positional arguments keep the same count and order cmd = [x for x in flag_block if not isinstance(x, dict)] for flag, val in interpreted.items(): if val is None: # positional continue flag = '-' + flag if val == NONE: cmd += [flag] # store_ture or store_false else: cmd += [flag, val] # regular optional flag with value return cmd
[docs] def interpretFlagBlock(self, flag_block): """ Interpret the flag block from yaml parsed values to a dict data type. :param flag_block: a list of dict and str contains flags and values :type flag_block: list of dict and str. In .yaml, {flag: NONE} means 'store_true' or 'store_false' optional arguments; [flag] means positional arguments; {flag: 'none'; flag: 'value'} means regular optional arguments. :return: Keys are flags without '-', and values are None for positional flag, NONE for optional 'store_true' or 'store_false' flag, 'none' or 'any_value' for regular optional 'store' flag :rtype: dict :raise ValueError: if the item in cmd block is not string or dict. """ interpreted = {} for cmd_flag_and_val in flag_block: if isinstance(cmd_flag_and_val, dict): for flag, val in cmd_flag_and_val.items(): interpreted[flag] = str(val) elif isinstance(cmd_flag_and_val, str): # We assume that the same positional flag doesn't appear twice # And this only works for python3 as it is an ordered dict. interpreted[cmd_flag_and_val] = None else: raise ValueError(f"{cmd_flag_and_val} is unknown.") return interpreted
[docs] def update(self, user_input, block_label=CMD_FLAGS): """ This function updates the task_protocol according to a user input dict. The update happens for optional arg: store, store_true and store_false. The flag to locate the optional block must be either CMD_FLAGS or XTRA_FLAGS. The logic is to put all subdriver recognizable optional flags in CMD_FLAGS, and put other flags in XTRA_FLAGS. In short, flags in CMD_FLAGS are passed to subjob driver directly. Flags in XTRA_FLAGS can be used to control worflow (not used in subdriver) or be reformatted and passed to subdriver in a file format depending on specific implementation. If user_input = {'animal': 'cat'}, the 'cat' will be used for drivers. If user_input = {'animal': None}, auto set: drivers with animal key use their own defaults set by the driver or config yaml. If user_input = {'is_rainy': True}, drivers with is_rainy key adds -is_rainy to the cmd. If user_input = {'is_rainy': False}, drivers with is_rainy key remove -is_rainy in the cmd. If user_input = {'is_rainy': None}, auto set: drivers with is_rainy key check config yaml for whether to add -is_rainy in cmd. Current implementation doesn't support the positional argument substitution, as this is a replacements of certain list values and positional argument are input/output filenames that are changed by updateInfileAndCommand(). :param user_input: dict for regular single stage :type user_input: dict :param block_label: The flag block in task_protocol defined by this flag is updated (either CMD_FLAGS or XTRA_FLAGS) :type block_label: str :param user_input: {flag: value} :type user_input: dict """ assert block_label in [CMD_FLAGS, XTRA_FLAGS] defaults_to_add, defaults_to_remove = self.parserUserInput(user_input) if not any([defaults_to_add, defaults_to_remove]): return flag_block = self.getFlagBlock(block_label) if not flag_block: if defaults_to_add: flag_block = [] self.protocol[block_label] = flag_block else: # no list block in the yaml and users don't want add anything return for dict_block in flag_block: if isinstance(dict_block, dict): break else: if not defaults_to_add: # no dict block in the yaml and users don't want add anything return dict_block = {} flag_block.append(dict_block) dict_block.update(defaults_to_add) for default_to_remove in defaults_to_remove: dict_block.pop(default_to_remove, None) if not dict_block and not flag_block.remove(dict_block): self.protocol.pop(block_label)
[docs] def parserUserInput(self, user_input): """ Parse the user input dict data and return a dict for values to be added and a list for values to be removed. :param user_input: The dict to be parsed :type user_input: dict :return: Values to be added, values to be removed :rtype: dict, list """ defaults_to_add = {} defaults_to_remove = [] for flag, value in user_input.items(): if value is None: continue elif value is True: defaults_to_add[flag] = NONE elif value is False: defaults_to_remove.append(flag) else: # Any key value pairs. For example, {forcefield: OPLS3e} defaults_to_add[flag] = value return defaults_to_add, defaults_to_remove
[docs]class StepBase(object): OUTFILE_EXT = MAE_OUT_EXT TERMINAL_INFILE_NUM = 1 DRIVER_PATH = None NUM_CRU_PROP = 'i_matsci_Num_Cru' REPLICA_TAG = "Replica %s: "
[docs] def __init__(self, basename, config_yaml, task_name=None, infile=TERMINAL_INFILE, outfile=None, additional_infiles=None, master_logger=None, node_index=None): """ :param basename: The basename of this subjob :type basename: str :param config_yaml: The config yaml file contain all job config :type config_yaml: str :param task_name: Task name of this sub job :type task_name: str :param infile: The input filename :type infile: str :param outfile: The output filename :type outfile: str :param additional_infiles: This records additional input files (e.g. the second, third, and so on) :type additional_infiles: list of str :type master_logger: `logging.Logger` :param master_logger: The logger to use for recording messages into master job -driver.log. :type node_index: The node index :param node_index: tuple """ self.jobname = basename self.cmd_dir = basename self.config_yaml = os.path.basename(config_yaml) self.task_name = task_name self.infile = infile self.additional_infiles = additional_infiles if additional_infiles else [] self.master_logger = master_logger self.node_index = node_index self.duration = 0. self.replica_index = int(self.node_index[-1]) if self.node_index \ else None self.outfile = outfile if outfile else basename + self.OUTFILE_EXT self.backend = jobcontrol.get_backend() self.first_lvl_child_jobs = [] self.first_lvl_parent_jobs = [] if self.backend and self.outfile: jobutils.add_outfile_to_backend(self.outfile, self.backend) self.config = get_config(self.config_yaml) self.setLogTag() self.task_protocol = self.config.getTaskProtocol( task_name=self.task_name) self.updateTaskReplicaSettings() self._command = self._getCommand()
[docs] def log(self, msg, add_log_tag=True): """ Log information to the master driver.log :param msg: The info to print :type msg: str :param add_log_tag: If True, additional tags is logged in front of the msg for the replica jobs. :type add_log_tag: bool """ if self.master_logger is None: return if add_log_tag: msg = self.log_tag + msg self.master_logger.info(msg)
[docs] def setLogTag(self): """ Set the log tag based on node index. """ self.log_tag = "" if self.node_index is None or WorkflowParser.REPLICA_DELIM not in self.node_index: return index = self.node_index.index(WorkflowParser.REPLICA_DELIM) exts = self.node_index[index:] rindexes = ''.join(map(str, exts)).strip(WorkflowParser.REPLICA_DELIM) rindexes = rindexes.replace(WorkflowParser.REPLICA_DELIM, ',') self.log_tag = self.REPLICA_TAG % rindexes
[docs] def updateTaskReplicaSettings(self): """ Update the task protocol due to replica. """ pass
[docs] def setup(self, copy_infile=True, copy_additional_files=True, more_files=None): """ Setup things before the job starts. For example, logging format, and file transfer. :param copy_infile: Copy the input files of the class to current folder :type copy_infile: bool :param copy_additional_files: Copy the additional files of the class to current folder :type copy_additional_files: bool :param more_files: Use this arguments to copy customized files :type more_files: None or list of str """ self.duration -= time.time() self.copyFiles(copy_infile=copy_infile, copy_additional_files=copy_additional_files, more_files=more_files) log_debug(f'Setup self.infile is {self.infile}') if self._command: log_debug(f"Subjob command: {' '.join(self._command)}")
[docs] def copyFiles(self, copy_infile=True, copy_additional_files=True, more_files=None): """ Copy files to current dir. :param copy_infile: Copy the input files of the class to current folder :type copy_infile: bool :param copy_additional_files: Copy the additional files of the class to current folder :type copy_additional_files: bool :param more_files: Use this arguments to copy customized files :type more_files: None or list of str """ all_files = more_files if more_files else [] if copy_infile: all_files.append(self.infile) if copy_additional_files: all_files += self.additional_infiles for a_infile_path in all_files: if not os.path.isabs(a_infile_path): a_infile_path = os.path.join(os.pardir, a_infile_path) try: shutil.copy(a_infile_path, os.curdir) except FileNotFoundError: log_debug(f'The input file {a_infile_path} is not found.') except shutil.SameFileError: # This happens when the job is restarted in the same dir without # -HOST flag pass else: file_basename = os.path.basename(a_infile_path) jobutils.add_outfile_to_backend( os.path.join(self.cmd_dir, file_basename), self.backend)
[docs] def getDownStreamPolymerJob(self): """ Search and return the down-stream polymer job :return: A subtask step with polymer driver in the cmd :rtype: 'Step' or None """ job = self polymer_driver = path_join(*POLYMER_BUILDER_PATH) while job.first_lvl_child_jobs: job = job.first_lvl_child_jobs[0] if polymer_driver in job._command: return job
def _getCommand(self, host='localhost:1', append_prerun=True, pre_script=SCHRODINGER_RUN, driver_path=None): """ Construct command based on the config yaml and task name. :param host: Host name for this subjob. :type host: str :param append_prerun: If True, str (e.g. $SCHRODINGER/run) is prepended to the cmd :type append_prerun: bool :param pre_script: This is the prepended str. None means default ($SCHRODINGER/run) :type pre_script: str or None :param driver_path: The driver path after $SCHRODINGER/run :type driver_path: str :return: A command that launchapi can launch :rtype: list of str """ cmdlines = [] if append_prerun: prepend_run = self.task_protocol.getPreScript(pre_script=pre_script) if prepend_run: cmdlines += [prepend_run] if not driver_path: driver_path = self.task_protocol.getDriverPath( driver_path=self.DRIVER_PATH) if driver_path: cmdlines += [driver_path] cmd_from_flag_block = self.task_protocol.getCmdFromFlagBlock() if cmd_from_flag_block: cmdlines += cmd_from_flag_block if parserutils.FLAG_GPU in cmdlines: # The smart distribution is off, and thus, without localhost # the subjob will be submitted to the subhost, which in this driver # must be a gpu host. cmdlines.remove(parserutils.FLAG_GPU) elif host: cmdlines += ['-HOST', host] cmdlines += ['-JOBNAME', self.jobname] terminal_infile_count = cmdlines.count(TERMINAL_INFILE) if not terminal_infile_count and TERMINAL_INFILE == self.infile: n_before_infiles = 2 if append_prerun else 1 cmdlines = cmdlines[:n_before_infiles] + [ TERMINAL_INFILE ] * self.TERMINAL_INFILE_NUM + cmdlines[n_before_infiles:] elif terminal_infile_count: # This is only for initiating class manually with specific input file # When under jobcontrol, the TERMINAL_INFILE should be in the initial # cmd and later be automatically configured according to parent jobs # in finalization. terminal_infile_index = cmdlines.index(TERMINAL_INFILE) cmdlines[terminal_infile_index] = self.infile else: cmdlines += [self.infile] if cmdlines.count(TERMINAL_OUTFILE_BASENAME): terminal_outbase_index = cmdlines.index(TERMINAL_OUTFILE_BASENAME) cmdlines[terminal_outbase_index] = self.jobname return cmdlines
[docs] def getFirstMatchedfilename(self, filenames, exts): """ Get the filename that ends with the first extension. If multiple filenames end with that extension return the first match. :param filenames: list of filenames :type filenames: list of string :param exts: list of extensions :type exts: list of string :return: The matched filename or None :rtype: string or None """ for out_ext in exts: for filename in filenames: if filename.endswith(out_ext): return filename
[docs] @staticmethod def finalizeStep(job): """ Update child job's command with the parent job output, and set output files if not set by the subjob itself. :type job: An instance of a `BaseJob` subclass (such as `JobControlJob` or `SubprocessJob`) :param job: current job """ job.duration += time.time() job.duration /= 3600 # Seconds to hours job.log('', add_log_tag=False) job.setOutfile() log_debug( f"Current job dir is {job.cmd_dir} and structure is {job.outfile}") log_debug(f"Total duration: {job.duration:.4f} h") if not job.outfile: return jobutils.add_outfile_to_backend(job.outfile, job.backend) par_outfile_with_dir = os.path.join(job.cmd_dir, job.outfile) for first_lvl_child_job in job.first_lvl_child_jobs: first_lvl_child_job.updateInfileAndCommand(par_outfile_with_dir) log_debug(f"Subjob dir is {first_lvl_child_job.cmd_dir}")
[docs] def setOutfile(self): """ Set the job outfile with an existing file based on the jobcontrol job, if not set. :raise FileNotFoundError: Failed to set an existing outfile """ if self.outfileExist(): return cjob = self.getJob() try: self.outfile = cjob.StructureOutputFile except AttributeError: # some scripts (e.g., analyze_simulation.py) don't have a structure # or cms set by StructureOutputFile pass else: if self.outfileExist(): return self.outfile = self.getFirstMatchedfilename(cjob.OutputFiles, SUBJOB_OUT_EXTS) if self.outfileExist(): return raise FileNotFoundError( f"Either {self.outfile} or {cjob.StructureOutputFile} found. " f"No found files in {cjob.OutputFiles} ends with {SUBJOB_OUT_EXTS}")
[docs] def outfileExist(self): """ Whether the outfile exists :return: Whether the file exists. :rtype: bool """ return self.outfile and os.path.exists(self.outfile)
[docs] def updateInfileAndCommand(self, par_outfile_with_dir): """ Update the input file in the command and the input file attributes of a child job to the output file path of the parent job. :param child_job: The subjob whose command will be updated. :type child_job: an instance of a `BaseJob` subclass (such as `JobControlJob` or `SubprocessJob`) :param par_outfile_with_dir: The output file from the parent job that will be used as the input file of this child job. :type par_outfile_with_dir: str """ infile_count = self._command.count(TERMINAL_INFILE) log_debug( f'Original command: {self._command} and the count is {infile_count}' ) if not infile_count: return infile_index = self._command.index(TERMINAL_INFILE) self._command[infile_index] = os.path.basename(par_outfile_with_dir) if infile_count == 1: self.infile = par_outfile_with_dir else: self.additional_infiles.append(par_outfile_with_dir) log_debug(f'Updated command: {self._command}')
[docs] @classmethod def getSeedFlag(cls): """ Return the flag for random seed. Should be overwritten by the subclass only when subtask reads in this flag :return: The flag to set the random seed (without the '-') :rtype: str or None """ return None
[docs] @classmethod def getForceFieldFlag(cls): """ Return the flag for force field. Should be overwritten by the subclass only when subtask reads in this flag :return: The flag to set the force field (without the '-') :rtype: str or None """ return None
[docs] @classmethod def optionsToTaskProtocols(cls, options, task_name, seed=None): """ This is the class method to generate task_protocol based on command line options and default values from config yaml. This method should be overwritten by subclass method as this one returns the protocol in the default configure yaml. :param options: The parsed command line options :type options: `argparse.Namespace` :param task_name: Task name of the current stage :type task_name: str :param seed: Random seed to randomize the initial velocities :type seed: int or None :return: Updated task protocol :rtype: `TaskProtocol` """ config = get_config_from_options(options) task_protocol = config.getTaskProtocol(task_name=task_name) if not task_protocol: # This helps when the string leading the setting block is xxx # instead of xxx (xxx). # In other words, TASK_NAME only instead of TASK_NAME (TASK_NAME) task_protocol = config.getTaskProtocol(task_type=task_name) task_user_input = { cls.getSeedFlag(): seed, cls.getForceFieldFlag(): options.forcefield } task_user_input.pop(None, None) task_protocol.update(task_user_input) return task_protocol
[docs]class Step(StepBase, jobutils.RobustSubmissionJob): """ Over write the parent class, and set subjobname, subdir, and etc. """
[docs] def __init__(self, basename, config_yaml=DEFAULT_PERMITTIVITY_PROTOCOLS, task_name=None, infile=TERMINAL_INFILE, outfile=None, additional_infiles=None, master_logger=None, node_index=None): """ :param basename: The base name of this subjob :type basename: str :param config_yaml: The config yaml file contain all job config :type config_yaml: str :param task_name: Task name of this sub job :type task_name: str :param infile: The input filename :type infile: str :param outfile: The output filename :type outfile: str :param additional_infiles: This records additional input files (e.g. the second, third, and so on) :type additional_infiles: list of str :type master_logger: `logging.Logger` :param master_logger: The logger to use for recording messages into master job -driver.log. :type node_index: The node index :param node_index: tuple """ StepBase.__init__(self, basename, config_yaml, task_name=task_name, infile=infile, outfile=outfile, additional_infiles=additional_infiles, master_logger=master_logger, node_index=node_index) jobutils.RobustSubmissionJob.__init__(self, self._command, name=self.jobname, command_dir=self.cmd_dir) self.addFinalizer(self.finalizeStep, run_dir=self.cmd_dir)
[docs]class PolymerBuilder(Step): """ Class for customized polymer builder. """ DRIVER_PATH = path_join(POLYMER_BUILDER_PATH) POLYMER_BUILDER = 'polymer_builder' DEFAULT_JOBNAME = PERMITTIVITY_TASK % POLYMER_BUILDER FLAG_NUM_CRU = '-num_cru' SEQUENCE_PROP = msprops.SEQUENCE_PROP FLAG_POLYMER_CRU_NUM = '-polymer_cru_num' DEFAULT_NUMBER_OF_POLYMERS = 25
[docs] def __init__(self, *arg, **kwargs): super().__init__(*arg, **kwargs) ifbk = self.task_protocol.getInterpFlagBlock() ncru = ifbk.get(self.FLAG_NUM_CRU[1:], self.DEFAULT_NUMBER_OF_POLYMERS) self.num_cru = int(ncru) if self.num_cru != 0: return # Polymer of zero cru (H2) is requested self.task_protocol.update({x: False for x in ifbk.keys()}) self._command = self._getCommand(driver_path=path_join( [PERMITTIVITY_GUI_DIR, 'permittivity_polymer_builder_driver.py']))
[docs] def updateTaskReplicaSettings(self): """ Update the task settings due to replica. """ if self.replica_index is None: return replicakeywords = self.task_protocol.getReplicaKeywords() self.num_cru = replicakeywords[REPLICA_FLAG][self.replica_index] self.task_protocol.update({self.FLAG_NUM_CRU[1:]: self.num_cru})
[docs] @staticmethod def getTemplates(moiety_dir=None, moiety_filename=None): """ Return the polymer template file or dir :param moiety_dir: Moiety dirname inside the polymer template dir :type moiety_dir: str :param moiety_filename: Filename inside the moiety dir :type moiety_filename: str :return: A polymer template file or dir :rtype: str """ path_args = [ fileutils.get_mmshare_data_dir(), PERMITTIVITY, POLYMER_TEMPLATES ] if moiety_dir: path_args.append(moiety_dir) if moiety_filename: path_args.append(moiety_filename) return os.path.join(*path_args)
[docs] @classmethod def optionsToTaskProtocols(cls, options, task_name, seed=None): """ Update the polymer driver commands based on users' input. :param options: The parsed command line options :type options: `argparse.Namespace` :param task_name: Task name :type task_name: str :param seed: Random seed to randomize the initial velocities :type seed: int or None :return: Updated task protocol :rtype: `TaskProtocol` """ task_protocol = super().optionsToTaskProtocols(options, task_name, seed=seed) task_protocol.update({cls.FLAG_NUM_CRU[1:]: options.polymer_cru_num}) if options.max_oligomer_size: cru_nums = list(range(0, options.max_oligomer_size + 1)) cru_nums.append(options.polymer_cru_num) user_input = { REPLICA_TYPE: BRANCH, REPLICA_FLAG: cru_nums, REPLICA_NUM: len(cru_nums) } task_protocol.update(user_input, block_label=XTRA_FLAGS) return task_protocol
[docs] @classmethod def getForceFieldFlag(cls): """ Return the flag for force field. Should be overwritten by the subclass only when subtask reads in this flag :return: The flag to set the force field (without the '-') :rtype: str or None """ return parserutils.FLAG_FORCEFIELD[1:]
[docs]class DisorderSystemBuilder(Step): """ Class for customized disorder system builder. """ DSB_DRIVER = imputils.import_module_from_file( path_join([MMSHARE_SCRIPTS_PATH] + DISORDER_SYSTEM_BUILDER_PATH)) DRIVER_PATH = path_join(DISORDER_SYSTEM_BUILDER_PATH) DISORDER_SYSTEM = 'disorder_system' DEFAULT_JOBNAME = PERMITTIVITY_TASK % DISORDER_SYSTEM FLAG_MOLECULES = DSB_DRIVER.MOLECULES_FLAG FLAG_COMPOSITION = DSB_DRIVER.COMPOSITION_FLAG FLAG_DENSITY = DSB_DRIVER.DENSITY_FLAG FLAG_MONOMER_CHARGE = parserutils.FLAG_MONOMER_CHARGE DEFAULT_DENSITY = 0.5 DEFAULT_NUMBER_OF_MOLECULES = 500
[docs] @classmethod def getSeedFlag(cls): """ Return the flag for random seed. :return: The flag to set the random seed (without the '-') :rtype: str """ return parserutils.FLAG_RANDOM_SEED[1:]
[docs] @classmethod def getForceFieldFlag(cls): """ Return the flag for force field. Should be overwritten by the subclass only when subtask reads in this flag :return: The flag to set the force field (without the '-') :rtype: str or None """ return parserutils.FLAG_FORCEFIELD[1:]
[docs] @classmethod def optionsToTaskProtocols(cls, options, task_name, seed=None): """ Process the stages, for example, update relaxation stages, update the productions stage, and remove productions markers. :param options: The parsed command line options :type options: `argparse.Namespace` :param task_name: Task name :type task_name: str :param seed: Random seed to randomize the initial velocities :type seed: int or None :return: Updated task protocol :rtype: `TaskProtocol` """ task_protocol = super().optionsToTaskProtocols(options, task_name, seed=seed) mol_num = jobutils.get_option(options, cls.FLAG_MOLECULES) density = jobutils.get_option(options, FLAG_INITIAL_DENSITY) dsb_protocol = { cls.FLAG_MOLECULES[1:]: mol_num, cls.FLAG_COMPOSITION[1:]: mol_num, cls.FLAG_DENSITY[1:]: density } if options.monomer_ff_q: dsb_protocol[cls.FLAG_MONOMER_CHARGE[1:]] = True task_protocol.update(dsb_protocol) if options.replica_num and options.replica_num > 1: control_user_input = { REPLICA_TYPE: BRANCH, REPLICA_NUM: options.replica_num } task_protocol.update(control_user_input, block_label=XTRA_FLAGS) return task_protocol
[docs] def updateTaskReplicaSettings(self): """ Update the task protocol due to replica. """ if self.replica_index is None: return seed_flag = self.getSeedFlag() if not seed_flag: return orig_seed = self.task_protocol.getSeed(seed_flag) task_user_input = {seed_flag: int(orig_seed) + self.replica_index} self.task_protocol.update(task_user_input)
[docs]class MultiStageMd(Step): """ Multi-stage simulations with relaxation. """ BROWNIAN_DYNAMICS = 'Brownian Dynamics' MOLECULAR_DYNAMICS = 'Molecular Dynamics' AVERAGE_CELL = 'Average Cell' MATSCI_ANALYSIS = 'Matsci Analysis' # MSJ keywords SIMULATE = 'simulate' CFG_FILE = 'cfg_file' # These are the argument keys in MDMSJStringer, which may not be the # exact string in msj. For example, temp vs temperature TIME = desmondutils.MSJStringer.TIME TEMP = 'temp' PRESS = desmondutils.MSJStringer.PRESSURE TIMESTEP = desmondutils.MSJStringer.TIMESTEP ENSEMBLE = desmondutils.MSJStringer.ENSEMBLE DIPOLE_INTERVAL = 'dipole_moment_dot_interval' SIM_TYPE = 'sim_type' TRJ_INTVL = 'trajectory.interval' SEED = 'seed' FLAG_TO_KEY = { parserutils.FLAG_MD_TIME: TIME, parserutils.FLAG_MD_PRESS: PRESS, parserutils.FLAG_MD_TEMP: TEMP, parserutils.FLAG_MD_TIMESTEP: TIMESTEP, parserutils.FLAG_MD_ENSEMBLE: ENSEMBLE, parserutils.FLAG_MD_TRJ_INT: TRJ_INTVL } DESMOND_UNIT_CONVERTERS = { parserutils.FLAG_MD_TIME: lambda x: x * units.NANO2PICO, parserutils.FLAG_MD_TIMESTEP: lambda x: [ x * units.FEMTO2PICO, x * units.FEMTO2PICO, x * units.FEMTO2PICO * 3. ] } TRJ_WRITE_VEL = desmondutils.MSJStringer.TRJ_WRITE_VEL LAST_STAGE_DICT = desmondutils.MSJStringer.MSJ_LAST FLOAT_FIELDS = [TIME, TEMP, PRESS] # Need to remove these fields before converting to desmond msj # SIM_TYPE defines the stage type, and will be removed on converting to msj DUMMY_FIELDS = [INDEX] + MD_STAGE_MARKERS MSJSTRINGERS = { BROWNIAN_DYNAMICS: desmondutils.BrownieMSJStringer, MOLECULAR_DYNAMICS: desmondutils.MDMSJStringer, AVERAGE_CELL: desmondutils.AveCellMSJStringer, MATSCI_ANALYSIS: desmondutils.MSAnalysisMSJStringer } DEFAULT_MSJ_HEADER = 'task { task = "desmond:auto"}\n' HOFMANN = 'Hofmann' COMPRESSIVE = 'Compressive' ADD_NEW = 'Add New...' HOFMANN_PROTOCOL = f'{HOFMANN.lower()}.yaml' COMPRESSIVE_PROTOCOL = f'{COMPRESSIVE.lower()}.{MSJ}' RELAXATION_MAP = { HOFMANN: HOFMANN_PROTOCOL, COMPRESSIVE: COMPRESSIVE_PROTOCOL, } RELAXATION_MAP = { x: os.path.join(RELAXATION_PROTOCOLS, y) for x, y in RELAXATION_MAP.items() } OUTFILE_EXT = CMS_OUT_EXT
[docs] def __init__(self, *arg, **kwarg): """ See parent class. """ # basename is the only arg: e.g., 'md_simulation_relax_and_npt_111' self.msj = arg[0] + f'.{MSJ}' super().__init__(*arg, **kwarg)
[docs] def setup(self): """ Over write parent class method. """ super().setup() self.createMSJ()
[docs] def createMSJ(self): """ Create msj for desmond. """ stages, msj_str = self.getStages() msj_stages = self.parseMsjStr(msj_str) self.copyCfgFiles(msj_stages) self.updateReplicaSeed(stages, msj_stages) desmondutils.create_msj(stages, filename=self.msj, task_stage=False, msj_string_header=str(msj_stages)) msj_filepath = os.path.join(self.cmd_dir, self.msj) jobutils.add_outfile_to_backend(msj_filepath, self.backend)
[docs] def getStages(self): """ Get desmond stages from config yaml. :return: The MSJSTRINGERS contains desmond stage information, the orig_msj_str is for msj string provided by users. :rtype: list of desmondutils.MSJStringer, str """ in_stages = self.task_protocol.protocol # For the safety if case that these markers appear in the cmd config yaml [y.pop(x, None) for x in self.DUMMY_FIELDS for y in in_stages] orig_msj_str = '' out_stages = [] for in_stage in in_stages: in_stage = in_stage.copy() msj_str = in_stage.get(MSJ, '') if msj_str: # Only one single in_stage with MSJ is tested, and thus we # recommend to use one single MSJ in one stage without mixing. # The accruments of MSJ is open for future extension. orig_msj_str += msj_str continue for float_field in self.FLOAT_FIELDS: try: # The temperature of an annealing stage is a list if in_stage[ ANNEALING] == DESMOND_TRUE and float_field == self.TEMP: continue except KeyError: pass field_value = in_stage.pop(float_field, None) try: in_stage[float_field] = float(field_value) except (TypeError, ValueError): if field_value: log_debug( f"{field_value} as {float_field} cannot be converted to float." ) sim_type = in_stage.pop(self.SIM_TYPE, self.MOLECULAR_DYNAMICS) msj_stringer = self.MSJSTRINGERS[sim_type] out_stage = msj_stringer(**in_stage) out_stages.append(out_stage) return out_stages, orig_msj_str
[docs] def parseMsjStr(self, msj_str): """ Parse MSJ string, set task stage and customize settings. :param msj_str: A string in MSJ format :type msj_str: str :return: Modified msj string with task stage :rtype: schrodinger.application.desmond.multisim.Msj """ if not msj_str: # Use a default header if not provided msj_str = self.DEFAULT_MSJ_HEADER from schrodinger.application.desmond.multisim import parser try: msj_stages = parser.parse(string=msj_str) except AssertionError as err: log_debug(str(err)) # Failed to parse the one from customers, and thus use the default msj_str = self.DEFAULT_MSJ_HEADER msj_stages = parser.parse(string=msj_str) try: msj_stages.stage[0].task except AttributeError: # customers' msj has no task stage msj_str = self.DEFAULT_MSJ_HEADER + msj_str msj_stages = parser.parse(string=msj_str) # In case that users input a msj for relaxation, modify the GPU task stage # for backward compatibility msj_stages.stage[0].task.update( {'set_family': { 'desmond': { 'bigger_rclone': 'false' } }}) return msj_stages
[docs] def copyCfgFiles(self, msj_stages): """ Copy the config files into the subtask dir. :param msj_stages: The msj that may contain cfg files :type msj_stages: 'schrodinger.application.desmond.multisim.parser.Msj' """ sim_stages = [x for x in msj_stages.stage if self.SIMULATE in x] sim_stages = [x for x in sim_stages if self.CFG_FILE in x.simulate] cfg_files = set([x.simulate.cfg_file.val for x in sim_stages]) cfg_files = set([os.path.basename(x) for x in cfg_files]) for cfg_file in cfg_files: shutil.copy(os.path.join(os.pardir, cfg_file), cfg_file) cfg_filepath = os.path.join(self.cmd_dir, cfg_file) jobutils.add_outfile_to_backend(cfg_filepath, self.backend)
[docs] def updateReplicaSeed(self, stages, msj_stages): """ Update the random seed in the msj so that each replica has a different value. :param stages: The desmond MSJSTRINGERS contains desmond information :type stages: list of desmondutils.MSJStringer :param msj_header: The msj header with task stage (may have other simulation stages) :type msj_header: schrodinger.application.desmond.multisim.Msj """ if self.replica_index is None: return if self.modifyReplicaStages(stages): return if self.modifyReplicaMsjHeader(msj_stages): return self.modifyReplicaFirstStage(stages)
[docs] def modifyReplicaStages(self, stages): """ Search the desmond stages for seed, and tune them based on the replica index. :param stages: The desmond MSJSTRINGERS contains desmond information :type stages: list of the MSJSTRINGERS :return: Whether some seeds in the stages are modified :rtype: bool """ stage_seed_modified = False for stage in stages: seed = stage.data.get(MdStageModifier.RANDOMIZE_VELOCITY_DOT_SEED) if seed is None: continue seed += self.replica_index stage.data[MdStageModifier.RANDOMIZE_VELOCITY_DOT_SEED] = seed stage_seed_modified = True return stage_seed_modified
[docs] def modifyReplicaMsjHeader(self, msj_stages): """ Modify the replica Msj header. :param msj_stages: The msj header with task stage (may have other simulation stages) :type msj_stages: schrodinger.application.desmond.multisim.Msj :return: Whether the msj_stages are modified :rtype: bool """ if len(msj_stages.stage) < 2: return False sim_stages = [x for x in msj_stages.stage if 'simulate' in x] rand_vel_stages = [ x for x in sim_stages if 'randomize_velocity' in x.simulate ] seed_stages = [ x for x in rand_vel_stages if self.SEED in x.simulate.randomize_velocity ] if seed_stages: stage = seed_stages[0] elif sim_stages: stage = sim_stages[0] else: return False try: seed = stage.simulate.randomize_velocity.seed.val + self.replica_index except AttributeError: # seed_stage.simulate doesn't have a randomize_velocity.seed attribute seed = parserutils.RANDOM_SEED_DEFAULT + self.replica_index stage.simulate.set_value(MdStageModifier.RANDOMIZE_VELOCITY_DOT_SEED, seed) return True
[docs] def modifyReplicaFirstStage(self, stages): """ Modify the first desmond stages to add seed based on the replica index :param stages: The desmond MSJSTRINGERS contains desmond information :type stages: list of the MSJSTRINGERS """ if not stages: return seed = parserutils.RANDOM_SEED_DEFAULT + self.replica_index stages[0].data[MdStageModifier.RANDOMIZE_VELOCITY_DOT_SEED] = seed
def _getCommand(self): """ Return the cmd to run the job. :type raw_cmd: list of str :param raw_cmd: Command line options :rtype: list of str :return: The command line options for a subjob. """ cmd = desmondutils.get_multisim_command(TERMINAL_INFILE, self.outfile, msj_name=self.msj, job_name=f'{self.jobname}') return cmd
[docs] @classmethod def optionsToStage(cls, options): """ Convert command line options to a stage like dict. :param options: The command parsered options :type options: `argparse.Namespace` :return: Key and values for a desmond stage :rtype: dict """ stage_dict = {IS_PRODUCTION: DESMOND_YES} for cmd_flag, desmond_key in cls.FLAG_TO_KEY.items(): desmond_value = jobutils.get_option(options, cmd_flag) if not desmond_value: continue unit_funct = cls.DESMOND_UNIT_CONVERTERS.get(cmd_flag) if unit_funct: desmond_value = unit_funct(desmond_value) stage_dict[desmond_key] = desmond_value return stage_dict
[docs] @classmethod def optionsToTaskProtocols(cls, options, task_name, seed=None): """ Process the stages, for example, update relaxation stages, update the productions stage, and remove productions markers. :param options: The parsed command line options :type options: `argparse.Namespace` :param task_name: Task name :type task_name: str :param seed: Random seed to randomize the initial velocities :type seed: int or None :return: Updated task protocol :rtype: `TaskProtocol` """ task_protocol = MultiStageMd.getAllStages(options, task_name) # Here, orig_stages may have multiple or zero stages marked by production # filled with command stage info; they may have command line relaxation # stages or yaml typed stages before the production. (at least no one marked # with 'use customized relaxation' anymore. cls.modifyStageSettings(task_protocol, options, seed=seed) return task_protocol
[docs] @classmethod def getRelaxationProtocolFile(cls, relaxation_option): """ Return the relaxation protocol file based on the relaxation option. :param relaxation_option: File with path or a protocol name :type relaxation_option: str :return: Relaxation file with path :rtype: str """ if relaxation_option not in cls.RELAXATION_MAP: return relaxation_option relaxation_filename = cls.RELAXATION_MAP[relaxation_option] return Config.get_file(relaxation_filename)
[docs] @classmethod def getAllStages(cls, options, task_name, time=100.): """ Construct the stages in general by looking at the config yaml, commandline relaxation, and productions. Note: only when stages are marked by USE_CUSTOMIZED_RELAXATION and options.relaxation_protocol is provided, the stages in the relaxation_protocol is used to overwrite the stages in options.config_yaml. :param options: The object holding all the cmd options :type options: `argparse.Namespace` :param task_name: The task name of the stage :type task_name: str :param time: A short md threshold threshold for debug mode (simulation time longer than this is reduced to this value for fast testing) :type time: float :return: task_protocol.protocol is a list of stages :rtype: 'TaskProtocol' """ # Read the config yaml and replace the relaxation stages if needed config = get_config_from_options(options) task_protocol = config.getTaskProtocol(task_name=task_name) stage_indexes_to_replace = MdStageModifier.getStageIndicesByMarker( task_protocol.protocol, marker=USE_CUSTOMIZED_RELAXATION, remove_marker=True) if not stage_indexes_to_replace: if not schrodinger.in_dev_env(): return task_protocol for stage in task_protocol.protocol: if stage.get(IS_RELAXATION) == DESMOND_YES: continue # Use short time in debug mode for quick testing if float(stage.get(cls.TIME, 0.)) > time: stage[cls.TIME] = time return task_protocol # USE_CUSTOMIZED_RELAXATION use the cmd relaxation # to replace this one and the stages before this one relaxation_file = cls.getRelaxationProtocolFile( options.relaxation_protocol) relax_stages_from_file = cls.getRelaxationStages(relaxation_file) if not relax_stages_from_file: return task_protocol first_index = stage_indexes_to_replace[0] stages_after_relax = task_protocol.protocol[first_index + 1:] task_protocol.protocol = relax_stages_from_file + stages_after_relax return task_protocol
[docs] @staticmethod def getRelaxationStages(relaxation_file): """ Get the relaxation stages from relaxation protocol file and return it as desmond stages. :param relaxation_file: A file containing relaxation protocol :type relaxation_file: str :return: list of desmond stages :rtype: list """ from schrodinger.application.desmond import stage # noqa: F401, required for cmj.py if not relaxation_file: return [] if relaxation_file.endswith('.yaml'): config = get_config(relaxation_file) task_protocol = config.getTaskProtocol( task_type=MD_SIMULATION_PROTOCOL) return task_protocol.protocol elif relaxation_file.endswith(f'.{MSJ}'): try: stages = cmj.parse_msj(relaxation_file) except (SyntaxError, RuntimeError) as msg: raise ValueError('Unable to read msj file %s :\n%s' % (relaxation_file, msg)) msj_str = cmj.write_msj(stages, to_str=True) stages = [{MSJ: msj_str}] return stages
[docs] @classmethod def modifyStageSettings(cls, task_protocol, options, seed=None): """ Modify the stages based on command line options, including update the temperature and pressure of the first flexible stages before each production stage, pass velocity to NVE production stage. :param task_protocol: task_protocol.protocol is a list of stages :type task_protocol: 'TaskProtocol' :param options: The parsed command line options :type options: `argparse.Namespace` :param seed: Random seed to randomize the initial velocities :type seed: int or None """ md_stage_modifier = MdStageModifier(task_protocol, options, seed=seed) md_stage_modifier.run()
[docs]class MdStageModifier(object): TEMPERATURE = desmondutils.MDMSJStringer.TEMP RANDOMIZE_VELOCITY = 'randomize_velocity' FIRST = 'first' RANDOMIZE_VELOCITY_DOT_FIRST = f'{RANDOMIZE_VELOCITY}.{FIRST}' RANDOMIZE_VELOCITY_DOT_SEED = desmondutils.MSJStringer.SEED LAST_RELAX_STAGE_TRJ_INTVL = 100. OTHER_RELAX_STAGE_TRJ_INTVL = 500.
[docs] def __init__(self, task_protocol, options, seed=None): """ :param task_protocol: task_protocol.protocol is a list of stages :type task_protocol: 'TaskProtocol' :param options: The parsed command line options :type options: `argparse.Namespace` :param seed: Random seed to randomize the initial velocities :type seed: int or None """ self.task_protocol = task_protocol self.options = options self.seed = seed self.setStages() self.relax_stage_indexes = [] self.production_stage_indexes = []
[docs] def run(self): """ Main method to drive the functionality. """ self.setRelaxAndProductionIndexes() self.setRelaxRandomVel() if not any([self.relax_stage_indexes, self.production_stage_indexes]): # No stages marked by relaxation or production. Use settings as is. return if self.is_msj: # MSJ stages are relaxation stages and the driver reads yaml protocol self.duplicateRelaxStage() self.updateLastStage() self.saveMsjStages() return self.updateProductStageTempAndPress() self.updateRelaxStageTempAndPress() self.connectNVEInitialVelocity() self.updateTrjIntvel() self.duplicateRelaxStage() self.updateLastStage() self.removeCom() self.updateDipoleInterval() self.removeDummyFields()
[docs] def setStages(self): """ Set stages. If the protocol contains only one stage and the only item in this stage list is a dict with 'msj' being the only key, the stage is parsed by standard msj parser. Otherwise, each stage is a list. """ protocol = self.task_protocol.protocol self.is_msj = len(protocol) == 1 and MSJ in protocol[0] if not self.is_msj: self.stages = protocol return # MSJ file must be a relaxation protocol, and the production settings # will be constructed from command flags. from schrodinger.application.desmond.multisim import parser self.stages = parser.parse(string=protocol[0][MSJ]).stage
[docs] def setRelaxAndProductionIndexes(self): """ Set relaxation and production stage indexes according to markers. Note: if only relaxation stages are found, the stage right after the last relaxation is assumed to be the production stages. """ if self.is_msj: # MSJ is always a relaxation protocol self.relax_stage_indexes = [len(self.stages) - 1] self.production_stage_indexes = [len(self.stages)] return self.production_stage_indexes = self.getStageIndicesByMarker( self.stages, marker=IS_PRODUCTION) self.relax_stage_indexes = self.getStageIndicesByMarker( self.stages, marker=IS_RELAXATION) if not self.production_stage_indexes and self.relax_stage_indexes: stage_indexes_after_relax = [self.relax_stage_indexes[-1] + 1] non_product_stage_indexes = self.getStageIndicesByMarker( self.stages, marker=IS_PRODUCTION, expected_value=DESMOND_NO) self.production_stage_indexes = [ x for x in stage_indexes_after_relax if x not in non_product_stage_indexes ] if self.production_stage_indexes and not self.relax_stage_indexes: stage_indexes_before_product = [ self.production_stage_indexes[0] - 1 ] non_relax_stage_indexes = self.getStageIndicesByMarker( self.stages, marker=IS_RELAXATION, expected_value=DESMOND_NO) self.relax_stage_indexes = [ x for x in stage_indexes_before_product if x not in non_relax_stage_indexes ]
[docs] def setRelaxRandomVel(self): """ Set the randomize velocity seed for the first relaxation stage. """ if not self.stages or not self.relax_stage_indexes: return if self.is_msj: first_md_stage_index = 1 if TASK in str(self.stages[0]) else 0 stage = self.stages[first_md_stage_index] value = sea.sea.Map({self.FIRST: 0.0}) stage.simulate.set_value(self.RANDOMIZE_VELOCITY, value) if self.seed is None: return stage.simulate.randomize_velocity.set_value(MultiStageMd.SEED, self.seed) return self.stages[0][self.RANDOMIZE_VELOCITY_DOT_FIRST] = 0.0 if self.seed is None: return self.stages[0][self.RANDOMIZE_VELOCITY_DOT_SEED] = self.seed
[docs] @staticmethod def getStageIndicesByMarker(stages, marker=IS_PRODUCTION, expected_value=DESMOND_YES, remove_marker=False): """ Indexes of the stages marked with certain markers. For example, IS_PRODUCTION markers stages as production stages; IS_RELAXATION markers stages as relaxation stages; If USE_CUSTOMIZED_RELAXATION markers some stages, the first with this marker and those before this one will be replaced by customized relaxation stages defined by users. :param stages: Multistages for desmond serial simulations :type stages: list :param marker: Remove the marker flag, if True :type marker: bool :param expected_value: This is expected value pointed to by the marker :type expected_value: str :param remove_marker: Remove the yes production flag, if True :type remove_marker: bool :return: The indexes of all the stages marked with 'xxx:yes' :rtype: list of int """ # NOTE: Mac and Linux parse 'yes' differently: string / bool if expected_value == DESMOND_YES: expected_values = set([True, DESMOND_YES]) elif expected_value == DESMOND_NO: expected_values = set([False, DESMOND_NO]) else: raise ValueError( f"{expected_value} is not {DESMOND_YES} or {DESMOND_NO}") stage_indexes = [] for index, stage in enumerate(stages): if remove_marker: value = stage.pop(marker, None) else: value = stage.get(marker, None) if value in expected_values: stage_indexes.append(index) return stage_indexes
[docs] def updateProductStageTempAndPress(self): """ Update the temp and press of the production stage. """ if not self.production_stage_indexes: return # IS_PRODUCTION stages use cmd production stage info stage_from_cmd = MultiStageMd.optionsToStage(self.options) for prod_stage_index in self.production_stage_indexes: if prod_stage_index >= len(self.stages): # The stages are of purely relaxations continue # The users' input has higher priority, and use default when # users don't specify self.stages[prod_stage_index].update(stage_from_cmd)
[docs] def updateRelaxStageTempAndPress(self): """ Update the temperature and pressure settings of stages. If no NPT or NVT ahead of the NVE, randomize the NVE production velocity. """ if self.is_msj or self.options.md_ensemble in [emd.NPT]: # The production stage can adjust itself's temp and press return # Update the temperature and pressure of the stages ahead of the production # stage until reaching the closest NPT. for stage_index in self.production_stage_indexes: if stage_index == 0: continue relax_stages = self.stages[stage_index - 1::-1] ensembles = [ x.get(MultiStageMd.ENSEMBLE, None) for x in relax_stages ] has_npt = emd.NPT in ensembles for relax_stage in relax_stages: ensemble = relax_stage.get(MultiStageMd.ENSEMBLE) if ensemble in [emd.NVT, emd.NPT]: relax_stage[MultiStageMd.TEMP] = self.options.md_temp if ensemble == emd.NPT: # set the pressure of the first previous NPT relax_stage[MultiStageMd.PRESS] = self.options.md_press break if not has_npt: # NO NPT available, just set the temperature for the first # previous NVT break
[docs] def connectNVEInitialVelocity(self): """ Seamlessly connect the NVE production stage to the previous stages by writing out velocities and skip the randomizing initial velocities. """ if not self.options.md_ensemble == emd.NVE: return for stage_index in self.production_stage_indexes: try: production_stage = self.stages[stage_index] except IndexError: # This md stages have no production stage production_stage = {} for pre_stage in self.stages[stage_index - 1::-1]: ensemble = pre_stage.get(MultiStageMd.ENSEMBLE) if ensemble in [emd.NVT, emd.NPT]: # Don't use the NVE production velocity but pass the previous # NPT or NVT velocities to the NVE production. # Also write out the velocity of the last stage pre_stage[MultiStageMd.TRJ_WRITE_VEL] = DESMOND_TRUE production_stage[self.RANDOMIZE_VELOCITY_DOT_FIRST] = 'inf' break else: # If no NPT or NVT ahead of the NVE, randomize the NVE production # velocity production_stage[self.RANDOMIZE_VELOCITY_DOT_FIRST] = 0.
[docs] def duplicateRelaxStage(self): """ Update and duplicate the last relaxation stage settings. """ if not self.relax_stage_indexes: return last_relax_stage_index = self.relax_stage_indexes[-1] last_relax_stage = self.stages[last_relax_stage_index] ps_time = self.options.md_time_last_relaxation * units.NANO2PICO if self.is_msj: last_relax_stage.simulate.set_value(self.TEMPERATURE, self.options.md_temp) last_relax_stage.simulate.set_value(MultiStageMd.PRESS, self.options.md_press) for key in MultiStageMd.LAST_STAGE_DICT.keys(): try: # This isn't the last stage in output last_relax_stage.simulate.del_key(key) except KeyError: pass self.stages.append(copy.deepcopy(last_relax_stage)) self.stages[-1].simulate.set_value(MultiStageMd.TIME, ps_time) self.stages[-1].simulate.set_value(MultiStageMd.ENSEMBLE, emd.NPT) return # The last relaxation stage has mediate trajectory output interval, and # maybe be useful for average cell stage last_relax_stage[ MultiStageMd.TRJ_INTVL] = self.LAST_RELAX_STAGE_TRJ_INTVL self.stages.append(copy.deepcopy(last_relax_stage)) # Update the relaxation stage time self.stages[-1][MultiStageMd.TIME] = ps_time self.stages[-1][MultiStageMd.ENSEMBLE] = emd.NPT
[docs] def updateLastStage(self): """ Update the settings of the last stage. """ if not self.stages: return if self.is_msj: for key, val in MultiStageMd.LAST_STAGE_DICT.items(): # '""' to "" self.stages[-1].simulate.set_value(key, val[1:-1]) return self.stages[-1].update(MultiStageMd.LAST_STAGE_DICT)
[docs] def saveMsjStages(self): """ Save the msj stages to the task protocol """ # task = {task = "desmond:auto"} to task {task = "desmond:auto"} msj_vals = [str(x).replace('=', '', 1) for x in self.stages] self.task_protocol.protocol = [{MSJ: '\n'.join(msj_vals)}]
[docs] def updateTrjIntvel(self): """ Update the trajectory output interval for the stages before the last relaxation stage. """ if not self.relax_stage_indexes: return last_relax_stage_index = self.relax_stage_indexes[-1] for relax_stage_index in range(last_relax_stage_index): relax_stage = self.stages[relax_stage_index] # Most relaxation stages don't need large trajectory output relax_stage[ MultiStageMd.TRJ_INTVL] = self.OTHER_RELAX_STAGE_TRJ_INTVL
[docs] def removeCom(self): """ Add remove center of mass string to relaxation and production stages. """ stage_indexes = [] if self.relax_stage_indexes: stage_indexes.append(self.relax_stage_indexes[-1]) if self.production_stage_indexes: for production_stage_index in self.production_stage_indexes: if production_stage_index >= len(self.stages): continue stage_indexes.append(production_stage_index) for stage_index in stage_indexes: stage = self.stages[stage_index] stage[f"backend.{emd.MDSIM_PLUGIN_REMOVE_COM_MOTION_FIRST}"] = 0.0 stage[ f"backend.{emd.MDSIM_PLUGIN_REMOVE_COM_MOTION_INTERVAL}"] = 0.0
[docs] def updateDipoleInterval(self): """ Update the dipole interval in production run """ if not self.production_stage_indexes: return for stage_index in self.production_stage_indexes: try: production_stage = self.stages[stage_index] except IndexError: # This md stages have no production stage production_stage = {} production_stage[MultiStageMd.DIPOLE_INTERVAL] = \ self.options.dipole_record_int
[docs] def removeDummyFields(self): """ Remove the intermediate arguments that desmond doesn't recognize. """ [y.pop(x, None) for x in MultiStageMd.DUMMY_FIELDS for y in self.stages]
[docs]class MdPostAnalysis(Step): """ Base class to hold molecular dynamics post analysis functionality. """ TEMP_PROP = 'r_matsci_Temperature(K)' TEMP_STD_PROP = 'r_matsci_stdev_Temperature(K)'
[docs] def setup(self, copy_ene=False, write_cms=False): """ Over write parent class method. """ if copy_ene: self.addEnefileToAdditionalInfiles() if write_cms: self.writeCms() super().setup(copy_infile=not write_cms)
[docs] def addEnefileToAdditionalInfiles(self): """ Add the ene file as one of the additional files. """ for infile in [self.infile] + self.additional_infiles: if not infile.endswith(CMS_OUT_EXT): continue cms_infile_path = os.path.join(os.pardir, infile) ene_file, orig_cms = self.getRelatedJobFile(cms_infile_path) log_debug(f'setup ene_file: {ene_file}') try: ene_file_from_master_job_dir = os.path.relpath( ene_file, os.pardir) except ValueError: # the two paths are on different drivers, for example, # ene file is on 'C:' drive, and the job starts on 'E:' drive ene_file_from_master_job_dir = ene_file self.additional_infiles.append(ene_file_from_master_job_dir)
[docs] def writeCms(self): """ Write out cms files to current dir and each of them has a 'msprops.ORIGINAL_CMS_PROP' property pointing to the original cms file path. """ for infile in [self.infile] + self.additional_infiles: if not infile.endswith(CMS_OUT_EXT): continue cms_infile_path = os.path.join(os.pardir, infile) cms_model = cms.Cms(cms_infile_path) # FIXME: this isn't supported when post trj analysis is directly after # another post trj analysis. # But this can be supported by doing: # continue jumping from current cms to original cms (as long they are # different) until finding the first one with trajectory around. # The saved original cms needs to abs path as there will be one in the # subjob launch dir (the same name as the one in the master job original # launch dir). original_abs_infile_path = os.path.abspath(cms_infile_path) cms_model.set_cts_property(msprops.ORIGINAL_CMS_PROP, original_abs_infile_path) file_basename = os.path.basename(infile) cms_model.write(file_basename) jobutils.add_outfile_to_backend( os.path.join(self.cmd_dir, file_basename), self.backend) try: self.additional_infiles.remove(infile) except ValueError: pass log_debug(f'{msprops.ORIGINAL_CMS_PROP}: {infile}')
[docs] @staticmethod def autocorrelation(time_serial_data, normalize=False): """ Calculate the auto correlation of the input numpy.array data. :param time_serial_data: Autocorrelation of each column of the data :type time_serial_data: n x m numpy.array :param normalize: If True, the auto correlation data will be normalized :type normalize: bool :return: Autocorrelated data (by column) :rtype: n x m numpy.array """ nrow, ncol = time_serial_data.shape data_autocorrelated = np.zeros((nrow, ncol)) dat_start = nrow - 1 dat_end = dat_start + nrow num_accu = np.linspace(nrow, 1, num=nrow) for icol in range(ncol): with warnings.catch_warnings(): warnings.simplefilter("ignore", FutureWarning) correlated = signal.fftconvolve(time_serial_data[:, icol], time_serial_data[::-1, icol], mode='full') data_autocorrelated[:, icol] = correlated[dat_start:dat_end] data_autocorrelated[:, icol] /= num_accu if normalize: data_autocorrelated[:, icol] /= data_autocorrelated[0, icol] return data_autocorrelated
[docs] @staticmethod def autocorrelationWithSum(time_serial_data, normalize=False): """ Calculate the auto correlation of the input numpy.array data. :param time_serial_data: Autocorrelation of each column of the data :type time_serial_data: n x m numpy.array :param normalize: If True, the auto correlation data will be normalized :type normalize: bool :return: Autocorrelated data (by column) :rtype: n x m numpy.array """ data = MdPostAnalysis.autocorrelation(time_serial_data, normalize=False) data_sum = np.sum(data, axis=1) data_with_sum = np.concatenate((data, data_sum.reshape((-1, 1))), axis=1) if not normalize: return data_with_sum nrow, ncol = data_with_sum.shape for icol in range(ncol): data_with_sum[:, icol] /= data_with_sum[0, icol] return data_with_sum
[docs] @staticmethod def getRelatedFile(cms_file, file_ext=ENE_EXT): """ Get the requested file based on cms file name and file extension :param cms_file: The cms file whose related file is searched. :type cms_file: str :param file_ext: the extension of file to search for. The file extension must include the separator as well. For example, to get ene file, file_ext must be ".ene" :type file_ext: str :return: Found file with correct extension :rtype: str or None """ def get_first_filepath(filepatterns): """ Return the first existing file. :param filepatterns: File patterns to search :type filepatterns: list of str :return: A found file :rtype: str or None """ for filepattern in filepatterns: filepaths = glob.glob(filepattern) for filepath in filepaths: if os.path.isfile(filepath): return filepath if not file_ext: return out_ext = '-out' basename, ext = fileutils.splitext(cms_file) if basename.endswith(out_ext): basename = basename[:-len(out_ext)] # For safety only, certain ene filename may be slightly different # (cannot reproduce now, but did happened once for a second run in # the same dir and without -HOST/-SUBHOST flags: {basename}_1.ene) filepatterns = [ '%s%s' % (basename, file_ext), '%s*%s' % (basename, file_ext), os.path.join(os.path.dirname(cms_file), '*%s' % file_ext) ] filepath = get_first_filepath(filepatterns) return filepath
[docs] @staticmethod def getRelatedJobFile(input_cms, orig_cms=None, file_ext=ENE_EXT): """ Search existing ene file and return the ene found based on the input_cms. Existing ene file may be found in current folder, in the input cms folder or in the PROP_CMS ('s_m_original_cms_file') folder when ene is not copied from the PROP_CMS folder to the current folder. :param input_cms: INPUT_CMS defined for INPUT_TRJ searching. :type input_cms: str :param orig_cms: The original cms defined by users. :type orig_cms: str :param file_ext: the extension of file to search for. The file extension must include the separator as well. For example, to get ene file, file_ext must be ".ene" :type file_ext: str :raise KeyError, FileNotFoundError, ValueError: when ene file cannot be found. :rtype: str, str or None :return: Existing ene file if found, the original cms file that the ene file sits with or None (sit with the input_cms) """ no_ene_err_msg = f"Cannot locate a {file_ext} file for {input_cms}." afile = MdPostAnalysis.getRelatedFile(input_cms, file_ext=file_ext) if afile: return afile, None if not orig_cms or not os.path.exists(orig_cms): if not os.path.exists(input_cms): raise FileNotFoundError(f"{input_cms} not found)") # input_cms sets the path base and orig_cms may be relative to orig_cms msys_model, cms_model = topo.read_cms(input_cms) try: orig_cms = cms_model.property[msprops.ORIGINAL_CMS_PROP] except KeyError: raise KeyError( f"{no_ene_err_msg} ({msprops.ORIGINAL_CMS_PROP} not found)") if not os.path.isabs(orig_cms): # input_cms sets the path base and orig_cms may be relative to orig_cms if not os.path.isabs(input_cms): input_cms = os.path.abspath(input_cms) orig_cms = os.path.join(os.path.dirname(input_cms), orig_cms) # orig_cms may sit with the cms input relative to the original # launch dir, but may not be added as jobcontrol input file. orig_cms = fileutils.get_existing_filepath(orig_cms) if not orig_cms: raise FileNotFoundError( f"{no_ene_err_msg} ({orig_cms} not found)") afile = MdPostAnalysis.getRelatedFile(orig_cms, file_ext=file_ext) if not afile: raise FileNotFoundError(f"{no_ene_err_msg} (and {orig_cms})") return afile, orig_cms
[docs]class DensityAnalysis(MdPostAnalysis): """ Class to do post desmond density analysis. """ DRIVER_PATH = path_join(MD_POST_DENSITY_PATH) DENSITY = 'density' DEFAULT_JOBNAME = PERMITTIVITY_TASK % DENSITY DENSITY_PROP = 'r_matsci_Density(g/cm3)' DENSITY_STD_PROP = 'r_matsci_stdev_Density(g/cm3)' DENSITY_REPLICA_PROP = 'r_matsci_Density_Replica_%i(g/cm3)' DENSITY_STD_REPLICA_PROP = 'r_matsci_stdev_Density_Replica_%i(g/cm3)' TEMP_REPLICA_PROP = 'r_matsci_Temperature_Replica_%i(K)' TEMP_STD_REPLICA_PROP = 'r_matsci_stdev_Temperature_Replica_%i(K)' DENSITY_REPLICA_PROP_REX = r'r_matsci_Density_Replica_(\d)\(g/cm3\)' TEMP_REPLICA_PROP_REX = r'r_matsci_Temperature_Replica_(\d)\(K\)' DENSITY_EXT = '_@_{density:.5g}(g/cm3)' OUTFILE_EXT = CMS_OUT_EXT
[docs] def setup(self, copy_ene=True): """ Over write parent class method. """ super().setup(copy_ene=copy_ene)
[docs] @staticmethod def finalizeStep(job): """ Finalize this step. Can be overwritten for light weight post process. :type job: An instance of a `BaseJob` subclass (such as `JobControlJob` or `SubprocessJob`) :param job: Current job """ super().finalizeStep(job) msys_model, cms_model = topo.read_cms(job.outfile) struct = cms_model.fsys_ct for prop_key, prop_val in struct.property.items(): match = re.match(job.DENSITY_REPLICA_PROP_REX, prop_key) if match is None or len(match.groups()) != 1: continue replica_index = int(match.groups()[0]) log_tag = job.REPLICA_TAG % str(replica_index) density_prop = job.DENSITY_REPLICA_PROP % replica_index density_std_prop = job.DENSITY_STD_REPLICA_PROP % replica_index density = struct.property[density_prop] density_std = struct.property[density_std_prop] job.log(f"{log_tag}The density of the system is {density:.4f} " f"g/cm3 with standard deviation of {density_std:.4f}") temp_prop = job.TEMP_REPLICA_PROP % replica_index temp_std_prop = job.TEMP_STD_REPLICA_PROP % replica_index temp = struct.property[temp_prop] temp_std = struct.property[temp_std_prop] job.log(f"{log_tag}The temperature of the system is {temp:.4f} K " f"with standard deviation of {temp_std:.4f}") job.log('', add_log_tag=False) density = struct.property[DensityAnalysis.DENSITY_PROP] temp = struct.property[DensityAnalysis.TEMP_PROP] density_std = struct.property[DensityAnalysis.DENSITY_STD_PROP] temp_std = struct.property[DensityAnalysis.TEMP_STD_PROP] job.log(f"The density of the system is {density:.4f} " f"g/cm3 with standard deviation of {density_std:.4f}") job.log(f"The temperature of the system is {temp:.4f} K " f"with standard deviation of {temp_std:.4f}") if density < 0.1: job.log("", add_log_tag=False) job.log("WARNING: the result may be inaccurate as the density of " "system is below 0.1 g/cm3, an indication of gas phase. ")
[docs]class MdPostTrajectoryAnalysis(MdPostAnalysis): """ Base class to hold molecular dynamics post trajectory analysis functionality. """
[docs] @staticmethod def getTrajectoryFromCms(input_cms, orig_cms=None): """ Search existing trajectory and return the trajectory found based on the input_cms. Existing trajectory (desmondutils.PROP_TRJ) may be found in current folder, sitting with input cms or sitting with the original cms defined by users or pointed to by PROP_CMS ('s_m_original_cms_file') when trajectory is not copied from the PROP_CMS folder to the current folder. :param input_cms: INPUT_CMS defined for INPUT_TRJ searching. :type input_cms: str :param orig_cms: The original cms defined by users. :type orig_cms: str :raise KeyError, FileNotFoundError, ValueError: when trajectory cannot be found. :rtype: str, str or None :return: existing trajectory if found, whether the input trajectory sits with the original cms """ no_trj_err_msg = f"Cannot locate a trajectory for {input_cms}." traj_dir = topo.find_traj_path_from_cms_path(input_cms) if traj_dir: return traj_dir, None if not orig_cms or not os.path.isfile(orig_cms): # orig_cms is not defined by users or the defined one doesn't exist msys_model, cms_model = topo.read_cms(input_cms) try: orig_cms = cms_model.property[msprops.ORIGINAL_CMS_PROP] except KeyError: raise KeyError( f"{no_trj_err_msg} ({msprops.ORIGINAL_CMS_PROP} not found)") if not os.path.isabs(orig_cms): # input_cms sets the path base and orig_cms may be relative to orig_cms if not os.path.isabs(input_cms): input_cms = os.path.abspath(input_cms) orig_cms = os.path.join(os.path.dirname(input_cms), orig_cms) # orig_cms may sit with the cms input relative to the original # launch dir, but may not be added as jobcontrol input file. orig_cms = fileutils.get_existing_filepath(orig_cms) if not orig_cms: raise FileNotFoundError(f"{no_trj_err_msg} ({orig_cms} not found)") traj_dir = topo.find_traj_path_from_cms_path(orig_cms) if not traj_dir: raise FileNotFoundError(f"{no_trj_err_msg} (and {orig_cms})") try: traj.read_traj(traj_dir) except Exception as e: raise ValueError('Cannot load trajectory file: %s' % e) return traj_dir, orig_cms
[docs] @classmethod def registerCmsAndTrjToJob(cls, options, job_builder, orig_cms=None): """ Register cms file and trajectory folder to job if needed. :param options: The parser to get the cms options from and set the trj options for. :type options: `argparse.Namespace` :param job_builder: JobSpecification object :type job_builder: `launchapi.JobSpecification` """ input_cms_abspath = os.path.abspath(options.input_cms) input_cms_filename = os.path.basename(options.input_cms) jobhost = os.environ.get('JOBHOST') log_debug(f"registerCmsAndTrjToJob: {jobhost}") if jobhost and jobhost != 'localhost': # Only copy the trajectory to remote host. jobutils.add_folder_to_job_builder(job_builder, options.input_trj) # getTrajectoryFromCms returns found_trj and whether trj sits with orig_cms elif not cls.getTrajectoryFromCms(options.input_cms, orig_cms=orig_cms)[1]: # This is localhost or no jobcontrol, don't copy the trajectory to save disk # space. Then the msprops.ORIGINAL_CMS_PROP needs to point to the original cms in # the launch dir. cms_model = cms.Cms(options.input_cms) cms_model.set_cts_property(msprops.ORIGINAL_CMS_PROP, input_cms_abspath) temp_dir = os.curdir orig_cms_in_cur_dir = os.path.isfile(input_cms_filename) if orig_cms_in_cur_dir: temp_dir = os.path.join(temp_dir, TMP_DIR) with chdir(temp_dir, always_create=orig_cms_in_cur_dir) as p_chdir: temp_dir_in = p_chdir.dirname input_cms_abspath = os.path.join(temp_dir_in, input_cms_filename) cms_model.write(input_cms_abspath) job_builder.setInputFile(input_cms_abspath, runtime_path=input_cms_filename)
[docs]class AveCellAnalysis(MdPostTrajectoryAnalysis): """ Class to do post cell average analysis on desmond trajectory. """ DRIVER_PATH = path_join(MD_POST_AVE_CELL_PATH) AVE_CELL = 'ave_cell' DEFAULT_JOBNAME = PERMITTIVITY_TASK % AVE_CELL OUTFILE_EXT = CMS_OUT_EXT
[docs] def setup(self, copy_ene=True, write_cms=True): """ Over write parent class method. """ super().setup(copy_ene=copy_ene, write_cms=write_cms)
[docs] @staticmethod def finalizeStep(job): """ Finalize this step. Can be overwritten for light weight post process. :type job: An instance of a `BaseJob` subclass (such as `JobControlJob` or `SubprocessJob`) :param job: current job """ super().finalizeStep(job) msys_model, cms_model = topo.read_cms(job.outfile) job.log('The average cell dimensions vector are:') job.log(f'a = {cms_model.box[0:3]}') job.log(f'b = {cms_model.box[3:6]}') job.log(f'c = {cms_model.box[6:]}')
[docs]class AnalyzeSimulation(MdPostTrajectoryAnalysis): """ Class to analyze trajectory from desmond simulation using analyze_simulation.py """ DRIVER_PATH = path_join(MD_POST_ANALYZE_SIMULATION) ANALYZE_SIMULATION = 'analyze_simulation' DEFAULT_JOBNAME = PERMITTIVITY_TASK % ANALYZE_SIMULATION OUTFILE_EXT = CMS_OUT_EXT ST2_EXT = '.st2' OUT_ST2_EXT = '-out' + ST2_EXT INPUT_ST2 = 'input_st2' OUT_ST2_PROP = 's_matsci_St2_File'
[docs] def __init__(self, *arg, **kwarg): """ See parent class for documentation """ super().__init__(*arg, **kwarg) self.input_st2 = self.jobname + self.ST2_EXT self.out_st2 = self.jobname + self.OUT_ST2_EXT
[docs] def setup(self): """ Over write parent class method. """ super().setup(write_cms=True) self.appendTrjInOutst2ToCmd() self.createInputSt2()
[docs] def appendTrjInOutst2ToCmd(self): """ Append the trajectory, input st2, and output st2 information to the cmd. """ traj_folder, orig_cms = self.getTrajectoryFromCms( os.path.basename(self.infile)) self._command += [traj_folder, self.out_st2, self.input_st2]
[docs] def createInputSt2(self): """ Create input st2 file based on the configure yaml. """ task_protocol = self.config.getTaskProtocol(task_name=self.task_name) input_st2_str = task_protocol.protocol[self.INPUT_ST2] st2_sea = sea.Map(input_st2_str) with open(self.input_st2, "w") as f_st2: f_st2.write(str(st2_sea))
[docs] def setOutfile(self): """ Set the outfile of a job. """ cms_model = cms.Cms(os.path.basename(self.infile)) cjob = self.getJob() out_st2 = self.getFirstMatchedfilename(cjob.OutputFiles, [self.OUT_ST2_EXT]) if out_st2: cms_model.set_cts_property(self.OUT_ST2_PROP, out_st2) self.outfile = self.jobname + self.OUTFILE_EXT cms_model.write(self.outfile) super().setOutfile()
[docs] @staticmethod def finalizeStep(job): """ Finalize this step. Can be overwritten for light weight post process. :type job: An instance of a `BaseJob` subclass (such as `JobControlJob` or `SubprocessJob`) :param job: Current job """ super().finalizeStep(job) msys_model, cms_model = topo.read_cms(job.outfile) job.log( f'The analyzed results are saved in {cms_model.property[job.OUT_ST2_PROP]}' )
[docs]class DipoleAnalysis(MdPostTrajectoryAnalysis): """ Class to do post dipole analysis on desmond trajectory. """ FLAG_SLICE_INTVL = '-slice_intvl' FLAG_NORMALIZATION = '-normalization' DRIVER_PATH = path_join(MD_POST_DIPOLE_PATH) DIPOLE = 'dipole' STATISTICS_CSV_EX = '_dp' + CSV_OUT_EXT STATISTICS_FILE_PROP = f"s_matsci_{DIPOLE.capitalize()}_Statistics_File" DEFAULT_JOBNAME = PERMITTIVITY_TASK % DIPOLE OUTFILE_EXT = CMS_OUT_EXT TIME_SERIAL = 'time_serial' AUTOCORRELATION = 'autocorrelation' COMPLEX_PERMITTIVITY = 'complex_permittivity' SUSCEPTIBILITY = 'susceptibility' TIME_SERIAL_CAPITALIZE = '_'.join( [x.capitalize() for x in TIME_SERIAL.split('_')]) TIME_SERIAL_FILE_PROP = f"s_matsci_{DIPOLE.capitalize()}_{TIME_SERIAL_CAPITALIZE}_File" AUTOCORRELATION_FILE_PROP = f"s_matsci_{DIPOLE.capitalize()}_{AUTOCORRELATION.capitalize()}_File" MD_COMPLEX_PERMITTIVITY_FILE_PROP = f"s_matsci_Md_{COMPLEX_PERMITTIVITY.capitalize()}_File" COMPLEX_PERMITTIVITY_FILE_PROP = f"s_matsci_{COMPLEX_PERMITTIVITY.capitalize()}_File" DIPOLE_SUSCEPTIBILITY_PROP = f"r_matsci_{DIPOLE.capitalize()}_{SUSCEPTIBILITY.capitalize()}" DIPOLE_SUSCEPTIBILITY_STD_PROP = f"r_matsci_stdev_{DIPOLE.capitalize()}_{SUSCEPTIBILITY.capitalize()}" SYSTEMWIDE_DIPOLE = 'systemwide_dipole' SYSTEMWIDE_DIPOLE_X_PROP = "r_matsci_Systemwide_Dipole_X" SYSTEMWIDE_DIPOLE_Y_PROP = "r_matsci_Systemwide_Dipole_Y" SYSTEMWIDE_DIPOLE_Z_PROP = "r_matsci_Systemwide_Dipole_Z" SYSTEMWIDE_DIPOLE_PROPS = [ SYSTEMWIDE_DIPOLE_X_PROP, SYSTEMWIDE_DIPOLE_Y_PROP, SYSTEMWIDE_DIPOLE_Z_PROP ] SYSTEMWIDE_DIPOLE_X_STD_PROP = "r_matsci_Systemwide_Dipole_X_SD" SYSTEMWIDE_DIPOLE_Y_STD_PROP = "r_matsci_Systemwide_Dipole_Y_SD" SYSTEMWIDE_DIPOLE_Z_STD_PROP = "r_matsci_Systemwide_Dipole_Z_SD" SYSTEMWIDE_DIPOLE_STD_PROPS = [ SYSTEMWIDE_DIPOLE_X_STD_PROP, SYSTEMWIDE_DIPOLE_Y_STD_PROP, SYSTEMWIDE_DIPOLE_Z_STD_PROP ] KWW = 'kww' TAU = 'tau' BETA = 'beta' TAU_START_PROP = 'r_matsci_Permittivity_Tau_Start_(ns)' TAU_END_PROP = 'r_matsci_Permittivity_Tau_End_(ns)' KWW_TAU_PROP = f"r_matsci_{KWW.capitalize()}_{TAU.capitalize()}_(ps)" KWW_BETA_PROP = f"r_matsci_{KWW.capitalize()}_{BETA.capitalize()}" TIME_SERIAL_LABELS = [ 'Time (ps)', 'Dipole_x (debye)', 'Dipole_y (debye)', 'Dipole_z (debye)', '<Dipole^2> (debye^2)' ] TAU_PS = 'Tau (ps)' DIPOLE_SQ = 'Dipole^2 (debye^2)' DIPOLE_SQ_MEAN = 'Dipole^2 - <Dipole>^2 (debye^2)' AUTOCORRELATION_LABELS = [ TAU_PS, 'Dipole_x^2 (debye^2)', 'Dipole_y^2 (debye^2)', 'Dipole_z^2 (debye^2)', DIPOLE_SQ, DIPOLE_SQ_MEAN ] TAU_PS_INDEX = AUTOCORRELATION_LABELS.index(TAU_PS) DIPOLE_SQ_INDEX = AUTOCORRELATION_LABELS.index(DIPOLE_SQ) DIPOLE_SQ_MEAN_INDEX = AUTOCORRELATION_LABELS.index(DIPOLE_SQ_MEAN) LABEL_MAP = { TIME_SERIAL: TIME_SERIAL_LABELS, AUTOCORRELATION: AUTOCORRELATION_LABELS } STD_DEV = "Std Dev of " SysDipole = collections.namedtuple('SysDipole', ['mean', 'sd'])
[docs] def setup(self, copy_ene=False, write_cms=True): """ Over write parent class method. """ super().setup(copy_ene=copy_ene, write_cms=write_cms)
[docs] @staticmethod def finalizeStep(job): """ Finalize this step. Can be overwritten for light weight post process. :type job: An instance of a `BaseJob` subclass (such as `JobControlJob` or `SubprocessJob`) :param job: Current job """ super().finalizeStep(job) struct = structure.Structure.read(job.outfile) try: job.log( "The relative electric susceptibility due to the systemwide dipole is " f"{struct.property[DipoleAnalysis.DIPOLE_SUSCEPTIBILITY_PROP]:.4f}" ) except KeyError: pass try: job.log( "Dielectric decay function of the systemwide dipole is saved as " f"{struct.property[DipoleAnalysis.AUTOCORRELATION_FILE_PROP]}") except KeyError: pass
[docs]class SusceptibilityMerge(DipoleAnalysis): """ Class to merge susceptibility. """
[docs] def setup(self, copy_ene=False, write_cms=True): """ Over write parent class method. """ super().setup(copy_ene=copy_ene, write_cms=write_cms) self.udpateCmd()
[docs] def udpateCmd(self): """ Update the command of the subjob: trim the input cms to one single; all the cms files are parsed for original cms and the original cms filenames are provided as the base filename without cms, csv, or npz. """ replica_keywords = self.task_protocol.getReplicaKeywords() if not replica_keywords or replica_keywords.get(REPLICA_TYPE) != MERGE: return log_debug(f"Original command: {' '.join(self._command)}") cms_files = [x for x in self._command if x.endswith(CMS_OUT_EXT)] base_filenames = [emd.FLAG_BASE_FILENAME] for cms_file in cms_files: msys_model, cms_model = topo.read_cms(cms_file) orig_cms = cms_model.property[msprops.ORIGINAL_CMS_PROP] base_filenames.append(orig_cms[:-len(CMS_OUT_EXT)]) cms_file_idxs = [self._command.index(x) for x in cms_files] cmd = self._command[:cms_file_idxs[0]] + self._command[ cms_file_idxs[-1]:] self._command = cmd + base_filenames log_debug(f"Subjob command: {' '.join(self._command)}")
[docs] @staticmethod def finalizeStep(job): """ Update child job's command with the parent job output, and set output files if not set by the subjob itself. :type job: An instance of a `BaseJob` subclass (such as `JobControlJob` or `SubprocessJob`) :param job: Current job """ super().finalizeStep(job) struct = structure.Structure.read(job.outfile) susceptibility = struct.property.get(job.DIPOLE_SUSCEPTIBILITY_PROP, None) std_susceptibility = struct.property.get( job.DIPOLE_SUSCEPTIBILITY_STD_PROP, None) if std_susceptibility is not None: job.log(f"Susceptibility is {susceptibility:.4f} with " f"{std_susceptibility:.4f} as standard deviation.")
[docs]class ComplexPermittivity(SusceptibilityMerge): """ Class to do post complex permittivity analysis based on dipole autocorrelation output files (npz, csv). """ MAX_DIEL_LOSS_VAL_PROP = 'r_matsci_Maximum_Dielectric_Loss' MAX_DIEL_LOSS_FREQ_PROP = 'r_matsci_Frequency_of_Maximum_Dielectric_Loss_(Hz)' # The smaller decay constant is good for 1 ps default interval and the # physics within a few nano seconds. On the other hand, the larger decay is # good for 500 ps interval and the slow physics in the range of 0.5 - 50 ns KWW_INIT_GUESSES = [[10, 0.8], [30000, 0.8]]
[docs] @staticmethod def finalizeStep(job): """ Finalize this step. Can be overwritten for light weight post process. :type job: An instance of a `BaseJob` subclass (such as `JobControlJob` or `SubprocessJob`) :param job: Current job """ super().finalizeStep(job) struct = structure.Structure.read(job.outfile) file_prop = struct.property.get(job.MD_COMPLEX_PERMITTIVITY_FILE_PROP) if file_prop is None: job.log( "ERROR: KWW fitting, dielectric loss, and dielectric storage " "calculations based on molecular dynamics simulations failed.") return job.log(f"KWW tau is {struct.property[job.KWW_TAU_PROP]:.4f} ps; " f"KWW beta is {struct.property[job.KWW_BETA_PROP]:.4f}") loss_peak_val = struct.property[job.MAX_DIEL_LOSS_VAL_PROP] loss_peak_freq = struct.property[job.MAX_DIEL_LOSS_FREQ_PROP] job.log( f"The normalized loss permittivity shows a peak of {loss_peak_val:.4f} " f"at the frequency of {loss_peak_freq / 1E9:.4f} GHz.") job.log(f"Complex permittivity based on molecular dynamics simulations " f"is saved as {file_prop}")
[docs]class JaguarCalculation(Step): """ Class to do a Jaguar calculation with keywords. """ OPT_ATTR = 'jaguar_options' IN_EXT = '.in' DRIVER_PATH = path_join(JAGUAR_RUN_PATH) DEFAULT_JOBNAME = PERMITTIVITY_TASK % JAGUAR FLAG_PARALLEL = '-PARALLEL' BASIS = 'basis' # Optimization IGEOPT = 'igeopt' # Overall charge on molecule, excluding point charges set in a pointch section # (default is 0) MOLCHG = 'molchg' # Spin multiplicity: 1 for singlet, 2 for doublet, etc. (default is 1, except # for ihamtyp=0, when multip=2 by default). If the spin-orbit ZORA Hamiltonian # is used (relham=zora-so), this keyword indicates the degeneracy of the state # (1 for closed shells, 2 for open shell doublet; higher is not relevant). MULTIP = 'multip' # RHF (Restricted Hartree-Fock), UHF (Unrestricted Hartree-Fock), # ROHF (Restricted Open Shell Hartree-Fock) IUHF = 'iuhf' DFTNAME = 'dftname' IPOLAR = 'ipolar' IFDPOL = 'ifdpol' IACC_FDPOL = 'iacc_fdpol' B3LYP = 'B3LYP' DEF2_SVPD = 'DEF2-SVPD' IACCG = 'iaccg' # Return the geometry that gives the lowest energy on geometry convergence # failure, when nofail=1 NOFAIL = 'nofail' # Convergence cutoff for switching from pseudospectral to analytic integral # evaluation in a geometry optimization. NOPS_OPT_SWITCH = 'nops_opt_switch' # Maximum number of optimization iterations MAXITG = 'maxitg' MAXIT = 'maxit' INT_KEYWORDS = [IGEOPT, MOLCHG, MULTIP, IUHF, IPOLAR] KEYWORD_DEFAULTS = { DFTNAME: B3LYP, IGEOPT: 0, MOLCHG: 0, MULTIP: 1, IUHF: None, IPOLAR: None } POLYMER_INT_KEYWORDS_DIFF = {BASIS: DEF2_SVPD} FLAG_JAGUAR_OPTIONS = '-jaguar_options' # str "basis=6-31G** dftname=B3LYP igeopt=1" is parsed into the following DEFAULT_JAGUAR_OPTIONS = { BASIS: '6-31G**', DFTNAME: B3LYP, IGEOPT: 1, NOFAIL: 1, NOPS_OPT_SWITCH: 10, MAXIT: 300, MAXITG: 300 } MAX_ATOM_NUM = jaguarworkflows.get_jaguar_max_atoms() # MATSCI-10847: mm.MMELEMENTS_VALENCE_MAX is raised in _get_valence_electrons() # from mm.mmjag_read_file(self.filename) in class JaguarInput(object) MMELEMENTS_VALENCE_MAX = mm.MMELEMENTS_VALENCE_MAX def _getCommand(self, host='localhost:1', append_prerun=True, pre_script=SCHRODINGER_JAGUAR): """ Construct command based on the config yaml and task name. :param host: Host name for this subjob. :type host: str :param append_prerun: If True, str (e.g. $SCHRODINGER/run) is prepended to the cmd :type append_prerun: bool :param pre_script: This is the prepended str. None means the default ($SCHRODINGER/run) :type pre_script: str or None :return: A command that launchapi can launch :rtype: list of str """ return super()._getCommand(host=host, append_prerun=append_prerun, pre_script=pre_script)
[docs] def setup(self): """ See parent class. """ super().setup() self.customizeInputAndCommand() log_debug(f'Updated command: {self._command}')
[docs] def customizeInputAndCommand(self): """ Jaguar stage requires its own input file instead of mae. The method writes out the jaguar input file and update the command with the jaguar input file. """ jaguar_infile = self.jobname + self.IN_EXT base_infile = os.path.basename(self.infile) self._command = [ x if not x.endswith(base_infile) else jaguar_infile for x in self._command ] self._command = [ x if x != '-JOBNAME' else '-jobname' for x in self._command ] struct = structure.Structure.read(base_infile) try: jinput = jaguar_input.JaguarInput(structure=struct, name=self.jobname) except mm.MmException as msg: raise (f'Failed to create Jaguar input... ({str(msg)})') task_protocol = self.config.getTaskProtocol(task_name=self.task_name) task_protocol.rmReplicaKeywords() # no replica info into jaguar file ixfb = task_protocol.getInterpFlagBlock(label=XTRA_FLAGS).copy() iuhf_value = ixfb.pop(self.IUHF, None) if iuhf_value and ixfb.get(self.MULTIP, 1) != 1: jinput.setValue(self.IUHF, iuhf_value) for keyword in self.INT_KEYWORDS: jaguar_value = ixfb.get(keyword, None) if jaguar_value is None or jaguar_value == NONE: continue ixfb[keyword] = int(jaguar_value) for key, value in ixfb.items(): jinput.setValue(key, value) jinput.save()
[docs] @staticmethod def finalizeStep(job): """ Finalize this step. Can be overwritten for light weight post process. :type job: An instance of a `BaseJob` subclass (such as `JobControlJob` or `SubprocessJob`) :param job: current job :return: The Jaguar output data :rtype: `JaguarOutput` or None """ super().finalizeStep(job) try: output = jaguar_output.JaguarOutput(job.name) except OSError: if job.outfile: job.log(f"{job.outfile} is found as the output.") return job.log(f"DFT functional: {output.functional}") job.log(f"DFT basis: {output.basis}") f"The energy of the last step is: {output.last_results.energy} hartrees" return output
[docs] @classmethod def optionsToTaskProtocols(cls, options, task_name, seed=None): """ Update the task protocol according to the user input. :param options: The parsed command line options :type options: `argparse.Namespace` :param task_name: Task name :type task_name: str :param seed: Random seed to randomize the initial velocities :type seed: int or None :return: Updated task protocol :rtype: `TaskProtocol` """ task_protocol = super().optionsToTaskProtocols(options, task_name, seed=seed) if not hasattr(options, cls.OPT_ATTR): return task_protocol if not getattr(options, cls.OPT_ATTR): task_protocol.update({REPLICA_TYPE: REMOVE}, block_label=XTRA_FLAGS) return task_protocol task_protocol.update(getattr(options, cls.OPT_ATTR), block_label=XTRA_FLAGS) if options.parallel: task_protocol.update({cls.FLAG_PARALLEL[1:]: options.parallel}) return task_protocol
[docs]class JaguarFinerCalculation(JaguarCalculation): """ Class to do a Jaguar calculation with finer basis set and functional. """ OPT_ATTR = 'jaguar_finer_options' FLAG_JAGUAR_FINER_OPTIONS = '-jaguar_finer_options' DEFAULT_JAGUAR_OPTIONS = JaguarCalculation.DEFAULT_JAGUAR_OPTIONS.copy() DEFAULT_JAGUAR_OPTIONS[JaguarCalculation.BASIS] = 'cc-pVTZ-pp(-f)++' DEFAULT_JAGUAR_OPTIONS[JaguarCalculation.DFTNAME] = JaguarCalculation.B3LYP
[docs]class ElectricPolarizability(JaguarCalculation): """ Class to do jaguar calculation of electric polarizability. """ OPT_ATTR = 'elec_polarizability' TPP = gnt_opm.Jaguar.TPP ELECTRIC_POLARIZABILITY = 'electric_polarizability' DEFAULT_JOBNAME = PERMITTIVITY_TASK % ELECTRIC_POLARIZABILITY FLAG_JAGUAR_OPTIONS = JaguarCalculation.FLAG_JAGUAR_OPTIONS FLAG_ELEC_POLARIZABILITY = '-elec_polarizability' DEFAULT_IPOLAR = -1 DEFAULT_JAGUAR_OPTIONS = { JaguarCalculation.BASIS: 'cc-pVTZ-pp(-f)++', JaguarCalculation.DFTNAME: JaguarCalculation.B3LYP, JaguarCalculation.NOFAIL: 1, JaguarCalculation.MAXIT: 300, } FDPOL_FREQ1 = mm.MMJAG_RKEY_FDPOL_FREQ1 FDPOLAR_FREQ1 = 'fdpolar_freq1' PROP = 'electric_polarizability' KEY = 'r_matsci_Electric_Polarizability_(bohr^3)' ELECTRIC_POLARIZABILITY_PROP = KEY DELIM = '_@_' ELECTRIC_POLARIZABILITY_FREQ_PROP = f'{KEY}{DELIM}%.4e{EV}' POLAR_ALPHA = 'polar_alpha' FDPOLAR_ALPHA1 = 'fdpolar_alpha1' MITERTD = mm.MMJAG_IKEY_MITERTD DEFAULT_KWARGS = collections.OrderedDict() DEFAULT_KWARGS[TPP] = 1 DEFAULT_KWARGS[gnt_opm.Jaguar.JAGUAR_OUTPUT_ATTR] = POLAR_ALPHA FLAG_NUM_CRU = PolymerBuilder.FLAG_NUM_CRU ELECTRIC_POLARIZABILITY_SLOPE = 'r_matsci_Electric_Polarizability_Slope_(bohr^3/monomer)' ELECTRIC_POLARIZABILITY_INTERCEPT = 'r_matsci_Electric_Polarizability_Intercept_(bohr^3)' ELECTRIC_POLARIZABILITY_SLOPE_FREQ = f'{ELECTRIC_POLARIZABILITY_SLOPE}{DELIM}%.4e{EV}' ELECTRIC_POLARIZABILITY_INTERCEPT_FREQ = f'{ELECTRIC_POLARIZABILITY_INTERCEPT}{DELIM}%.4e{EV}' PLYM = 'plym'
[docs] def __init__(self, *arg, **kwargs): super().__init__(*arg, **kwargs) iflag_block = self.task_protocol.getInterpFlagBlock(label=XTRA_FLAGS) self.freq = iflag_block.get(self.FDPOL_FREQ1, None) if self.freq: self.freq = float(self.freq)
[docs] def updateTaskReplicaSettings(self): """ Update the task settings due to replica. """ replica_keywords = self.task_protocol.getReplicaKeywords() if not replica_keywords: return if self.replica_index is None: # The wavelength (e.g. NONE) lives in the replica block self.replica_index = 0 replica_vals = replica_keywords[REPLICA_FLAG] try: wavelength = replica_vals[self.replica_index] except IndexError: # branch on oligomers but not on wavelengths wavelength = replica_vals[0] if replica_vals else NONE # 1.00000 eV is 1239.84193 nm self.freq = None if wavelength == NONE else units.EV_IN_NM / wavelength if self.freq: user_input = { self.IACC_FDPOL: 3, self.IFDPOL: 1, self.MITERTD: 200, self.FDPOL_FREQ1: self.freq } else: user_input = { self.IACC_FDPOL: False, self.IFDPOL: False, self.FDPOL_FREQ1: False, JaguarCalculation.IPOLAR: ElectricPolarizability.DEFAULT_IPOLAR } self.task_protocol.update(user_input, block_label=XTRA_FLAGS) self.task_protocol.protocol.pop(CMD_FLAGS, None) # backwards compatible
[docs] @staticmethod def finalizeStep(job): """ Finalize this step. Can be overwritten for light weight post process. :type job: An instance of a `BaseJob` subclass (such as `JobControlJob` or `SubprocessJob`) :param job: Current job """ output = super().finalizeStep(job) if not output: return struct = structure.Structure.read(job.outfile) ele_prop = job.ELECTRIC_POLARIZABILITY_PROP if job.freq is None \ else job.ELECTRIC_POLARIZABILITY_FREQ_PROP % job.freq ele_val = output.polar_alpha if job.freq is None else output.fdpolar_alpha1 struct.property[ele_prop] = ele_val struct.write(job.outfile) if job.freq: job.log(f"The wavelength is {units.NM_X_EV / job.freq:.4g} nm") cru_num = len(struct.property.get(PolymerBuilder.SEQUENCE_PROP, '')) if cru_num: job.log(f"The constitutional repeat unit number is {cru_num}") job.log("The electric polarizability of the molecule is " f"{ele_val:.4f} bohr^3")
[docs] @classmethod def getWavAndPolar(cls, struct): """ Get the wavelength and polarizability. :param struct: The input structure to search properties. :type struct: 'structure.Structure' :return: Wavelength in nm, polarizability in bohr^3 :rtype: (float or None, float or None) """ return cls.getWavAndProp(struct, cls.ELECTRIC_POLARIZABILITY_PROP)
[docs] @classmethod def getWavAndProp(cls, struct, prop): """ Get the wavelength and wavelength-related property. :param struct: The input structure to search properties. :type struct: 'structure.Structure' :param prop: A property key :type prop: str :return: Wavelength in nm, a wavelength related property value :rtype: (float or None, float or None) """ freqs, values = cls.getWavAndProps(struct, prop) try: return freqs[0], values[0] except IndexError: return None, None
[docs] @classmethod def getWavAndPolars(cls, struct, include_ipolar=False): """ Get the wavelength and polarizability. :param struct: The input structure to search properties. :type struct: 'structure.Structure' :param include_ipolar: Whether to include the ipolar result, the coupled perturbed Hartree-Fock (CPHF) equations Cao, Y.; Friesner, R. A. Molecular (hyper)polarizabilities computed by pseudospectral methods. J. Chem. Phys. 2005 :type include_ipolar: bool :return: Wavelength in nm, polarizability in bohr^3 :rtype: list, list """ wavs, vals = cls.getWavAndProps(struct, cls.ELECTRIC_POLARIZABILITY_PROP) if not vals: # Return empty lists if no results found return wavs, vals if not include_ipolar and wavs[0] is None: wavs, vals = wavs[1:], vals[1:] return wavs, vals
[docs] @classmethod def getWavAndProps(cls, struct, prop): """ Get the wavelength and wavelength-related properties. Note property keys starting with the user requested str is considered matches. :param struct: The input structure to search properties. :type struct: 'structure.Structure' :param prop: A property key without frequency info. :type prop: str :return: Wavelength in nm, wavelength related property values :rtype: list, list """ val = struct.property.get(prop, None) vals = [float(val)] if val is not None else [] # None stands for the freq of ipolar=-1 wavs = [None] if val is not None else [] props = { x: struct.property[x] for x in struct.property.keys() if x.startswith(prop) } # grab the frequencies out of the property name data = {cls.getFreqFromPropkey(x): y for x, y in props.items()} data.pop(None, None) if not data: return wavs, vals wavs.extend([units.NM_X_EV / float(x) for x in data.keys()]) vals.extend(list(map(float, data.values()))) return wavs, vals
[docs] @classmethod def getFreqFromPropkey(cls, propkey): """ Get the frequency value from the property key. :return: The frequency of this property :rtype: float or None """ try: return float(propkey.split(cls.DELIM)[-1].replace(EV, '')) except ValueError: return None
[docs] @classmethod def optionsToTaskProtocols(cls, options, task_name, seed=None): """ Update the task protocol according to the user input. :param options: The parsed command line options :type options: `argparse.Namespace` :param task_name: Task name :type task_name: str :param seed: Random seed to randomize the initial velocities :type seed: int or None :return: Updated task protocol :rtype: `TaskProtocol` """ task_protocol = super().optionsToTaskProtocols(options, task_name, seed=seed) if options.workflow_type and set( options.workflow_type).intersection(JAGUAR_WORKFLOW_TYPES): # complex permittivity workflow has two-stage jaguar optimizations, # and we don't interrupt with a desmond minimizer task_protocol.update({cls.getForceFieldFlag(): False}) wavelengths = cls.getWavelengths(options) if wavelengths: replica_flag = [x if x == NONE else float(x) for x in wavelengths] user_input = { REPLICA_TYPE: BRANCH, REPLICA_FLAG: replica_flag, REPLICA_NUM: len(wavelengths) } task_protocol.update(user_input, block_label=XTRA_FLAGS) return task_protocol
[docs] @classmethod def getWavelengths(cls, options): """ Get the wavelengths based on the parsed command line options. :param options: The parsed command line options. :type options: `argparse.Namespace` :return: All wavelengths for calculation :rtype: list of floats and NONE """ wls = cls.getUserWavelengths(options) wls += cls.getAbbeWavelengths(options) wls = sorted(set(wls)) if options.workflow_type is None or set( options.workflow_type).isdisjoint(IPOLAR_WORKFLOW_TYPES): return wls if NONE not in wls: wls.append(NONE) return wls
[docs] @staticmethod def getUserWavelengths(options, default_intvl=50): """ Get the wavelengths from the user input limits. :param options: The parsed command line options :type options: `argparse.Namespace` :param default_intvl: Default wavelength interval :type default_intvl: float :return: The wavelengths based on the user input limits :rtype: list of floats """ # refractive_index_wavelength is a list of # one value: the requested wavelength, # two values: the starting and ending limits # three values: the starting lim, ending lim, and Number of samples to generate ri_wls = options.refractive_index_wavelength if not ri_wls: return [] if len(ri_wls) == 1: return ri_wls[:] intvl = ri_wls[2] if len(ri_wls) == 3 else default_intvl if intvl < 0.1: intvl = 0.1 start, end = sorted(ri_wls[:2]) num = math.ceil((end - start) / intvl) return list(np.linspace(start, start + num * intvl, int(num + 1)))
[docs] @staticmethod def getAbbeWavelengths(options): """ Get the wavelengths from the users' request on abbe number. :param options: The parsed command line options :type options: `argparse.Namespace` :return: The number wavelengths in nm for abbe number :rtype: list of float """ if options.workflow_type is None or ABBE_NUMBER not in options.workflow_type: return [] return options.abbe_number_wavelength[:]
[docs]class JaguarMerge(Step): """ Class to merge multiple Jaguar Calculations. """ DRIVER_PATH = path_join(JAGUAR_MERGE_PATH) JAGUAR_MERGE = 'jaguar_merge' DEFAULT_JOBNAME = PERMITTIVITY_TASK % JAGUAR_MERGE OUTFILE_EXT = MAE_OUT_EXT CSV_OUT_EXT = f"_polar{CSV_OUT_EXT}" WAV_EP_FILE_PROP = "s_matsci_Electric_Polarizability_File"
[docs] @classmethod def optionsToTaskProtocols(cls, options, task_name, seed=None): """ Update the merge commands based on users' input. :param options: The parsed command line options :type options: `argparse.Namespace` :param task_name: Task name :type task_name: str :param seed: Random seed to randomize the initial velocities :type seed: int or None :return: Updated task protocol :rtype: `TaskProtocol` """ task_protocol = super().optionsToTaskProtocols(options, task_name, seed=seed) task_protocol.update( {PolymerBuilder.FLAG_POLYMER_CRU_NUM[1:]: options.polymer_cru_num}) return task_protocol
[docs] @staticmethod def finalizeStep(job): """ Finalize this step. Can be overwritten for light weight post process. :type job: an instance of a `BaseJob` subclass (such as `JobControlJob` or `SubprocessJob`) :param job: current job """ super().finalizeStep(job) struct = structure.Structure.read(job.outfile) epolar = ElectricPolarizability polars = epolar.getWavAndProps(struct, epolar.KEY) polars = {x: y for x, y in zip(polars[0], polars[1])} slopes = epolar.getWavAndProps(struct, epolar.ELECTRIC_POLARIZABILITY_SLOPE) slopes = {x: y for x, y in zip(slopes[0], slopes[1])} intcps = epolar.getWavAndProps(struct, epolar.ELECTRIC_POLARIZABILITY_INTERCEPT) intcps = {x: y for x, y in zip(intcps[0], intcps[1])} for wav, polar in polars.items(): if wav is not None: job.log(f'At wavelength {wav:.1f} nm:') job.log(f'The electric polarizability is {polar:.4f} bohr^3') slope = slopes.get(wav) if slope: job.log( f'The electric polarizability of one monomer is {slope:.4f} bohr^3' ) intcp = intcps.get(wav) if intcp: job.log(f'The linear fit intercept for oligomer electric ' f'polarizabilities is {intcp:.4f} bohr^3')
[docs]class TwoFileInput(Step): """ Class to sort the two input files based on extension. """ TERMINAL_INFILE_NUM = 2
[docs] def setup(self): """ See parent class. """ super().setup() self.udpateCmd()
[docs] def udpateCmd(self): """ Update the command. """ two_files = [self.infile] + self.additional_infiles two_filenames = [os.path.basename(x) for x in two_files] if not len(two_filenames) == 2: log_debug(f'Two filenames not found: {two_filenames}') return for filename in two_filenames: if fileutils.is_cms_file(filename): cms_filename = filename else: mae_filename = filename cms_index = self._command.index(cms_filename) mae_index = self._command.index(mae_filename) if mae_index > cms_index: # Driver requests mae to be the first input structure self._command[cms_index] = mae_filename self._command[mae_index] = cms_filename log_debug(f'Updated command: {self._command}')
[docs]class RefractiveIndex(TwoFileInput): """ Class to do refractive index calculation. """ DRIVER_PATH = path_join(REFRACTIVE_INDEX_PATH) REFRACTIVE_INDEX = 'refractive_index' FILE_EXT = '_ri' + CSV_OUT_EXT RI_CAP = '_'.join(x.capitalize() for x in REFRACTIVE_INDEX.split('_')) DEFAULT_JOBNAME = PERMITTIVITY_TASK % REFRACTIVE_INDEX REFRACTIVE_INDEX_PROP = 'r_matsci_Refractive_Index' DELIM = '_@_' NM = 'nm' MATSCI_ABBE_NUMBER = 'r_matsci_Abbe_Number' ABBE_NUMBER_PROP = f'{MATSCI_ABBE_NUMBER}{DELIM}%.1f{NM}_%.1f{NM}_%.1f{NM}' RELATIVE_PERMITTIVITY_INF_PROP = 'r_matsci_Relative_Permittivity_Inf' WAV_IR_FILE_PROP = f"s_matsci_Wavelength_{RI_CAP}_File" FLAG_ABBE_NUMBER_WAVELENGTH = '-abbe_number_wavelength' FLAG_REFRACTIVE_INDEX_WAVELENGTH = '-refractive_index_wavelength' AbbeNumber = collections.namedtuple('ABBE_NUMBER', ['val', 'wavs']) ABBE_NFREQ = 3
[docs] @classmethod def optionsToTaskProtocols(cls, options, task_name, seed=None): """ Update the refractive index commands based on users' input. :param options: The parsed command line options :type options: `argparse.Namespace` :param task_name: Task name :type task_name: str :param seed: Random seed to randomize the initial velocities :type seed: int or None :return: Updated task protocol :rtype: `TaskProtocol` """ task_protocol = super().optionsToTaskProtocols(options, task_name, seed=seed) if options.workflow_type and all([ ABBE_NUMBER in options.workflow_type, options.abbe_number_wavelength ]): wvs = ','.join(map(str, options.abbe_number_wavelength)) user_input = {cls.FLAG_ABBE_NUMBER_WAVELENGTH[1:]: wvs} task_protocol.update(user_input) return task_protocol
[docs] @staticmethod def finalizeStep(job): """ Finalize this step. Can be overwritten for light weight post process. :type job: An instance of a `BaseJob` subclass (such as `JobControlJob` or `SubprocessJob`) :param job: Current job """ super().finalizeStep(job) struct = structure.Structure.read(job.outfile) try: refractive_index = struct.property[ RefractiveIndex.REFRACTIVE_INDEX_PROP] except KeyError: pass else: job.log( f"The refractive index of the system is {refractive_index:.4f}") try: epsilon_inf = struct.property[job.RELATIVE_PERMITTIVITY_INF_PROP] except KeyError: pass else: job.log( f"The relative permittivity at high frequency (Epsilon Inf) is {epsilon_inf:.4f}" ) try: csv_file = struct.property[job.WAV_IR_FILE_PROP] except KeyError: pass else: csv_file = os.path.join(job.cmd_dir, csv_file) job.log( f"Frequency-dependent refractive index are saved as {csv_file}") abbe_num = job.getAbbeNum(struct) if abbe_num is not None: wavs = ', '.join(map(str, abbe_num.wavs)) job.log(f"The Abbe number at {wavs} nm is {abbe_num.val:.4f}") else: ifblock = job.task_protocol.getInterpFlagBlock() if job.FLAG_ABBE_NUMBER_WAVELENGTH[1:] in ifblock: job.log("ERROR: Abbe number calculation failed.")
[docs] @staticmethod def getRefractiveIndex(struct, density, polar): """ Get refractive index from density, polarizability, and total_weight. :param struct: Structure of a single molecule :type struct: 'structure.Structure' :param density: Density in g/cm^3 :type density: float :param polar: The isotropically averaged polarizability in atomic units of bohr^3 :type polar: float :return: Refractive index :rtype: float """ try: gnt_opm.RefractiveIndex.setRefractiveIndexProperty( struct, density, polar) except ValueError: return else: return struct.property[gnt_opm.RefractiveIndex.KEY]
[docs] @staticmethod def setAbbeNum(struct, abbe_num, wavs, ext=None): """ Set the Abbe number value with wavelength information as structural property. :param struct: The structure to save property to :type struct: 'structure.Structure' :param abbe_num: The Abbe number value :type abbe_num: float :param ext: The str extension for the property :type ext: str :param wavs: The wavelengths at which Abbe number is calculated :type wavs: list of three float """ abbe_prop = RefractiveIndex.ABBE_NUMBER_PROP % tuple(sorted(wavs)) if ext: abbe_prop += ext struct.property[abbe_prop] = abbe_num
[docs] @classmethod def getAbbeNum(cls, struct): """ Get Abbe number and wavelengths from a structure. :param struct: The structure to get Abbe number and wavelengths. :type struct: 'structure.Structure' :return: Abbe number and wavelengths :rtype: collections.namedtuple """ for prop in struct.property.keys(): if not (prop.startswith(cls.MATSCI_ABBE_NUMBER) and prop.endswith(cls.NM)): continue wavs = prop.split(cls.DELIM)[1].replace('_', '').split(cls.NM) wavs = [float(x) for x in wavs if x] abbe_num = cls.AbbeNumber(val=struct.property[prop], wavs=wavs) return abbe_num
[docs] @classmethod def calAbbeNum(cls, data): """ Calculate abbe number from refractive indexes. :param data: First column: wavelengths, second column: refractive indexes :type data: 3 x 2 array :return: The abbe number and wavelength info :rtype: 'collections.namedtuple' """ # Sort the wavelengths and refractive indexes by ascending wavelength sorted_idx = np.argsort(data[:, 0]) data = data[sorted_idx, :] abbe_num = (data[1, 1] - 1) / (data[0, 1] - data[2, 1]) wavs = np.ndarray.tolist(data[:, 1]) return cls.AbbeNumber(val=abbe_num, wavs=wavs)
[docs]class StaticPermittivity(TwoFileInput): """ Class to do refractive index calculation. """ OUTFILE_EXT = CMS_OUT_EXT DRIVER_PATH = path_join(STATIC_PERMITTIVITY_PATH) STATIC = 'static' STATIC_PERMITTIVITY = f'{STATIC}_permittivity' DEFAULT_JOBNAME = PERMITTIVITY_TASK % STATIC RELATIVE_STATIC_PERMITTIVITY_PROP = 'r_matsci_Relative_Static_Permittivity' COMPLEX_PERMITTIVITY_DATA_HEADER = ','.join( ['Frequency (Hz)', 'Epsilon1', 'Epsilon2'])
[docs] def setup(self): """ See parent class. """ super().setup() # All the output files from previous jobs serving as the input of the # current job with one single cms available infiles = self.additional_infiles + [self.infile] input_cms = [x for x in infiles if fileutils.is_cms_file(x)][0] if not os.path.isabs(input_cms): # The relative path is from the job dir instead of substask dir input_cms = os.path.join(os.pardir, input_cms) _, self.cms_model = topo.read_cms(input_cms) cmplx_fl = self.cms_model.property.get( DipoleAnalysis.MD_COMPLEX_PERMITTIVITY_FILE_PROP) if not cmplx_fl: return cmplx_path = os.path.join(os.path.dirname(input_cms), cmplx_fl) shutil.copy(cmplx_path, cmplx_fl)
[docs] @staticmethod def finalizeStep(job): """ Finalize this step. Can be overwritten for light weight post process. :type job: An instance of a `BaseJob` subclass (such as `JobControlJob` or `SubprocessJob`) :param job: Current job """ super().finalizeStep(job) cms_model = cms.Cms(job.outfile) static_permittivity = cms_model.property[ job.RELATIVE_STATIC_PERMITTIVITY_PROP] job.log( f"The relative static permittivity of the system is {static_permittivity:.4f}" ) complex_file = cms_model.property.get( DipoleAnalysis.COMPLEX_PERMITTIVITY_FILE_PROP) if complex_file: job.log(f'Complex permittivity data is saved as {complex_file}')
[docs]class WorkflowParser(object): TASK_NAME = 'task_name' TASK_TYPE = 'task_type' TASK_DRIVER_PATH = 'task_driver_path' SPECIFIC_RESTART_LOCATOR_MAP = { NAME: TASK_NAME, TYPE: TASK_TYPE, DRIVER_PATH: TASK_DRIVER_PATH } TASK_SUBCLASS = 'task_subclass' TASK_SUBJOB = 'task_subjob' TASK_REPLICA = 'task_replica' REPLICA_DELIM = 'r' CHILD_NODE_INDEXES = 'child_node_indexes' PARENT_NODE_INDEXES = 'parent_node_indexes' PROP_KEYS = [TASK_REPLICA, CHILD_NODE_INDEXES, PARENT_NODE_INDEXES] TASK_TYPE_TO_SUBCLASS = { POLYMER_BUILDER_PROTOCOL: PolymerBuilder, MD_SIMULATION_PROTOCOL: MultiStageMd, MD_POST_DENSITY_PROTOCOL: DensityAnalysis, MD_POST_AVE_CELL_PROTOCOL: AveCellAnalysis, MD_POST_DIPOLE_PROTOCOL: DipoleAnalysis, SUSCEPTIBILITY_MERGE_PROTOCOL: SusceptibilityMerge, COMPLEX_PERMITTIVITY_PROTOCOL: ComplexPermittivity, JAGUAR_CALCULATION_PROTOCOL: JaguarCalculation, JAGUAR_FINER_CALCULATION_PROTOCOL: JaguarFinerCalculation, JAGUAR_POLARIZABILITY_PROTOCOL: ElectricPolarizability, JAGUAR_MERGE_PROTOCOL: JaguarMerge, REFRACTIVE_INDEX_PROTOCOL: RefractiveIndex, STATIC_PERMITTIVITY_PROTOCOL: StaticPermittivity, DISORDER_SYSTEM_BUILDER_PROTOCOL: DisorderSystemBuilder, MD_ANALYZE_SIMULATION_PROTOCOL: AnalyzeSimulation }
[docs] def __init__(self, config_yaml, basename, infile_job_infos, master_logger=None): """ :param config_yaml: Config file :type config_yaml: str :param basename: Base name :type basename: str :param infile_job_infos: The input file information :type infile_job_infos: list of SimpleNamespace :type logger: `logging.Logger` :param logger: The logger to use for recording messages into master job -driver.log. """ self.config_yaml = config_yaml self.basename = basename self.infile_job_infos = infile_job_infos self.master_logger = master_logger self.workflow = None self.graphx = None self.indexed_task_names = {} self.config = get_config(config_yaml) self.spring_layout_pos = None self.unique_tasknames = True
[docs] def setUp(self): """ Parse the workflow section in the config yaml and setup the subjobs. """ self.initialize() self.addNodes() self.addEdges() self.draw() self.addSubclass() self.removeNodes() self.addSubjobs() self.setParentChildJobs() self.removeStartingJobs() self.setReplica() self.draw() self.setSubjobDependence() self.updateSartupJobs()
[docs] def initialize(self): """ Initialize the class by reading workflow from config file, creating networkx, indexing the workflow task names.... """ self.workflow = self.config.getTaskProtocol(task_name=WORKFLOW) self.graphx = networkx.DiGraph() self.indexed_task_names = self.workflow.indexUnbalancedWorkflow() self.task_name_to_index = { y: x for x, y in self.indexed_task_names.items() } task_names = list(self.indexed_task_names.values()) self.unique_tasknames = len(task_names) == len(set(task_names))
[docs] def addNodes(self): """ Add each task name using an unique index as a node to the graph. """ for indexes, task_name in self.indexed_task_names.items(): self.graphx.add_node(indexes, task_name=task_name) node = self.graphx.nodes[indexes] node[self.CHILD_NODE_INDEXES] = [] node[self.PARENT_NODE_INDEXES] = []
[docs] def addEdges(self): """ Add edges between nodes according to the job dependence (parent / child). """ for node_index in self.graphx.nodes: parent_tasknames = self.workflow.getParentTasknames(node_index) log_debug( f"{self.graphx.nodes[node_index]['task_name']}, {parent_tasknames}" ) for parent_taskname in parent_tasknames: try: parent_index = self.task_name_to_index[parent_taskname] except KeyError: log_debug( f"{parent_taskname} does't not " "appear as a workflow task but shows up a parent task for " f"{self.graphx.nodes[node_index][self.TASK_NAME]}.") continue self.graphx.add_edge(parent_index, node_index) log_debug(f"Workflow nodes: {self.graphx.nodes}")
[docs] def addSubclass(self): """ Add task type, driver path, and subclass to each graph node. """ for node_index in self.graphx.nodes: node = self.graphx.nodes[node_index] task_name = node[self.TASK_NAME] task_type = self.config.name_to_type[task_name] node[self.TASK_TYPE] = task_type task_protocol = self.config.getTaskProtocol(task_name=task_name) task_subclass = self.getTaskSubClass(task_type, task_protocol) node[self.TASK_SUBCLASS] = task_subclass driver_path = task_protocol.getDriverPath( driver_path=task_subclass.DRIVER_PATH) node[self.TASK_DRIVER_PATH] = driver_path replica_keywords = task_protocol.getReplicaKeywords() node[self.TASK_REPLICA] = replica_keywords
[docs] def removeNodes(self): """ Remove the nodes marked with remove replica keyword. """ graphx = self.graphx.copy() for node_index in self.graphx.nodes: node = graphx.nodes[node_index] task_replica = node[self.TASK_REPLICA] if task_replica is None or task_replica.get(REPLICA_TYPE) != REMOVE: continue edges = [x for x in graphx.edges if node_index in x] edges = sorted(edges, key=lambda x: x.index(node_index)) par_index, chld_index = edges[1][0], edges[0][1] graphx.add_edge(par_index, chld_index) graphx.remove_node(node_index) self.graphx = graphx
[docs] def getLeafTaskNodes(self): """ Return the task node without dependents in an order that the leaf with the longest path to the root is returned first. :return: A list of nodes without a child. :rtype: list of nodes in networkx.Graph.nodes """ node_indexes = [x for x in self.graphx.nodes] leaf_node_indexes = [ x for x in node_indexes if not self.graphx.nodes[x][self.CHILD_NODE_INDEXES] ] root_node_indexes = [ x for x in node_indexes if not self.graphx.nodes[x][self.PARENT_NODE_INDEXES] ] path_len_to_root = collections.defaultdict(lambda: np.inf) for root_node_index, leaf_node_index in itertools.product( root_node_indexes, leaf_node_indexes): try: paths = networkx.shortest_path(self.graphx, source=root_node_index, target=leaf_node_index) except networkx.exception.NetworkXNoPath: paths_len = np.inf else: paths_len = len(paths) if paths_len < path_len_to_root[leaf_node_index]: path_len_to_root[leaf_node_index] = paths_len sorted_leaf_node_indexes = sorted(path_len_to_root, key=path_len_to_root.get, reverse=True) return [self.graphx.nodes[x] for x in sorted_leaf_node_indexes]
[docs] def getFirstLevelParentNodes(self, node): """ Return the first level parent task subjob node indexes of this input node. :param node: A node index whose parent nodes are requested :type node: tuple :return: A node in the job without a child :rtype: list of nodes (dict) """ parent_node_indexes = node[self.PARENT_NODE_INDEXES] return [self.graphx.nodes[x] for x in parent_node_indexes]
[docs] def getOrderedTaskSubjobDirs(self, node): """ Return the task subjob dirs in the order from bottom to the top. :param node: A node in the job without a child. :type node: dict as a node in networkx.Graph.nodes :return: Key is the task name and value is the subdir of the task :rtype: OrderedDict """ subjob_name_to_dir = collections.OrderedDict() subjob_name_to_dir[node[self.TASK_NAME]] = node[ self.TASK_SUBJOB].cmd_dir all_nodes = [node] while all_nodes: node = all_nodes.pop(0) first_lvl_parent_nodes = self.getFirstLevelParentNodes(node) for first_lvl_parent_node in first_lvl_parent_nodes: all_nodes.append(first_lvl_parent_node) first_lvl_parent_task_subjob = first_lvl_parent_node[ self.TASK_SUBJOB] name = first_lvl_parent_node[self.TASK_NAME] command_dir = first_lvl_parent_task_subjob.cmd_dir subjob_name_to_dir[name] = command_dir or os.curdir return subjob_name_to_dir
[docs] def addSubjobs(self): """ Initiate each subclass and add the initiated subclass (subjob) to the node. """ # Add each job to workflow for node_index in self.graphx.nodes: node = self.graphx.nodes[node_index] task_subclass = node[self.TASK_SUBCLASS] task_name = node[self.TASK_NAME] sub_jobname = self.getSubjobname(node_index) subjob = task_subclass(sub_jobname, config_yaml=self.config_yaml, task_name=task_name, infile=TERMINAL_INFILE, master_logger=self.master_logger) node[self.TASK_SUBJOB] = subjob
[docs] def getSubjobname(self, node_index): """ Return the subjob name based on task name and node index. :param node_index: The index to get job info node :type node_index: tuple :return: Subjob name :rtype: str """ node = self.graphx.nodes[node_index] task_name = node[self.TASK_NAME] sub_jobname = task_name.replace(" ", "_").lower() base_index, replica_index = tuple(), tuple() try: rep_delim_index = node_index.index(self.REPLICA_DELIM) except ValueError: base_index = node_index else: base_index = node_index[:rep_delim_index] replica_index = node_index[rep_delim_index:] full_index = replica_index if not self.unique_tasknames: full_index = base_index + full_index if not full_index: return sub_jobname node_index_str = "".join(map(str, list(full_index))) sub_jobname = "_".join([sub_jobname, node_index_str]) return sub_jobname
[docs] def setParentChildJobs(self, clear_first=True): """ From the edges of the graph, save the info on the first level child jobs for each parent job and the first level parent jobs for each child job. """ if clear_first: self.clearFirstLevelParentChildJobs() for par_job_index, cur_job_index in self.graphx.edges: par_node = self.graphx.nodes[par_job_index] cur_node = self.graphx.nodes[cur_job_index] par_node[self.CHILD_NODE_INDEXES].append(cur_job_index) cur_node[self.PARENT_NODE_INDEXES].append(par_job_index)
[docs] def clearFirstLevelParentChildJobs(self): """ Clear the first level child jobs and first parent job information. """ for par_job_index, cur_job_index in self.graphx.edges: par_node = self.graphx.nodes[par_job_index] cur_node = self.graphx.nodes[cur_job_index] par_node[self.CHILD_NODE_INDEXES] = [] cur_node[self.PARENT_NODE_INDEXES] = []
[docs] def removeStartingJobs(self): """ Remove the subjobs that are not defined as starting tasks by task type or task driver path. """ if len(self.infile_job_infos ) == 1 and self.infile_job_infos[0].locator_type == ANY: log_debug("Any input file types are accepted, and thus " "subjobs in the workflow remain intact.") return starting_task_names = set([ x.locator_value for x in self.infile_job_infos if x.locator_type == NAME ]) starting_task_types = set([ x.locator_value for x in self.infile_job_infos if x.locator_type == TYPE ]) starting_task_drivers = set([ x.locator_value for x in self.infile_job_infos if x.locator_type == DRIVER_PATH ]) log_debug(f"starting_task_names: {starting_task_names}") log_debug(f"starting_task_types: {starting_task_types}") log_debug(f"starting_task_drivers: {starting_task_drivers}") node_removed_num = 0 while True: incompatible_node_indexes = [] for node_index in self.graphx.nodes: node = self.graphx.nodes[node_index] if node[self.PARENT_NODE_INDEXES]: continue elif node[self.TASK_NAME] in starting_task_names: continue elif node[self.TASK_TYPE] in starting_task_types: continue elif node[self.TASK_DRIVER_PATH] and node[ self.TASK_DRIVER_PATH] in starting_task_drivers: continue incompatible_node_indexes.append(node_index) for child_node_index in node[self.CHILD_NODE_INDEXES]: child_node = self.graphx.nodes[child_node_index] child_node[self.PARENT_NODE_INDEXES].remove(node_index) if not incompatible_node_indexes: break log_debug( f"{[self.graphx.nodes[x][self.TASK_NAME] for x in incompatible_node_indexes]}" ) log_debug( f"task_name: {node[self.TASK_NAME] in starting_task_names}") log_debug( f"task_type: {node[self.TASK_TYPE] in starting_task_types}") log_debug( f"driver_path: {node[self.TASK_DRIVER_PATH] in starting_task_drivers}" ) self.graphx.remove_nodes_from(incompatible_node_indexes) node_removed_num += len(incompatible_node_indexes) log_debug(f"{node_removed_num} node removed due to input file type.")
[docs] def setSubjobDependence(self): """ Set the job dependence between each subjob pair. """ for par_job_index, cur_job_index in self.graphx.edges: par_node = self.graphx.nodes[par_job_index] par_job = par_node[self.TASK_SUBJOB] cur_node = self.graphx.nodes[cur_job_index] cur_job = cur_node[self.TASK_SUBJOB] cur_job.addPrereq(par_job) par_job.first_lvl_child_jobs.append(cur_job) cur_job.first_lvl_parent_jobs.append(par_job)
[docs] def JobinfoAndNodeMatched(self, jinfo, node): """ Whether the job information and task node match. :param jinfo: One job information for specific restart. :type jinfo: SimpleNamespace :param node: One task node in the networkx :type node: networkx.Graph.nodes :return: True if the jinfo matches the node settings :rtype: bool """ if jinfo.locator_type == ANY: return True for jloc, nloc in self.SPECIFIC_RESTART_LOCATOR_MAP.items(): if jinfo.locator_type == jloc and jinfo.locator_value == node[nloc]: return True return False
[docs] def updateSartupJobs(self): """ Set the input files for the start-up jobs. """ # Set the input files for the no-parent subjobs log_debug(f"infile_job_infos: {self.infile_job_infos}") for node_index in self.graphx.nodes: node = self.graphx.nodes[node_index] job = node[self.TASK_SUBJOB] if job.first_lvl_parent_jobs: continue try: jinfo = next(x for x in self.infile_job_infos if self.JobinfoAndNodeMatched(x, node)) except StopIteration: continue for infile in [jinfo.infile] + jinfo.additional_infiles: job.updateInfileAndCommand(infile)
[docs] def addReplicaNode(self, node, node_rindex): """ Add the replica node to the graph and set node properties. :param node: The node in graph the replica duplicates :type node: dict :param node_rindex: The nodex index of the replica :type node_rindex: tuple """ self.graphx.add_node(node_rindex) added_node = self.graphx.nodes[node_rindex] # Str properties added_node[self.TASK_TYPE] = node[self.TASK_TYPE] added_node[self.TASK_NAME] = node[self.TASK_NAME] added_node[self.TASK_DRIVER_PATH] = node[self.TASK_DRIVER_PATH] # List properties for prop_key in self.PROP_KEYS: prop_val = node.get(prop_key) if prop_val: prop_val = prop_val.copy() added_node[prop_key] = prop_val task_subclass = node[self.TASK_SUBCLASS] # Address passed added_node[self.TASK_SUBCLASS] = task_subclass # New instance sub_jobname = self.getSubjobname(node_rindex) subjob = task_subclass(sub_jobname, config_yaml=self.config_yaml, task_name=node[self.TASK_NAME], infile=node[self.TASK_SUBJOB].infile, master_logger=self.master_logger, node_index=node_rindex) added_node[self.TASK_SUBJOB] = subjob self.compatibleChildNodes(added_node) added_node[THREADING] = True
[docs] def getReplicaType(self, node): """ Get the replica type. :param node: The node in the nextworkx graph :type node: dict :return: The replica type :rtype: str or None """ task_replica = node.get(WorkflowParser.TASK_REPLICA) if not task_replica: return None return task_replica.get(REPLICA_TYPE)
[docs] def setReplicaType(self, node, ntype): """ Set the replica type. :param node: The node in the nextworkx graph :type node: dict :param ntype: The replica type of the node :type ntype: str """ task_replica = node.get(WorkflowParser.TASK_REPLICA) if not task_replica: node[WorkflowParser.TASK_REPLICA] = {REPLICA_TYPE: ntype} return task_replica[REPLICA_TYPE] = ntype
[docs] def compatibleChildNodes(self, node): """ Keep compatible children nodes. :param node: dict :type node: The node whose children's compatibility is checked. """ if not node[self.TASK_TYPE] == POLYMER_BUILDER_PROTOCOL: return polym_num_cru = max(node[self.TASK_REPLICA][REPLICA_FLAG]) child_node_indexes = [] for child_node_index in node[self.CHILD_NODE_INDEXES]: child_node = self.graphx.nodes[child_node_index] if child_node[self.TASK_TYPE] == JAGUAR_CALCULATION_PROTOCOL \ and node[self.TASK_SUBJOB].num_cru == polym_num_cru: # Polymer atom number usually exceeds jaguar limit continue if child_node[self.TASK_TYPE] == DISORDER_SYSTEM_BUILDER_PROTOCOL \ and node[self.TASK_SUBJOB].num_cru < polym_num_cru: # Polymer cell instead of oligomer cell is built continue child_node_indexes.append(child_node_index) node[self.CHILD_NODE_INDEXES] = child_node_indexes
[docs] def connectReplicaNode(self, node_rindex): """ Add the edges between one replica node and the parent and child nodes of the original nodes. In addition, the mapping between original node and replica nodes is saved in each child job so that further threading process can utilize before merging. NOTE: threading mode disconnects one single parent node. :param node: The node of the original node :type node: dict :param node_index: The node index of the original node :type node_index: tuple :param node_rindex: The node index of the replica node :type node_rindex: tuple :param replica_type: The replica :type replica_type: str """ node = self.graphx.nodes[node_rindex] par_node_indexes = node[self.PARENT_NODE_INDEXES] if par_node_indexes: # Only single parent is supported threading = par_node_indexes[0].count(self.REPLICA_DELIM) == \ node_rindex.count(self.REPLICA_DELIM) index = node_rindex[-1] if threading else 0 try: par_node_index = par_node_indexes[index] except IndexError: # Polymer builder on disordered builder thread on REPLICA_DELIM # isn't a branching node (dsb has to branch from single parent) par_node_index = par_node_indexes[0] self.graphx.add_edge(par_node_index, node_rindex) log_debug(f'connectReplicaNode 1: {par_node_index}, {node_rindex}') for child_node_index in node[self.CHILD_NODE_INDEXES]: self.graphx.add_edge(node_rindex, child_node_index) log_debug( f'connectReplicaNode 2: {node_rindex}, {child_node_index}') self.setParentChildJobs()
[docs] def disconnectOrigNode(self, node, node_index): """ Disconnect the edge and first level things between the original node and all parents and children. :param node: The original node :type node: dict :param node_index: The original node index :type node_index: tuple """ for par_node_index in node[self.PARENT_NODE_INDEXES]: self.graphx.remove_edge(par_node_index, node_index) self.graphx.nodes[par_node_index][self.CHILD_NODE_INDEXES].remove( node_index) for child_node_index in node[self.CHILD_NODE_INDEXES]: self.graphx.remove_edge(node_index, child_node_index) self.graphx.nodes[child_node_index][ self.PARENT_NODE_INDEXES].remove(node_index)
[docs] def setReplica(self): """ Process the replica nodes (branch, thread, and merge) and treat original nodes properly. """ self.updateReplicaNodes() while self.setBranchReplicaNode(): while self.setThreadingReplicaNodes(): self.setParentChildJobs() self.setMergeReplicaNodes() self.logReplicaInfo()
[docs] def updateThreadingNodes(self): """ Update the threading nodes so that single branching is not considered as threading. """ tnodes = [ x for x in self.graphx.nodes if self.graphx.nodes[x].get(THREADING) ] tnodes = { x: self.graphx.nodes[x][self.CHILD_NODE_INDEXES] for x in tnodes } cnodes = list(itertools.chain.from_iterable(tnodes.values())) counter = collections.Counter(cnodes) tnodes = {x: sum(counter[z] for z in y) for x, y in tnodes.items()} for tnode, num in tnodes.items(): self.graphx.nodes[tnode][THREADING] = num > 1
[docs] def logReplicaInfo(self): """ Print all the node info including replica node for debugging. """ if not schrodinger.in_dev_env(): return for node_index in self.graphx.nodes: node = self.graphx.nodes[node_index] try: log_debug( f'replicaDebugPrinting {node_index} {node[self.TASK_NAME]} ' f'{node[REPLICA]} {node[self.PARENT_NODE_INDEXES]}') except KeyError: log_debug( f'replicaDebugPrinting {node_index} {node[self.TASK_NAME]}' f'{node[self.PARENT_NODE_INDEXES]}')
[docs] def updateReplicaNodes(self): """ Update replica nodes based on replica keywords. """ for node_index in self.graphx.nodes: rpl = self.graphx.nodes[node_index].get(self.TASK_REPLICA) if rpl and rpl.get(REPLICA_TYPE) == BRANCH and rpl.get( REPLICA_NUM, 0) <= 1: self.graphx.nodes[node_index].pop(self.TASK_REPLICA)
[docs] def setBranchReplicaNode(self): """ Locate branching nodes, replace then with replica nodes, properly connecting and disconnecting edges, update first level parent and child jobs upstream and downstream of the original nodes. :return: True if one or more branch nodes are successfully processed. :rtype: bool """ indexes = self.getRelicaNodeIndexes(node_type=BRANCH) if not indexes: return node_index = indexes[0] node = self.graphx.nodes[node_index] node[self.TASK_REPLICA][REPLICA_TYPE] = None for replica_index in range(node[self.TASK_REPLICA][REPLICA_NUM]): node_rindex = (*node_index, self.REPLICA_DELIM, replica_index) self.addReplicaNode(node, node_rindex) self.connectReplicaNode(node_rindex) self.disconnectOrigNode(node, node_index) self.graphx.remove_nodes_from([node_index]) self.updateThreadingNodes() log_debug(f"setBranchReplicaNode {node_index} {node[self.TASK_NAME]}") return node
[docs] def getRelicaNodeIndexes(self, node_type=BRANCH): """ Get the node index of a certain type (threading or merging). :param node_type: 'threading' is a node of single parent who is a replica node. 'merge' is a node marked with MERGE replica keywords :type node_type: str :return: Node indexes of a certain type (threading or merging) :rtype: list of tuple :raise TypeError: during threading, a branching node is encountered. This may come from a improper workflow design or bug. Branch should happen before thread. Currently, only one single branching node design is implemented and tested. """ replica = { x: self.graphx.nodes[x].get(self.TASK_REPLICA) for x in self.graphx.nodes } return [ x for x, y in replica.items() if y and y.get(REPLICA_TYPE) == node_type ]
[docs] def setThreadingReplicaNodes(self): """ Locate threading nodes (node after branching replica before merge node), replace then with a single replica node, properly connecting and disconnecting edges, update first level parent and child jobs upstream and downstream of the original nodes. :return: True, if successfully identify and process one single node with replica. :rtype: bool """ nodes = {x: self.graphx.nodes[x] for x in self.graphx.nodes} tnodes = [x for x, y in nodes.items() if y.pop(THREADING, None)] exts = {x: x[x.index(self.REPLICA_DELIM):] for x in tnodes} chldns = {x: nodes[x][self.CHILD_NODE_INDEXES] for x in tnodes} chldns = {x: [z for z in y if self.getReplicaType(nodes[z]) != MERGE] \ for x, y in chldns.items()} chldns = {x: y for x, y in chldns.items() if y} rchldns = {(*z, *exts[x]): z for x, y in chldns.items() for z in y} for rindex, oindex in rchldns.items(): onode = self.graphx.nodes[oindex] self.addReplicaNode(onode, rindex) for rindex in rchldns.keys(): self.connectReplicaNode(rindex) nodes_to_remove = set(rchldns.values()) self.graphx.remove_nodes_from(nodes_to_remove) log_debug(f'setThreadingReplicaNodes removed {nodes_to_remove}') return nodes_to_remove
[docs] def setMergeReplicaNodes(self): """ Merge the replica nodes. Multiple last replica job in different threads are the parent of one single merge job, which is the original job. :return: True, if successfully identify and merge into one single job. :rtype: bool """ for node_index in self.getRelicaNodeIndexes(node_type='merge'): node = self.graphx.nodes[node_index] cmd = node[self.TASK_SUBJOB]._command.copy() infile_index = cmd.index(TERMINAL_INFILE) par_job_num = len(node[self.PARENT_NODE_INDEXES]) node[self.TASK_SUBJOB]._command = cmd[:infile_index] + [ TERMINAL_INFILE ] * par_job_num + cmd[infile_index + 1:] log_debug( f'setMergingReplicaNodes update cmd {node[self.TASK_SUBJOB]._command}' )
[docs] def addJobsToJobDj(self, jdj): """ Add leaf jobs to the JobDJ and all jobs with dependence set by setSubjobDependence() are added as well. :type jdj: 'queue.JobDJ' :param jdj: The `JobDJ` instance to register jobs. :return: The job queue for running commands/jobs in parallel under jobcontrol. :rtype: 'queue.JobDJ' """ for leaf_node in self.getLeafTaskNodes(): leaf_job = leaf_node[self.TASK_SUBJOB] jdj.addJob(leaf_job)
[docs] def draw(self): """ Draw the workflow with dependence and task name as png. """ if not schrodinger.in_dev_env(): return # write image file, Agg rather than QTAgg is supposed to be default backend (MATSCI-9819) # to prevent requiring graphics libraries in case running on a remote compute host from matplotlib import figure from matplotlib.backends.backend_agg import \ FigureCanvasAgg as FigureCanvas # Workflow layout could be possible with BLDMGR-4008, e.g., # from networkx.drawing.nx_agraph import graphviz_layout # pos = graphviz_layout(self.graphx, prog='dot') fig = figure.Figure([8, 8]) canvas = FigureCanvas(fig) ax = fig.add_subplot(111) file_ext = 'rworkflow.png' if self.spring_layout_pos else 'workflow.png' pos, labels = self._getPosAndLabels() networkx.drawing.nx_pylab.draw_networkx(self.graphx, pos=pos, labels=labels, ax=ax, node_color='w', font_color='b', node_size=600, font_size=12) scale = 1.5 ax.set_xlim([x * scale for x in ax.get_xlim()]) ax.set_ylim([x * scale for x in ax.get_ylim()]) ax.set_xticks([]) ax.set_yticks([]) ax.axis("off") fig.tight_layout() png_name = '_'.join([self.basename, file_ext]) canvas.print_figure(png_name) jobutils.add_outfile_to_backend(png_name, None)
def _getPosAndLabels(self): """ Get the positions and labels of the jobs in the graphx. The first time running this method sets the spring_layout_pos without replica. :return: index -> the node positions, index -> the node labels :rtype: dict, dict """ if not self.spring_layout_pos: pos = [[sum(x[1::2]), sum(x[::2])] for x in self.graphx.nodes] pos = {x: np.array(y) for x, y in zip(self.graphx.nodes, pos)} self.spring_layout_pos = networkx.spring_layout( self.graphx, pos=pos, k=5 / math.sqrt(self.graphx.order())) labels = { x: f"{self.workflow.getTaskName(x)} {','.join(map(str, x))}" for x in self.graphx.nodes } return self.spring_layout_pos.copy(), labels rlabels = {} for node_rindex in self.graphx.nodes: taskname = self.graphx.nodes[node_rindex][self.TASK_NAME] rlabels[node_rindex] = taskname lb_counter = collections.Counter(rlabels.values()) rlabels = {x: f'{y} ({lb_counter[y]})' for x, y in rlabels.items()} rpos = {} for node_rindex in self.graphx.nodes: try: rd_index = node_rindex.index(self.REPLICA_DELIM) except ValueError: node_index = node_rindex else: node_index = node_rindex[:rd_index] rpos[node_rindex] = self.spring_layout_pos[node_index] return rpos, rlabels
[docs] @classmethod def getTaskSubClass(cls, task_type, task_protocol): """ Get task subclass if the subclass is known by this driver, or a specific task driver path is provided in the config yaml setting block. :param task_type: The task type of this stage :type task_type: str :param task_protocol: The task protocol containing task information :type task_protocol: 'TaskProtocol' :raise TypeError: When the task_type is not supported :return: The corresponding task class :rtype: the class or subclass of 'Step' """ try: task_subclass = cls.TASK_TYPE_TO_SUBCLASS[task_type] except KeyError: if task_protocol.getDriverPath(): # Use generic subclass settings, e.g., $SCHRODINGER/run return Step raise TypeError(f"Task type {task_type} is not supported.") return task_subclass
[docs] @classmethod def writeConfig(cls, options, jobname): """ Command line options is a subset of all the flags, and thus write a new configure file based on command line options and configure template. NOTE: if a config_yaml is explicitly passed from the command line, The returned the full path of the final config file pointing to the original path from the command line. :param options: The parsed command line options :type options: `argparse.Namespace` :param jobname: The jobname based on which config yaml is generated. If False, no yaml file written out. :type jobname: str or False """ config_writer = ConfigWriter(options, jobname) config_writer.run()