Source code for schrodinger.application.desmond.stage.app.matsci

"""
Various multisim concrete stage classes.

Copyright Schrodinger, LLC. All rights reserved.

"""
import glob
import os
import re
from typing import Optional

import numpy as np

from schrodinger import structure
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import envir
from schrodinger.application.desmond import util
from schrodinger.application.matsci import amorphous
from schrodinger.application.matsci import desmondutils
from schrodinger.application.matsci import msutils
from schrodinger.application.matsci.nano import xtal
from schrodinger.structutils import build
from schrodinger.structutils import structalign
from schrodinger.structutils.analyze import evaluate_asl
from schrodinger.utils import fileutils
from schrodinger.utils import sea
from schrodinger.utils import subprocess

RestrainTypes = constants.RestrainTypes

__all__ = [
    'Aacg_SiteMap_Multijob', 'AverageCell', 'MatSciAnalysis', 'DeformCell',
    'SolvateSlabBuilder', 'ScaleEffectiveSolvent'
]


[docs]class Aacg_SiteMap_Multijob(cmj.StageBase): """ This class runs multiple SiteMap jobs for mixed resolution cosolvent simulations """ NAME = "aacg_sitemap" SITEMAP_CMD = os.path.join(envir.CONST.SCHRODINGER, 'sitemap') PARAM = cmj._create_param_when_needed(""" DATA = { frame_frequency = 14 frame_start = 700 jobname = "$MASTERJOBNAME" reference_structure_filename_root = "$MASTERJOBNAME_reference_structure" sitemap_directory = "$MASTERJOBNAME_SiteMap_files" simulation_basename = "$MASTERJOBNAME_sample_" protein_asl = "((( backbone ) OR ( sidechain ) ) OR (fillmol withinbonds 1 (atom.ele P))) OR (atom.ele Mg)" sitemap_writestructs = "no" sitemap_writevis = "no" sitemap_resolution = "high" sitemap_maxsites = 10 } VALIDATE = { frame_frequency = {type = int0} frame_start = {type = int0} jobname = {type = str1} reference_structure_filename_root = {type = str1} sitemap_directory = {type = str1} simulation_basename = {type = str1} protein_asl = {type = str1} sitemap_writestructs = {type = bool} sitemap_writevis = {type = bool} sitemap_resolution = {type = enum range = [low high standard]} sitemap_maxsites = {type = int range = [0 20]} } """)
[docs] def __init__(self, *arg, **kwarg): """ Initialize the aacg-sitemap-multijob stage """ cmj.StageBase.__init__(self, *arg, **kwarg) self._fai_job = []
[docs] def crunch(self): """ Submit the multiple SiteMap jobs """ import schrodinger.application.desmond.packages.topo as topo import schrodinger.application.desmond.packages.traj as traj self._print("debug", "In Aacg_SiteMap_Multijob.crunch") frame_frequency = self.param.frame_frequency.val frame_start = self.param.frame_start.val jobname = self.param.jobname.val reference_structure_filename_root = self.param.reference_structure_filename_root.val sitemap_directory = self.param.sitemap_directory.val simulation_basename = self.param.simulation_basename.val protein_asl = self.param.protein_asl.val sitemap_writestructs = self.param.sitemap_writestructs.val sitemap_writevis = self.param.sitemap_writevis.val if not self.param.sitemap_writestructs.val: sitemap_writestructs = "no" else: sitemap_writestructs = "yes" if not self.param.sitemap_writevis.val: sitemap_writevis = "no" else: sitemap_writevis = "yes" sitemap_resolution = self.param.sitemap_resolution.val sitemap_maxsites = self.param.sitemap_maxsites.val for pj in self.get_prejobs(): dir = cmj.ENGINE.base_dir sitemap_directory_path = os.path.join(dir, sitemap_directory) if not os.path.isdir(sitemap_directory_path): os.makedirs(sitemap_directory_path) if os.path.isfile(reference_structure_filename_root + '.mae'): reference_structure_filename = reference_structure_filename_root + '.mae' elif os.path.isfile(reference_structure_filename_root + '.maegz'): reference_structure_filename = reference_structure_filename_root + '.maegz' elif os.path.isfile(reference_structure_filename_root + '.mae.gz'): reference_structure_filename = reference_structure_filename_root + '.mae.gz' else: reference_structure_filename = None if reference_structure_filename: reference_reader = structure.StructureReader( reference_structure_filename) reference_structure = next(reference_reader) reference_reader.close() else: reference_structure = None for filename in os.listdir(os.curdir): if not filename.startswith(simulation_basename): continue non_base = filename[len(simulation_basename):] if not non_base.isdigit(): continue isim = int(non_base) simulation_directory_name = filename simulation_directory_path = os.path.join( dir, simulation_directory_name) if not os.path.isdir(dir): self._print( "quiet", "skipping directory %s" % simulation_directory_path) continue util.chdir(simulation_directory_path) cms_filename = jobname + '-out.cms' trajectory_directory = os.path.join(simulation_directory_path, jobname + '_trj') cms_model = cms.Cms(cms_filename) tr = traj.read_traj(trajectory_directory) # lazy protein_aids = cms_model.select_atom(protein_asl) protein_gids = topo.aids2gids(cms_model, protein_aids, include_pseudoatoms=False) protein_st = cms_model.extract(protein_aids, copy_props=True) sa = structalign.StructAlign() util.chdir(sitemap_directory_path) for frame_index in range(frame_start, len(tr), frame_frequency): fr = tr[frame_index] protein_st.setXYZ(fr.pos(protein_gids)) build.add_hydrogens(protein_st) # if there is no reference structure use the # first frame in the first trajectory if not reference_structure: reference_structure = protein_st.copy() self._print( "quiet", "reassigning reference_structure to: %i" % reference_structure.handle) sa.alignStructure(reference_structure, protein_st) new_jobname = '%s-%d-%d' % (jobname, isim, frame_index) input_filename = new_jobname + '-input.maegz' input_writer = structure.StructureWriter(input_filename) input_writer.append(protein_st) input_writer.close() outfile = re.sub(r'.(mae|maegz|mae.gz)$', r'_out.\1', input_filename) command = [ Aacg_SiteMap_Multijob.SITEMAP_CMD, '-prot', input_filename, '-writestructs', sitemap_writestructs, '-writevis', sitemap_writevis, '-resolution', sitemap_resolution, '-maxsites', str(sitemap_maxsites) ] self._print("quiet", "SiteMap command: %s" % command) new_job = cmj.Job(new_jobname, parent=pj, stage=self, jlaunch_cmd=command, dir=sitemap_directory_path) new_job.output.set_struct_file(os.path.abspath(outfile)) new_job.need_host = True new_job.num_cpu = 1 self.add_job(new_job) util.chdir(os.pardir)
def _check_partial_success(self): failed_jobs = self.filter_jobs(failed=[True], old=[False]) successful_jobs = self.filter_jobs(status=[cmj.JobStatus.SUCCESS], old=[False]) if not successful_jobs: return False # Some jobs were successful self._print( "quiet", f"\nStage {self._INDEX} partially completed. " f"{len(failed_jobs)} subjobs failed, " f"{len(successful_jobs)} subjobs done.\n") self._print("quiet", "Following subjobs failed") for ajob in failed_jobs: self._print("quiet", "%s" % ajob.jobname) allowed_fai_percent = 2 if len(failed_jobs) > (allowed_fai_percent / 100.0 * self.filter_jobs(old=[False])): self._print( "quiet", "Number of failed jobs is over " + "the given limit. So not proceeding to next stage\n") return False self._print( "quiet", "Number of failed jobs is over " + "the given limit. So not proceeding to next stage\n") return True
[docs]class AverageCell(cmj.StageBase): NAME = "average_cell" PARAM = cmj._create_param_when_needed([ """ DATA = { percent_to_avg = 20.0 } VALIDATE = { percent_to_avg = {type = float+} } """, ])
[docs] def __init__(self, *arg, **kwarg): """ """ cmj.StageBase.__init__(self, *arg, **kwarg)
# __init__
[docs] def crunch(self): """ """ import schrodinger.application.desmond.packages.traj as traj self._print("debug", "In AverageCell.crunch") for pj in self.get_prejobs(): pre_stage = self._PREV_STAGE pre_cms = pj.output.struct_file() model = cms.Cms(file=pre_cms) util.chdir(pj.dir) trj_dn = '' for filename in os.listdir(os.curdir): if filename.endswith('_trj') and os.path.isdir(filename): trj_dn = filename self._print('debug', 'found trj directory: %s.' % trj_dn) break if not trj_dn: self._print('debug', 'trj directory not found.') continue tr = traj.read_traj(trj_dn) boxes = [] for frame in tr: boxes.append(list(frame.box.flat)) if len(boxes) == 0: self._print('debug', 'no frames found in the trajectory.') continue # At least one data point should be used num_data = int( len(boxes) * self.param.percent_to_avg.val / 100.0) or 1 boxes = np.array(boxes[-num_data:]) avg_box = boxes.mean(axis=0) self._print('debug', 'average cell box: %s.' % str(avg_box)) # Backup original cms, before modifying it model.write(pre_cms + '.orig') desmondutils.deform_cms_model(model, avg_box) model.remove_cts_property(cms.Cms.PROP_TRJ) model.write(pre_cms) self.add_jobs(self.get_prejobs()) self._print("debug", "Out AverageCell.crunch")
[docs]class DeformCell(cmj.StructureStageBase): """ Deform cell axes based on the deformation. """ NAME = "deform_cell" PARAM = cmj._create_param_when_needed(""" DATA = { axes_deformation = [1. 1. 1.] remap = 'true' } VALIDATE = { axes_deformation = {type = list size = 3 elem = {type = float+}} remap = {type = bool} } """)
[docs] def run(self, jobname: str, input_fname: str) -> Optional[str]: """ Run stage, return filename of the deformed CMS model. :param str jobname: Jobname for this stage. :param str input_fname: Filename for the input structure. :rtype: str :return: Filename for the output structure. """ import schrodinger.application.desmond.packages.topo as topo from schrodinger.application.matsci.nano import xtal out_fname = f"{jobname}-out.cms" msys_model, cms_model = topo.read_cms(input_fname) params = xtal.get_params_from_chorus(cms_model.box) self._print( 'quiet', 'Input cell (Ang, deg): %.3f %.3f, %.3f, ' '%.3f, %.3f, %.3f' % tuple(params)) deformation = self.param.axes_deformation.val self._print('quiet', 'Axes deformation: %.3f, %.3f, %.3f' % tuple(deformation)) # Update axes (a, b, c) to the deformed params params[:3] = np.asarray(params[:3]) * deformation self._print( 'quiet', 'Deformed cell (Ang, deg): %.3f %.3f, %.3f, ' '%.3f, %.3f, %.3f' % tuple(params)) nchorus = xtal.get_chorus_from_params(params) desmondutils.deform_cms_model(cms_model, nchorus, remap=self.param.remap.val) cms_model.write(out_fname) return out_fname
[docs]class SolvateSlabBuilder(cmj.StructureStageBase): """ Add solvent to the slab. Slab must be along the Z direction. """ NAME = "solvate_slab_builder" PARAM = cmj._create_param_when_needed(""" DATA = { molecules = 100 density = 0.25 scale_vdw = 0.80 seed = 645 interface_vacuum_buffer = 10. interface_buffer = 5 molecules_file = '' } VALIDATE = { molecules = {type = int1} density = {type = "float+"} scale_vdw = {type = "float+"} seed = {type = int} interface_vacuum_buffer = {type = "float+"} interface_buffer = {type = "int+"} molecules_file = {type = str} } """)
[docs] def run(self, jobname: str, input_fname: str) -> Optional[str]: if self.param.molecules_file.val: m_file = os.path.join(cmj.ENGINE.base_dir, self.param.molecules_file.val) if not os.path.isfile(m_file): self._print('quiet', 'ERROR: molecules_file not found.') return else: self._print("quiet", "ERROR: provide molecules_file.") return model = cms.Cms(input_fname) out_fname = f"{jobname}-out.cms" with fileutils.in_temporary_directory(): # Write the input tmp_in_fname = f"{jobname}-in.maegz" model.fsys_ct.write(tmp_in_fname) cmd = [ os.path.join(envir.CONST.SCHRODINGER, "run"), os.path.join( "disordered_system_builder_gui_dir", "disordered_system_builder_driver.py"), "-molecules", str(self.param.molecules.val), "-scale_vdw", str(self.param.scale_vdw.val), "-composition", str(self.param.molecules.val), "-seed", str(self.param.seed.val), "-density", str(self.param.density.val), '-substrate', tmp_in_fname, '-interface_vacuum_buffer', str(self.param.interface_vacuum_buffer.val), '-interface_buffer', str(self.param.interface_buffer.val), '-interface_normal', msutils.get_interface_normal(model.fsys_ct), '-substrate_mode', 'interface', "-no_system", "-no_rezero", '-pbc', 'existing', m_file, jobname ] # yapf: disable self._print("debug", 'DSB command: %s' % ' '.join(cmd)) proc_ret = subprocess.call(cmd) # Check the results tmp_out_fname = f"{jobname}_amorphous.maegz" if proc_ret or not os.path.exists(tmp_out_fname): # Job Failed self._print("quiet", "ERROR: disordered_system_builder_driver failed.") # Print the log if the job fails. for log_fname in glob.iglob("*.log"): with open(log_fname, 'r') as f: for line in f: self._print("quiet", line) return # Job Succeeded adsorbate_model = self._getAdsorbateModel(tmp_out_fname, model) if not adsorbate_model: # error has been reported already return # Update model box. Adsorbate has box modified along Z. for ct in [model.fsys_ct] + model.comp_ct: xtal.set_pbc_properties(ct, adsorbate_model.box) # Extend original model model.comp_ct.extend(adsorbate_model.comp_ct) model.synchronize_fsys_ct() model.write(out_fname) return out_fname
def _getAdsorbateModel(self, amorphous_fn, model): """ Given the amorphous structure and original model, extract adsorbate molecules and assign the same forcefield as model. """ ff_name = model.get_ff() if not ff_name: self._print('quiet', 'ERROR: could not determine force field') return amorphous_ct = structure.Structure.read(amorphous_fn) indices = evaluate_asl(amorphous_ct, 'not atom.%s' % amorphous.SCAFFOLD_ATOM_PROP) if not indices: self._print('quiet', 'ERROR: no adsorbate indices found') return adsorbate_ct = amorphous_ct.extract(indices, copy_props=True) # Hard-code opposite velocity for now for atom in adsorbate_ct.atom: atom_velocity = msutils.get_atom_ffio_velocity(atom) atom_velocity[2] = -1 msutils.set_atom_ffio_velocity(atom, atom_velocity) cms_fn = desmondutils.run_system_builder(adsorbate_ct, 'ads_output', forcefield=ff_name, rezero_system=False, split_components=True) if not cms_fn: self._print('quiet', 'ERROR: could not read adsorbate model') return return cms.Cms(cms_fn)
[docs]class MatSciAnalysis(cmj.StageBase): """ This class sets up and runs automatic analysis for material-science related jobs. """ NAME = "matsci_analysis" DEFAULT_BULK_TYPE = "cohes_e density heat_vap specific_heat pressure_tensor sol_param volume" BULK_TYPE_NUM = len(DEFAULT_BULK_TYPE.split()) PARAM = cmj._create_param_when_needed([ """ DATA = { analysis_type = [""" + DEFAULT_BULK_TYPE + """] } VALIDATE = { analysis_type = {type = list elem = {type = enum range = [""" + DEFAULT_BULK_TYPE + """]}} } """, ])
[docs] def __init__(self, *arg, **kwarg): """ initialization of Material Science Analysis stage """ cmj.StageBase.__init__(self, *arg, **kwarg)
[docs] def getMSJobKeywords(self): """ This returns the keywords used for this job type. :return: Standard job keywords :rtype: list of ark objects """ ark_kw = sea.Map() ark_kw["Bulk"] = sea.Map('Name = "MatSci SID"') ark_kw.Bulk["Type"] = self.param.analysis_type return [ark_kw]
[docs] def crunch(self): """ do all the setup and submit the calculations """ self._print("debug", "In MatSciAnalysis.crunch") for pj in self.get_prejobs(): model_fname = pj.output.struct_file() jobname, dir = self._get_jobname_and_dir(pj) # Makes the directory. if (not os.path.isdir(dir)): os.makedirs(dir) util.chdir(dir) eaf_in_fname = jobname + ".st2" cfg_fname = pj.input.outcfg_file() eaf = open(eaf_in_fname, 'w') m = sea.Map() m["Trajectory"] = model_fname m["Keywords"] = self.getMSJobKeywords() eaf.write(str(m)) eaf.close() all_types = len(self.param.analysis_type.val) == self.BULK_TYPE_NUM ext = '-out.eaf' if all_types else '-out.st2' eaf_out_fname = jobname + ext self._print("quiet", " /------MS Simulation Analysis for Bulks-----") self._print("quiet", " | Input File: %s" % eaf_in_fname) self._print("quiet", " | Output File: %s" % eaf_out_fname) self._print("quiet", " |") self._print("quiet", " | To view results, run:") self._print("quiet", " | $SCHRODINGER/run matsci_event_analysis.py") self._print("quiet", " \\------------------------------------------") trj_fname = pj.output.get('TRJ') cmd = [ os.path.join(envir.CONST.SCHRODINGER, "run"), 'analyze_simulation.py', model_fname, trj_fname, eaf_out_fname, eaf_in_fname, '-sim-cfg', cfg_fname, '-JOBNAME', jobname ] job = cmj.Job(jobname, pj, self, cmd, dir, is_output=False) job.output.add(eaf_out_fname, None) job.output.set_struct_file(pj.output.struct_file()) job.need_host = True self.add_job(job) self.add_jobs(self.get_prejobs()) self._print("debug", "Out MatSciAnalysis.crunch")
[docs]class ScaleEffectiveSolvent(cmj.StructureStageBase): """ Scale all nonbonded interactions by the passed scaling factor. """ NAME = "ses_stage" PARAM = cmj._create_param_when_needed(""" DATA = { scaling_factor = 1.0 } VALIDATE = { scaling_factor = {type = "float+"} } """)
[docs] def run(self, jobname: str, input_fname: str) -> Optional[str]: """ Run stage, return filename of the CMS model with scaled FFIO parameters :param str jobname: Jobname for this stage. :param str input_fname: Filename for the input structure. :rtype: str :return: Filename for the output structure. """ import schrodinger.application.desmond.remd as remd out_fname = f"{jobname}-out.cms" cms_model = cms.Cms(input_fname) scaling_factor = self.param.scaling_factor.val self._print('quiet', f'Scaling the interactions by {scaling_factor}') for comp_ct in cms_model.comp_ct: num_comp = comp_ct.property.get(constants.NUM_COMPONENT) # Get atom list for which rescaling of non-bonded parameters # needs to be done if num_comp: # Get atoms from each first n molecules where n is number of # components atom_list = [ y for x in range(1, num_comp + 1) for y in comp_ct.molecule[x].getAtomIndices() ] elif len(comp_ct.atom) != len(comp_ct.ffio.site): # Get all the sites that are atoms, ignore the pseudo sites atom_list = [ idx for idx, site in enumerate(comp_ct.ffio.site, 1) if site.type == 'atom' ] else: # Get all the atoms in the comp_ct if it does not have split # components atom_list = [x.index for x in comp_ct.atom] # Rescale the parameters try: remd.rescale_ff(comp_ct, atom_list, scaling_factor, False) except IndexError: msg = ('ERROR: The number of atoms in the component is ' 'inconsistent with the FFIO block. Please reassign ' 'forcefield.') self._print('quiet', msg) return cms_model.write(out_fname) return out_fname