Source code for schrodinger.application.matsci.jwsteps

"""
Steps for use in Workflows

Copyright Schrodinger, LLC. All rights reserved.
"""

import glob
import math
import os
import pathlib
import shutil

from schrodinger import structure
from schrodinger.application.desmond import cms
from schrodinger.application.macromodel import utils as mmodutils
from schrodinger.application.matsci import desmondutils
from schrodinger.application.matsci import fragments
from schrodinger.application.matsci import \
    jaguar_multistage_workflow_utils as jmswfu
from schrodinger.application.matsci import jaguarworkflows
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import msutils
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import msprops
from schrodinger.application.matsci import uq_utils
from schrodinger.infra import mm
from schrodinger.job import jobcontrol
from schrodinger.job import queue
from schrodinger.utils import fileutils
from schrodinger.utils import imputils
from schrodinger.utils import sea


# Import drivers
[docs]def import_driver(dpath): scripts = fileutils.get_mmshare_scripts_dir() return imputils.import_module_from_file(os.path.join(scripts, dpath))
DO_NOT_USE_FLAG = 'do_not_use_this_flag' SAVE_PROPS_PROPERTY = 's_matsci_save_props'
[docs]class JobCreationError(Exception): """ Raised when a job cannot be created """
[docs]class BaseWorkflow(jaguarworkflows.WorkFlow): """ Base Workflow class This class does not assume the first step is the main step. Instead, set the main_step property to the main step. This is the step that will get all the workflow properties added to it. The main step will also be the only step in the main PT group. All other steps will be in a "subjobs" subgroup. """
[docs] def __init__(self, *args, **kwargs): kwargs.setdefault('subhierarchy', 'subjobs') super().__init__(*args, **kwargs)
[docs]class SubdirectoryStepMixin: """ A mixin to handle some common functionality for steps that run in subdirectories """
[docs] def __init__(self, *args, **kwargs): """ See parent class for documentation """ super().__init__(*args, **kwargs) self.input_files = []
[docs] def finishProcessingJobControlJob(self): """ Add all the subjob files to the backend """ backend = jobcontrol.get_backend() if backend: if self.USES_JC: for ifile in self.input_files: ipath = os.path.join(self.job_name, ifile) backend.addOutputFile(ipath) jobutils.add_subjob_files_to_backend(self.job, path=self.job_name) else: # Subprocess jobs don't have a way to record files to copy # back, so just copy back the entire directory for afile in glob.iglob(f'{self.job_name}/*'): backend.addOutputFile(afile)
[docs] def setSubDir(self): """ Determine the absolute path to this step's subdirectory so that we can always access it no matter what the current directory is """ backend = jobcontrol.get_backend() if backend: job_path = backend.getJob().JobDir else: job_path = os.getcwd() self.subdir = os.path.join(job_path, self.job_name)
[docs]class JaguarOptimization(SubdirectoryStepMixin, jaguarworkflows.OptStep): """ Class to run a Jaguar optimization and possibly thermochemistry This class overrides many of the parent class methods to enable the step to run in a subdirectory. The parent class expects the job to run in the main workflow job directory. """ STEP_NAME = 'Optimization' JOB_BASE = 'opt' USES_JC = True
[docs] def __init__(self, *args, **kwargs): """ See parent class for documentation """ super().__init__(*args, **kwargs) self.job_name = self.job_name or (self.workflow.base_name + '_' + self.JOB_BASE) self.step_name = self.step_name or self.STEP_NAME self.setSubDir()
[docs] def writeInput(self): """ Override the parent method to work in the subjob directory """ for ext in ['.in', '.mae']: self.input_files.append(self.job_name + ext) return super().writeInput()
[docs] def createJob(self): """ Override the parent method to work in the subjob directory """ qjob = super().createJob() qjob._command_dir = self.subdir return qjob
[docs] def start(self): """ Override the parent method to work in the subjob directory """ os.makedirs(self.subdir, exist_ok=True) with fileutils.chdir(self.subdir): return super().start()
[docs] def getOutput(self, *args, **kwargs): """ Override the parent method to work in the subjob directory """ os.makedirs(self.subdir, exist_ok=True) with fileutils.chdir(self.subdir): output = super().getOutput(*args, **kwargs) if self.results: self.results.path = os.path.join(self.subdir, self.results.path) return output
[docs] def createSmapFile(self): """ Create the smap file that allows the surfaces to automatically import when the structure is manually imported into Maestro """ # Note that this method is not currently used, but is left here as an # example of how to fix subjobs run in a subdirectory so that surfaces # and vibrational files can be attached properly to the structures in # the main results file in the main job directory. # Writing the smap file manually is necessary due to JAGUAR-9464 vis_stems = ('potential', 'density') def _write_lines(smap_name, st_name, stub, num): with open(smap_name, 'w') as sfile: sfile.write('# smap version 1.0\n') sfile.write(st_name + '\n') for stem in vis_stems: sfile.write(f'{stub}_{stem}.vis: {num}\n') sfile.write('#end\n') # This writes the smap file for the subjob .01.mae file in the subjob # subdirectory. While not technically necessary, it makes it so the user # can import the structure manually and get the associated data. smap_name = self.job_name + '.01.smap' st_name = self.job_name + '.01.mae' stub = self.job_name # Here we use num = 1 with the expectation that this job produces a # single structure in the .01.mae file, and therefore the smap file # should point to the first (only) structure in that file. num = 1 with fileutils.chdir(self.subdir): _write_lines(smap_name, st_name, stub, num) jobutils.add_outfile_to_backend(os.path.join(self.subdir, smap_name)) # This writes the smap file for the main output structure file st_name = self.workflow.output_name stub = fileutils.get_basename(st_name) smap_name = stub + '.smap' # This number should be the index of the structure in the main results # (st_name) file. num=2 is completely made up for this example. num = 2 _write_lines(smap_name, st_name, stub, num) # Now copy the .vis files to the main directory and add main directory # files to the backend for stem in vis_stems: fname = f'{self.job_name}_{stem}.vis' path = os.path.join(self.subdir, fname) new_name = f'{stub}_{stem}.vis' shutil.copy(path, new_name) jobutils.add_outfile_to_backend(new_name) jobutils.add_outfile_to_backend(smap_name)
[docs]class BaseResults: """ A simple results class to provide the functionality from the jaguarworkflows.Results class that is still needed for non-Jaguar jobs """
[docs] def __init__(self, parent): """ Create a BaseResults class :param `jaguarworkflows.Step` parent: The class these results are for """ self.parent = parent
[docs] def getMaeStructure(self): """ Get the resulting Maestro structure for the parent class """ return self.parent.getStructForWriting()
[docs]class BaseStep(SubdirectoryStepMixin, jaguarworkflows.Step): """ A base class for Steps that are not simple Jaguar jobs. Because this step fires off a Python script that itself fires off Jaguar jobs, we have to modify a number of methods that dealt with the jaguar .in file directly or expected standard Jaguar output. """ PROCS = 1 RESULTS_CLASS = BaseResults STEP_NAME = 'Base Step' JOB_BASE = 'base' # The command line will be formed in the following order: # - driver path (if not None) # - flags in the self.flags dictionary, which is formed from: # - The class variable FLAGS # - flags passed in to the __init__ via the flags keyword (which # override values in the class variable FLAGS) # - input name if FLAGS_ADD_INPUT_NAME is True # - values in the self.cmd_args list # - -JOBNAME flag if FLAGS_ADD_JOB_NAME is True # - -TPP flag if FLAGS_ADD_TPP is True DRIVER_PATH = 'driver_dir/driver.py' DRIVER = None # For "-flag value" type flags, use FLAG[flag] = value # For "-flag_with_no_value" flags, use FLAG[flag] = None FLAGS = {} # Add the string given by getInputName() after the command line flags FLAGS_ADD_INPUT_NAME = True # Preface the input name with this command line flag - leave blank if the # input name is a positional parameter rather than prefaced by a -flag FLAGS_INPUT_NAME_FLAG = "" # Add -JOBNAME self.job_name after the command line flags FLAGS_ADD_JOB_NAME = True # Add -TPP options.TPP after the command line flags FLAGS_ADD_TPP = False # Use -- right before input file name to ensure keywords are terminated FLAGS_ADD_DOUBLE_DASH = False # Step might create a trajectory based on flag settings CAN_CREATE_TRAJECTORY = False # If step requires CMS input REQUIRES_CMS_INPUT = False # If step requires TRAJECTORY input REQUIRES_TRAJECTORY_INPUT = False # If the step outputs CMS OUTPUTS_CMS = False # If the step uses Job Control USES_JC = True # Property names in SAVE_PROPS will be saved to the main workflow structure SAVE_PROPS = set() # Property names that start with anything in SAVE_PROP_STARTS will be saved # to the main workflow structure SAVE_PROP_STARTS = set()
[docs] def __init__(self, *args, procs=PROCS, tag=None, flags=None, archive=False, monitor_globs=None, full_command=None, local_files=None, input_file=None, **kwargs): """ Create a BaseStep object :param int procs: The number of processors to allocate to steps of this class :param tag: str(tag) will be added to the end of the job name and step name :param dict flags: Command line argument flags in addition to, or that override, the class FLAGS variable. Keys are flag names ("-flag"), values are values for that flag. For flags that take no value, use None as the value. To turn off (not include in the command line) a flag in the class FLAGS variable, use the DO_NOT_USE_FLAG constant as the value. :param bool archive: If True, instead of copying the subdirectory and individual files, will tar.gz up the entire subdirectory and copy that back. :param list monitor_globs: List of valid file globs (i.e. `["*.zip"]`) that should be monitored for and copied back to the original job directory when they appear. Note that the interaction of archive and moitor_globs is such that if both are specified, the result will be a subdirectory with the monitored files AND an archive of the entire subdirectory (including any monitored files that remain when this step completes). :param list full_command: the full command line for this step including the driver path, etc. but without $SCHRODINGER/run. The command will be used "as is" except for replacing some pre-defined tokens with job-specific strings - see the createCommand function. Each item of the list is a word of the command line invocation. :param list local_files: A list of file paths to copy into the job subdirectory before running the command :param str input_file: The name of the file containing the input structures/data. It is ignored if there is a parent for this step. See parent class for additional documentation """ self.procs = procs super().__init__(*args, **kwargs) # self.results is set to None in the super class so set it afterwards self.results = self.RESULTS_CLASS(self) self.job_name = self.job_name or (self.workflow.base_name + '_' + self.JOB_BASE) self.step_name = self.step_name or self.STEP_NAME # Check tag not None instead of "if tag" because it could be 0 if tag is None: self.tag = tag else: self.tag = str(tag) self.job_name += '_' + self.tag self.step_name += ' ' + self.tag self.setSubDir() self.flags = self.FLAGS.copy() if flags: self.flags.update(flags) # These are command line positional arguments that will appear after all # the items in self.flags and after the input file name (if requested). self.cmd_args = [] self.full_command = full_command self.archive = archive self.monitor_globs = monitor_globs self.files_monitored = set() self.cmd = [] self.local_files = local_files or [] # Associate with input file only if there is no parent self.input_file = None if self.parent else input_file
[docs] def getParentalSubdirectories(self): """ Get the subdirectory names for parents and required stages :rtype: str, str :return: The parent subdirectory, and a space-separated string of all required stage subdirectories. The subdirectory paths are relative to the overall job directory. """ if self.parent: parent_subdir = os.path.basename(self.parent.subdir) else: parent_subdir = "" if self.noninheritable_parents: slist = [] for nih_parent in self.noninheritable_parents: slist.append(os.path.basename(nih_parent.subdir)) requires_subdirs = ' '.join(slist) else: requires_subdirs = "" return parent_subdir, requires_subdirs
[docs] def addParentalSubdirProperties(self, struct): """ Add parental subdirectory properties to the structure :param `schrodinger.structure.Structure` struct: The structure to add properties to """ parent_subdir, requires_subdirs = self.getParentalSubdirectories() if parent_subdir: struct.property[msprops.PARENT_SUBDIRECTORY] = parent_subdir if requires_subdirs: struct.property[msprops.REQUIRES_SUBDIRECTORIES] = requires_subdirs
[docs] def writeInput(self, struct): """ Write the input file :param `structure.Structure` or None struct: The structure to write. If None it will write the input file. """ iname = self.getInputName() if self.input_file: # Copy the input file to job folder shutil.copy(os.path.join(os.pardir, self.input_file), iname) else: self.addParentalSubdirProperties(struct) struct.write(iname) self.input_files.append(iname)
[docs] def start(self): """ Start the job - create the input and write it, adding necessary output files to make sure they get copied back """ # First check to see if there is an existing output file for this step - # this might have been created in a previous aborted run. if self.getOutput(quiet=True): self.log('Completed output found, will not run again') self.finish() return os.makedirs(self.subdir, exist_ok=True) for lfile in self.local_files: shutil.copy(lfile, self.subdir) struct = self.getStructure() with fileutils.chdir(self.subdir): self.writeInput(struct) try: self.job = self.createJob() except JobCreationError: self.log('Will not be run') return self.workflow.jobq.addJob(self.job) self.log('Added to job queue')
[docs] def removeIncomingProperties(self, struct): """ Remove properties on the structure that may have come from previous steps and that we do not want to propagate through this step :param `structure.Structure` struct: The structure to remove properties from """ if isinstance(struct, cms.Cms): # If this is a CMS-type structure, remove the trajectory property struct.remove_cts_property(desmondutils.PROP_TRJ) else: # This is an mae structure, perhaps from Jaguar msutils.remove_properties(struct, matches=['_j_'])
[docs] def getStructure(self): """ Get the starting structure for this step Overrides the parent class to get the initial structure for this workflow if the Step has no parent. :rtype: schrodinger.structure.Structure or None :return: The starting structure for this step. None if input is not a structure. """ struct = super().getStructure() if not struct: struct = self.workflow.struct # In case there is no parent input structure and step input structure. if struct: self.removeIncomingProperties(struct) return struct
def _getOutputData(self): """ Return the list of output structures from this calculation :rtype: list :return: The output structures from this step """ oname = os.path.join(self.subdir, self.getOutputName()) if not os.path.exists(oname): return output = list(structure.StructureReader(oname)) return output
[docs] def getOutput(self, quiet=False): """ Read in the results of the calculation :type quiet: bool :param quiet: If True, no error messages will be printed. If False, (default) error messages will be printed. Also, if True, self.ok will not be set to False if the output file cannot be read. :rtype: None or list :return: None if the calculation failed, or a list of output structures from a successful calculation. """ if not self.ok: return None if not quiet: self.ok = False output = self._getOutputData() if not output: if not quiet: oname = os.path.join(os.path.basename(self.subdir), self.getOutputName()) self.log(f'No output structures found, looked for {oname}') return None self.ok = True self.finished = True return output
[docs] def getStructForWriting(self): """ Get the structure from this step to write to the final Workflow file :rtype: `schrodinger.structure.Structure` :return: The structure to write """ structs = self._getOutputData() if structs: return structs[0] else: return None
[docs] def getInputName(self): """ Get the name of the input structure file :rtype: str :return: The input structure file name """ if self.input_file: _, extension = fileutils.splitext(self.input_file) else: extension = '.maegz' return self.job_name + extension
[docs] def getOutputName(self): """ Get the name of the output structure file :rtype: str :return: The output structure file name """ return self.job_name + '-out.maegz'
[docs] def finish(self): """ Finish this step, including processing any output """ output = self.getOutput() self.processOutput(output) self.finished = True self.log('Work completed')
[docs] def write(self, writer, props=None, hierarchy=None): """ Add the final structure for this step to the output structure file :type writer: schrodinger.StructureWriter :param writer: The writer to use to write the structure :type props: dict :param props: A dictionary of property/value pairs to add to the property dictionary of this object. :param list hierarchy: The project group hierarchy for this result - each item is a str """ if self.ok and self.finished: struct = self.getStructForWriting() if not struct: self.log('Unable to obtain results') return struct.title = self.job_name # Ensure the structure does not have an entry name of # Scratch, which prevents the structure from being incorporated struct.property['s_m_entry_name'] = self.job_name if props: struct.property.update(props) if hierarchy: msutils.set_project_group_hierarchy(struct, hierarchy, collapsed=True) # Add a property to help downstream analysis find files from this # subjob when given the overall job directory subbase = os.path.basename(self.subdir) struct.property[msprops.SUBDIRECTORY_PROP] = subbase writer.append(struct)
[docs] def createCommand(self): """ Create the command line :rtype: list :return: The command line in list form :raise JobCreationError: If the command cannot be created """ if self.full_command: # Use the command exactly as specified # Replacing the input name input_name = self.getInputName() ibase = fileutils.splitext(input_name)[0] mask = jmswfu.CommandLine.INPUT_MASK cmd = [x.replace(mask, ibase) for x in self.full_command] # Replace the parent step trajectory info tmask = jmswfu.CommandLine.TRAJ_MASK if tmask in cmd: if not self.parent: self.log(f'Command contains {tmask} but this step has no ' 'parent so there is no parent trajectory.') raise JobCreationError(f'No parent for {tmask}') try: trajdir = self.parent.getTrajectoryName() except AttributeError: trajdir = None if not trajdir: self.log('Failed to create job because command contains ' f'{tmask} but parent step has no trajectory ' 'information.') raise JobCreationError(f'Unable to replace {tmask}') if not os.path.exists(trajdir): self.log('Cannot find parent trajectory required for this ' f'step: {trajdir}') raise JobCreationError(f'Unable to find {trajdir}') cmd = [x.replace(tmask, trajdir) for x in cmd] else: cmd = [] if self.DRIVER_PATH: cmd += [self.DRIVER_PATH] for flag, value in self.flags.items(): if value == DO_NOT_USE_FLAG: continue cmd += [flag] if value is not None: cmd += [str(value)] if (self.FLAGS_ADD_TPP and self.workflow.options.TPP and self.workflow.options.TPP > 1): cmd += [parserutils.FLAG_TPP, str(self.workflow.options.TPP)] # Add to ensure parsing of the keywords ends before positional # arguments like the input file if self.FLAGS_ADD_DOUBLE_DASH: cmd += ['--'] # Add the input file name if self.FLAGS_INPUT_NAME_FLAG: cmd += [self.FLAGS_INPUT_NAME_FLAG] if self.FLAGS_ADD_INPUT_NAME: cmd += [self.getInputName()] cmd.extend(self.cmd_args) if self.FLAGS_ADD_JOB_NAME: cmd += ['-JOBNAME', self.job_name] return cmd
[docs] def createJob(self): """ Create the job command and object :rtype: `jobutils.RobustSubmissionJob` :return: The job object to run Macromodel """ self.cmd = self.createCommand() return self.createQJobFromCommand(self.cmd)
[docs] def createQJobFromCommand(self, cmd): """ Create the JobDJ job from the list of command line arguments. The job will be set to run in the Step subdirectory. :rtype: `jobutils.RobustSubmissionJob` :return: A job object that can be added to JobDJ """ if self.USES_JC: return jobutils.RobustSubmissionJob(cmd, command_dir=self.subdir, procs=self.procs) else: if self.procs > 1: raise JobCreationError('Cannot run steps that do not use ' 'job control but use more than 1 ' 'processor') return queue.SubprocessJob(cmd, command_dir=self.subdir)
[docs] def processOutput(self, output): """ Process the output of a job during the finish part of the step :param output: The output of the job. Type may vary in subclasses """ if not output: return struct = output[0] propnames = struct.property.get(SAVE_PROPS_PROPERTY, "") userprops = set(propnames.split()) if not any((self.SAVE_PROPS, userprops, self.SAVE_PROP_STARTS)): return for prop, value in struct.property.items(): if prop in self.SAVE_PROPS or prop in userprops: self.workflow.properties[prop] = value else: for pstart in self.SAVE_PROP_STARTS: if prop.startswith(pstart): self.workflow.properties[prop] = value
[docs] def archiveSubDir(self): """ Archive this step's subdirectory and add it to the backend for copy back """ aname = self.subdir + '.tar.gz' jobutils.archive_job_data(aname, [self.subdir]) jobutils.add_outfile_to_backend(aname)
[docs] def finishProcessingJobControlJob(self): """ Override the parent method to archive the directory if requested """ if self.archive: self.archiveSubDir() else: super().finishProcessingJobControlJob()
[docs] def periodicMaintenance(self): """ This method is periodically called while the workflow is running """ self.monitorFiles()
[docs] def monitorFiles(self): """ Check for any requested files that need to be copied back immediately """ if not self.monitor_globs: return backend = jobcontrol.get_backend() if not backend: return # Run the recursive glob from the subjob directory not the main # directory - we only want to find files for this step with fileutils.chdir(self.subdir): for mglob in self.monitor_globs: for mfile in pathlib.Path().rglob(mglob): # Paths for jobcontrol must be relative to the main # directory mpath = os.path.join(self.subdir, mfile) if mpath not in self.files_monitored: backend.copyOutputFile(mpath) self.files_monitored.add(mpath)
[docs]class MacromodelConfSearchStep(BaseStep): """ Step to perform a macromodel conformational search """ STEP_NAME = 'ConfSearch' JOB_BASE = 'csearch' DRIVER_PATH = 'bmin' FLAGS = {} FLAGS_ADD_INPUT_NAME = False FLAGS_ADD_JOB_NAME = False
[docs] def __init__(self, *args, **kwargs): """ See parent class for documentation """ super().__init__(*args, **kwargs) self.cmd_args.append(self.job_name)
[docs] def getComUtil(self): """ Get the macromodel com utility that holds the settings to write :rtype: `schrodinger.application.macromodel.utils.comUtil` :return: The comUtil object with settings """ # Note - these settings reproduce # the Conformational search GUI default settings when no solvent is # selected mcu = mmodutils.ComUtil( serial=True, ffld=desmondutils.OPLS3, # Energy window for saving structures (kJ/mol) demx_final=21.0, demx_prelim=42.0, # Write the EXNB line (cutoff parameters) exnb=False, # Don't write the NANT line (enantiomers) nant=True) mcu.serial_nonconf_input = True mcu.BDCO[5] = 41.5692 # pure low modes explored mcu.LMCS[3] = 0 # allowed approach distance in A mcu.LMCS[6] = 0 mcu.DEMX[2] = 833 mcu.MCNV[2] = 5 # Do not use symmetry library mcu.MSYM[1] = 0 # Number of steps per rotatable bond mcu.AUOP[5] = 100 # tors, new setup mcu.AUTO[5] = 0.0 # minimum ring size to setup mcu.AUTO[7] = 0 # 'non-restrictive' sampling (MacroModel 9.0 deflault) mcu.AUTO[8] = 1 # max energy diff required to skip geometric comp mcu.CRMS[5] = 0.0 # max dist between 'equivalent' corresponding atoms mcu.CRMS[6] = 0.5 # maximum dihedral angle change for polar hydrogens mcu.CRMS[7] = 0.0 # minimize converge on gradient mcu.CONV[5] = 0.05 # iterations mcu.MINI[3] = 2500 # hessian cutoff, where appropriate mcu.MINI[6] = 0.0 return mcu
[docs] def writeCom(self): """ Write out the Macromodel com file """ mcu = self.getComUtil() iname = self.getInputName() cname = self.getComName() oname = self.getOutputName() mcu.mcmmlmod(mae_file=iname, com_file=cname, out_file=oname) self.input_files.append(cname)
[docs] def getComName(self): """ Get the name of the com file for this step :rtype: str :return: The name of the com file """ return self.job_name + '.com'
[docs] def writeInput(self, struct): """ Override the parent method to also write the com file """ super().writeInput(struct) self.writeCom()
[docs]class VoidStep(BaseStep): """ A step for running the Distributed Voids driver """ STEP_NAME = 'Distribute voids' JOB_BASE = 'voids' DRIVER_PATH = 'distribute_voids_gui_dir/distribute_voids_driver.py' DRIVER = import_driver(DRIVER_PATH) FLAGS = {DRIVER.FLAG_VOID_PCT: 10, DRIVER.FLAG_FORCE: None}
[docs] def getOutputName(self): """ Get the name of the output structure file :rtype: str :return: The output structure file name """ return self.job_name + '-voids.maegz'
[docs]class BaseMetaJaguarWorkflowStep(BaseStep): """ Base Step for MatSci workflows that themselves run Jaguar subjobs """ FLAGS_ADD_TPP = True
[docs]class BDEStep(BaseMetaJaguarWorkflowStep): """ A step for running the Bond Dissociation Energy workflow """ KEYS = { 'dftname': 'B3LYP', 'basis': 'MIDIXL', 'igeopt': 1, 'iaccg': 3, 'maxit': 100, 'nofail': 1, 'isymm': 0, 'nops_opt_switch': 10 } STEP_NAME = 'BDE' JOB_BASE = 'bde' DRIVER_PATH = 'bond_dissociation_gui_dir/bond_dissociation_driver.py' DRIVER = import_driver(DRIVER_PATH) FLAGS = {'-states': 's0', '-bonds': None, '-allow_hx': 'no'} FLAGS_ADD_DOUBLE_DASH = True
[docs] def __init__(self, *args, **kwargs): """ Create a BDEStep instance. See parent class for additional documentation. """ super().__init__(*args, **kwargs) keystring = msutils.keyword_dict_to_string(self.KEYS) self.flags['-keywords'] = keystring self.cmd_args.append(self.getOutputName())
[docs] def getStructsOfType(self, stype): """ A generator for structures of the given type from BDE results :param str stype: The type of structure to get 'Reactants', 'Reactions', 'Fragments' :rtype: Iterator :return: Iterator of `schrodinger.Structure` for the request type """ structs = self.getOutput() if not structs: return for struct in self.getOutput(): archy = msutils.get_project_group_hierarchy(st=struct) if archy[-1] == stype: yield struct
[docs] def processOutput(self, output): """ Find the minimum BDE and store it as a workflow property :param output: Unused, kept for API compatibility """ struct = self.getStructForWriting() bde = struct.property.get(msprops.WEAKEST_BDE_PROP) if bde is not None: self.workflow.properties[msprops.WEAKEST_BDE_PROP] = bde
[docs] def getStructForWriting(self): """ Get the structure to write for this step. It will be the reactant structure :rtype: `schrodinger.Structure` :return: The reactant structure """ return next(self.getStructsOfType(fragments.REACTANTS))
[docs]class HOFStep(BaseMetaJaguarWorkflowStep): """ A step for running the Heat of Formation workflow """ PROCS = 2 STEP_NAME = 'HOF' JOB_BASE = 'hof' DRIVER_PATH = 'jaguar' FLAGS = { 'run': None, 'deltah.py': None, '-zpe': None, '-scalfr': '0.9806', '-method_sp': 'M06-2X' } FLAGS_ADD_JOB_NAME = False HOF_PROP = 'r_j_Delta_H_of_Formation(298K)_(kcal/mol)' SAVE_PROPS = {HOF_PROP}
[docs] def __init__(self, *args, high=False, **kwargs): """ Create a HOFStep instance. :param bool high: True if we are computing heat of formation at the high level of accuracy See parent class for additional documentation. """ super().__init__(*args, **kwargs) self.flags['-jobname'] = self.job_name if high: self.flags['-basis_sp'] = '6-311++G-3df-3pd' else: self.flags['-basis_sp'] = 'LACVP*' self.flags['-ps'] = None
[docs] def outputStructurePath(self): """ Get the path to the subjob subdirectory that contains the results :rtype: str :return: The path to the subdirectory inside the subjob's directory that holds the results. """ oname = os.path.join(self.subdir, self.job_name + '_deltah', self.getOutputName()) return oname
def _getOutputData(self): """ Get the output structure :rtype: list :return: The one item of the list is the output structure """ oname = self.outputStructurePath() if not os.path.exists(oname): return output = list(structure.StructureReader(oname)) return output
[docs] def getInputName(self): """ Get the name of the input structure file :rtype: str :return: The input structure file name """ # The HOF workflow oddly can't take .maegz files, only .mae return self.job_name + '.mae'
[docs] def getOutputName(self): """ Get the name of the output structure file :rtype: str :return: The output structure file name """ return self.job_name + '_deltah.mae'
[docs]class CustomMaeStep(BaseStep): """ A step for running custom scripts that output a .mae file """ STEP_NAME = 'Custom' JOB_BASE = 'custom' USES_JC = False
[docs]class BaseDesmondResults(BaseResults): """ A simple results class to provide the functionality from the jaguarworkflows.Results class that is still needed for Desmond jobs """
[docs] def getMaeStructure(self): """ Get the resulting Maestro structure for the parent class """ return self.parent.getCMSSystem()
[docs]class BaseDesmondStep(BaseStep): """ Base class for steps that use Desmond as the main engine or produce a cms file """ RESULTS_CLASS = BaseDesmondResults # If step requires CMS input REQUIRES_CMS_INPUT = True # If the step outputs CMS OUTPUTS_CMS = True
[docs] def __init__(self, *args, maestro_in=False, **kwargs): """ Create a BaseDesmondStep instance :param bool maestro_in: If true, the class will write a Maestro input file (.maegz) rather than a CMS (.cms) file See parent class for additional documentation """ self.maestro_in = maestro_in if maestro_in: self.REQUIRES_CMS_INPUT = False super().__init__(*args, **kwargs)
[docs] def addParentalSubdirProperties(self, struct): """ Add parental subdirectory properties to the structure :type struct: `cms.Cms` or `schrodinger.structure.Structure` :param struct: The structure to add properties to """ if isinstance(struct, cms.Cms): parent_subdir, requires_subdirs = self.getParentalSubdirectories() if parent_subdir: struct.set_cts_property(msprops.PARENT_SUBDIRECTORY, parent_subdir) if requires_subdirs: struct.set_cts_property(msprops.REQUIRES_SUBDIRECTORIES, requires_subdirs) else: super().addParentalSubdirProperties(struct)
def _getOutputData(self): """ Return the list of output structures from this calculation :rtype: list :return: The single item is the output structure from this step as a Cms system """ oname = os.path.join(self.subdir, self.getOutputName()) if not fileutils.is_cms_file(oname): # This handles steps like the Disordered System Builder whose output # type can change based on command line flags return super()._getOutputData() if not os.path.exists(oname): return output = [cms.Cms(oname)] return output
[docs] def getCMSSystem(self): """ Get the output CMS system :rtype: `cms.Cms` :return: The output system """ return super().getStructForWriting()
[docs] def getStructForWriting(self): """ Get the output Maestro structure derived from the output CMS system :rtype: `schrodinger.Structure` :return: The output Maestro structure """ struct = super().getStructForWriting() try: struct = struct.fsys_ct except AttributeError: # This is not a CMS structure (some steps can output mae or cms # depending on options) pass # Desmond adds project group hierarchy properties, remove them props = [ mm.M2IO_DATA_SUBGROUP_TITLE, mm.M2IO_DATA_SUBGROUPID, mm.M2IO_DATA_SUBGROUP_COLLAPSED ] msutils.remove_properties(struct, props=props) # Make the trajectory path point to the Step subdirectory trj = struct.property.get(desmondutils.PROP_TRJ) if trj: # Use / instead of os.path.join so it works on all platforms struct.property[desmondutils.PROP_TRJ] = f'{self.job_name}/{trj}' return struct
[docs] def getOutputName(self): """ Get the name of the output structure file :rtype: str :return: The output structure file name """ return self.job_name + '-out.cms'
[docs] def getInputName(self): """ Get the name of the output structure file :rtype: str :return: The output structure file name """ if self.maestro_in or self.input_file: return BaseStep.getInputName(self) else: return self.job_name + '.cms'
[docs] def getTrajectoryName(self): """ Get the name of the output trajectory directory :rtype: str or None :return: The output trajectory directory name or None if this step does not create trajectories """ if self.CAN_CREATE_TRAJECTORY: return os.path.join(self.subdir, self.job_name + '_trj') else: return None
[docs]class DisorderedSystemBuilderStep(BaseDesmondStep): """ A step that runs the Disordered System Builder. """ STEP_NAME = 'Disordered system' JOB_BASE = 'dsb' DRIVER_PATH = ('disordered_system_builder_gui_dir/' 'disordered_system_builder_driver.py') DRIVER = import_driver(DRIVER_PATH) FLAGS = { '-grid': None, '-pack': None, '-split_components': None, '-forcefield': desmondutils.OPLS3 } # If step requires CMS input REQUIRES_CMS_INPUT = False
[docs] def __init__(self, *args, **kwargs): """ Create a DisorderedSystemBuilderStep instance See parent class for additional documentation """ super().__init__(*args, maestro_in=True, **kwargs) self.cmd_args.append(self.job_name)
[docs] def getOutputName(self): """ Get the name of the output structure file :rtype: str :return: The output structure file name """ if self.DRIVER.NO_SYSTEM_FLAG in self.cmd: return self.job_name + '_amorphous.maegz' else: return self.job_name + '_system-out.cms'
[docs]class PolymerBuilderStep(BaseDesmondStep): """ A step that runs the Polymer Builder. """ STEP_NAME = 'Polymer Builder' JOB_BASE = 'polybuild' DRIVER_PATH = ('polymer_builder_gui_dir/polymer_builder_driver.py') DRIVER = import_driver(DRIVER_PATH) # If step requires CMS input REQUIRES_CMS_INPUT = False
[docs] def __init__(self, *args, **kwargs): """ Create a PolymerBuilderStep instance See parent class for additional documentation """ super().__init__(*args, maestro_in=True, **kwargs) if not self.parent and not self.input_file: raise RuntimeError('The structure file must be provided')
[docs] def getOutputName(self): """ Get the name of the output structure file :rtype: str :return: The output structure file name """ if self.DRIVER.FLAG_NO_SYSTEM in self.cmd: return self.job_name + '-amcell.maegz' else: return self.job_name + '_system-out.cms'
[docs] def getInputName(self): """ Get the name of the input structure file :rtype: str :return: The input structure file name """ # Polymer builder should always take in maegz file return self.job_name + '.maegz'
[docs] def writeInput(self, *args, **kwargs): """ Overrides the parent method to copy the full input structure file or list of structures from parent rather than write a single structure. """ iname = self.getInputName() # Get output structure from parent and write it. These set of # structures should be polymer moeity (monomers + initiator + # terminator). if self.parent: struct_list = self.parent.getOutput() else: # Read structures from input file struct_list = list( structure.StructureReader( os.path.join(os.pardir, self.input_file))) for struct in struct_list: self.addParentalSubdirProperties(struct) with structure.StructureWriter(iname) as st_writer: st_writer.extend(struct_list) self.input_files.append(iname)
[docs] def removeIncomingProperties(self, *args, **kwargs): """ Override the parent method to do nothing since we don't have a single simple input structure and the properties do not survive the polymer builder. """ return
[docs]class PrepForMDStep(BaseDesmondStep): """ A step that runs Prepare for MD to create a Desmond system. """ STEP_NAME = 'Prepare for MD' JOB_BASE = 'prepmd' DRIVER_PATH = 'prepare_for_md_gui_dir/prepare_for_md_driver.py' DRIVER = import_driver(DRIVER_PATH) FLAGS = { parserutils.FLAG_FORCEFIELD: desmondutils.OPLS3, parserutils.SPLIT_COMPONENTS_FLAG: None } # If step requires CMS input REQUIRES_CMS_INPUT = False
[docs] def __init__(self, *args, **kwargs): """ Create a PrepForMDStep object See parent class for documentation """ super().__init__(*args, maestro_in=True, **kwargs) self.struct_title = ""
[docs] def getStructure(self): """ See parent class for documentation """ struct = super().getStructure() self.struct_title = struct.title return struct
[docs] def getOutputName(self): """ Get the name of the output structure file :rtype: str :return: The output structure file name """ return self.struct_title + '_system-out.cms'
[docs]class MolecularDynamicsStep(BaseDesmondStep): """ A step that runs one or more Desmond MD stages """ STEP_NAME = 'Relaxation' JOB_BASE = 'relax' # The last percent of the frames that are used for analysis property # averaging ANALYSIS_FRAME_PERCENT = 20 CAN_CREATE_TRAJECTORY = True
[docs] def __init__(self, *args, analysis=False, average=None, **kwargs): """ Create a MolecularDynamicsStep object :param bool analysis: Whether to perform a trajectory analysis as the last stage :param int average: What percent of the final stage trajectory to average the cell size over See parent class for documentation """ super().__init__(*args, **kwargs) self.num_stages = 0 self.analysis = analysis if average and average < 0: raise RuntimeError('average must be > 0') self.average = average
[docs] def addVelocityIfNeeded(self, kwargs): """ Add the velocity write keyword if there will be an analysis stage :param dict kwargs: dict of current keyword/value pairs - modified if velocity is needed """ if self.analysis: kwargs[desmondutils.MSJStringer.TRJ_WRITE_VEL] = True
[docs] def getStringers(self): """ Create the stringers that define each stage :rtype: list :return: A list of MSJStringer objects """ # Note - it's important for velocities to be written if this stage will # have analysis done kwargs = {} self.addVelocityIfNeeded(kwargs) example = desmondutils.MDMSJStringer(last=True, **kwargs) return [example]
[docs] def writeMSJFile(self): """ Write the MSJ file for this step """ stringers = self.getStringers() if self.analysis: stringers.append(desmondutils.MSAnalysisMSJStringer()) if self.average: kwargs = { desmondutils.AveCellMSJStringer.PERCENT_TO_AVG: self.average } stringers.append(desmondutils.AveCellMSJStringer(**kwargs)) mname = self.getMSJName() desmondutils.create_msj(stringers, mname) self.input_files.append(mname) # Add 1 for the initial "auto" stage self.num_stages = len(stringers) + 1
[docs] def writeInput(self, struct): """ Write the input structure file and msj file :param `structure.Structure` or None struct: The structure to write. If None it will write the input file. """ super().writeInput(struct) self.writeMSJFile()
[docs] def getMSJName(self): """ Get the name of the MSJ file :rtype: str :return: The MSJ file name """ return self.job_name + '.msj'
[docs] def createJob(self): """ Form the command line and create a job for the queue :rtype: `jobutils.RobustSubmissionJob` :return: The job to add to the JobDJ """ cmd = desmondutils.get_multisim_command(self.getInputName(), self.getOutputName(), msj_name=self.getMSJName(), job_name=self.job_name) return self.createQJobFromCommand(cmd)
[docs] def processOutput(self, output): """ Process the output of a job during the finish part of the step :param output: The output of the job. If it evaluates to False, no processing is done. Otherwise it is unused. """ super().processOutput(output) if not output or not self.analysis: return # Define the properties to save properties = { 'DensityResult': msprops.DENSITY_PROP, 'VolumeResult': msprops.VOLUME_PROP, 'EneCohesiveResult': msprops.COHESIVE_PROP, 'HeatVaporizationResult': msprops.HEAT_VAP_PROP, 'SolubilityParameterResult': msprops.SOLUBILITY_PROP, 'SolubilityParameterVdwSqResult': msprops.SOLUBILITY_VDW_PROP, 'SolubilityParameterEleSqResult': msprops.SOLUBILITY_ELE_PROP, 'SpecificHeatResult': msprops.SPECIFIC_HEAT_PROP } # Read the output file tried_paths = [] for propext in ('eaf', 'st2'): eaf_name = f'{self.job_name}_{self.num_stages}-out.{propext}' eaf_path = os.path.join(self.subdir, eaf_name) tried_paths.append(eaf_path) if os.path.exists(eaf_path): break else: self.log('Could not find analysis output file, tried:') for tpath in tried_paths: self.log(tpath) return with open(eaf_path, 'r') as eaf_file: analysis = sea.Map(eaf_file.read()) # We'll average over the last 20% of frames total_frames = analysis.TrajectoryNumFrames.val # Use the last 20% of frame data for averages ave_frames = math.ceil(total_frames * self.ANALYSIS_FRAME_PERCENT / 100) # Compute all the properties and save them bulk = analysis.Keywords[0].Bulk for seaprop, stprop in properties.items(): if stprop == msprops.SPECIFIC_HEAT_PROP: try: average = bulk[seaprop][1].val except KeyError: continue else: try: sea_atoms = bulk[seaprop][-ave_frames:] except KeyError: continue average = sum(x.val for x in sea_atoms) / ave_frames self.workflow.properties[stprop] = average
[docs]class BaseRelaxationStep(MolecularDynamicsStep): """A base class for relaxation step""" STEP_NAME = 'BaseRelaxation' JOB_BASE = 'baserelax'
[docs] def __init__(self, *args, temp=300.0, **kwargs): """ Create a base relaxation object :param float temp: Temperature for the final relaxation step See parent class for documentation """ super().__init__(*args, **kwargs) self.temp = temp
[docs] def getStringers(self): """ Create the stringers that define each stage :raise NotImplementedError: Overwrite this method :rtype: str :return: A list of MSJStringer objects """ raise NotImplementedError
[docs]class MatSciMDRelaxationStep(BaseRelaxationStep): """ A step that runs a MatSci MD relaxation protocol """ STEP_NAME = 'MSRelaxation' JOB_BASE = 'msrelax'
[docs] def getStringers(self): """ Create the stringers that define each stage :rtype: str :return: A list of MSJStringer objects """ return desmondutils.get_materials_relaxation_stringers( temp=self.temp, compress_last=False)
[docs]class CompressiveRelaxationStep(BaseRelaxationStep): """ A step that runs a compressive relaxation protocol """ STEP_NAME = 'CompressRelaxation' JOB_BASE = 'comprelax'
[docs] def getStringers(self): """ Create the stringers that define each stage :rtype: str :return: A list of MSJStringer objects """ return desmondutils.get_compressive_relaxation_stringers( temp=self.temp, compress_last=False)
[docs]class SemiCrystalRelaxation1Step(BaseRelaxationStep): """ A step that runs first Semi-Crystalline relaxation protocol """ STEP_NAME = 'SemiCrystalRelaxation1' JOB_BASE = 'semicrystalrelax1'
[docs] def getStringers(self): """ Create the stringers that define each stage :rtype: str :return: A list of MSJStringer objects """ return desmondutils.get_semicrystalline_relaxation1_stringers( temp=self.temp, compress_last=False)
[docs]class SemiCrystalRelaxation2Step(BaseRelaxationStep): """ A step that runs second Semi-Crystalline relaxation protocol """ STEP_NAME = 'SemiCrystalRelaxation2' JOB_BASE = 'semicrystalrelax2'
[docs] def getStringers(self): """ Create the stringers that define each stage :rtype: str :return: A list of MSJStringer objects """ return desmondutils.get_semicrystalline_relaxation2_stringers( temp=self.temp, compress_last=False)
[docs]class SingleMDStep(MolecularDynamicsStep): """ A step that runs a single MD simulation """ STEP_NAME = 'md' JOB_BASE = 'md' STRINGER_CLASS = desmondutils.MDMSJStringer
[docs] def __init__(self, *args, params=None, **kwargs): """ Create a MatSciMDRelaxationStep object :param dict temp: Keyword arguments to pass to the `STRINGER_CLASS` See parent class for documentation """ super().__init__(*args, **kwargs) self.params = params or {} # Don't compress this step if 'last' not in self.params: self.params['last'] = True
[docs] def getStringers(self): """ Create the stringers that define each stage :rtype: list :return: A list of MSJStringer objects """ # Note - it's important for velocities to be written if this stage will # have analysis done self.addVelocityIfNeeded(self.params) # Note - it's important for velocities to be written if this stage will # have analysis done return [self.STRINGER_CLASS(**self.params)]
[docs]class SingleBrownieStep(SingleMDStep): """ A step that runs a single Brownie step """ STEP_NAME = 'brownie' JOB_BASE = 'brownie' STRINGER_CLASS = desmondutils.BrownieMSJStringer
[docs]class TgStep(BaseDesmondStep): """ A step that runs a Tg calculation """ STEP_NAME = 'Tg' JOB_BASE = 'tg' DRIVER_PATH = ('thermophysical_properties_gui_dir/' 'thermophysical_properties_driver.py') DRIVER = import_driver(DRIVER_PATH) FLAGS = { DRIVER.MIN_TEMP_FLAG: 200, DRIVER.MAX_TEMP_FLAG: 600, DRIVER.TEMP_STEP_FLAG: 25, DRIVER.DENSITY_TIME_FLAG: 20, DRIVER.DENSITY_CYCLES_FLAG: 5, DRIVER.JOB_ORDER_FLAG: DRIVER.MAX_FIRST, DRIVER.LOAD_STRUC_FLAG: 200.0, DRIVER.INTERVAL_FLAG: 200.0 } FLAGS_INPUT_NAME_FLAG = '-icms' SAVE_PROP_STARTS = {msprops.TG_DENSITY_LEAD} CAN_CREATE_TRAJECTORY = True
[docs] def __init__(self, *args, melting=False, bilinear=False, **kwargs): """ Create a TgStep object :param bool melting: If True, temps will run from low to high. Otherwise, (default), they will run from high to low. :param bool bilinear: If True, the Tg fit will be done via the bilinear method. Otherwise, (default), it will be done via hyperbola See parent class for additional documentation """ super().__init__(*args, **kwargs) self.melting = melting self.bilinear = bilinear if self.melting: self.flags['-job_order'] = self.DRIVER.MIN_FIRST
[docs] def processOutput(self, output): """ Process the output of a job during the finish part of the step. This computes the Tg via fit of temperature vs density. :param list output: The output of the job. The first item should be the structure (Cms or Structure) that contains the density vs temperature properties. """ super().processOutput(output) if not output: return data = [] system = output[0] for key, value in system.property.items(): if key.startswith(msprops.TG_DENSITY_LEAD): temp = float(key.split('_')[4]) density = float(system.property[key]) data.append((temp, density)) if self.bilinear: tg = uq_utils.get_bilinear_prop_from_points(data) else: tg = uq_utils.get_hyperbola_prop_from_points(data) if not tg: self.log('Unable to obtain Tg value from fit to density data') return if self.melting: prop = msprops.TM_PROP else: prop = msprops.TG_PROP if self.tag: prop += '_' + self.tag self.workflow.properties[prop] = tg
[docs]class StressStrainStep(BaseDesmondStep): """ A step that runs a Stress-Strain calculation """ STEP_NAME = 'Stress' JOB_BASE = 'stress' DRIVER_PATH = 'stress_strain_gui_dir/stress_strain_driver.py' DRIVER = import_driver(DRIVER_PATH) SAVE_PROP_STARTS = {'r_matsci_Stress', 'r_matsci_Strain'} CAN_CREATE_TRAJECTORY = True
[docs]class PolymerCrosslinkStep(BaseDesmondStep): """ A step that runs a polymer crosslink calculation """ STEP_NAME = 'Crosslinking' JOB_BASE = 'xlink' FLAGS_INPUT_NAME_FLAG = '-icms' DRIVER_PATH = 'polymer_crosslink_gui_dir/polymer_crosslink_driver.py' DRIVER = import_driver(DRIVER_PATH)
[docs]class ElasticConstantsStressStep(BaseDesmondStep): """ A step that runs a stress-based elastic constants calculation """ STEP_NAME = 'Estress' JOB_BASE = 'estress' FLAGS_INPUT_NAME_FLAG = '-icms' DRIVER_PATH = 'elastic_constants_gui_dir/elastic_constants_driver2.py' DRIVER = import_driver(DRIVER_PATH) SAVE_PROPS = { msprops.EL_Y_MOD, msprops.EL_UNI_ANI, msprops.EL_P_RATIO, msprops.EL_LAMBDA, msprops.EL_MU, msprops.EL_SHEAR, msprops.EL_BULK } CAN_CREATE_TRAJECTORY = True
[docs]class ElasticConstantsStep(ElasticConstantsStressStep): """ A step that runs a non-stress elastic constants calculation """ STEP_NAME = 'Elastic' JOB_BASE = 'elastic' DRIVER_PATH = 'elastic_constants_gui_dir/elastic_constants_driver.py' DRIVER = import_driver(DRIVER_PATH) SAVE_PROP_STARTS = {'r_matsci_Strain'} # If the step outputs CMS OUTPUTS_CMS = False
[docs] def getOutputName(self): """ Get the name of the output structure file :rtype: str :return: The output structure file name """ return self.job_name + '_md-out.mae'
[docs]class PenetrantLoadingStep(BaseDesmondStep): """ A step that runs a Penetrant Loading calculation """ STEP_NAME = 'pload' JOB_BASE = 'pload' DRIVER_PATH = ('penetrant_loading_gui_dir/penetrant_loading_driver.py') DRIVER = import_driver(DRIVER_PATH) CAN_CREATE_TRAJECTORY = True
[docs] def getOutputName(self): """ Get the name of the output structure file :rtype: str :return: The output structure file name """ return self.job_name + self.DRIVER.OUTFILE_EXTENSION
[docs] def getTrajectoryName(self): """ Get the name of the output trajectory directory :rtype: str :return: The output trajectory directory name """ return os.path.join(self.subdir, self.job_name + '-multisim_trj')
[docs]class PolymerChainAnalysisStep(BaseDesmondStep): """ A step that runs a Polymer Chain Analysis """ STEP_NAME = 'pcan' JOB_BASE = 'pcan' DRIVER_PATH = ('polymer_chain_analysis_gui_dir/' 'polymer_chain_analysis_driver.py') DRIVER = import_driver(DRIVER_PATH) SAVE_PROPS = { DRIVER.PERSISTENCE_LENGTH, DRIVER.END_TO_END_DISTANCE, DRIVER.MEAN_SQUARED_END_DISTANCE, DRIVER.EXTENDED_CHAIN_LENGTH, DRIVER.RADIUS_OF_GYRATION, DRIVER.MEAN_SQUARED_RADIUS_OF_GYRATION, DRIVER.MEAN_FRACTIONAL_ANISOTROPY, DRIVER.END_DIST_FILE, DRIVER.END_N_DIST_FILE, DRIVER.RG_FILE, DRIVER.TORSIONAL_FILE, DRIVER.ORDER_FILE, DRIVER.START_TIME, DRIVER.END_TIME } # If step requires TRAJECTORY input REQUIRES_TRAJECTORY_INPUT = True
[docs] def getOutputName(self): """ Get the name of the output structure file :rtype: str :return: The output structure file name """ return self.job_name + '-trj.cms'
[docs]class StructureFactorStep(BaseDesmondStep): """ A step that runs a Structure Factor calculation """ STEP_NAME = 'sfactor' JOB_BASE = 'sfactor' DRIVER_PATH = 'structure_factor_gui_dir/structure_factor_driver.py' DRIVER = import_driver(DRIVER_PATH) SAVE_PROPS = {'s_matsci_sq_method', 'r_matsci_sq_resolution'} FLAGS_INPUT_NAME_FLAG = '-cms_file' # If step requires TRAJECTORY input REQUIRES_TRAJECTORY_INPUT = True
[docs]class SimulationProfileStep(BaseDesmondStep): """ A step that runs a Simulation Profile calculation """ STEP_NAME = 'profile' JOB_BASE = 'profile' DRIVER_PATH = ('trajectory_density_analysis_gui_dir/' 'trajectory_density_analysis_driver.py') DRIVER = import_driver(DRIVER_PATH) FLAGS_INPUT_NAME_FLAG = '-cms_file' # If step requires TRAJECTORY input REQUIRES_TRAJECTORY_INPUT = True
[docs]class CustomCmsStep(BaseDesmondStep): """ A step for running custom scripts that output a .cms file """ STEP_NAME = 'Custom' JOB_BASE = 'custom' USES_JC = False
[docs]class CustomCmsToMaeStep(CustomCmsStep): """ A step for running custom scripts that take in a .cms file and output .maegz """ STEP_NAME = 'Custom' JOB_BASE = 'custom' OUTPUTS_CMS = False
[docs] def getOutputName(self): """ Get the name of the output structure file :rtype: str :return: The output structure file name """ return self.job_name + '-out.maegz'
[docs]class CustomTrajectoryStep(CustomCmsStep): """ A step for running custom scripts that output a .cms file and create trajectories """ CAN_CREATE_TRAJECTORY = True
[docs]class ClusterAnalysisStep(BaseDesmondStep): """ A step that runs a Cluster Analysis calculation """ STEP_NAME = 'cluster' JOB_BASE = 'cluster' DRIVER_PATH = 'cluster_analysis_gui_dir/cluster_analysis_driver.py' DRIVER = import_driver(DRIVER_PATH) FLAGS_INPUT_NAME_FLAG = '-cms_file' # If the step outputs CMS OUTPUTS_CMS = False
[docs] def getOutputName(self): """ Get the name of the output structure file :rtype: str :return: The output structure file name """ return self.job_name + '_out.maegz'