"""
Various multisim concrete stage classes.
Copyright Schrodinger, LLC. All rights reserved.
"""
import glob
import os
import re
from typing import Optional
import numpy as np
from schrodinger import structure
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import envir
from schrodinger.application.desmond import util
from schrodinger.application.matsci import amorphous
from schrodinger.application.matsci import desmondutils
from schrodinger.application.matsci import msutils
from schrodinger.application.matsci.nano import xtal
from schrodinger.structutils import build
from schrodinger.structutils import structalign
from schrodinger.structutils.analyze import evaluate_asl
from schrodinger.utils import fileutils
from schrodinger.utils import sea
from schrodinger.utils import subprocess
RestrainTypes = constants.RestrainTypes
__all__ = [
'Aacg_SiteMap_Multijob', 'AverageCell', 'MatSciAnalysis', 'DeformCell',
'SolvateSlabBuilder', 'ScaleEffectiveSolvent'
]
[docs]class Aacg_SiteMap_Multijob(cmj.StageBase):
"""
This class runs multiple SiteMap jobs for mixed resolution cosolvent simulations
"""
NAME = "aacg_sitemap"
SITEMAP_CMD = os.path.join(envir.CONST.SCHRODINGER, 'sitemap')
PARAM = cmj._create_param_when_needed("""
DATA = {
frame_frequency = 14
frame_start = 700
jobname = "$MASTERJOBNAME"
reference_structure_filename_root = "$MASTERJOBNAME_reference_structure"
sitemap_directory = "$MASTERJOBNAME_SiteMap_files"
simulation_basename = "$MASTERJOBNAME_sample_"
protein_asl = "((( backbone ) OR ( sidechain ) ) OR (fillmol withinbonds 1 (atom.ele P))) OR (atom.ele Mg)"
sitemap_writestructs = "no"
sitemap_writevis = "no"
sitemap_resolution = "high"
sitemap_maxsites = 10
}
VALIDATE = {
frame_frequency = {type = int0}
frame_start = {type = int0}
jobname = {type = str1}
reference_structure_filename_root = {type = str1}
sitemap_directory = {type = str1}
simulation_basename = {type = str1}
protein_asl = {type = str1}
sitemap_writestructs = {type = bool}
sitemap_writevis = {type = bool}
sitemap_resolution = {type = enum range = [low high standard]}
sitemap_maxsites = {type = int range = [0 20]}
}
""")
[docs] def __init__(self, *arg, **kwarg):
"""
Initialize the aacg-sitemap-multijob stage
"""
cmj.StageBase.__init__(self, *arg, **kwarg)
self._fai_job = []
[docs] def crunch(self):
"""
Submit the multiple SiteMap jobs
"""
import schrodinger.application.desmond.packages.topo as topo
import schrodinger.application.desmond.packages.traj as traj
self._print("debug", "In Aacg_SiteMap_Multijob.crunch")
frame_frequency = self.param.frame_frequency.val
frame_start = self.param.frame_start.val
jobname = self.param.jobname.val
reference_structure_filename_root = self.param.reference_structure_filename_root.val
sitemap_directory = self.param.sitemap_directory.val
simulation_basename = self.param.simulation_basename.val
protein_asl = self.param.protein_asl.val
sitemap_writestructs = self.param.sitemap_writestructs.val
sitemap_writevis = self.param.sitemap_writevis.val
if not self.param.sitemap_writestructs.val:
sitemap_writestructs = "no"
else:
sitemap_writestructs = "yes"
if not self.param.sitemap_writevis.val:
sitemap_writevis = "no"
else:
sitemap_writevis = "yes"
sitemap_resolution = self.param.sitemap_resolution.val
sitemap_maxsites = self.param.sitemap_maxsites.val
for pj in self.get_prejobs():
dir = cmj.ENGINE.base_dir
sitemap_directory_path = os.path.join(dir, sitemap_directory)
if not os.path.isdir(sitemap_directory_path):
os.makedirs(sitemap_directory_path)
if os.path.isfile(reference_structure_filename_root + '.mae'):
reference_structure_filename = reference_structure_filename_root + '.mae'
elif os.path.isfile(reference_structure_filename_root + '.maegz'):
reference_structure_filename = reference_structure_filename_root + '.maegz'
elif os.path.isfile(reference_structure_filename_root + '.mae.gz'):
reference_structure_filename = reference_structure_filename_root + '.mae.gz'
else:
reference_structure_filename = None
if reference_structure_filename:
reference_reader = structure.StructureReader(
reference_structure_filename)
reference_structure = next(reference_reader)
reference_reader.close()
else:
reference_structure = None
for filename in os.listdir(os.curdir):
if not filename.startswith(simulation_basename):
continue
non_base = filename[len(simulation_basename):]
if not non_base.isdigit():
continue
isim = int(non_base)
simulation_directory_name = filename
simulation_directory_path = os.path.join(
dir, simulation_directory_name)
if not os.path.isdir(dir):
self._print(
"quiet",
"skipping directory %s" % simulation_directory_path)
continue
util.chdir(simulation_directory_path)
cms_filename = jobname + '-out.cms'
trajectory_directory = os.path.join(simulation_directory_path,
jobname + '_trj')
cms_model = cms.Cms(cms_filename)
tr = traj.read_traj(trajectory_directory) # lazy
protein_aids = cms_model.select_atom(protein_asl)
protein_gids = topo.aids2gids(cms_model,
protein_aids,
include_pseudoatoms=False)
protein_st = cms_model.extract(protein_aids, copy_props=True)
sa = structalign.StructAlign()
util.chdir(sitemap_directory_path)
for frame_index in range(frame_start, len(tr), frame_frequency):
fr = tr[frame_index]
protein_st.setXYZ(fr.pos(protein_gids))
build.add_hydrogens(protein_st)
# if there is no reference structure use the
# first frame in the first trajectory
if not reference_structure:
reference_structure = protein_st.copy()
self._print(
"quiet", "reassigning reference_structure to: %i" %
reference_structure.handle)
sa.alignStructure(reference_structure, protein_st)
new_jobname = '%s-%d-%d' % (jobname, isim, frame_index)
input_filename = new_jobname + '-input.maegz'
input_writer = structure.StructureWriter(input_filename)
input_writer.append(protein_st)
input_writer.close()
outfile = re.sub(r'.(mae|maegz|mae.gz)$', r'_out.\1',
input_filename)
command = [
Aacg_SiteMap_Multijob.SITEMAP_CMD, '-prot',
input_filename, '-writestructs', sitemap_writestructs,
'-writevis', sitemap_writevis, '-resolution',
sitemap_resolution, '-maxsites',
str(sitemap_maxsites)
]
self._print("quiet", "SiteMap command: %s" % command)
new_job = cmj.Job(new_jobname,
parent=pj,
stage=self,
jlaunch_cmd=command,
dir=sitemap_directory_path)
new_job.output.set_struct_file(os.path.abspath(outfile))
new_job.need_host = True
new_job.num_cpu = 1
self.add_job(new_job)
util.chdir(os.pardir)
def _check_partial_success(self):
failed_jobs = self.filter_jobs(failed=[True], old=[False])
successful_jobs = self.filter_jobs(status=[cmj.JobStatus.SUCCESS],
old=[False])
if not successful_jobs:
return False
# Some jobs were successful
self._print(
"quiet", f"\nStage {self._INDEX} partially completed. "
f"{len(failed_jobs)} subjobs failed, "
f"{len(successful_jobs)} subjobs done.\n")
self._print("quiet", "Following subjobs failed")
for ajob in failed_jobs:
self._print("quiet", "%s" % ajob.jobname)
allowed_fai_percent = 2
if len(failed_jobs) > (allowed_fai_percent / 100.0 *
self.filter_jobs(old=[False])):
self._print(
"quiet", "Number of failed jobs is over " +
"the given limit. So not proceeding to next stage\n")
return False
self._print(
"quiet", "Number of failed jobs is over " +
"the given limit. So not proceeding to next stage\n")
return True
[docs]class AverageCell(cmj.StageBase):
NAME = "average_cell"
PARAM = cmj._create_param_when_needed([
"""
DATA = {
percent_to_avg = 20.0
}
VALIDATE = {
percent_to_avg = {type = float+}
}
""",
])
[docs] def __init__(self, *arg, **kwarg):
"""
"""
cmj.StageBase.__init__(self, *arg, **kwarg)
# __init__
[docs] def crunch(self):
"""
"""
import schrodinger.application.desmond.packages.traj as traj
self._print("debug", "In AverageCell.crunch")
for pj in self.get_prejobs():
pre_stage = self._PREV_STAGE
pre_cms = pj.output.struct_file()
model = cms.Cms(file=pre_cms)
util.chdir(pj.dir)
trj_dn = ''
for filename in os.listdir(os.curdir):
if filename.endswith('_trj') and os.path.isdir(filename):
trj_dn = filename
self._print('debug', 'found trj directory: %s.' % trj_dn)
break
if not trj_dn:
self._print('debug', 'trj directory not found.')
continue
tr = traj.read_traj(trj_dn)
boxes = []
for frame in tr:
boxes.append(list(frame.box.flat))
if len(boxes) == 0:
self._print('debug', 'no frames found in the trajectory.')
continue
# At least one data point should be used
num_data = int(
len(boxes) * self.param.percent_to_avg.val / 100.0) or 1
boxes = np.array(boxes[-num_data:])
avg_box = boxes.mean(axis=0)
self._print('debug', 'average cell box: %s.' % str(avg_box))
# Backup original cms, before modifying it
model.write(pre_cms + '.orig')
desmondutils.deform_cms_model(model, avg_box)
model.remove_cts_property(cms.Cms.PROP_TRJ)
model.write(pre_cms)
self.add_jobs(self.get_prejobs())
self._print("debug", "Out AverageCell.crunch")
[docs]class SolvateSlabBuilder(cmj.StructureStageBase):
"""
Add solvent to the slab. Slab must be along the Z direction.
"""
NAME = "solvate_slab_builder"
PARAM = cmj._create_param_when_needed("""
DATA = {
molecules = 100
density = 0.25
scale_vdw = 0.80
seed = 645
interface_vacuum_buffer = 10.
interface_buffer = 5
molecules_file = ''
}
VALIDATE = {
molecules = {type = int1}
density = {type = "float+"}
scale_vdw = {type = "float+"}
seed = {type = int}
interface_vacuum_buffer = {type = "float+"}
interface_buffer = {type = "int+"}
molecules_file = {type = str}
}
""")
[docs] def run(self, jobname: str, input_fname: str) -> Optional[str]:
if self.param.molecules_file.val:
m_file = os.path.join(cmj.ENGINE.base_dir,
self.param.molecules_file.val)
if not os.path.isfile(m_file):
self._print('quiet', 'ERROR: molecules_file not found.')
return
else:
self._print("quiet", "ERROR: provide molecules_file.")
return
model = cms.Cms(input_fname)
out_fname = f"{jobname}-out.cms"
with fileutils.in_temporary_directory():
# Write the input
tmp_in_fname = f"{jobname}-in.maegz"
model.fsys_ct.write(tmp_in_fname)
cmd = [
os.path.join(envir.CONST.SCHRODINGER, "run"),
os.path.join(
"disordered_system_builder_gui_dir",
"disordered_system_builder_driver.py"),
"-molecules", str(self.param.molecules.val),
"-scale_vdw", str(self.param.scale_vdw.val),
"-composition", str(self.param.molecules.val),
"-seed", str(self.param.seed.val),
"-density", str(self.param.density.val),
'-substrate', tmp_in_fname,
'-interface_vacuum_buffer',
str(self.param.interface_vacuum_buffer.val),
'-interface_buffer', str(self.param.interface_buffer.val),
'-interface_normal', msutils.get_interface_normal(model.fsys_ct),
'-substrate_mode', 'interface',
"-no_system", "-no_rezero", '-pbc', 'existing',
m_file, jobname
] # yapf: disable
self._print("debug", 'DSB command: %s' % ' '.join(cmd))
proc_ret = subprocess.call(cmd)
# Check the results
tmp_out_fname = f"{jobname}_amorphous.maegz"
if proc_ret or not os.path.exists(tmp_out_fname):
# Job Failed
self._print("quiet",
"ERROR: disordered_system_builder_driver failed.")
# Print the log if the job fails.
for log_fname in glob.iglob("*.log"):
with open(log_fname, 'r') as f:
for line in f:
self._print("quiet", line)
return
# Job Succeeded
adsorbate_model = self._getAdsorbateModel(tmp_out_fname, model)
if not adsorbate_model:
# error has been reported already
return
# Update model box. Adsorbate has box modified along Z.
for ct in [model.fsys_ct] + model.comp_ct:
xtal.set_pbc_properties(ct, adsorbate_model.box)
# Extend original model
model.comp_ct.extend(adsorbate_model.comp_ct)
model.synchronize_fsys_ct()
model.write(out_fname)
return out_fname
def _getAdsorbateModel(self, amorphous_fn, model):
""" Given the amorphous structure and original model, extract adsorbate
molecules and assign the same forcefield as model. """
ff_name = model.get_ff()
if not ff_name:
self._print('quiet', 'ERROR: could not determine force field')
return
amorphous_ct = structure.Structure.read(amorphous_fn)
indices = evaluate_asl(amorphous_ct,
'not atom.%s' % amorphous.SCAFFOLD_ATOM_PROP)
if not indices:
self._print('quiet', 'ERROR: no adsorbate indices found')
return
adsorbate_ct = amorphous_ct.extract(indices, copy_props=True)
# Hard-code opposite velocity for now
for atom in adsorbate_ct.atom:
atom_velocity = msutils.get_atom_ffio_velocity(atom)
atom_velocity[2] = -1
msutils.set_atom_ffio_velocity(atom, atom_velocity)
cms_fn = desmondutils.run_system_builder(adsorbate_ct,
'ads_output',
forcefield=ff_name,
rezero_system=False,
split_components=True)
if not cms_fn:
self._print('quiet', 'ERROR: could not read adsorbate model')
return
return cms.Cms(cms_fn)
[docs]class MatSciAnalysis(cmj.StageBase):
"""
This class sets up and runs automatic analysis for material-science related
jobs.
"""
NAME = "matsci_analysis"
DEFAULT_BULK_TYPE = "cohes_e density heat_vap specific_heat pressure_tensor sol_param volume"
BULK_TYPE_NUM = len(DEFAULT_BULK_TYPE.split())
PARAM = cmj._create_param_when_needed([
"""
DATA = {
analysis_type = [""" + DEFAULT_BULK_TYPE + """]
}
VALIDATE = {
analysis_type = {type = list
elem = {type = enum
range = [""" + DEFAULT_BULK_TYPE + """]}}
}
""",
])
[docs] def __init__(self, *arg, **kwarg):
"""
initialization of Material Science Analysis stage
"""
cmj.StageBase.__init__(self, *arg, **kwarg)
[docs] def getMSJobKeywords(self):
"""
This returns the keywords used for this job type.
:return: Standard job keywords
:rtype: list of ark objects
"""
ark_kw = sea.Map()
ark_kw["Bulk"] = sea.Map('Name = "MatSci SID"')
ark_kw.Bulk["Type"] = self.param.analysis_type
return [ark_kw]
[docs] def crunch(self):
"""
do all the setup and submit the calculations
"""
self._print("debug", "In MatSciAnalysis.crunch")
for pj in self.get_prejobs():
model_fname = pj.output.struct_file()
jobname, dir = self._get_jobname_and_dir(pj)
# Makes the directory.
if (not os.path.isdir(dir)):
os.makedirs(dir)
util.chdir(dir)
eaf_in_fname = jobname + ".st2"
cfg_fname = pj.input.outcfg_file()
eaf = open(eaf_in_fname, 'w')
m = sea.Map()
m["Trajectory"] = model_fname
m["Keywords"] = self.getMSJobKeywords()
eaf.write(str(m))
eaf.close()
all_types = len(self.param.analysis_type.val) == self.BULK_TYPE_NUM
ext = '-out.eaf' if all_types else '-out.st2'
eaf_out_fname = jobname + ext
self._print("quiet",
" /------MS Simulation Analysis for Bulks-----")
self._print("quiet", " | Input File: %s" % eaf_in_fname)
self._print("quiet", " | Output File: %s" % eaf_out_fname)
self._print("quiet", " |")
self._print("quiet", " | To view results, run:")
self._print("quiet",
" | $SCHRODINGER/run matsci_event_analysis.py")
self._print("quiet",
" \\------------------------------------------")
trj_fname = pj.output.get('TRJ')
cmd = [
os.path.join(envir.CONST.SCHRODINGER, "run"),
'analyze_simulation.py', model_fname, trj_fname, eaf_out_fname,
eaf_in_fname, '-sim-cfg', cfg_fname, '-JOBNAME', jobname
]
job = cmj.Job(jobname, pj, self, cmd, dir, is_output=False)
job.output.add(eaf_out_fname, None)
job.output.set_struct_file(pj.output.struct_file())
job.need_host = True
self.add_job(job)
self.add_jobs(self.get_prejobs())
self._print("debug", "Out MatSciAnalysis.crunch")
[docs]class ScaleEffectiveSolvent(cmj.StructureStageBase):
"""
Scale all nonbonded interactions by the passed scaling factor.
"""
NAME = "ses_stage"
PARAM = cmj._create_param_when_needed("""
DATA = {
scaling_factor = 1.0
}
VALIDATE = {
scaling_factor = {type = "float+"}
}
""")
[docs] def run(self, jobname: str, input_fname: str) -> Optional[str]:
"""
Run stage, return filename of the CMS model with scaled FFIO parameters
:param str jobname: Jobname for this stage.
:param str input_fname: Filename for the input structure.
:rtype: str
:return: Filename for the output structure.
"""
import schrodinger.application.desmond.remd as remd
out_fname = f"{jobname}-out.cms"
cms_model = cms.Cms(input_fname)
scaling_factor = self.param.scaling_factor.val
self._print('quiet', f'Scaling the interactions by {scaling_factor}')
for comp_ct in cms_model.comp_ct:
num_comp = comp_ct.property.get(constants.NUM_COMPONENT)
# Get atom list for which rescaling of non-bonded parameters
# needs to be done
if num_comp:
# Get atoms from each first n molecules where n is number of
# components
atom_list = [
y for x in range(1, num_comp + 1)
for y in comp_ct.molecule[x].getAtomIndices()
]
elif len(comp_ct.atom) != len(comp_ct.ffio.site):
# Get all the sites that are atoms, ignore the pseudo sites
atom_list = [
idx for idx, site in enumerate(comp_ct.ffio.site, 1)
if site.type == 'atom'
]
else:
# Get all the atoms in the comp_ct if it does not have split
# components
atom_list = [x.index for x in comp_ct.atom]
# Rescale the parameters
try:
remd.rescale_ff(comp_ct, atom_list, scaling_factor, False)
except IndexError:
msg = ('ERROR: The number of atoms in the component is '
'inconsistent with the FFIO block. Please reassign '
'forcefield.')
self._print('quiet', msg)
return
cms_model.write(out_fname)
return out_fname