Source code for schrodinger.application.desmond.replica_dE

import os
from past.utils import old_div
from typing import Optional

import numpy

import schrodinger.utils.sea as sea

from . import replica_exchange_review as rer

# round the results to save disk space
_NUM_DIGITS = 3


[docs]class replica_energy(object):
[docs] def __init__(self, rep_number, filename=None, de_array=None): assert bool(filename) ^ ( de_array is not None), "Specify filename or de_array, not both" self.replica_number = rep_number time = [] reverse = [] forward = [] if filename is not None: with open(filename) as f: for line in f: if line[0] != '#' and len(line.strip()) != 0: t, r, f = list(map(float, line.split())) time.append(t) reverse.append(r) forward.append(f) elif de_array is not None: de_array = de_array.T time = de_array[0] reverse = de_array[1] forward = de_array[2] self.reverse = [r * -1. for r in reverse] self.forward = forward self.time = time self.reverse_range = (min(self.reverse), max(self.reverse)) self.forward_range = (min(self.forward), max(self.forward))
[docs] def getNumber(self): return self.replica_number
[docs] def getRMin(self): return self.reverse_range[0]
[docs] def getRMax(self): return self.reverse_range[1]
[docs] def getFMin(self): return self.forward_range[0]
[docs] def getFMax(self): return self.forward_range[1]
[docs] def getRMean(self): return numpy.mean(self.reverse)
[docs] def getFMean(self): return numpy.mean(self.forward)
[docs] def getRHistogram(self, his_min, his_max, nbins): d, b = numpy.histogram(self.reverse, bins=nbins, density=True, range=(his_min, his_max)) d = old_div(d, d.sum()) return d, b
[docs] def getFHistogram(self, his_min, his_max, nbins): d, b = numpy.histogram(self.forward, bins=nbins, density=True, range=(his_min, his_max)) d = old_div(d, d.sum()) return d, b
[docs]class replica_container:
[docs] def __init__(self, basename, energy_output, de_array: Optional[numpy.ndarray] = None, task_type=None, n_win=12): # `basename` is used to search for dE files. # It is not required if `de_array` (dE values) is provided. assert bool(basename) ^ bool (de_array is not None), \ "Specify filename or de_array, not both" self.basename = basename self._de_array = de_array self.task_type = task_type self.nreplicas = n_win self.replica_list = [] self.replica_numb = [] self.overlap = [] self.overlap_centers = [] self.nbins = 100 self.fn_template = None self._setup_fn_template() self.read_dE_Replicas() data0, results0 = energy_output['forward_time'] self.bennett_forward_tot, self.bennett_forward_replica = self.process_bennett_log( results0) self.dG_forward_data = data0 data1, results1 = energy_output['reversed_time'] self.bennett_reverse_tot, self.bennett_reverse_replica = self.process_bennett_log( results1) self.dG_reverse_data = data1 data2, results2 = energy_output['sliding_time'] self.bennett_sliding_tot, self.bennett_sliding_replica = self.process_bennett_log( results2) self.dG_sliding_data = data2
def _setup_fn_template(self): if self.basename is None: # Assume the `de_array` is passed directly to `__init__` return self.fn_template = self.basename + '_replica%i.dE' if self.task_type == 'fep': directory = self.basename + '_lambda%i/' self.fn_template = directory + self.basename + '_lambda%i.dE' def _addReplica(self, replica): self.replica_list.append(replica) self.replica_numb.append(replica.getNumber())
[docs] def read_dE_Replicas(self): if self._de_array is not None: for irep, arr in enumerate(self._de_array): new_replica = replica_energy(irep, filename=None, de_array=arr) self._addReplica(new_replica) else: for irep in range(self.nreplicas): if self.task_type == 'fep': filename = self.fn_template % (irep, irep) else: filename = self.fn_template % irep if not os.path.isfile(filename): break new_replica = replica_energy(irep, filename) self._addReplica(new_replica)
[docs] def getReplica(self, number): if number > len(self.replica_list): return None idx = self.replica_numb.index(number) return self.replica_list[idx]
[docs] def get_nrep(self): return len(self.replica_numb)
[docs] def printInfo(self): print('Number or replicas:', self.get_nrep()) print('Replica Names:', self.replica_numb)
[docs] def export(self): m = sea.Map() if self.dG_forward_data: m['dGForward'] = self.export_dE_data(self.dG_forward_data, self.bennett_forward_tot, self.bennett_forward_replica) if self.dG_reverse_data: m['dGReverse'] = self.export_dE_data(self.dG_reverse_data, self.bennett_reverse_tot, self.bennett_reverse_replica) if self.dG_sliding_data: m['dGSliding'] = self.export_dE_data(self.dG_sliding_data, self.bennett_sliding_tot, self.bennett_sliding_replica) return m
[docs] def process_bennett_log(self, data): bennett_data = [(a[0], a[1], a[2]) for a in data[:-1]] dG = (data[-1][0], data[-1][1], data[-1][2]) return dG, bennett_data
[docs] def export_dE_data(self, data, bennett_dG, bennett_per_replica): if not data: return None x, y, err = [], [], [] for d in data: x.append(d[0]) y.append(d[1].val) err.append(d[1].unc) m = sea.Map() m['StartTime_ns'] = x[0] / 1000.0 m['EndTime_ns'] = x[-1] / 1000. m['dG'] = [round(_, _NUM_DIGITS) for _ in y] m['dG_error'] = [round(_, _NUM_DIGITS) for _ in err] m['Bennett'] = sea.Map() m['Bennett']['dG'] = bennett_dG[0] m['Bennett']['Bootstrap_std'] = bennett_dG[1] m['Bennett']['Analytical_std'] = bennett_dG[2] m['Bennett']['dF_Per_Replica'] = [ [round(_, _NUM_DIGITS) for _ in row] for row in bennett_per_replica ] kw_list = sea.List() num_dF = len(data[0][2]) for i in range(num_dF): n = sea.Map() n['repA'] = i n['repB'] = i + 1 y, err = [], [] for d in data: y.append(round(d[2][i].val, _NUM_DIGITS)) err.append(round(d[2][i].unc, _NUM_DIGITS)) n['dF'] = y n['dF_error'] = err kw_list.append(n) m['dG_pairs'] = kw_list return m
[docs]class replicas_monitor(object):
[docs] def __init__(self, basename, cfg, task_type): self.basename = basename self.cfg = cfg self.task_type = task_type # Use remd.dat if available. try: log_file = self.cfg.remd.exchange_dat.name.val except AttributeError: log_file = basename + '.log' self.replicas, self.array_list = \ rer.read_data_from_log_file(log_file, self.cfg) self.nbins = len(self.array_list)
[docs] def get_histogram(self, times, data): ''' Converts a sparse doubled-time list into a count of the time that each replica spends in each state. :param times: Time series :param data: State the replica was in at each time point :type times: numpy array of floats :type data: numpy array of int :return: Counts of occurence in each state :rtype: numpy array of int Note: the precision of the time in the logfile can be lower than the exchange frequency and thus result in small rounding errors when computing duration. This should be resolved by dividing by the interval and rounding. ''' counts = numpy.zeros(self.nbins, dtype=float) durations = numpy.diff(times) for duration, rep_num in zip(durations, data): counts[rep_num] += duration counts /= self.cfg.remd.interval.val return counts.round(0).astype(int)
[docs] def get_nrep(self): return len(self.array_list)
[docs] def export(self): kw_list = sea.List() for rep_num in range(len(self.array_list)): times, temps = self.array_list[rep_num] temps = temps.astype(numpy.int) m = sea.Map() m['Number'] = rep_num m['HistoryDistribution'] = self.get_histogram(times, temps).tolist() kw_list.append(m) r = sea.Map() r['Replica'] = kw_list return r