Source code for schrodinger.application.desmond.bennett

# File moved from desmond-src/extensions/Gibbs/scripts 2015-12-02

#!/usr/bin/env desres-exec
#{
# . $DESRES_ROOT/modules/init/sourceme.sh
# module add treetop/142-Desmond/bin
# exec desres-cleanenv -e PYTHONPATH \
#   -m $(treetop Python)/bin \
#   -m $(treetop numpy)/lib-python \
#   -m molfile/1.11.23/lib-python  \
#   -- python $0 "$@"
#}
# @COPYRIGHT@
#
# This script uses the Bennett acceptance ratio method to compute the
# free energy difference between two systems.
#

import optparse
import sys
from math import fabs
from math import log
from math import sqrt
from past.utils import old_div

import numpy
import scipy
import scipy.optimize

try:
    import molfile
    has_molfile = True
except ImportError:
    has_molfile = False

BOLTZMANN = 0.0019872041  # Boltzmann constant, in kcal/mol/K


[docs]def logistic(v, x): t = numpy.tanh(0.5 * (v + x)) return 0.5 * (t.size - t.sum())
[docs]def logistic_function(v1, v2, x): C = log(old_div(float(v1.size), float(v2.size))) - x t1 = logistic(v1, C) t2 = logistic(v2, -C) return t1 - t2
[docs]def logistic_full(v, x): t = numpy.tanh(0.5 * (v + x)) return (0.5 * (t.size - t.sum()), -0.25 * (t.size - (t**2).sum()))
[docs]def logistic_function_full(v1, v2, x): C = log(old_div(float(v1.size), float(v2.size))) - x (t1, dt1) = logistic_full(v1, C) (t2, dt2) = logistic_full(v2, -C) return (t1 - t2, dt1 + dt2)
[docs]def min_func(x, v1, v2): return logistic_function(v1, v2, x)
[docs]class CalcBAR(object): """ Attributes for parsing the observables and setting up BAR calculations. """
[docs] def __init__(self, begin_time=300., end_time=-1., forward_column=2, reverse_column=1, temperature=300., nresamples=100, tolerance=1.e-5, exchange=False, seed=2111839, bootstrap_flavor='stationary', blocksize=False, syncseed=True): self.begin_time = begin_time self.end_time = end_time self.forward_column = forward_column self.reverse_column = reverse_column self.temperature = temperature self.nresamples = nresamples self.tolerance = tolerance self.exchange = exchange self.dE = None self.dE_filter = None self.seed = seed self.bootstrap_flavor = bootstrap_flavor self.blocksize = blocksize self.syncseed = syncseed self.err = '' self.set_output() self.set_seed(self.seed) assert end_time == -1. or (end_time > begin_time)
[docs] def set_nresamples(self, nresamples): self.nresamples = nresamples
[docs] def set_output(self, fn_out='stdout', fn_log=None, fn_err='stderr'): """ Set up output filehandles :param fn_out: Filename for output, stdout for stdout or None for no output :type fn_out: string or None :param fn_log: Filename for log output, stdout for stdout or None for no output :type fn_log: string or None :param fn_err: Filename for error, stderr for stderr or None for no output :type fn_err: string or None """ if fn_out == 'stdout': self.fout = sys.stdout elif fn_out == 'stderr': self.fout = sys.stderr elif fn_out is not None: self.fout = open(fn_out, 'w') else: self.fout = None if fn_log == 'stdout': self.flog = sys.stdout elif fn_log == 'stderr': self.flog = sys.stderr elif fn_log is not None: self.flog = open(fn_log, 'w') else: self.flog = None if fn_err == 'stdout': self.ferr = sys.stdout elif fn_err == 'stderr': self.ferr = sys.stderr elif fn_err is not None: self.ferr = open(fn_err, 'w') else: self.ferr = None
[docs] def set_seed(self, seed): """ Set random seed. """ self.seed = seed numpy.random.seed(seed)
[docs] def close_output(self): """ Close filehandles. """ if self.fout is not None: self.fout.close() self.fout = None if self.flog is not None: self.flog.close() self.flog = None if self.ferr is not None: self.ferr.close() self.ferr = None
[docs] def load_data(self, dE_fns): """ Load data from dE_fns for subsequent analysis. :param dE_fns: List of filenames to load in order :type dE_fns: List of strings """ self.dE = [] for dE_fn in dE_fns: if has_molfile: plugin = molfile.guess_filetype(dE_fn, None) if plugin is None: data = numpy.loadtxt(dE_fn) else: reader = plugin.read(dE_fn) result = [] for f in reader.frames(): result.append( [f.time, getattr(f, 'dE-'), getattr(f, 'dE+')]) data = numpy.array(result) else: data = numpy.loadtxt(dE_fn) self.dE.append(data) self.filter_data(self.begin_time, self.end_time)
[docs] def filter_data(self, begin_time=0, end_time=-1): """ Filter data based on the start, end time :param begin_time: keep data after this time :type begin_time: float :param end_time: keep data before this time :type end_time: float """ if self.dE is None: self.dE_filter = None return self.dE_filter = [] # a better name would be dE_filtered for dE in self.dE: if end_time > 0: keep_idx = numpy.all( [dE[:, 0] >= begin_time, dE[:, 0] <= end_time], axis=0) else: keep_idx = dE[:, 0] > begin_time self.dE_filter.append(dE[keep_idx])
[docs] def analyze_data(self): """ Analyze filtered data. :return: [[dG, bootstrap sigma, analytic sigma],...] :rtype: list of list of floats """ if self.dE_filter is None: return None results = [] for i in range(len(self.dE_filter) - 1): # lambda's # Parse the data try: forward_dE = self.dE_filter[i][:, self.forward_column] except IndexError: forward_dE = numpy.array([]) try: reverse_dE = self.dE_filter[i + 1][:, self.reverse_column] except IndexError: reverse_dE = numpy.array([]) # Bootstrap resample if self.nresamples: if self.syncseed: # Syncseed ensures bootstrap resample synchronized across forward and reverse energies randstate = numpy.random.get_state() fbs = bootstrap(self.nresamples, len(forward_dE), bootstrap_flavor=self.bootstrap_flavor, blocksize=self.blocksize) if self.syncseed: numpy.random.set_state(randstate) rbs = bootstrap(self.nresamples, len(reverse_dE), bootstrap_flavor=self.bootstrap_flavor, blocksize=self.blocksize) dF = 0.0 dF2 = 0.0 sigma_dF = 0.0 dFvals = [] for iblock in range(self.nresamples): # bootstrap samples dFs = self.bennett(forward_dE[fbs[iblock]], reverse_dE[rbs[iblock]]) dF += dFs[0] dFvals.append(dFs[0]) dF2 += dFs[0]**2 sigma_dF += dFs[1] dF /= self.nresamples dF2 /= self.nresamples sigma_dF /= self.nresamples # "analytical" sigma is just the average sigma of each block bootstrap_sigma = sqrt(fabs(dF2 - dF * dF)) fulldf, fullsigma = self.bennett( forward_dE, reverse_dE) # dF using entire trajectory if not self.nresamples: bootstrap_sigma = 0.0 fullsigma = 0.0 result = [fulldf, bootstrap_sigma, fullsigma] results.append(result) # originally results is = list of (average bootstrap dF, standard deviation of # bootstrap df, average bootstrap sigma) len(list) == n-lambda if len(results) > 1: deltaF = [0, 0, 0, 0] for result in results: deltaF[0] += result[0] # delta F deltaF[1] += result[1] * result[1] # bootstrap sigma deltaF[2] += result[2] * result[2] # average sigma deltaF[1] = sqrt(deltaF[1]) deltaF[2] = sqrt(deltaF[2]) results.append(deltaF) return results
[docs] def write_results(self, results): """ Write out the results. :param results: results from analyze data :type results: list of list of floats """ cmt = "# Bootstrapping using random number seed %d" % self.seed if (self.seed == 2111839): cmt += ", B-day of J. Willard Gibbs." self.write_data(cmt, self.fout) header = "# %16s %16s %16s" % ("dF (kcal/mol)", "bootstrap std", "analytical std") self.write_data(header, self.fout) if len(results) == 1: result = results[0] self.write_data("# dF = %8.4f +/- %8.4f (%8.4f) kcal/mol" % \ tuple(result), self.fout) elif len(results) > 1: for result in results[:-1]: self.write_data(" %16.4f %16.4f %16.4f" % tuple(result), self.fout) # Never actually output in the original version #self.write_data("# + " + "-"*59, self.fout) deltaF = results[-1] self.write_data("# deltaF = %8.4f +/- %8.4f %17s kcal/mol" % \ (deltaF[0], deltaF[1], '(%.4f)' % deltaF[2]), self.fout)
[docs] def bennett(self, wf, wr): """ Given a series of forward and reverse work values between two systems, use Bennett's acceptance ratio method to estimate the free energy differences. The notations in this program follow that of Shirts et al. P.R.L. 91, 140601 (2003) :param wf: Forward work values :type wf: numpy array :param wr: Reverse work values :type wr: numpy array :return: bennett dG, bennett sigma :rtype: float, float """ beta = old_div(1.0, (BOLTZMANN * self.temperature)) nf = len(wf) nr = len(wr) bwf = beta * wf bwr = beta * wr # Perturbative estimate of the free energy difference, used as the # initial point for the iterations. if nf > 0: bwfm = numpy.min(bwf) dbFf = bwfm - log(numpy.mean(numpy.exp(bwfm - bwf))) if nr > 0: bwrm = numpy.min(bwr) dbFr = bwrm - log(numpy.mean(numpy.exp(bwrm - bwr))) if nf == 0 and nr == 0: self.err = 'No work found! Please increase the number of '\ 'sampling points or increase the block size '\ 'for bootstrapping.' self.write_data(self.err, self.ferr) raise ValueError(self.err) elif nf == 0: self.err = 'No forward work found: using straightforward '\ 'perturbation formula with the reverse work.' self.write_data(self.err, self.ferr) return old_div(-dbFr, beta), 0.0 elif nr == 0: self.err = 'No reverse work found: using straightforward '\ 'perturbation formula with the forward work.' self.write_data(self.err, self.ferr) return old_div(dbFf, beta), 0.0 dbF = 0.5 * (dbFf - dbFr) data = 'Initial estimates for dF = (%f + %f)/2 = %f' % (old_div( dbFf, beta), old_div(-dbFr, beta), old_div(dbF, beta)) #self.write_data(data, self.flog) dbF, sds = self._root(bwf, bwr, dbFf, dbFr) # The variance in the estimate of \delta F is given by the Fisher # information # # C = log(n_f/n_r) - \beta \delta F # # \sigma_{\delta F} = \frac{2}{\beta^2} ( # ( \sum_i^{n_f} 1/(1+cosh(\beta wf + C)) # + \sum_i^{n_r} 1/(1+cosh(\beta wr - C)))^{-1} - (1/n_f + 1/n_r) ) if sds[1] == 0: self.err = 'WARNING: \n'\ 'The estimated free energy has infinite variance.\n'\ 'Perhaps there is no overlap in phase space between the \n'\ 'two adjacent stages, please check your FEP setup and \n'\ 'make the adjacent stages closer.' self.write_data(self.err, self.ferr) sigma_dF = numpy.inf else: sigma_dF = (old_div( -1.0, (beta**2))) * (old_div(1.0, sds[1]) + (old_div(1.0, nf) + old_div(1.0, nr))) if sigma_dF > 0: sigma_dF = sqrt(sigma_dF) else: self.err = 'Possible numerical error: (sigma_dF)^2 = %g < 0' % sigma_dF self.write_data(self.err, self.ferr) sigma_dF = 0.0 #self.write_data('sigma_dF = %f' % sigma_dF, self.flog) if self.exchange: exg = self.exchange_acceptance_ratio(bwf, bwr) self.write_data('exchange acceptance ratio = %f' % exg, self.fout) return old_div(dbF, beta), sigma_dF
[docs] def exchange_acceptance_ratio(self, bwf, bwr): r""" Computes the expected acceptance ratio for exchange between adjacent windows.:: <a> = \int dx dy min( 1, exp( beta*(dW_f(x) + dW_r(y)) ) ) = 1/(n_f n_r) \sum_{i,j} min( 1, exp( beta( dW_{fi} + dW_{rj} ) ) ) :param bwf: forward work multiplied by beta :type bwf: numpy array :param bwr: reverse work multiplied by beta :type bwr: numpy array """ nf = len(bwf) nr = len(bwr) bwfMat = numpy.zeros([nr, nf]) for i in range(0, nr): bwfMat[i, :] = bwf bwrMat = numpy.zeros([nr, nf]) for i in range(0, nf): bwrMat[:, i] = bwr dbwfr = (bwfMat + bwrMat).flatten() #dbwfr = numpy.array( [ min(0, x) for x in dbwfr ] ) # slow !!! dbwfr[dbwfr > 0] = 0 return old_div(numpy.sum(numpy.exp(dbwfr)), (nf * nr))
def _root(self, bw0, bw1, x0, x1): """ Use the brentq method to find the root of the logistic function :param bw0: forward work values multiplied by beta :type bw0: numpy array :param bw1: reverse work values multiplied by beta :type bw1: numpy array :param x0: Initial estimate of the forward free energy :type x0: float :param x1: Initial estimate of the reverse free energy :type x1: float """ sds0 = logistic_function_full(bw0, bw1, x0) # If x0==x1, we attempt to move x1 to the opposite side of the root as # x0 by using x1 = x0 - 2 y/y' if abs(x1 - x0) < self.tolerance: x1 = x0 - 2 * sds0[0] / sds0[1] sds1 = logistic_function_full(bw0, bw1, x1) if (abs(sds0[0]) < self.tolerance): return x0, sds0 if (abs(sds1[0]) < self.tolerance): return x1, sds1 sds0 = sds0[0] sds1 = sds1[0] # First, we keep doubling the interval until we bracket the root. while (sds0 * sds1 > 0): t1 = 1.5 * x1 - 0.5 * x0 t0 = 1.5 * x0 - 0.5 * x1 x1 = t1 x0 = t0 sds0 = logistic_function(bw0, bw1, x0) sds1 = logistic_function(bw0, bw1, x1) # Now use the brentq algorithm to find the root x = scipy.optimize.brentq(min_func, x0, x1, args=(bw0, bw1), xtol=self.tolerance, maxiter=10000, disp=False) sds = logistic_function_full(bw0, bw1, x) return x, sds
[docs] def write_data(self, data, fh=None): """ Write data to file handle (or do nothing if fh is None) :param data: Data to write :type data: string :param fh: File to write to or None to not write :type fh: Open filehandle or None """ if fh is None: return fh.write(data + '\n')
[docs]def bootstrap(nresamples, size, nblocks=False, bootstrap_flavor='stationary block', blocksize=False): """ Create index sets representing bootstrap resamples of original data Written mostly dan.sindhikara@schrodinger.com :param nresamples: number of resamplings to return :type nresamples: int :param size: size of the data :type size: int :param nblocks: (optional) number of subblocks per resample :type nblocks: int :param bootstrap_flavor: type of bootstrapping :type bootstrap_flavor: str :param blocksize: (optional) size of blocks comprising each resample :type blocksize: int :return i_resamples: list of resampling indices :rtype i_resamples: list-like of list-likes """ valid_flavors = [ 'stationary', 'simple block', 'simple subblock', 'moving block', 'mini-case' ] if bootstrap_flavor not in valid_flavors: raise RuntimeError( "Invalid bootstrap flavor '{}'. Choose from:{}".format( bootstrap_flavor, valid_flavors)) if not nblocks: nblocks = nresamples flatten = lambda mylist: [item for sublist in mylist for item in sublist] if bootstrap_flavor == 'stationary': # Stationary block resampling # Resample with replacement subblocks with random start points with # random (geometric) lengths # len(resample) ~ len(data) bsize = 0.1 * size nblocks = int(old_div(size, bsize)) i_resamples = [] for ir in range(nresamples): resample_i_lists = [[ i % size for i in range(start, start + length) ] for start, length in zip( numpy.random.randint(0, size, nblocks), numpy.random.geometric(old_div(1., bsize), nblocks))] i_resamples.append(flatten(resample_i_lists)) return i_resamples elif bootstrap_flavor == 'simple block': # Resample with replacement ordered subblocks # len(resample) == len(data) bsize = int(old_div(size, nresamples)) i_resamples = [ flatten([ list(range(bsize * i, bsize * (i + 1))) for i in numpy.random.randint(nblocks, size=nblocks) ]) for ir in range(nresamples) ] return i_resamples elif bootstrap_flavor == 'simple subblock': # Resample without replacement ordered subblocks # len(resample) << len(data) bsize = int(old_div(size, nresamples)) i_resamples = [ list(range(bsize * i, bsize * (i + 1))) for i in range(nblocks) ] return i_resamples elif bootstrap_flavor == 'moving block': # Moving block resampling # resample subblocks of length size/nblocks with random start # points with replacement # from reservoir of length "size" # len(resample) == len(data) if not blocksize: bsize = int(old_div(float(size), nblocks)) else: bsize = blocksize reservoir = [list(range(i, i + bsize)) for i in range(size - bsize)] i_resamples = [] for i in range(nresamples): series = [] for j in range(nblocks): series.extend(reservoir[numpy.random.randint(0, size - bsize)]) i_resamples.append(series) return i_resamples elif bootstrap_flavor == 'mini-case': # Dan deemed 'mini-case'-style resampling # nblock average (size/nblock)-sized mini-resamples # I haven't seen this style used in literature # len(resample) << len(data) # Drawbacks: # 1) arbitrarily small mini-resamples yields arbitrary error scale # 2) 'case'-style resampling obscures time-correlated error rand_block = numpy.random.randint(0, nresamples, size) idx = numpy.arange(size) return [idx[rand_block == iblock] for iblock in range(nblocks)]
[docs]def main(argv=None): usage = '%prog -f forward_efile -r reverse_efile\n'\ ' -b begin_time -e end_time\n'\ ' -F forward_ecolumn -R reverse_ecolumn -T temperature\n'\ ' -t tolerance -B <number of data blocks>\n'\ ' -s <F.E.C. energy files prefix>\n'\ ' -l lambda_0 -L lambda_1\n' opt = optparse.OptionParser(usage) opt.add_option('-f', '--forward_eFile', type='string', default='', help='file of forward works') opt.add_option('-r', '--reverse_eFile', type='string', default='', help='file of reverse works') opt.add_option('-b', '--begin_time', type='float', default=0., help='take samples after this beginning time') opt.add_option('-e', '--end_time', type='float', default=-1., help='take samples before this end time') opt.add_option('-F', '--forward_column', type='int', default=2, help='column containing the forward works') opt.add_option('-R', '--reverse_column', type='int', default=1, help='column containing the reverse works') opt.add_option('-T', '--temperature', type='float', default=300., help='temperature of the simulation') opt.add_option('-t', '--tolerance', type='float', default=1.e-5, help='tolerance in iteratively solving the free energy') opt.add_option('-B', '--nresamples', type='int', default=100, help='number of bootstrap resamples') opt.add_option( '--bootstrap_flavor', type='str', default='stationary', # mini-case has been default through 11/2015 help='type of bootstrap resampling') opt.add_option('-s', '--series', type='string', default='', help='The naming pattern of the F.E.C. energy ' \ 'files. For example, if the energy files are ' \ 'prefix.i.suffix, where i is between l_0 and l_1, ' \ 'specified by -l and -L, respectively, the naming ' \ 'pattern should be given as prefix.%d.suffix.') opt.add_option('--dE', action='append', default=[], help='Alternative way to specify a series of deltaE ' \ 'files, appending one after another.') opt.add_option('-l', '--lambda_0', type='int', default=0, help='beginning lambda window') opt.add_option('-L', '--lambda_1', type='int', default=10, help='end lambda window') opt.add_option('--rseed', type='int', default=2111839, help='random number seed for bootstrap') opt.add_option('-x', '--exchange', action='store_true', dest='exchange', help='whether to compute exchange accetance ratio') opt.add_option('-o', '--output', default='stdout', help='output file name, none to not write output') opt.add_option('--blocksize', type=int, default=0, dest='blocksize'), opt.add_option('--syncseed', action='store_true', dest='syncseed', default=True) opt.add_option('--error_file', default='stderr', help='name of file to write error and warning ' 'messages to, none to not write output') opt.add_option('--log_file', default='stdout', help='name of file to write additional log' 'information to, none to not write output') opt.add_option('--description', help='information to print into ' 'the output file') opts, args = opt.parse_args(argv) bar = CalcBAR(opts.begin_time, opts.end_time, opts.forward_column, opts.reverse_column, opts.temperature, opts.nresamples, opts.tolerance, opts.exchange, opts.rseed, opts.bootstrap_flavor, opts.blocksize, opts.syncseed) fn_out = opts.output fn_log = opts.log_file fn_err = opts.error_file if fn_out == 'none': fn_out = None if fn_log == 'none': fn_log = None if fn_err == 'none': fn_err = None bar.set_output(fn_out, fn_log, fn_err) if opts.forward_eFile and opts.reverse_eFile: fns = [opts.forward_eFile, opts.reverse_eFile] else: if (opts.series): fns = [ opts.series % i for i in range(opts.lambda_0, opts.lambda_1 + 1) ] elif (opts.dE): fns = opts.dE else: raise RuntimeError("No input files provided") bar.load_data(fns) results = bar.analyze_data() if opts.description: bar.write_data("# %s" % opts.description, bar.fout) bar.write_results(results) bar.close_output()
if __name__ == '__main__': main()