Source code for schrodinger.application.canvas.r_group_dee

"""
R-Group Analysis Dead End elimination
"""

# RGA Dead End Elimination code

import math
import random
import sys
import time
from past.utils import old_div

import numpy

from schrodinger.utils import log

logger = log.get_output_logger("schrodinger.application.canvas.r_group_dee")


[docs]class SimulatedAnnealing():
[docs] @staticmethod def getChoiceSpectrum(choices): # Build an array that lists each multimatch input structure a number # of times equal to the number of matches it could make a transition # to from its current match. Later, we will select a random entry # from this list in order to select the structure to sample a new # random transition for. spectrum = [] for i, choice in enumerate(choices): # For example, if st[i] has 6 matches, it will be listed 5 # times, because it can make 5 possible transitions: spectrum.extend((choice - 1) * [i]) return spectrum
[docs] def __init__(self, energy_matrix, sa_seed=None, t_factor=None, tmax_mult=None): """ Initializer for simulated annealing class. :param energy_matrix: precalculated pairwise matrix used by SA. :type energy_matrix: `DEE_EnergyMatrix` :param sa_seed: SA random number generator seed :type sa_seed: int :param t_fac: Factor (<1) by which T will be multiplied at each T change :type sa_seed: float :param tmax_mult: Factor by which no. of st. will be multiplied to get starting T :type tmax_mult: float """ # Because this class can be initialized from many places, several of which can # optionally set the keyword arguments, it is most robust for all callers to # use None as the default value and then let this method define the true defaults # as follows. Otherwise, we would have to reset the defaults in all the calling # functions, should we decide that changes are needed: self.sa_seed = (0 if sa_seed is None else sa_seed) self.t_factor = (0.9 if t_factor is None else t_factor) self.tmax_mult = (6.0 if tmax_mult is None else tmax_mult) self.random = random.Random() self.random.seed(self.sa_seed) self.energy_matrix = energy_matrix self.nmax = self.energy_matrix.numPos() # no. input structures self.choices = self.energy_matrix.numChoices( ) # for each st, no. states self.log10_configs = 1 for choice in self.choices: self.log10_configs += math.log10(choice) self.choice_spectrum = SimulatedAnnealing.getChoiceSpectrum( self.choices) self.best_solution = [] # First solution encountered w. current best E # All solutions found with current best E: self.best_solution_set = set() # Best energy found so far, initialize to highest FP value: self.best_energy = sys.float_info.max self.cpu_time = None self.t_max = self.tmax_mult * self.nmax # starting T self.t_min = 0.001 # Quit if T gets this low self.max_t_steps = int(1 + old_div( (math.log(self.t_min) - math.log(self.t_max)), math.log(self.t_factor))) # Epsilon for floating-point comparison: self.fp_eps = 1.0e-10 self.imax = 0 # sum of number of states for each st for c in self.choices: self.imax += c
[docs] def run(self): # initialize random initial solution time_0 = time.perf_counter() clock_0 = time.perf_counter() solution_new = [] for c in self.choices: solution_new.append(self.random.randint(0, c - 1)) e_new = self.energy_matrix.calculateEnergy(solution_new) self.new_best(e_new, solution_new) e_old = e_new solution_old = solution_new[:] iter = 0 t = self.t_max # Normally decrease T after this number of steps max_steps_at_t = 20 * int(math.sqrt(self.imax)) # Decrease T if no rejections in this number of steps: max_steps_all_accept = old_div(max_steps_at_t, 2) # tsteps = 0 # Number of steps so far logger.debug("Starting simulated annealing") logger.debug("SA random seed: %d\n" % self.sa_seed) logger.debug( "No. structures=%d; log10 configs=%.2f; sum of choices= %d" % (self.nmax, self.log10_configs, self.imax)) logger.debug("Max. T levels=%d; T factor=%.2f; Max. steps at T=%d" % (self.max_t_steps, self.t_factor, max_steps_at_t)) # Number of consecutive acceptances that exhibit the current best energy: acceptCnt_best_e = 0 e_diff_max = 0 # Abs value of max energy change during any iteration at any T e_t_start = sys.float_info.max # Will hold E at the start of each T step delta_e = sys.float_info.max # Will hold delta_e for each T step while True: last_delta_e = delta_e last_best_energy = self.best_energy tsteps += 1 if tsteps > self.max_t_steps: # Exceeded max. allowed iterations: break acceptCnt = 0 rejectCnt = 0 totalCnt = 0 for totalCnt in range(1, max_steps_at_t + 1): solution_new, istruct = self.getNewSolution( solution_old) #make a move #e_new = self.energy_matrix.calculateEnergy( solution_new ) e_diff = self.energy_matrix.calculateEnergyDifference( solution_old, e_old, istruct, solution_new) e_diff_max = max(e_diff_max, abs(e_diff)) e_new = e_old + e_diff eligible_for_boltzmann = True # Summation of large number of FP values in E calc leads # to floating-point inaccuracies that the following # attempts to correct for: e_best_delta = abs(old_div((e_new - self.best_energy), e_new)) is_e_best = e_best_delta < self.fp_eps # Close enough for gummint work if is_e_best: # Special branch to handle ground-state degeneracy: # If current state revisits an old best-E state, then # we will always reject and will not try Boltzmann: eligible_for_boltzmann = self.is_another_best_config( solution_new) if eligible_for_boltzmann and self.boltzmann_probability( e_old, e_new, t) > self.random.random(): # We accept the new state: if e_new < self.best_energy: self.new_best(e_new, solution_new) solution_old = solution_new[:] e_old = e_new acceptCnt += 1 if is_e_best: acceptCnt_best_e += 1 else: acceptCnt_best_e = 0 else: # We rejected the current state: rejectCnt += 1 iter += 1 if totalCnt == max_steps_all_accept and acceptCnt == totalCnt: # Go to next T early because we accepted all structures (T very high): break if acceptCnt_best_e == max_steps_at_t: # Break out early because of too many consecutive # acceptances of best_e structures: break # Completed or broke out of a T step; # Unintuitively. e_old is E at the end of the completed T step: self.report_t(t, e_t_start, e_old, tsteps, totalCnt, acceptCnt) delta_e = abs(e_old - e_t_start) delta_best_energy = abs(self.best_energy - last_best_energy) #if tsteps % 50 == 0: # logger.debug( ' %4d T-steps\n' % ( tsteps, ) ) finished = False if (delta_e < self.fp_eps and last_delta_e < self.fp_eps and delta_best_energy < self.fp_eps): msg = 'Converged because energy has stopped going down' finished = True elif totalCnt == rejectCnt: msg = 'Converged because there were no acceptances at last T' finished = True elif acceptCnt_best_e == max_steps_at_t: msg = ('Converged due to massive low-E degeneracy:\n' ' %d consecutive acceptances had best E' % (acceptCnt_best_e,)) finished = True elif tsteps > self.max_t_steps: msg = 'Exiting because exceeded iteration limit' finished = True if finished: #mod = tsteps % 50 #if mod: # pad = ''.rjust( 50 - mod ) # string of mod blanks # logger.debug( '%s %4d T-steps\n' % ( pad, tsteps, ) ) logger.debug(msg) break # If we get here, we're still running; proceed to next temperature: t = self.new_temperature(t) e_t_start = e_old #print "Completed simulated annealing" elapsed_time = time.perf_counter() - time_0 cpu_time = time.perf_counter() - clock_0 self.cpu_time = cpu_time logger.debug("\nTemperature steps: %d" % tsteps) logger.debug("Total iterations: %d" % iter) logger.debug("|DeltaEMax| for an iteration: %.2f" % e_diff_max) logger.debug("exp(-|DeltaEMax|/initialT)= %.2f" % (math.exp(old_div(-e_diff_max, self.t_max)))) logger.debug("Best energy: %.4f" % self.best_energy) #print "First best solution found: ", self.best_solution logger.debug("Number of degenerate best solutions found: %d" % len(self.best_solution_set)) #for sol in self.best_solution_set: # print ' ', sol logger.debug("Elapsed time: %.2f" % elapsed_time) logger.debug("CPU time: %.2f" % cpu_time) return ''
[docs] def is_another_best_config(self, solution): s = tuple(solution[:]) if s in self.best_solution_set: #print ' Found best E again; revisited solution:' #print ' ', s return False else: self.best_solution_set.add(tuple(solution[:])) #print ' Found best E again; new solution:' #print ' ', s return True
[docs] def new_best(self, energy, solution): self.best_energy = energy self.best_solution = tuple(solution[:]) self.best_solution_set = set() self.best_solution_set.add(tuple(solution[:]))
#print ' New best_E=%.10f' % ( energy, )
[docs] def report_t(self, t, start_E, end_E, tsteps, totalCnt, acceptCnt): delta_E = end_E - start_E if tsteps == 1: logger.debug( "T, start_E, end_E, delta_E, best_E=" + " %10.4f %10s %10.4f %10s %10.4f" % (t, '----------', end_E, '----------', self.best_energy)) else: logger.debug("T, start_E, end_E, delta_E, best_E=" + " %10.4f %10.4f %10.4f %10.4f %10.4f" % (t, start_E, end_E, delta_E, self.best_energy)) logger.debug(" T-steps, T-iterations, T-acceptances, %d %d %d" % (tsteps, totalCnt, acceptCnt))
[docs] def getNewSolution(self, solution_old): # Original method: #solution_new = self.neighbor( solution_old ) #return solution_new solution_new = solution_old[:] istruct = self.random.choice(self.choice_spectrum) nchoices = self.choices[istruct] choice_increment = self.random.randint(1, nchoices - 1) old_choice = solution_old[istruct] new_choice = (old_choice + choice_increment) % nchoices #print 'istruct, old_choice, new_choice=', istruct, old_choice, new_choice solution_new[istruct] = new_choice return (solution_new, istruct)
[docs] def neighbor(self, solution): s = solution[:] while True: mol = self.random.randint(0, self.nmax - 1) c = self.random.randint(0, self.choices[mol] - 1) s[mol] = c if s != solution: # make sure we have a new system state break return s
[docs] def new_temperature(self, old_t): t = self.t_factor * old_t return t
[docs] def boltzmann_probability(self, e_old, e_new, t): if e_new < e_old: return 1.0 elif t == 0.0: return 0.0 else: return math.exp(old_div((e_old - e_new), t))
[docs] def getBestMatch(self): return self.best_solution
[docs]class DEE_Backtracking():
[docs] def __init__(self, energy_matrix): self.energy_matrix = energy_matrix self.nmax = self.energy_matrix.numPos() self.choices = self.energy_matrix.numChoices() self.result = self.energy_matrix.initialSolution() self.current_energy = sys.float_info.max if len(self.result) > 0: self.current_energy = self.energy_matrix.calculateEnergy( self.result) #self.result = [] self.count = 0 self.nodeCnt = 0 self.maxCnt = 25000
[docs] def minimize(self): print("starting backtracking algorithm") print("nmax: ", self.nmax, " choices: ", self.choices) print("initial solution: ", self.result) print("initial energy: ", self.current_energy) solution = [] rStr = "" if self.backtrack(solution): rStr = "Backtracking algorithm max node count exceeded. Solution may be suboptimal." print("nodes visited: ", self.nodeCnt) self.output(self.result) print("final solution: ", self.result) print("final energy: ", self.current_energy) return rStr
[docs] def backtrack(self, solution): self.nodeCnt += 1 if self.nodeCnt > self.maxCnt: return True if self.reject(solution): return None if self.accept(solution): self.result = solution[:] s = self.first(solution) while s is not None: if self.backtrack(s): return True s = self.next(s)
[docs] def reject(self, solution): return self.energy_matrix.applyEnergyFilter(solution, self.current_energy)
[docs] def accept(self, solution): if len(solution) == self.nmax: self.count += 1 e = self.energy_matrix.calculateEnergy(solution) if e < self.current_energy: self.current_energy = e #print "solution: ", solution, " new best energy: ", e return True else: #print "current suboptimal solution: ", solution, " e: ", e return False
[docs] def first(self, solution): if len(solution) == self.nmax: return None else: s = solution[:] s.append(0) return s
[docs] def next(self, solution): s = solution[:] if s[len(s) - 1] < self.choices[len(s) - 1] - 1: s[len(s) - 1] = s[len(s) - 1] + 1 return s else: return None
[docs] def output(self, solution): i0 = 0 total_variants = 1 outStr = '' for c, r in enumerate(solution): idx = i0 + r + 1 outStr += str(idx) outStr += ' ' i0 += self.choices[c] total_variants = total_variants * self.choices[c] outStr += "\n"
#print "total variants: ", total_variants, " total checked: ", self.count #print "min energy: ", self.current_energy #print "states: ", outStr
[docs] def getBestMatch(self): return self.result
[docs]class DEE_EnergyMatrix():
[docs] def __init__(self, choices, uij): self.nmax = len(choices) self.choices = choices self.uij = uij self.offset = [] self.singles = [] # calculate total number of matrix rows/columns ns = 0 for i in range(self.nmax): ns += self.choices[i] #print "matrix size ns: ", ns #print "choices: ", self.choices self.umin = numpy.zeros((self.nmax, self.nmax)) self.umin_state = numpy.zeros((ns, self.nmax)) # precalculate min values of uij for each pair of molecules (self.umin) # and each state/molecule pair (self.umin_state) i0 = 0 for i in range(self.nmax): j0 = 0 for j in range(self.nmax): self.umin[i][j] = sys.float_info.max for ia in range(self.choices[i]): idx = i0 + ia self.umin_state[idx][j] = sys.float_info.max for ja in range(self.choices[j]): idy = j0 + ja if self.uij[idx][idy] < self.umin[i, j]: self.umin[i, j] = self.uij[idx][idy] if self.uij[idx][idy] < self.umin_state[idx, j]: self.umin_state[idx, j] = self.uij[idx][idy] j0 += self.choices[j] self.offset.append(i0) i0 += self.choices[i] self.pairs = numpy.zeros((i0, i0))
#print "energy matrix: " #numpy.set_printoptions(precision=2,edgeitems=8) #print self.uij #print "umin: " #print self.umin #print "umin_state: " #print self.umin_state
[docs] def numPos(self): return self.nmax
[docs] def numChoices(self): return self.choices
[docs] def calculateEnergy(self, solution): e = 0.0 s = self.convertSolution(solution) for r in s: e += self.uij[r][r] for c in [x for x in s if x > r]: e += self.uij[r][c] return e
[docs] def calculateEnergyDifference(self, solution_old, e_old, istruct, solution_new): s_old = self.convertSolution(solution_old) s_new = self.convertSolution(solution_new) e_diff = 0.0 c_old = s_old[istruct] c_new = s_new[istruct] for r in s_old: e_diff -= self.uij[r][c_old] for r in s_new: e_diff += self.uij[r][c_new] return e_diff
[docs] def initialSolution(self): solution = [] solution.append(0) for i in range(1, self.nmax): umin = sys.float_info.max imin = -1 for ia in range(self.choices[i]): idx = self.offset[i] + ia if idx in self.singles: continue if self.pairs[0][idx] == 1: continue if self.uij[0][idx] < umin: umin = self.uij[0][idx] imin = ia if imin == -1: blank = [] return blank solution.append(imin) return solution
# this function checks that this solution does not contain eliminated singles # of dead end pairs
[docs] def checkSolution(self, solution): s = self.convertSolution(solution) for r in s: if r in self.singles: return False for c in s: if self.pairs[r][c] == 1: return False return True
[docs] def convertSolution(self, solution): s = [] for c, r in enumerate(solution): idx = self.offset[c] + r s.append(idx) return s
[docs] def applyEnergyFilter(self, solution, current_energy): if len(solution) == self.nmax: return False s = self.convertSolution(solution) if not self.checkSolution(solution): return True e = self.calculateEnergy(solution) # estimate best possible contribution from the remaining states for i in range(self.nmax): if i < len(solution): for j in range(len(solution), self.nmax): e += self.umin_state[s[i]][j] else: e += self.umin[i][i] for j in range(i + 1, self.nmax): e += self.umin[i][j] if e >= current_energy: return True else: return False
[docs] def eliminateSingles(self): #print "eliminate singles using Goldstein rule" for k in range(self.nmax): for a in range(self.choices[k] - 1): ka = self.offset[k] + a if ka in self.singles: continue for b in range(a + 1, self.choices[k]): kb = self.offset[k] + b if kb in self.singles: continue rc = self.applyGoldsteinSingles(k, a, b) if rc == 3: self.singles.append(ka) self.singles.append(kb) if rc == 1: self.singles.append(ka) if rc == 2: self.singles.append(kb)
#print "eliminate singles size: ", len(self.singles) #print "singles: ", self.singles
[docs] def eliminatePairs(self): #print "eliminate pairs" cnt = 0 for k in range(self.nmax - 1): for x in range(k + 1, self.nmax): for a in range(self.choices[k]): ka = self.offset[k] + a for b in range(self.choices[x]): lb = self.offset[x] + b eDiffAB = self.uij[ka][ka] + self.uij[lb][ lb] + self.uij[ka][lb] isElim = False for c in range(self.choices[k]): if isElim: break if c == a: continue kc = self.offset[k] + c for d in range(self.choices[x]): if isElim: break if d == b: continue ld = self.offset[x] + d if self.pairs[kc][ld] == 1: continue eDiffCD = self.uij[kc][kc] + self.uij[ld][ ld] + self.uij[kc][ld] totMinAB = 0.0 totMaxCD = 0.0 for i in range(self.nmax): if i == k or i == x: continue minDiffAB = sys.float_info.max maxDiffCD = -sys.float_info.max for c in range(self.choices[i]): ix = self.offset[i] + c eab = self.uij[ka][ix] + self.uij[lb][ix] ecd = self.uij[kc][ix] + self.uij[ld][ix] if eab < minDiffAB: minDiffAB = eab if ecd > maxDiffCD: maxDiffCD = ecd totMinAB += minDiffAB totMaxCD += maxDiffCD if (eDiffAB + totMinAB) > (eDiffCD + totMaxCD): cnt += 1 isElim = True self.pairs[ka][lb] = 1 self.pairs[lb][ka] = 1
#print "eliminate ka: ", ka, " lb: ", lb, " total count: ", cnt #print "total pairs eliminated: ", cnt
[docs] def applyGoldsteinSingles(self, k, a, b): #print "goldstein singles k: ", k, " a: ", a, " b: ", b ka = self.offset[k] + a kb = self.offset[k] + b bNotA = False # if A can't be eliminated this is true bNotB = False # if B can't be eliminated this is true if ka in self.singles or kb in self.singles: return 0 eDiffAB = self.uij[ka][ka] - self.uij[kb][kb] eDiffBA = -eDiffAB for i in range(self.nmax): if i == k: continue initA = False initB = False minDiffAB = sys.float_info.max minDiffBA = sys.float_info.max for c in range(self.choices[i]): ix = self.offset[i] + c if ix in self.singles: continue if self.pairs[ka][ix] == 1: if self.pairs[kb][ix] == 1: continue else: bNotB = True elif self.pairs[kb][ix] == 1: bNotA = True # check if we can not eliminate either A or B if bNotA and bNotB: return 0 eDiff = self.uij[ka][ix] - self.uij[kb][ix] if self.pairs[ka][ix] == 0: if not initA: minDiffAB = eDiff initA = True if minDiffAB > eDiff: minDiffAB = eDiff if self.pairs[kb][ix] == 0: if not initB: minDiffBA = -eDiff initB = True if minDiffBA > -eDiff: minDiffBA = -eDiff if not (initA or initB): return 3 elif not initA: return 1 elif not initB: return 2 eDiffAB += minDiffAB eDiffBA += minDiffBA #print "diffAB: ", eDiffAB, " diffBA: ", eDiffBA if (not bNotA) and (eDiffAB > 0): return 1 if (not bNotB) and (eDiffBA > 0): return 2 return 0
[docs]def main(): matrix = DEE_EnergyMatrix() backtrack = DEE_Backtracking(matrix) backtrack.minimize()
if __name__ == '__main__': main()