import copy
import glob
import math
import os
import pathlib
import shutil
import string
import time
import numpy as np
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import config
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import envir
from schrodinger.application.desmond import fepana
from schrodinger.application.desmond import struc
from schrodinger.application.desmond import util
from schrodinger.application.desmond.picklejar import Picklable
from schrodinger.structure import StructureReader
from schrodinger.utils import fileutils
from schrodinger.utils import sea
from schrodinger.utils import subprocess
from .jobs import DesmondJob
from .jobs import FepJob
__all__ = ['Analysis', 'PLAnalysis', 'FepAnalysis', 'Reporter']
PROTEIN_ASL_KEY = 'protein_asl'
LIGAND_ASL_KEY = 'ligand_asl'
[docs]class Reporter(object):
"""
"""
[docs] def __init__(self, fname, plotter="gchart"):
"""
"""
self._fname = fname
self._plotter = plotter
self._fh = None
def __copy__(self):
"""
"""
return Reporter(self._fname, self._plotter)
[docs] def plot(self, *arg, **kwarg):
"""
Generates the plot and returns an url to the plot.
"""
if (self._plotter == "gchart"):
import schrodinger.application.desmond.gchart as gchart
return gchart.get_xy_url(*arg, **kwarg)
if (self._plotter == "mplchart"):
import schrodinger.application.desmond.mplchart as mplchart
return mplchart.get_xy_plot(*arg, **kwarg)
[docs] def plot_2d(self, *arg, **kwarg):
"""
Generates the plot and returns an url to the plot.
"""
if (self._plotter == "mplchart"):
import schrodinger.application.desmond.mplchart as mplchart
return mplchart.get_2var_plot(*arg, **kwarg)
[docs] def write(self, msg):
"""
"""
if (self._fh is None or self._fh.closed):
self._fh = open(self._fname, "a")
print(msg, file=self._fh)
[docs] def close(self):
"""
"""
if (self._fh):
self._fh.close()
[docs]class AnalysisJob(cmj.Job):
"""
"""
[docs] def __init__(self, model_fname, traj_fname, reporter, title, *arg, **kwarg):
"""
"""
self._model_fname = model_fname
self._traj_fname = traj_fname
self._reporter = reporter
self._title = title
cmj.Job.__init__(self, *arg, **kwarg)
# __init__
def __getstate__(self, state=None):
"""
"""
state = state if (state) else copy.copy(self.__dict__)
state["_reporter"] = copy.copy(self._reporter)
return cmj.Job.__getstate__(self, state)
[docs]class TimeSeriesAnalysisJob(AnalysisJob):
"""
"""
Y_MARKER = {
"distance": ("dist (A)", "diamond", "grey", "black"),
"angle": ("angle (deg)", "x", "purple", "red"),
"dihedral": ("dihe (deg)", "circle", "skyblue", "blue"),
}
TS_NAME = {
"distance": "dist(t)",
"angle": "angle(t)",
"dihedral": "dihe(t)",
}
PR_NAME = {
"distance": "P(dist)",
"angle": "P(angle)",
"dihedral": "P(dihe)",
}
[docs] def __init__(self, time_series, prob_profile, *arg, **kwarg):
"""
"""
self._time_series = time_series
self._prob_profile = prob_profile
AnalysisJob.__init__(self, *arg, **kwarg)
# __init__
def _get_requestname(self, request):
"""
"""
s = str(request[0])
for e in request[1:]:
if (isinstance(e, dict)):
for e2 in e:
s += "_%s_%s" % (
str(e2),
e[e2],
)
else:
s += "_" + str(e)
return s
def _get_title(self, request):
"""
"""
ts_title = self._title + " " + TimeSeriesAnalysisJob.TS_NAME[request[0]]
pr_title = self._title + " " + TimeSeriesAnalysisJob.PR_NAME[request[0]]
s = ""
for e in request[1:]:
if (isinstance(e, dict)):
for e2 in e:
s += "_%s_%s" % (
str(e2),
e[e2],
)
else:
s += "_" + str(e)
return ts_title + " " + s[1:], pr_title + " " + s[1:]
[docs] def run(self):
"""
"""
if (not os.path.isdir(self.dir)):
os.makedirs(self.dir)
import schrodinger.application.desmond.ana as ana
util.chdir(self.dir)
ts = ana.calc_time_series(self._time_series, self._model_fname,
self._traj_fname)
basename = os.path.basename(self._traj_fname)
for i, time_data in enumerate(ts):
request_name = self._get_requestname(self._time_series[i])
fname = basename + "_time_series-" + request_name
fh = open(fname, "w")
x = []
data = []
for e in time_data:
print(e[0], e[1], file=fh)
x.append(e[0])
data.append(e[1])
fh.close()
if (self._reporter):
ts_title, pr_title = self._get_title(self._time_series[i])
data_label, symbol, ts_color, pr_color = TimeSeriesAnalysisJob.Y_MARKER[
self._time_series[i][0]]
x_label = "time (ps)"
if (x[-1] >= 10000):
x_label = "time (ns)"
x = [e / 1000.0 for e in x]
url = self._reporter.plot(x,
data,
x_label=x_label,
y_label=data_label,
chart_type="scatter",
marker=(
symbol,
ts_color,
5,
),
title=ts_title,
filename=("%s.png" % fname))
self._reporter.write(util.html_embed_image(url))
fname = basename + "_prob_profile-" + request_name
prob = self._prob_profile[i]
if (prob is None):
datum_min = min(data)
datum_max = max(data)
datum_range = datum_max - datum_min
num_datum = len(data)
x_space = int(num_datum / 10)
x_space = 300 if (x_space > 300) else (1 if (
x_space < 1) else x_space)
bin_width = float(datum_range) / x_space
prob = [
bin_width,
datum_min,
datum_max,
False,
]
prob_profile = ana.calc_prob_profile(data, *prob)
x = []
y = []
fh = open(fname, "w")
for e in prob_profile:
print(e[0], e[1], file=fh)
x.append(e[0])
y.append(e[1])
fh.close()
if (self._reporter):
url = self._reporter.plot(x,
y,
x_label=data_label,
y_label="prob",
color=[
pr_color,
],
title=pr_title,
filename=("%s.png" % fname))
self._reporter.write(util.html_embed_image(url))
if (self._reporter):
self._reporter.close()
self.status.set(cmj.JobStatus.SUCCESS)
[docs]class SeaJob(AnalysisJob):
"""
"""
[docs] def __init__(self, SEA, *arg, **kwarg):
AnalysisJob.__init__(self, *arg, **kwarg)
self.SEA = SEA
[docs] def run(self):
"""
"""
if (not os.path.isdir(self.dir)):
os.makedirs(self.dir)
util.chdir(self.dir)
out_fname = self.SEA.output.val
sea_fname = self.SEA.seacfg.val
cfg_fname = self.SEA.simcfg.val if ("simcfg" in self.SEA) else None
with open(sea_fname, "w") as f:
f.write(str(self.SEA))
cmd = [
'run', 'analyze_simulation.py', self._model_fname, self._traj_fname,
out_fname, sea_fname
]
cfg_fname and cmd.extend(['-sim-cfg', cfg_fname])
retcode = subprocess.call(cmd)
if retcode:
self.status.set(cmj.JobStatus.BACKEND_ERROR)
else:
self.status.set(cmj.JobStatus.SUCCESS)
[docs]class PLAnalysis(cmj.StageBase):
"""
This class sets up and runs analysis for protein-ligand complex. The
input parameters are ligand and protein ASLs, if not specified the class
will automatically try to detect appropriate values, if "none" is specified
the class will ignore analysis for the parameter and in other cases use the
parameter value to run the analysis job.
"""
NAME = "pl_analysis"
PARAM = cmj._create_param_when_needed([
f"""
DATA = {{
{LIGAND_ASL_KEY} = ""
{PROTEIN_ASL_KEY} = ""
}}
VALIDATE = {{
{LIGAND_ASL_KEY} = {{type = str}}
{PROTEIN_ASL_KEY} = {{type = str}}
}}
""",
])
[docs] def __init__(self, *arg, **kwarg):
"""
initialization of pl-analysis stage
"""
cmj.StageBase.__init__(self, *arg, **kwarg)
# __init__
[docs] def crunch(self):
"""
do all the setup and submit the calculations
"""
self._print("debug", "In PLAnalysis.crunch")
sea.set_macro_dict(self._get_macro_dict())
sea.update_macro_dict({"$JOBNAME": self.param.jobname.val})
for pj in self.get_prejobs():
model_fname = pj.output.struct_file()
jobname, dir = self._get_jobname_and_dir(pj)
ligand_asl = self.param.ligand_asl.val
protein_asl = self.param.protein_asl.val
# Makes the directory.
if (not os.path.isdir(dir)):
os.makedirs(dir)
util.chdir(dir)
import schrodinger.application.desmond.automatic_analysis_generator as aag
cms_model = cms.Cms(model_fname)
for a in cms_model.atom:
a.property[constants.ORIGINAL_INDEX] = a.index
a.property[
constants.ORIGINAL_MOLECULE_NUMBER] = a.molecule_number
if not len(ligand_asl):
ligand_st, ligand_asl = aag.getLigand(cms_model)
elif ligand_asl.lower() == 'none':
ligand_st, ligand_asl = None, None
else:
ligand_atm_idx = cms_model.select_atom(ligand_asl)
ligand_st = cms_model.extract(ligand_atm_idx)
if not len(protein_asl):
protein_asl = aag.PROTEIN_ASL
elif protein_asl.lower() == 'none':
protein_asl = None
# use the saved reference coordinates
frame, ref_struct = 0, None
kws = aag.getPLISKWList(cms_model,
ligand_st,
ligand_asl,
ref_struct,
frame,
protein_asl=protein_asl,
want_rmsd=bool(protein_asl),
want_prmsf=bool(protein_asl),
want_lrmsf=bool(ligand_asl),
want_pli=bool(protein_asl and ligand_asl),
want_ppi=bool(protein_asl),
want_ltorsion=bool(ligand_asl),
want_lprops=bool(ligand_asl))
eaf_in_fname = jobname + ".st2"
eaf = open(eaf_in_fname, 'w')
m = sea.Map()
m["Trajectory"] = model_fname
m["Keywords"] = kws
if ligand_asl:
m["LigandASL"] = ligand_asl
if protein_asl:
m["ProteinASL"] = protein_asl
eaf.write(str(m))
eaf.close()
eaf_out_fname = jobname + '.eaf'
self._print("quiet", " /------Simulation Interactions Diagram-----")
self._print("quiet", " | Input File: %s" % eaf_in_fname)
self._print("quiet", " | Output File: %s" % eaf_out_fname)
self._print("quiet", " | Ligand ASL: '%s'" % ligand_asl)
self._print("quiet", " | Protein ASL: '%s'" % protein_asl)
self._print("quiet", " |")
self._print("quiet", " | To view results, run:")
self._print(
"quiet", " | $SCHRODINGER/run event_analysis.py %s/%s" %
(jobname, eaf_out_fname))
self._print("quiet",
" \\------------------------------------------")
trj_fname = pj.output.get('TRJ')
cmd = [
os.path.join(envir.CONST.SCHRODINGER,
"run"), 'analyze_simulation.py', model_fname,
trj_fname, eaf_out_fname, eaf_in_fname, '-JOBNAME', jobname
]
job = cmj.Job(jobname, pj, self, cmd, dir)
job.output.add(eaf_out_fname, None)
job.need_host = True
self.add_job(job)
self.add_jobs(self.get_prejobs())
self._print("debug", "Out PLAnalysis.crunch")
[docs] def hook_captured_successful_job(self, job):
for out_fname in job.output:
ext = fileutils.splitext(out_fname)[1]
if ext == '.eaf':
eaf_copy = pathlib.Path(cmj.ENGINE.base_dir,
cmj.ENGINE.jobname + '.eaf')
shutil.copy(out_fname, eaf_copy)
self._files4copy.append(eaf_copy)
break
if (type(job) == cmj.Job):
return "dissolved"
[docs]class Analysis(cmj.StageBase):
"""
"""
NAME = "analysis"
PARAM = cmj._create_param_when_needed([
"""
DATA = {
report = "$MAINJOBNAME_$STAGENO_report.html"
plotter = mplchart
time_series = []
prob_profile = []
meta.range = []
meta.nbin = [51]
SEA = off
}
VALIDATE = {
report = {type = str}
plotter = {type = enum range = [mplchart gchart]}
time_series = {type = list size = 0 elem = {type = list size = 0}}
prob_profile = {type = list size = 0 elem = {type = list size = 0}}
meta.range = {type = list size = 0 elem = {type = list size = 0}}
meta.nbin = {type = list size = -1 elem = {type = int range = [2 100000000000]}}
SEA = [
{type = bool0}
{_skip = all}
]
}
""",
])
DEFAULT_PROB_PROFILE_SETTING = {
"dihedral": [2, 0, 360, True],
"angle": [2, 0, 180, True],
}
[docs] def __init__(self, *arg, **kwarg):
"""
"""
cmj.StageBase.__init__(self, *arg, **kwarg)
# __init__
def _get_prob_profile(self, time_series, prob_profile):
"""
"""
result = []
for i, e in enumerate(time_series):
try:
p = prob_profile[i]
if (p != []):
result.append(p)
continue
except IndexError:
pass
if (e[0] in Analysis.DEFAULT_PROB_PROFILE_SETTING):
result.append(Analysis.DEFAULT_PROB_PROFILE_SETTING[e[0]])
else:
result.append(None)
return result
def _get_title(self, pj):
"""
"""
try:
title = "win#%d:" % pj.i_win
except AttributeError:
if (pj.rid):
title = "rep#%d:" % pj.rid
else:
title = ""
return title
[docs] def crunch(self):
"""
"""
self._print("debug", "In Analysis.crunch")
sea.set_macro_dict(self._get_macro_dict())
sea.update_macro_dict({"$JOBNAME": self.param.jobname.val})
time_series = self.param.time_series.val
prob_profile = self.param.prob_profile.val
reporter = Reporter(self.param.report.val, self.param.plotter.val) if (
self.param.report.val) else None
for pj in self.get_prejobs():
try:
traj_fname = pj.output.get("TRJ")
if (traj_fname is None):
raise KeyError
except KeyError:
self._print(
"quiet",
"WARNING: Job %s has no registered trajectory output." %
str(pj))
self._print("quiet",
"WARNING: Ignore this job for regular analysis.")
else:
model_fname = pj.output.struct_file()
jobname, dir = self._get_jobname_and_dir(pj)
title = self._get_title(pj)
if (time_series != []):
prob = self._get_prob_profile(time_series, prob_profile)
new_job = TimeSeriesAnalysisJob(
time_series,
prob,
model_fname,
traj_fname,
reporter,
title,
jobname=jobname,
parent=pj,
stage=self,
dir=dir,
jlaunch_cmd=lambda job: job.run(),
is_output=False)
self.add_job(new_job)
if (self.param.SEA.val):
SEA = copy.deepcopy(self.param.SEA)
if ("output" not in SEA):
SEA["output"] = "$JOBNAME_sea-out.st2"
SEA.output.val = SEA.output.val # This resets the 'output' with the macro-expanded value.
SEA["seacfg"] = "$JOBNAME_sea.st2"
SEA.seacfg.val = SEA.seacfg.val
# Finds out the "*-out.cfg" file.
cfg_file = glob.glob(os.path.join(pj.dir, "*-out.cfg"))
if (not cfg_file):
self._print(
"quiet",
"WARNING: The *-out.cfg file of job %s not found." %
str(pj))
else:
SEA["simcfg"] = cfg_file[0]
new_job = SeaJob(SEA,
model_fname,
traj_fname,
reporter,
title,
jobname=jobname,
parent=pj,
stage=self,
dir=dir,
jlaunch_cmd=lambda job: job.run(),
is_output=False)
self.add_job(new_job)
for pj in self.get_prejobs():
try:
cfg = sea.Map(open(pj.input.incfg_file(), "r").read())
except:
cfg = pj.stage.param
if (config.is_meta(cfg)):
meta = copy.deepcopy(cfg.meta)
jobname, dir = self._get_jobname_and_dir(pj)
default_nbin = self.param.meta.nbin[-1].val
for i, e in enumerate(meta.cv):
try:
e["nbin"] = self.param.meta.nbin[i].val
except IndexError:
e["nbin"] = default_nbin
for i, e in enumerate(meta.cv):
try:
e["range"] = self.param.meta.range[i]
except IndexError:
e["range"] = []
meta.name.val = pj.output.get("KERSEQ")
meta.cv_name.val = pj.output.get("CVSEQ")
title = self._get_title(pj)
new_job = MetadynamicsAnalysisJob(
meta,
reporter,
title,
jobname=jobname,
parent=pj,
stage=self,
dir=dir,
jlaunch_cmd=lambda job: job.run(),
is_output=False)
self.add_job(new_job)
self.add_jobs(self.get_prejobs())
self._print("debug", "Out Analysis.crunch")
[docs] def hook_captured_successful_job(self, job):
"""
"""
if (isinstance(job, AnalysisJob) or
isinstance(job, MetadynamicsAnalysisJob)):
return "dissolved"
[docs]class FepanaJob(DesmondJob, Picklable):
"""
"""
id = 0, Picklable
[docs] def __init__(self,
fepid=None,
n_win=None,
sim_job=None,
mutation=None,
temperature=300.,
correct_restr=False,
correct_coul=False,
correct_vdw=False,
bennett=None,
reporter=None,
fep_type=None,
include_sid=True,
*arg,
**kwarg):
"""
"""
self.fepid = fepid
self.n_win = n_win
self.sim_job = sim_job
self.mutation = mutation
self.correct_coul = correct_coul
self.correct_vdw = correct_vdw
self.correct_restr = correct_restr
self.temperature = temperature
self.bennett = bennett
self.reporter = reporter
self.fep_type = fep_type
self.include_sid = include_sid
# Results
self.result = None
self.contrib = None
self.vdw_correction = None
self.coul_correction = None
self.repr_maes = []
DesmondJob.__init__(self, *arg, **kwarg)
# __init__
def __getstate__(self, state=None):
"""
"""
state = state or copy.copy(self.__dict__)
state["reporter"] = copy.copy(self.reporter)
return cmj.Job.__getstate__(self, state)
[docs] def describe(self):
"""
"""
cmj.Job.describe(self)
self._print(
"quiet", " Mutation : %s\n"
" Windows : %d" % (self.mutation, self.n_win))
[docs] def get_endpoint_cmsfiles(self):
lambda0 = None
lambda1 = None
for sj in self.sim_job[self.fepid]:
if sj.i_win == 0:
lambda0 = sj.output.struct_file()
elif sj.i_win == sj.n_win - 1:
lambda1 = sj.output.struct_file()
return lambda0, lambda1
[docs] def run(self):
"""
"""
util.chdir(self.dir)
for sj in self.sim_job[self.fepid]:
fname = "gibbs.%d.dE" % sj.i_win
self._print("debug", os.path.join(sj.dir, sj.output.get("dE")))
util.symlink(os.path.join(sj.dir, sj.output.get("dE")), fname)
random_seed = self.bennett.random_seed.val
if random_seed == "time":
random_seed = int(time.time())
bar = fepana.init_bennett(self.dir,
self.n_win,
temperature=self.temperature,
random_seed=random_seed,
result_file="result",
nresamples=100)
bar.write_data('# %s' % self.jobname, bar.fout)
self.result, err_msg, dF, results = fepana.run_bennett(bar)
bar.close_output()
if self.correct_coul.val:
from schrodinger.application.desmond.fep_net_charge import \
compute_net_charge_corrections
(lambda0, lambda1) = self.get_endpoint_cmsfiles()
ene0 = lambda0.replace("-out.cms", ".ene")
ene1 = lambda1.replace("-out.cms", ".ene")
if lambda0 and lambda1 and ene0 and ene1:
correction0 = compute_net_charge_corrections(
lambda0, ene0, True)
correction1 = compute_net_charge_corrections(
lambda1, ene1, False)
self.coul_correction = np.sum(correction1 - correction0)
self.result += self.coul_correction
else:
self._log(
"Net charge correction cannot be computed because output cms or ene files are missing."
)
if self.reporter:
# Calculates the dE distribution overlaps.
self.reporter.write(
"<br/>Overlap of work distributions of neighboring windows:<br/>"
)
for i in range(self.n_win - 1):
try:
filename = "dE_overlap_%d_%d.png" % (i, i + 1)
url = fepana.plot_lambda_window_overlap(
"gibbs.%d.dE" % i,
"gibbs.%d.dE" % (i + 1),
"dE_prob-%d_%d" % (i, i + 1),
["dE_%d+" % i, "dE_%d-" % (i + 1)],
time_range=[100.0, float("inf")],
filename=filename,
reporter=self.reporter)
self.reporter.write(util.html_embed_image(url))
except RuntimeError as e:
self.reporter.write(
"<br/> Failed to plot the dE_%d+ and dE_%d- distributions. "
"Probably the simulation was too short to produce statistics.<br/>"
% (i, i + 1))
self._log(
"Calculating overlap failed for windows (%i, %i). %s" %
(i, i + 1, e))
self.reporter.write("<br/>")
self.reporter.close()
# Shows error message if there is.
if err_msg:
self._print(
"quiet",
"FEP [%s]: Bennett analysis program issued warning or error message:"
% self.mutation)
err_msg = err_msg.split("\n")
if len(err_msg) > 100:
err_msg = err_msg[-100:]
self._print(
"quiet",
"FEP [%s]: Last 100 lines of the message:" % self.mutation)
for line in err_msg:
self._print("quiet", line)
self._print("quiet", "(end of message)\n")
if self.result is None:
self.status.set(cmj.JobStatus.BACKEND_ERROR)
return
elif not err_msg:
util.chdir(self.dir)
with open("gibbs.0.dE") as fh:
content = fh.readlines()
if not content:
self._print("quiet", "ERROR: empty gibbs.0.dE file.")
self.status.set(cmj.JobStatus.BACKEND_ERROR)
return
last_line = content[-1].strip()
if "#" == last_line[0]:
self._print("quiet",
"ERROR: no sampling data in gibbs.0.dE file.")
self.status.set(cmj.JobStatus.BACKEND_ERROR)
return
word = last_line.split()
last_time = float(word[0])
energy_output = fepana.calc_free_energy(
'.',
last_time,
self.n_win,
temperature=self.temperature,
bennett_options=self.bennett.val,
random_seed=random_seed)
forward_time_energy = energy_output.get('forward_time')
reversed_time_energy = energy_output.get('reversed_time')
sliding_time_energy = energy_output.get('sliding_time')
try:
url0 = fepana.plot_convergence(
forward_time_energy[0],
self.bennett.forward_time.name.val,
"dF_t-%d_%d",
"simulation course",
"red",
reporter=self.reporter).get('url')
except ValueError as e:
url0 = None
self._print(
"quiet",
f"WARNING: Error in plotting convergence for forward time: {e}"
)
try:
url1 = fepana.plot_convergence(
reversed_time_energy[0],
self.bennett.reversed_time.name.val,
"dF_rt-%d_%d",
"reversed time",
"green",
reporter=self.reporter,
).get('url')
except ValueError as e:
url1 = None
self._print(
"quiet",
f"WARNING: Error in plotting convergence for reversed time: {e}"
)
try:
url2 = fepana.plot_convergence(
sliding_time_energy[0],
self.bennett.sliding_time.name.val,
"dF_st-%d_%d",
"sliding time",
"blue",
reporter=self.reporter).get('url')
except ValueError as e:
url2 = None
self._print(
"quiet",
f"WARNING: Error in plotting convergence for sliding time: {e}"
)
if self.include_sid:
import schrodinger.application.desmond.process_fep_traj as pft
import schrodinger.application.desmond.replica_sid_generator as rsg
sid_dir = self.parent.dir
jobname = self.parent.jobname
tasktype = 'lambda_hopping'
util.chdir(sid_dir)
# write .sid file
fep_results_report = rsg.FEPReport(
jobname,
energy_output,
task_type=tasktype,
n_win=self.parent.n_win,
perturbation_type=self.fep_type)
fep_results_report.export()
shutil.copyfile(os.path.join(sid_dir, f'{jobname}.sid'),
os.path.join(self.dir, f'{jobname}.sid'))
# reduce the trajectories and generate representative frames
files_to_keep = filter(
None.__ne__,
pft.postprocess_traj(
f'{jobname}.sid',
ref_ct_fname=fep_results_report.ref_ct_fname))
for f in files_to_keep:
src, dest = os.path.join(sid_dir,
f), os.path.join(self.dir, f)
if os.path.isdir(src):
if os.path.exists(dest):
new_dest = fileutils.get_next_filename(
os.path.dirname(dest) + os.path.sep,
os.path.basename(dest))
print("Folder %s exists. Rename it to %s" %
(dest, new_dest))
shutil.move(dest, new_dest)
shutil.copytree(src, dest)
else:
shutil.copyfile(src, dest)
# Store a list of generated repr structures
if f.endswith('.mae'):
self.repr_maes.append(dest)
util.chdir(self.dir)
if self.reporter:
self.reporter.write(
"<br/><br/>Overall free energy difference (dG) as a function of time:<br/>"
)
def my_write(url):
if url:
self.reporter.write(util.html_embed_image(url[0]))
my_write(url0)
my_write(url1)
my_write(url2)
self.reporter.write("<br/>")
self.reporter.write(
"<br/><br/>Free energy difference (dF) between neighboring windows"
" as a function of time:<br/>")
def my_write1(url):
if url:
for e in url[1:]:
self.reporter.write(util.html_embed_image(e))
self.reporter.write("<br/>")
my_write1(url0)
my_write1(url1)
my_write1(url2)
self.reporter.write(" <br/>")
self.reporter.close()
# Calculates the contributions.
if self.task == "afep":
sj = self.sim_job[self.fepid][0]
try:
self.contrib = fepana.calc_contrib("result",
sj.input.outcfg_file())
except Exception as e:
self._print("quiet", "WARNING: %s" % str(e))
# VDW long-range dispersion correction for absolute solvation free energy calculations.
if self.task == "afep" and self.correct_vdw.val:
self._print(
"quiet", "Long-range dispersion correction has been included"
" in dE's such that the vdW correction is no longer needed for"
" calculating the free energy.")
self._print(
"quiet", "The fep_analysis.correct_vdw setting has no effects"
" now.")
if self.task == "afep":
lambda0 = None
for sj in self.sim_job[self.fepid]:
if sj.i_win == 0:
lambda0 = sj
break
if lambda0:
try:
cross_link = lambda0.cross_link
except AttributeError:
pass
else:
self._print("debug", "calculating cross-link correction...")
self.cross_link_correction = fepana.calc_free_energy_for_abfe_cross_link_xu(
cross_link)
self.result -= self.cross_link_correction
self._print(
"debug", " cross-link correction = %f" %
self.cross_link_correction)
if self.correct_restr.val:
lambda0 = None
lambda1 = None
for sj in self.sim_job[self.fepid]:
if sj.i_win == 0:
lambda0 = sj
elif sj.i_win == sj.n_win - 1:
lambda1 = sj
if lambda0 and lambda1:
egout0 = lambda0.output.get("eg")
egout1 = lambda1.output.get("eg")
if egout0 and egout1:
self._print("debug",
"lambda0 energy group file: %s" % egout0)
self._print("debug",
"lambda1 energy group file: %s" % egout1)
egout0 = os.path.join(lambda0.dir, egout0)
egout1 = os.path.join(lambda1.dir, egout1)
if os.path.isfile(egout0) and os.path.isfile(egout1):
correction = fepana.correct_restr(
egout0, egout1, "restr_correction")
else:
correction = None
self._log(
"Correction for position restraints cannot be done "
"because the following output files are missing:\n %s\n %s"
% (egout0, egout1))
if correction:
self.result += correction
else:
self._log(
"Correction for position restraints cannot be done because 'energy_group' was turned off."
)
self.status.set(cmj.JobStatus.SUCCESS)
[docs]class FepAnalysis(cmj.StageBase):
"""
"""
NAME = "fep_analysis"
PARAM = cmj._create_param_when_needed([
string.Template("""
DATA = {
report = "$$JOBNAME.html"
plotter = mplchart
fep_type = $small_molecule
bennett = {
random_seed = 2111839
_override_temperature = 0.0
forward_time = {
name = "freeenergy_time"
begin = 100.0
end = inf
dt = 150:points
}
reversed_time = {
name = "freeenergy_rtime"
begin = 100.0
end = inf
dt = 150:points
}
sliding_time = {
name = "freeenergy_stime"
begin = 100.0
end = inf
dt = 150:points
window = 500.0
}
}
correct_coul = no
correct_vdw = no # DEPRECATED! No effects any more.
correct_restr = yes
should_sync = yes
include_sid = yes
}
VALIDATE = {
report = {type = str}
plotter = {type = enum range = [mplchart gchart]}
fep_type = {type = enum range = [$all_fep_types]}
bennett = {
random_seed = [
{type = enum range = [time]}
{type = int}
]
_override_temperature = {type = "float+"}
forward_time = {
name = {type = str}
begin = {type = "float+"}
end = {type = "float+"}
dt = [{type = "float+"} {type = "$reg_exp"}]
}
reversed_time = {
name = {type = str}
begin = {type = "float+"}
end = {type = "float+"}
dt = [{type = "float+"} {type = "$reg_exp"}]
}
sliding_time = {
name = {type = str1}
begin = {type = "float+"}
end = {type = "float+"}
dt = [{type = "float+"} {type = "$reg_exp"}]
window = {type = "float+"}
}
}
correct_coul = {type = bool}
correct_vdw = [
{type = bool}
{cutoff_radius = {type = "float+"}
average_dispersion = {type = "float+"}
}
]
correct_restr = {type = bool}
include_sid = {type = bool}
}
""").substitute(small_molecule=constants.FEP_TYPES.SMALL_MOLECULE,
all_fep_types=str(constants.FEP_TYPES),
reg_exp="regex:^\\\\d+:points$")
])
[docs] def crunch(self):
"""
"""
self._print("debug", "In FepAnalysis.crunch")
self._log("Checking integrity of simulation results...")
fep_job = {}
for pj in self.get_prejobs():
if isinstance(pj, FepJob):
fep_job.setdefault(pj.fepid, []).append(pj)
else:
self._log(
"WARNING: Simulation performed by %s seems not a FEP simulation."
% str(pj))
self._log("WARNING: Ignore this job in FEP analysis.")
if not fep_job:
self._log("No FEP job was performed.")
good_fepid = []
for fepid in fep_job:
i_win = set()
n_win = fep_job[fepid][0].n_win
for pj in fep_job[fepid]:
i_win.add(pj.i_win)
fragname = fep_job[fepid][0].fragname or "absolute"
if len(i_win) == n_win:
self._log("FEP [%s] seems well done" % fragname)
good_fepid.append(fepid)
else:
well = sorted(i_win)
ill = sorted(set(range(n_win)) - i_win)
self._log("FEP [%s] partially finished" % fragname)
self._log(" %d lambda windows finished: %s" % (
len(well),
str(well)[1:-1],
))
self._log(" %d lambda windows not done: %s" % (
len(ill),
str(ill)[1:-1],
))
self._log(" Ignore this FEP in analysis.")
for fepid in good_fepid:
# Gets the temperature of the simulation. If not found, it is assumed to be 300.0 K.
parent = fep_job[fepid][0]
outcfg_fname = parent.input.outcfg_file(
) or parent.input.incfg_file().replace("-in.cfg", "-out.cfg")
temperature = 300.
try:
outcfg = sea.Map(open(outcfg_fname).read())
temperature = outcfg.integrator.temperature.T_ref.val
except:
self._log(
"Trouble reading config file; using default temperature (%d K)"
% temperature)
if self.param.bennett._override_temperature.val > 0:
# NOTE: This is only used for running tests with a small
# number of replicas.
# When the number of replicas is small, the overlap between
# the energies of neighboring replicas is poor. This can prevent
# the bennett calculation from converging. By artificially
# increasing the temperature, you can force convergence,
# because the energy values are scaled by the inverse temperature.
temperature = self.param.bennett._override_temperature.val
self._log(
f"Override bennett temperature for testing, using {temperature} K."
)
mutation = "(absolute free energy)" if parent.task == "afep" \
else fep_job[fepid][0].fragname
jobname, dir = self._get_jobname_and_dir(parent)
reporter = Reporter(
self.param.report.val,
self.param.plotter.val) if self.param.report.val else None
new_job = FepanaJob(jobname=jobname,
parent=parent,
stage=self,
dir=dir,
jlaunch_cmd=lambda job: job.run(),
fepid=fepid,
n_win=parent.n_win,
sim_job=fep_job,
mutation=mutation,
temperature=temperature,
correct_restr=self.param.correct_restr,
correct_vdw=self.param.correct_vdw,
correct_coul=self.param.correct_coul,
task=parent.task,
bennett=self.param.bennett,
reporter=reporter,
fep_type=self.param.fep_type.val,
include_sid=self.param.include_sid.val,
is_output=False)
self.add_job(new_job)
if (not os.path.isdir(dir)):
os.makedirs(dir)
self._print("debug", "Out FepAnalysis.crunch")
[docs] def poststage(self):
"""
"""
RESULT_CT_PROPERTY_NAME = constants.FEP_DG_PREFIX
job_to_cms = {} # Keys are DesmondJob.gid's.
for job in self.jobs:
fepid = job.fepid
n_win = job.n_win
sim_job = job.sim_job[fepid]
target_i_win = 0 if self.param.fep_type.val == constants.FEP_TYPES.ABSOLUTE_BINDING else n_win - 1
for sj in sim_job:
if sj.i_win == target_i_win:
lambda1_cms_fname = sj.output.struct_file()
break
job_to_cms[job] = cms.Cms(lambda1_cms_fname)
mut_ct = job_to_cms[job].mut_ct
# Removes all previous "s_des_dG*" and "s_des_ddG*" properties,
# but here are some exceptions under certain conditions:
# 1. When it's absolute binding FEP,
# the following properties are exempted:
# `constaints.FEP_DG_CROSSLINK_CORRECTION_REF`,
# `constaints.FEP_DG_CROSSLINK_CORRECTION_MUT`,
# `constaints.FEP_DDG_CROSSLINK_CORRECTION`,
# `constants.FEP_DG_CROSSLINK_CORRECTION`
# because these are set by a prior stage of the same job.
exempt_keys = ()
if self.param.fep_type.val == constants.FEP_TYPES.ABSOLUTE_BINDING:
exempt_keys = (constants.FEP_DG_CROSSLINK_CORRECTION_REF,
constants.FEP_DG_CROSSLINK_CORRECTION_MUT,
constants.FEP_DDG_CROSSLINK_CORRECTION,
constants.FEP_DG_CROSSLINK_CORRECTION)
property_keys = [
key for key in list(mut_ct.property)
if ((key.startswith(constants.FEP_DDG_PREFIX) or key.startswith(
constants.FEP_DG_PREFIX)) and key not in exempt_keys)
]
struc.delete_structure_properties(mut_ct, property_keys)
if job.result == "":
mut_ct.property[RESULT_CT_PROPERTY_NAME] = "analysis failure"
else:
if job.task == "afep" and not hasattr(job,
"cross_link_correction"):
# -RT * ln( R' T )
R = 1.98588E-3
R1 = 0.0820574587
T = job.temperature
delta = -R * T * math.log(R1 * T)
mut_ct.property[RESULT_CT_PROPERTY_NAME +
"_transfer"] = str(job.result)
mut_ct.property[RESULT_CT_PROPERTY_NAME +
"_solvation"] = str(job.result + delta)
if (job.contrib):
prefix = RESULT_CT_PROPERTY_NAME + "_contribution_"
for name in job.contrib.__dict__:
if (name[:4] == "fec_"):
mut_ct.property[prefix + name[4:]] = str(
job.contrib.__dict__[name])
else:
mut_ct.property[RESULT_CT_PROPERTY_NAME] = str(job.result)
if (job.vdw_correction):
prop_name = RESULT_CT_PROPERTY_NAME + "_long_range_dispersion_correction"
mut_ct.property[prop_name] = str(job.vdw_correction)
self._log(
"Long-range dispersion correction for %s: %f kcal/mol" %
(
job.mutation,
job.vdw_correction,
))
if (job.coul_correction):
prop_name = RESULT_CT_PROPERTY_NAME + "_net_charge_correction"
mut_ct.property[prop_name] = str(job.coul_correction)
self._log("Net Charge correction for %s: %f kcal/mol" % (
job.mutation,
job.coul_correction,
))
if (hasattr(job, "cross_link_correction")):
prop_name = RESULT_CT_PROPERTY_NAME + "_cross_link_correction"
mut_ct.property[prop_name] = str(job.cross_link_correction)
self._log(
"Cross-link-restraint correction for %s: %f kcal/mol" %
(
job.mutation,
job.cross_link_correction,
))
if self.param.fep_type.val == constants.FEP_TYPES.ABSOLUTE_BINDING and job.repr_maes:
# In ABFEP, only one repr structure is generated
repr_st = StructureReader.read(job.repr_maes[0])
# Preserve the properties needed to calculated the dG
repr_st.property[constants.FEP_DG_PREFIX] = mut_ct.property[
constants.FEP_DG_PREFIX]
repr_st.property[
constants.FEP_RESTRAINT_CORRECTION] = mut_ct.property[
constants.FEP_RESTRAINT_CORRECTION]
job_to_cms[job] = repr_st
new_job = []
for pjob, st in job_to_cms.items():
jobname, dir = self._get_jobname_and_dir(pjob)
fname_out = os.path.join(dir,
jobname + "_" + str(pjob.gid) + "-out.cms")
st.write(fname_out)
job = cmj.Job(jobname, pjob, self, None, dir)
job.output.add(fname_out, None)
job.output.add("fep.dG.log", None)
new_job.append(job)
cmj.StageBase.poststage(self)
if (self._NEXT_STAGE is not None):
for job in new_job:
job.status.set(cmj.JobStatus.SUCCESS)
self.add_jobs(new_job)