Source code for schrodinger.application.desmond.stage.analysis

import copy
import glob
import math
import os
import pathlib
import shutil
import string
import time

import numpy as np
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import config
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import envir
from schrodinger.application.desmond import fepana
from schrodinger.application.desmond import struc
from schrodinger.application.desmond import util
from schrodinger.application.desmond.picklejar import Picklable
from schrodinger.structure import StructureReader
from schrodinger.utils import fileutils
from schrodinger.utils import sea
from schrodinger.utils import subprocess

from .jobs import DesmondJob
from .jobs import FepJob

__all__ = ['Analysis', 'PLAnalysis', 'FepAnalysis', 'Reporter']

PROTEIN_ASL_KEY = 'protein_asl'
LIGAND_ASL_KEY = 'ligand_asl'


[docs]class Reporter(object): """ """
[docs] def __init__(self, fname, plotter="gchart"): """ """ self._fname = fname self._plotter = plotter self._fh = None
def __copy__(self): """ """ return Reporter(self._fname, self._plotter)
[docs] def plot(self, *arg, **kwarg): """ Generates the plot and returns an url to the plot. """ if (self._plotter == "gchart"): import schrodinger.application.desmond.gchart as gchart return gchart.get_xy_url(*arg, **kwarg) if (self._plotter == "mplchart"): import schrodinger.application.desmond.mplchart as mplchart return mplchart.get_xy_plot(*arg, **kwarg)
[docs] def plot_2d(self, *arg, **kwarg): """ Generates the plot and returns an url to the plot. """ if (self._plotter == "mplchart"): import schrodinger.application.desmond.mplchart as mplchart return mplchart.get_2var_plot(*arg, **kwarg)
[docs] def write(self, msg): """ """ if (self._fh is None or self._fh.closed): self._fh = open(self._fname, "a") print(msg, file=self._fh)
[docs] def close(self): """ """ if (self._fh): self._fh.close()
[docs]class AnalysisJob(cmj.Job): """ """
[docs] def __init__(self, model_fname, traj_fname, reporter, title, *arg, **kwarg): """ """ self._model_fname = model_fname self._traj_fname = traj_fname self._reporter = reporter self._title = title cmj.Job.__init__(self, *arg, **kwarg)
# __init__ def __getstate__(self, state=None): """ """ state = state if (state) else copy.copy(self.__dict__) state["_reporter"] = copy.copy(self._reporter) return cmj.Job.__getstate__(self, state)
[docs] def run(self): """ """
[docs]class TimeSeriesAnalysisJob(AnalysisJob): """ """ Y_MARKER = { "distance": ("dist (A)", "diamond", "grey", "black"), "angle": ("angle (deg)", "x", "purple", "red"), "dihedral": ("dihe (deg)", "circle", "skyblue", "blue"), } TS_NAME = { "distance": "dist(t)", "angle": "angle(t)", "dihedral": "dihe(t)", } PR_NAME = { "distance": "P(dist)", "angle": "P(angle)", "dihedral": "P(dihe)", }
[docs] def __init__(self, time_series, prob_profile, *arg, **kwarg): """ """ self._time_series = time_series self._prob_profile = prob_profile AnalysisJob.__init__(self, *arg, **kwarg)
# __init__ def _get_requestname(self, request): """ """ s = str(request[0]) for e in request[1:]: if (isinstance(e, dict)): for e2 in e: s += "_%s_%s" % ( str(e2), e[e2], ) else: s += "_" + str(e) return s def _get_title(self, request): """ """ ts_title = self._title + " " + TimeSeriesAnalysisJob.TS_NAME[request[0]] pr_title = self._title + " " + TimeSeriesAnalysisJob.PR_NAME[request[0]] s = "" for e in request[1:]: if (isinstance(e, dict)): for e2 in e: s += "_%s_%s" % ( str(e2), e[e2], ) else: s += "_" + str(e) return ts_title + " " + s[1:], pr_title + " " + s[1:]
[docs] def run(self): """ """ if (not os.path.isdir(self.dir)): os.makedirs(self.dir) import schrodinger.application.desmond.ana as ana util.chdir(self.dir) ts = ana.calc_time_series(self._time_series, self._model_fname, self._traj_fname) basename = os.path.basename(self._traj_fname) for i, time_data in enumerate(ts): request_name = self._get_requestname(self._time_series[i]) fname = basename + "_time_series-" + request_name fh = open(fname, "w") x = [] data = [] for e in time_data: print(e[0], e[1], file=fh) x.append(e[0]) data.append(e[1]) fh.close() if (self._reporter): ts_title, pr_title = self._get_title(self._time_series[i]) data_label, symbol, ts_color, pr_color = TimeSeriesAnalysisJob.Y_MARKER[ self._time_series[i][0]] x_label = "time (ps)" if (x[-1] >= 10000): x_label = "time (ns)" x = [e / 1000.0 for e in x] url = self._reporter.plot(x, data, x_label=x_label, y_label=data_label, chart_type="scatter", marker=( symbol, ts_color, 5, ), title=ts_title, filename=("%s.png" % fname)) self._reporter.write(util.html_embed_image(url)) fname = basename + "_prob_profile-" + request_name prob = self._prob_profile[i] if (prob is None): datum_min = min(data) datum_max = max(data) datum_range = datum_max - datum_min num_datum = len(data) x_space = int(num_datum / 10) x_space = 300 if (x_space > 300) else (1 if ( x_space < 1) else x_space) bin_width = float(datum_range) / x_space prob = [ bin_width, datum_min, datum_max, False, ] prob_profile = ana.calc_prob_profile(data, *prob) x = [] y = [] fh = open(fname, "w") for e in prob_profile: print(e[0], e[1], file=fh) x.append(e[0]) y.append(e[1]) fh.close() if (self._reporter): url = self._reporter.plot(x, y, x_label=data_label, y_label="prob", color=[ pr_color, ], title=pr_title, filename=("%s.png" % fname)) self._reporter.write(util.html_embed_image(url)) if (self._reporter): self._reporter.close() self.status.set(cmj.JobStatus.SUCCESS)
[docs]class MetadynamicsAnalysisJob(cmj.Job): """ """
[docs] def __init__(self, meta_setting, reporter, title="", *arg, **kwarg): """ """ self._meta_setting = meta_setting # meta_setting.name.val meta_setting.cv_name.val self._reporter = reporter self._title = title cmj.Job.__init__(self, *arg, **kwarg)
# __init__ def __getstate__(self, state=None): """ """ state = state if (state) else copy.copy(self.__dict__) state["_reporter"] = copy.copy(self._reporter) return cmj.Job.__getstate__(self, state)
[docs] def run(self): """ """ if (not os.path.isdir(self.dir)): os.makedirs(self.dir) import schrodinger.application.desmond.meta as meta util.chdir(self.dir) basename = os.path.splitext( os.path.basename(self._meta_setting.name.val))[0] out_fname = basename + "_meta.fes" meta_ana = meta.MetaDynamicsAnalysis(self._meta_setting.name.val, None, self._meta_setting) meta_ana.computeFES(out_fname) if (1 == len(self._meta_setting.cv)): lines = open(out_fname, "r").read().split("\n") x = [] y = [] cv_type = self._meta_setting.cv[0].type.val for line in lines: line = line.strip() if (line != "" and line[0] != "#"): token = line.split() x.append(float(token[0])) y.append(float(token[1])) offset_y = np.subtract(y, np.amin(y)).tolist() url = self._reporter.plot(x, offset_y, y_label="free energy", x_label=cv_type, filename=("%s.png" % out_fname)) self._reporter.write(util.html_embed_image(url)) elif (2 == len(self._meta_setting.cv)): # do 2-D plotting here lines = open(out_fname, "r").read().split("\n") x = [] y = [] z = [] for line in lines: line = line.strip() if (line != "" and line[0] != "#"): token = line.split() x.append(float(token[0])) y.append(float(token[1])) z.append(float(token[2])) # offset FES to zero offset_z = np.subtract(z, np.amin(z)).tolist() self._reporter.plot_2d(list(zip(x, y, offset_z)), fill=True, legend=True, labels=False, legend_orientation='vertical', x_label="CV1", y_label="CV2", size=(600, 400), dpi=200, filename=("%s.png" % out_fname)) self.output.add(out_fname) self.output.add("%s.png" % out_fname) self.output.add(self._reporter._fname) if (self._reporter): self._reporter.close() self.status.set(cmj.JobStatus.SUCCESS)
[docs]class SeaJob(AnalysisJob): """ """
[docs] def __init__(self, SEA, *arg, **kwarg): AnalysisJob.__init__(self, *arg, **kwarg) self.SEA = SEA
[docs] def run(self): """ """ if (not os.path.isdir(self.dir)): os.makedirs(self.dir) util.chdir(self.dir) out_fname = self.SEA.output.val sea_fname = self.SEA.seacfg.val cfg_fname = self.SEA.simcfg.val if ("simcfg" in self.SEA) else None with open(sea_fname, "w") as f: f.write(str(self.SEA)) cmd = [ 'run', 'analyze_simulation.py', self._model_fname, self._traj_fname, out_fname, sea_fname ] cfg_fname and cmd.extend(['-sim-cfg', cfg_fname]) retcode = subprocess.call(cmd) if retcode: self.status.set(cmj.JobStatus.BACKEND_ERROR) else: self.status.set(cmj.JobStatus.SUCCESS)
[docs]class PLAnalysis(cmj.StageBase): """ This class sets up and runs analysis for protein-ligand complex. The input parameters are ligand and protein ASLs, if not specified the class will automatically try to detect appropriate values, if "none" is specified the class will ignore analysis for the parameter and in other cases use the parameter value to run the analysis job. """ NAME = "pl_analysis" PARAM = cmj._create_param_when_needed([ f""" DATA = {{ {LIGAND_ASL_KEY} = "" {PROTEIN_ASL_KEY} = "" }} VALIDATE = {{ {LIGAND_ASL_KEY} = {{type = str}} {PROTEIN_ASL_KEY} = {{type = str}} }} """, ])
[docs] def __init__(self, *arg, **kwarg): """ initialization of pl-analysis stage """ cmj.StageBase.__init__(self, *arg, **kwarg)
# __init__
[docs] def crunch(self): """ do all the setup and submit the calculations """ self._print("debug", "In PLAnalysis.crunch") sea.set_macro_dict(self._get_macro_dict()) sea.update_macro_dict({"$JOBNAME": self.param.jobname.val}) for pj in self.get_prejobs(): model_fname = pj.output.struct_file() jobname, dir = self._get_jobname_and_dir(pj) ligand_asl = self.param.ligand_asl.val protein_asl = self.param.protein_asl.val # Makes the directory. if (not os.path.isdir(dir)): os.makedirs(dir) util.chdir(dir) import schrodinger.application.desmond.automatic_analysis_generator as aag cms_model = cms.Cms(model_fname) for a in cms_model.atom: a.property[constants.ORIGINAL_INDEX] = a.index a.property[ constants.ORIGINAL_MOLECULE_NUMBER] = a.molecule_number if not len(ligand_asl): ligand_st, ligand_asl = aag.getLigand(cms_model) elif ligand_asl.lower() == 'none': ligand_st, ligand_asl = None, None else: ligand_atm_idx = cms_model.select_atom(ligand_asl) ligand_st = cms_model.extract(ligand_atm_idx) if not len(protein_asl): protein_asl = aag.PROTEIN_ASL elif protein_asl.lower() == 'none': protein_asl = None # use the saved reference coordinates frame, ref_struct = 0, None kws = aag.getPLISKWList(cms_model, ligand_st, ligand_asl, ref_struct, frame, protein_asl=protein_asl, want_rmsd=bool(protein_asl), want_prmsf=bool(protein_asl), want_lrmsf=bool(ligand_asl), want_pli=bool(protein_asl and ligand_asl), want_ppi=bool(protein_asl), want_ltorsion=bool(ligand_asl), want_lprops=bool(ligand_asl)) eaf_in_fname = jobname + ".st2" eaf = open(eaf_in_fname, 'w') m = sea.Map() m["Trajectory"] = model_fname m["Keywords"] = kws if ligand_asl: m["LigandASL"] = ligand_asl if protein_asl: m["ProteinASL"] = protein_asl eaf.write(str(m)) eaf.close() eaf_out_fname = jobname + '.eaf' self._print("quiet", " /------Simulation Interactions Diagram-----") self._print("quiet", " | Input File: %s" % eaf_in_fname) self._print("quiet", " | Output File: %s" % eaf_out_fname) self._print("quiet", " | Ligand ASL: '%s'" % ligand_asl) self._print("quiet", " | Protein ASL: '%s'" % protein_asl) self._print("quiet", " |") self._print("quiet", " | To view results, run:") self._print( "quiet", " | $SCHRODINGER/run event_analysis.py %s/%s" % (jobname, eaf_out_fname)) self._print("quiet", " \\------------------------------------------") trj_fname = pj.output.get('TRJ') cmd = [ os.path.join(envir.CONST.SCHRODINGER, "run"), 'analyze_simulation.py', model_fname, trj_fname, eaf_out_fname, eaf_in_fname, '-JOBNAME', jobname ] job = cmj.Job(jobname, pj, self, cmd, dir) job.output.add(eaf_out_fname, None) job.need_host = True self.add_job(job) self.add_jobs(self.get_prejobs()) self._print("debug", "Out PLAnalysis.crunch")
[docs] def hook_captured_successful_job(self, job): for out_fname in job.output: ext = fileutils.splitext(out_fname)[1] if ext == '.eaf': eaf_copy = pathlib.Path(cmj.ENGINE.base_dir, cmj.ENGINE.jobname + '.eaf') shutil.copy(out_fname, eaf_copy) self._files4copy.append(eaf_copy) break if (type(job) == cmj.Job): return "dissolved"
[docs]class Analysis(cmj.StageBase): """ """ NAME = "analysis" PARAM = cmj._create_param_when_needed([ """ DATA = { report = "$MAINJOBNAME_$STAGENO_report.html" plotter = mplchart time_series = [] prob_profile = [] meta.range = [] meta.nbin = [51] SEA = off } VALIDATE = { report = {type = str} plotter = {type = enum range = [mplchart gchart]} time_series = {type = list size = 0 elem = {type = list size = 0}} prob_profile = {type = list size = 0 elem = {type = list size = 0}} meta.range = {type = list size = 0 elem = {type = list size = 0}} meta.nbin = {type = list size = -1 elem = {type = int range = [2 100000000000]}} SEA = [ {type = bool0} {_skip = all} ] } """, ]) DEFAULT_PROB_PROFILE_SETTING = { "dihedral": [2, 0, 360, True], "angle": [2, 0, 180, True], }
[docs] def __init__(self, *arg, **kwarg): """ """ cmj.StageBase.__init__(self, *arg, **kwarg)
# __init__ def _get_prob_profile(self, time_series, prob_profile): """ """ result = [] for i, e in enumerate(time_series): try: p = prob_profile[i] if (p != []): result.append(p) continue except IndexError: pass if (e[0] in Analysis.DEFAULT_PROB_PROFILE_SETTING): result.append(Analysis.DEFAULT_PROB_PROFILE_SETTING[e[0]]) else: result.append(None) return result def _get_title(self, pj): """ """ try: title = "win#%d:" % pj.i_win except AttributeError: if (pj.rid): title = "rep#%d:" % pj.rid else: title = "" return title
[docs] def crunch(self): """ """ self._print("debug", "In Analysis.crunch") sea.set_macro_dict(self._get_macro_dict()) sea.update_macro_dict({"$JOBNAME": self.param.jobname.val}) time_series = self.param.time_series.val prob_profile = self.param.prob_profile.val reporter = Reporter(self.param.report.val, self.param.plotter.val) if ( self.param.report.val) else None for pj in self.get_prejobs(): try: traj_fname = pj.output.get("TRJ") if (traj_fname is None): raise KeyError except KeyError: self._print( "quiet", "WARNING: Job %s has no registered trajectory output." % str(pj)) self._print("quiet", "WARNING: Ignore this job for regular analysis.") else: model_fname = pj.output.struct_file() jobname, dir = self._get_jobname_and_dir(pj) title = self._get_title(pj) if (time_series != []): prob = self._get_prob_profile(time_series, prob_profile) new_job = TimeSeriesAnalysisJob( time_series, prob, model_fname, traj_fname, reporter, title, jobname=jobname, parent=pj, stage=self, dir=dir, jlaunch_cmd=lambda job: job.run(), is_output=False) self.add_job(new_job) if (self.param.SEA.val): SEA = copy.deepcopy(self.param.SEA) if ("output" not in SEA): SEA["output"] = "$JOBNAME_sea-out.st2" SEA.output.val = SEA.output.val # This resets the 'output' with the macro-expanded value. SEA["seacfg"] = "$JOBNAME_sea.st2" SEA.seacfg.val = SEA.seacfg.val # Finds out the "*-out.cfg" file. cfg_file = glob.glob(os.path.join(pj.dir, "*-out.cfg")) if (not cfg_file): self._print( "quiet", "WARNING: The *-out.cfg file of job %s not found." % str(pj)) else: SEA["simcfg"] = cfg_file[0] new_job = SeaJob(SEA, model_fname, traj_fname, reporter, title, jobname=jobname, parent=pj, stage=self, dir=dir, jlaunch_cmd=lambda job: job.run(), is_output=False) self.add_job(new_job) for pj in self.get_prejobs(): try: cfg = sea.Map(open(pj.input.incfg_file(), "r").read()) except: cfg = pj.stage.param if (config.is_meta(cfg)): meta = copy.deepcopy(cfg.meta) jobname, dir = self._get_jobname_and_dir(pj) default_nbin = self.param.meta.nbin[-1].val for i, e in enumerate(meta.cv): try: e["nbin"] = self.param.meta.nbin[i].val except IndexError: e["nbin"] = default_nbin for i, e in enumerate(meta.cv): try: e["range"] = self.param.meta.range[i] except IndexError: e["range"] = [] meta.name.val = pj.output.get("KERSEQ") meta.cv_name.val = pj.output.get("CVSEQ") title = self._get_title(pj) new_job = MetadynamicsAnalysisJob( meta, reporter, title, jobname=jobname, parent=pj, stage=self, dir=dir, jlaunch_cmd=lambda job: job.run(), is_output=False) self.add_job(new_job) self.add_jobs(self.get_prejobs()) self._print("debug", "Out Analysis.crunch")
[docs] def hook_captured_successful_job(self, job): """ """ if (isinstance(job, AnalysisJob) or isinstance(job, MetadynamicsAnalysisJob)): return "dissolved"
[docs]class FepanaJob(DesmondJob, Picklable): """ """ id = 0, Picklable
[docs] def __init__(self, fepid=None, n_win=None, sim_job=None, mutation=None, temperature=300., correct_restr=False, correct_coul=False, correct_vdw=False, bennett=None, reporter=None, fep_type=None, include_sid=True, *arg, **kwarg): """ """ self.fepid = fepid self.n_win = n_win self.sim_job = sim_job self.mutation = mutation self.correct_coul = correct_coul self.correct_vdw = correct_vdw self.correct_restr = correct_restr self.temperature = temperature self.bennett = bennett self.reporter = reporter self.fep_type = fep_type self.include_sid = include_sid # Results self.result = None self.contrib = None self.vdw_correction = None self.coul_correction = None self.repr_maes = [] DesmondJob.__init__(self, *arg, **kwarg)
# __init__ def __getstate__(self, state=None): """ """ state = state or copy.copy(self.__dict__) state["reporter"] = copy.copy(self.reporter) return cmj.Job.__getstate__(self, state)
[docs] def describe(self): """ """ cmj.Job.describe(self) self._print( "quiet", " Mutation : %s\n" " Windows : %d" % (self.mutation, self.n_win))
[docs] def get_endpoint_cmsfiles(self): lambda0 = None lambda1 = None for sj in self.sim_job[self.fepid]: if sj.i_win == 0: lambda0 = sj.output.struct_file() elif sj.i_win == sj.n_win - 1: lambda1 = sj.output.struct_file() return lambda0, lambda1
[docs] def run(self): """ """ util.chdir(self.dir) for sj in self.sim_job[self.fepid]: fname = "gibbs.%d.dE" % sj.i_win self._print("debug", os.path.join(sj.dir, sj.output.get("dE"))) util.symlink(os.path.join(sj.dir, sj.output.get("dE")), fname) random_seed = self.bennett.random_seed.val if random_seed == "time": random_seed = int(time.time()) bar = fepana.init_bennett(self.dir, self.n_win, temperature=self.temperature, random_seed=random_seed, result_file="result", nresamples=100) bar.write_data('# %s' % self.jobname, bar.fout) self.result, err_msg, dF, results = fepana.run_bennett(bar) bar.close_output() if self.correct_coul.val: from schrodinger.application.desmond.fep_net_charge import \ compute_net_charge_corrections (lambda0, lambda1) = self.get_endpoint_cmsfiles() ene0 = lambda0.replace("-out.cms", ".ene") ene1 = lambda1.replace("-out.cms", ".ene") if lambda0 and lambda1 and ene0 and ene1: correction0 = compute_net_charge_corrections( lambda0, ene0, True) correction1 = compute_net_charge_corrections( lambda1, ene1, False) self.coul_correction = np.sum(correction1 - correction0) self.result += self.coul_correction else: self._log( "Net charge correction cannot be computed because output cms or ene files are missing." ) if self.reporter: # Calculates the dE distribution overlaps. self.reporter.write( "<br/>Overlap of work distributions of neighboring windows:<br/>" ) for i in range(self.n_win - 1): try: filename = "dE_overlap_%d_%d.png" % (i, i + 1) url = fepana.plot_lambda_window_overlap( "gibbs.%d.dE" % i, "gibbs.%d.dE" % (i + 1), "dE_prob-%d_%d" % (i, i + 1), ["dE_%d+" % i, "dE_%d-" % (i + 1)], time_range=[100.0, float("inf")], filename=filename, reporter=self.reporter) self.reporter.write(util.html_embed_image(url)) except RuntimeError as e: self.reporter.write( "<br/> Failed to plot the dE_%d+ and dE_%d- distributions. " "Probably the simulation was too short to produce statistics.<br/>" % (i, i + 1)) self._log( "Calculating overlap failed for windows (%i, %i). %s" % (i, i + 1, e)) self.reporter.write("<br/>") self.reporter.close() # Shows error message if there is. if err_msg: self._print( "quiet", "FEP [%s]: Bennett analysis program issued warning or error message:" % self.mutation) err_msg = err_msg.split("\n") if len(err_msg) > 100: err_msg = err_msg[-100:] self._print( "quiet", "FEP [%s]: Last 100 lines of the message:" % self.mutation) for line in err_msg: self._print("quiet", line) self._print("quiet", "(end of message)\n") if self.result is None: self.status.set(cmj.JobStatus.BACKEND_ERROR) return elif not err_msg: util.chdir(self.dir) with open("gibbs.0.dE") as fh: content = fh.readlines() if not content: self._print("quiet", "ERROR: empty gibbs.0.dE file.") self.status.set(cmj.JobStatus.BACKEND_ERROR) return last_line = content[-1].strip() if "#" == last_line[0]: self._print("quiet", "ERROR: no sampling data in gibbs.0.dE file.") self.status.set(cmj.JobStatus.BACKEND_ERROR) return word = last_line.split() last_time = float(word[0]) energy_output = fepana.calc_free_energy( '.', last_time, self.n_win, temperature=self.temperature, bennett_options=self.bennett.val, random_seed=random_seed) forward_time_energy = energy_output.get('forward_time') reversed_time_energy = energy_output.get('reversed_time') sliding_time_energy = energy_output.get('sliding_time') try: url0 = fepana.plot_convergence( forward_time_energy[0], self.bennett.forward_time.name.val, "dF_t-%d_%d", "simulation course", "red", reporter=self.reporter).get('url') except ValueError as e: url0 = None self._print( "quiet", f"WARNING: Error in plotting convergence for forward time: {e}" ) try: url1 = fepana.plot_convergence( reversed_time_energy[0], self.bennett.reversed_time.name.val, "dF_rt-%d_%d", "reversed time", "green", reporter=self.reporter, ).get('url') except ValueError as e: url1 = None self._print( "quiet", f"WARNING: Error in plotting convergence for reversed time: {e}" ) try: url2 = fepana.plot_convergence( sliding_time_energy[0], self.bennett.sliding_time.name.val, "dF_st-%d_%d", "sliding time", "blue", reporter=self.reporter).get('url') except ValueError as e: url2 = None self._print( "quiet", f"WARNING: Error in plotting convergence for sliding time: {e}" ) if self.include_sid: import schrodinger.application.desmond.process_fep_traj as pft import schrodinger.application.desmond.replica_sid_generator as rsg sid_dir = self.parent.dir jobname = self.parent.jobname tasktype = 'lambda_hopping' util.chdir(sid_dir) # write .sid file fep_results_report = rsg.FEPReport( jobname, energy_output, task_type=tasktype, n_win=self.parent.n_win, perturbation_type=self.fep_type) fep_results_report.export() shutil.copyfile(os.path.join(sid_dir, f'{jobname}.sid'), os.path.join(self.dir, f'{jobname}.sid')) # reduce the trajectories and generate representative frames files_to_keep = filter( None.__ne__, pft.postprocess_traj( f'{jobname}.sid', ref_ct_fname=fep_results_report.ref_ct_fname)) for f in files_to_keep: src, dest = os.path.join(sid_dir, f), os.path.join(self.dir, f) if os.path.isdir(src): if os.path.exists(dest): new_dest = fileutils.get_next_filename( os.path.dirname(dest) + os.path.sep, os.path.basename(dest)) print("Folder %s exists. Rename it to %s" % (dest, new_dest)) shutil.move(dest, new_dest) shutil.copytree(src, dest) else: shutil.copyfile(src, dest) # Store a list of generated repr structures if f.endswith('.mae'): self.repr_maes.append(dest) util.chdir(self.dir) if self.reporter: self.reporter.write( "<br/><br/>Overall free energy difference (dG) as a function of time:<br/>" ) def my_write(url): if url: self.reporter.write(util.html_embed_image(url[0])) my_write(url0) my_write(url1) my_write(url2) self.reporter.write("<br/>") self.reporter.write( "<br/><br/>Free energy difference (dF) between neighboring windows" " as a function of time:<br/>") def my_write1(url): if url: for e in url[1:]: self.reporter.write(util.html_embed_image(e)) self.reporter.write("<br/>") my_write1(url0) my_write1(url1) my_write1(url2) self.reporter.write(" <br/>") self.reporter.close() # Calculates the contributions. if self.task == "afep": sj = self.sim_job[self.fepid][0] try: self.contrib = fepana.calc_contrib("result", sj.input.outcfg_file()) except Exception as e: self._print("quiet", "WARNING: %s" % str(e)) # VDW long-range dispersion correction for absolute solvation free energy calculations. if self.task == "afep" and self.correct_vdw.val: self._print( "quiet", "Long-range dispersion correction has been included" " in dE's such that the vdW correction is no longer needed for" " calculating the free energy.") self._print( "quiet", "The fep_analysis.correct_vdw setting has no effects" " now.") if self.task == "afep": lambda0 = None for sj in self.sim_job[self.fepid]: if sj.i_win == 0: lambda0 = sj break if lambda0: try: cross_link = lambda0.cross_link except AttributeError: pass else: self._print("debug", "calculating cross-link correction...") self.cross_link_correction = fepana.calc_free_energy_for_abfe_cross_link_xu( cross_link) self.result -= self.cross_link_correction self._print( "debug", " cross-link correction = %f" % self.cross_link_correction) if self.correct_restr.val: lambda0 = None lambda1 = None for sj in self.sim_job[self.fepid]: if sj.i_win == 0: lambda0 = sj elif sj.i_win == sj.n_win - 1: lambda1 = sj if lambda0 and lambda1: egout0 = lambda0.output.get("eg") egout1 = lambda1.output.get("eg") if egout0 and egout1: self._print("debug", "lambda0 energy group file: %s" % egout0) self._print("debug", "lambda1 energy group file: %s" % egout1) egout0 = os.path.join(lambda0.dir, egout0) egout1 = os.path.join(lambda1.dir, egout1) if os.path.isfile(egout0) and os.path.isfile(egout1): correction = fepana.correct_restr( egout0, egout1, "restr_correction") else: correction = None self._log( "Correction for position restraints cannot be done " "because the following output files are missing:\n %s\n %s" % (egout0, egout1)) if correction: self.result += correction else: self._log( "Correction for position restraints cannot be done because 'energy_group' was turned off." ) self.status.set(cmj.JobStatus.SUCCESS)
[docs]class FepAnalysis(cmj.StageBase): """ """ NAME = "fep_analysis" PARAM = cmj._create_param_when_needed([ string.Template(""" DATA = { report = "$$JOBNAME.html" plotter = mplchart fep_type = $small_molecule bennett = { random_seed = 2111839 _override_temperature = 0.0 forward_time = { name = "freeenergy_time" begin = 100.0 end = inf dt = 150:points } reversed_time = { name = "freeenergy_rtime" begin = 100.0 end = inf dt = 150:points } sliding_time = { name = "freeenergy_stime" begin = 100.0 end = inf dt = 150:points window = 500.0 } } correct_coul = no correct_vdw = no # DEPRECATED! No effects any more. correct_restr = yes should_sync = yes include_sid = yes } VALIDATE = { report = {type = str} plotter = {type = enum range = [mplchart gchart]} fep_type = {type = enum range = [$all_fep_types]} bennett = { random_seed = [ {type = enum range = [time]} {type = int} ] _override_temperature = {type = "float+"} forward_time = { name = {type = str} begin = {type = "float+"} end = {type = "float+"} dt = [{type = "float+"} {type = "$reg_exp"}] } reversed_time = { name = {type = str} begin = {type = "float+"} end = {type = "float+"} dt = [{type = "float+"} {type = "$reg_exp"}] } sliding_time = { name = {type = str1} begin = {type = "float+"} end = {type = "float+"} dt = [{type = "float+"} {type = "$reg_exp"}] window = {type = "float+"} } } correct_coul = {type = bool} correct_vdw = [ {type = bool} {cutoff_radius = {type = "float+"} average_dispersion = {type = "float+"} } ] correct_restr = {type = bool} include_sid = {type = bool} } """).substitute(small_molecule=constants.FEP_TYPES.SMALL_MOLECULE, all_fep_types=str(constants.FEP_TYPES), reg_exp="regex:^\\\\d+:points$") ])
[docs] def crunch(self): """ """ self._print("debug", "In FepAnalysis.crunch") self._log("Checking integrity of simulation results...") fep_job = {} for pj in self.get_prejobs(): if isinstance(pj, FepJob): fep_job.setdefault(pj.fepid, []).append(pj) else: self._log( "WARNING: Simulation performed by %s seems not a FEP simulation." % str(pj)) self._log("WARNING: Ignore this job in FEP analysis.") if not fep_job: self._log("No FEP job was performed.") good_fepid = [] for fepid in fep_job: i_win = set() n_win = fep_job[fepid][0].n_win for pj in fep_job[fepid]: i_win.add(pj.i_win) fragname = fep_job[fepid][0].fragname or "absolute" if len(i_win) == n_win: self._log("FEP [%s] seems well done" % fragname) good_fepid.append(fepid) else: well = sorted(i_win) ill = sorted(set(range(n_win)) - i_win) self._log("FEP [%s] partially finished" % fragname) self._log(" %d lambda windows finished: %s" % ( len(well), str(well)[1:-1], )) self._log(" %d lambda windows not done: %s" % ( len(ill), str(ill)[1:-1], )) self._log(" Ignore this FEP in analysis.") for fepid in good_fepid: # Gets the temperature of the simulation. If not found, it is assumed to be 300.0 K. parent = fep_job[fepid][0] outcfg_fname = parent.input.outcfg_file( ) or parent.input.incfg_file().replace("-in.cfg", "-out.cfg") temperature = 300. try: outcfg = sea.Map(open(outcfg_fname).read()) temperature = outcfg.integrator.temperature.T_ref.val except: self._log( "Trouble reading config file; using default temperature (%d K)" % temperature) if self.param.bennett._override_temperature.val > 0: # NOTE: This is only used for running tests with a small # number of replicas. # When the number of replicas is small, the overlap between # the energies of neighboring replicas is poor. This can prevent # the bennett calculation from converging. By artificially # increasing the temperature, you can force convergence, # because the energy values are scaled by the inverse temperature. temperature = self.param.bennett._override_temperature.val self._log( f"Override bennett temperature for testing, using {temperature} K." ) mutation = "(absolute free energy)" if parent.task == "afep" \ else fep_job[fepid][0].fragname jobname, dir = self._get_jobname_and_dir(parent) reporter = Reporter( self.param.report.val, self.param.plotter.val) if self.param.report.val else None new_job = FepanaJob(jobname=jobname, parent=parent, stage=self, dir=dir, jlaunch_cmd=lambda job: job.run(), fepid=fepid, n_win=parent.n_win, sim_job=fep_job, mutation=mutation, temperature=temperature, correct_restr=self.param.correct_restr, correct_vdw=self.param.correct_vdw, correct_coul=self.param.correct_coul, task=parent.task, bennett=self.param.bennett, reporter=reporter, fep_type=self.param.fep_type.val, include_sid=self.param.include_sid.val, is_output=False) self.add_job(new_job) if (not os.path.isdir(dir)): os.makedirs(dir) self._print("debug", "Out FepAnalysis.crunch")
[docs] def poststage(self): """ """ RESULT_CT_PROPERTY_NAME = constants.FEP_DG_PREFIX job_to_cms = {} # Keys are DesmondJob.gid's. for job in self.jobs: fepid = job.fepid n_win = job.n_win sim_job = job.sim_job[fepid] target_i_win = 0 if self.param.fep_type.val == constants.FEP_TYPES.ABSOLUTE_BINDING else n_win - 1 for sj in sim_job: if sj.i_win == target_i_win: lambda1_cms_fname = sj.output.struct_file() break job_to_cms[job] = cms.Cms(lambda1_cms_fname) mut_ct = job_to_cms[job].mut_ct # Removes all previous "s_des_dG*" and "s_des_ddG*" properties, # but here are some exceptions under certain conditions: # 1. When it's absolute binding FEP, # the following properties are exempted: # `constaints.FEP_DG_CROSSLINK_CORRECTION_REF`, # `constaints.FEP_DG_CROSSLINK_CORRECTION_MUT`, # `constaints.FEP_DDG_CROSSLINK_CORRECTION`, # `constants.FEP_DG_CROSSLINK_CORRECTION` # because these are set by a prior stage of the same job. exempt_keys = () if self.param.fep_type.val == constants.FEP_TYPES.ABSOLUTE_BINDING: exempt_keys = (constants.FEP_DG_CROSSLINK_CORRECTION_REF, constants.FEP_DG_CROSSLINK_CORRECTION_MUT, constants.FEP_DDG_CROSSLINK_CORRECTION, constants.FEP_DG_CROSSLINK_CORRECTION) property_keys = [ key for key in list(mut_ct.property) if ((key.startswith(constants.FEP_DDG_PREFIX) or key.startswith( constants.FEP_DG_PREFIX)) and key not in exempt_keys) ] struc.delete_structure_properties(mut_ct, property_keys) if job.result == "": mut_ct.property[RESULT_CT_PROPERTY_NAME] = "analysis failure" else: if job.task == "afep" and not hasattr(job, "cross_link_correction"): # -RT * ln( R' T ) R = 1.98588E-3 R1 = 0.0820574587 T = job.temperature delta = -R * T * math.log(R1 * T) mut_ct.property[RESULT_CT_PROPERTY_NAME + "_transfer"] = str(job.result) mut_ct.property[RESULT_CT_PROPERTY_NAME + "_solvation"] = str(job.result + delta) if (job.contrib): prefix = RESULT_CT_PROPERTY_NAME + "_contribution_" for name in job.contrib.__dict__: if (name[:4] == "fec_"): mut_ct.property[prefix + name[4:]] = str( job.contrib.__dict__[name]) else: mut_ct.property[RESULT_CT_PROPERTY_NAME] = str(job.result) if (job.vdw_correction): prop_name = RESULT_CT_PROPERTY_NAME + "_long_range_dispersion_correction" mut_ct.property[prop_name] = str(job.vdw_correction) self._log( "Long-range dispersion correction for %s: %f kcal/mol" % ( job.mutation, job.vdw_correction, )) if (job.coul_correction): prop_name = RESULT_CT_PROPERTY_NAME + "_net_charge_correction" mut_ct.property[prop_name] = str(job.coul_correction) self._log("Net Charge correction for %s: %f kcal/mol" % ( job.mutation, job.coul_correction, )) if (hasattr(job, "cross_link_correction")): prop_name = RESULT_CT_PROPERTY_NAME + "_cross_link_correction" mut_ct.property[prop_name] = str(job.cross_link_correction) self._log( "Cross-link-restraint correction for %s: %f kcal/mol" % ( job.mutation, job.cross_link_correction, )) if self.param.fep_type.val == constants.FEP_TYPES.ABSOLUTE_BINDING and job.repr_maes: # In ABFEP, only one repr structure is generated repr_st = StructureReader.read(job.repr_maes[0]) # Preserve the properties needed to calculated the dG repr_st.property[constants.FEP_DG_PREFIX] = mut_ct.property[ constants.FEP_DG_PREFIX] repr_st.property[ constants.FEP_RESTRAINT_CORRECTION] = mut_ct.property[ constants.FEP_RESTRAINT_CORRECTION] job_to_cms[job] = repr_st new_job = [] for pjob, st in job_to_cms.items(): jobname, dir = self._get_jobname_and_dir(pjob) fname_out = os.path.join(dir, jobname + "_" + str(pjob.gid) + "-out.cms") st.write(fname_out) job = cmj.Job(jobname, pjob, self, None, dir) job.output.add(fname_out, None) job.output.add("fep.dG.log", None) new_job.append(job) cmj.StageBase.poststage(self) if (self._NEXT_STAGE is not None): for job in new_job: job.status.set(cmj.JobStatus.SUCCESS) self.add_jobs(new_job)